生产者消费者模型——如何手搓一个线程池
目录
一、基本概念
1.1生产者消费者模型
生产者消费者模型是一种经典的并发编程设计模式,用于解决多线程/多进程协作中生产数据和处理数据的异步协作问题。
其核心思想是通过缓冲区(通常为队列)解耦(减少相互依赖)生产者和消费者,使二者能以不同的速度独立运行,生产者只需关心数据的生产并放入队列,消费者只需关心从队列中取出数据并消费,从而提高系统效率和资源利用率。
flowchart LR
P[生产者] -->|生成数据| B[("缓冲区(队列)")]
B -->|消费数据| C[消费者]
style P fill:#c3e6cb,stroke:#28a745
style B fill:#fff3cd,stroke:#ffc107
style C fill:#f8d7da,stroke:#dc3545
%% 可选:添加同步机制注释
note1["缓冲区满 → 生产者等待"]
note2["缓冲区空 → 消费者等待"]
note1 -.- B
note2 -.- B
1.2 线程池
线程池是一种通过预先创建并维护一组可复用的线程,实现任务的高效调度与资源优化的线程管理机制。其主要由 核心线程,任务队列,线程控制器 三部分构成。
核心线程:核心线程是线程池中长期存在的线程,它们不会因为任务执行完成就被立即销毁,而是保持空闲状态等待新的任务,直到线程池被关闭; 与之相对的,超出核心线程数的部分称为临时线程/扩展线程,空闲超时后会被回收/销毁
任务队列:任务队列用于存储提交到线程池但尚未被核心线程或临时线程处理的任务
线程池控制器:负责线程池的初始化与销毁, 线程数量控制,当前空闲线程等线程池状态的获取
其核心目标是降低线程创建/销毁的开销、提高系统资源利用率、增强任务处理的可控性。
二、线程池构造方式
2.1动态线程池
一般的线程池有两种构造方式,一种是在线程池创建时仅初始化一部分核心线程,当生产者的添加到任务队列的任务超过当前线程池的正在执行任务的核心线程时,再创建新的临时线程来执行任务,这被称为动态线程池
graph TD
A[任务提交到线程池] --> B{线程池中的线程数 < 核心线程数?}
B -- 是 --> C[创建核心线程执行任务]
B -- 否 --> D{任务队列未满?}
D -- 是 --> E[将任务加入任务队列]
D -- 否 --> F{线程池中的线程数 < 最大线程数?}
F -- 是 --> G[创建扩展线程执行任务]
F -- 否 --> H[拒绝任务]
C --> I[任务执行完成]
E --> I
G --> I
H --> I
I --扩展线程回收, 循环--> A
2.2静态线程池
另一种是静态线程池,将所有的线程都当作核心线程,并在一开始就初始化全部线程,任务到来时据线程池线程的空闲状态来进行线程的执行。
graph TD
A[线程池初始化并创建所有线程] --> B[线程进入等待状态]
B --> C[任务提交到任务队列]
C --> D{任务队列未满?}
D --是--> F[将任务加入到任务队列]
D --否--> H[拒接任务/等待队空]
H -.-> D
F --> I{空闲线程 > 0?}
I --是--> G[唤醒线程执行任务]
I --否--> J[任务等待]
J --> I
G --循环--> C
2.3两种构造方式的对比
两种策略有各自的适用场景和权衡点。以下是它们的优劣对比和分析:
| 构造方式 | 核心逻辑 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 动态 | 先初始化核心线程,任务队列满后按需创建临时线程 | 资源利用率高:按需分配线程,空闲线程自动回收 适应性强:据任务压力自动调整线程池大小,避免任务堆积 |
响应延迟:临时线程的创建需要时间 风险性高:频繁进行线程的创建/销毁可能导致内存泄漏 |
✅任务量波动大,如Web服务器的请求处理 ✅资源受限环境,如移动端、容器化部署 |
| 静态 | 启动时直接创建所有线程 | 响应速度快:任务到达时无需等待线程初始化,直接执行 稳定性强:无动态创建/销毁线程的开销 |
灵活性差:无法根据任务量动态调整线程数,突发流量可能导致任务堆积 |
✅低延迟场景,如高频交易系统 ✅稳态任务场景,如批量数据处理 |
三、代码实现(静态)
3.1结构体构建
1 | |
此处线程池结构体pthreadpool也是写的比较简单,我们还可以在此基础上添加当前等待队列的的任务数目,当任务无法加入任务队列的的处理策略,空闲线程等待新任务的最长时间(动态)等标志或方法,同时对应的接口函数也可以添加获取当前线程池线程和任务状态的功能
但这里要注意的一点便是任务队列task_t的构建,为什么next指针的前面要加上 struct 关键词呢?这是因为在结构体定义完成之前,结构体的完整类型尚未确定,使用 struct 关键字可以让编译器知道这是一个指向未定义结构体的指针,否则编译器便会不认识这个类型error: unknown type name ‘task_t’
眼尖的朋友可能发现了结构体task_t与pthreadpool的 typedef 有点不一样,那可不可以去掉typedef struct task_t的task呢?这个问题的原因和上个问题是一样的,如果去掉,编译器会认为结构体内next指针的类型struct task_t和外部的别名task_t是两个独立的个体,从而造成不兼容warning: assignment to ‘task_t *’ from incompatible pointer type ‘struct task_t *’
3.2函数接口实现
3.2.1线程池初始化
1 | |
每个线程需要一个独立的pthread_t标识符,这里选择thread_id[i]数组形式储存线程id是为了方便后续的pthread_join线程统一回收。但是对于动态线程池来说,一次性malloc分配固定的连续内存块是不行的,这时便可以采用链表结构来管理线程 id,malloc放入 for循环 内并一次仅申请一份内存,这样便可以在动态增删线程时无需预先分配固定大小
3.2.2添加任务
1 | |
当任务队列为空时,将队列头指针和尾指针同时指向新加入的任务节点。后续的入队操作便只需要移动尾指针就可以了,这里注意要想移动尾指针当前节点的next指针指向,然后再移动尾指针指向新节点,为了保险还可以在之后将新节点指向 NULL 或将传入节点的next指针指向 NULL
要注意互斥锁框选的代码段要尽可能少,以便减少线程阻塞时间和降低死锁风险。另外大家也应该发现了我在遍历队列打印时使用了uintptr_t将void *强转为了int类型,后面 3.3主函数实现 还会介绍
uintptr_t是 C99 标准中定义的整数类型,包含头文件#include <stdint.h>使用,专门用于安全存储指针值的二进制表示(保证指针和整数可以无损转换)但是只能转换指针的物理地址
如果直接使用int强转,在 64 位系统中,将 8 字节指针强制转换为 4 字节int会丢失高 4 字节数据,所以编译器会出现warning: cast to pointer from integer of different size
3.2.3任务执行
1 | |
pthread_cond_wait:必须与互斥锁和条件检查循环配合使用,阻塞当前线程,直到条件变量
cond被触发(通过pthread_cond_signal或pthread_cond_broadcast唤醒),函数会进行 释放当前互斥锁、阻塞、被唤醒、获取互斥锁 这四个步骤互斥锁:保证在同一时间,只有一个线程能够进入临界区访问被保护的共享资源。当一个线程获取了互斥锁后,其他线程尝试获取该锁时会被阻塞,直到锁被释放
条件变量:用于在线程之间传递信号,使线程能够等待某个特定的条件成立,或者在条件成立时通知其他线程。它通常与互斥锁一起使用,允许线程在等待条件时释放锁,让其他线程有机会获取锁并修改共享状态
那就有人要问了,为什么pthread_cond_wait函数本身会阻塞线程直到条件变量被触发,但还要用while循环来检查条件呢🤪?这是因为
即使没有其他线程显式调用
pthread_cond_signal或pthread_cond_broadcast,操作系统也可能因内部优化或信号中断等原因意外唤醒线程,这被称为虚假唤醒,所以需要while循环强制线程在唤醒后重新检查条件,确保条件真正成立
3.2.4线程池销毁
1 | |
Q:为什么free一个指针后,还要将其指向NULL呢?
A:因为指针指向的内存已被释放,但指针变量仍保留原地址值,此时该指针被称为悬空指针。与之相对的指向一个未定义或无效内存地址的指针被称为野指针,野指针通常是在指针被声明后,没有被正确初始化,或者被赋予了一个不合理的值。若悬空指针被解引用、二次释放等误用,会导致 程序崩溃、数据损坏、安全漏洞等未定义行为
3.3主函数实现
1 | |
为什么你这里一会儿用uintptr_t,一会儿用atoi来将字符串类型转换为整型?问得好,实际上我上面对uintptr_t的说明漏掉了一些つ﹏⊂(已修正)
- uintptr_t 数据类型全称
unsigned long int,是根据指针地址的二进制值进行严格一致转换,*如char x = “123” 地址 0x1000 转换后 → 输出整数 4096,而且任何指针都可转换(但转换后的整型需足够大,如 64 位系统) - atio 函数 是根据字符串的语义来进行转换的,包含
#include <stdlib.h>使用,依赖 ANSI C 标准,但是只能识别转化整型,如char *str = “123abc” 转换后→ 输出整数 123 ,像这种非法错误是不会有错误提示的
由于这两个玩意都比较坑,所以我推荐使用snprintf进行类型转换,没错就是我注释的部分,不仅c语言原生自带,不用依赖外部库,且可移植性更好,支持所有格式
但是话又说回来,即便我在字符串与整型间转来转去,但是这份代码可是能无任何报错、警告成功运行的哦🤪,原因是什么呢?欸🤓,我先卖个关子,你们看代码先思考一下,我在文章结尾再告诉你们🤭,那么接下来先看演示吧
四、运行实现(gif图)
这里特地加了sleep让消费者(任务执行),比生产者(添加任务进队列)慢一些,毕竟任务设置的太简单了(偷懒一下(~ ̄▽ ̄)~)
五、总结
揭晓答案了,为什么使用uintptr_t、void*等强转类型后代码还能正常执行,甚至输出都与纯int类型没差别呢?因为任务队列中传输的一直是 uintptr_t 类型。main函数初始化传入的是 uintptr_t ,传入队列后转化为 void *,中间遍历队列和最后任务执行时都将其又还原成了uintptr_t 无符号长整形,所以输出没有问题,代码完美运行
本章主要对线程池的构建与任务队列的基本概念,运行原理,代码构建进行了介绍,其中还拓展了一些在代码实操时可能遇到的问题,对其原因、解决方法进行分析了解,那么最后如果让你亲手实现一个动态线程池,你能实现吗?