【C】一文速学----线程池原理与实战
文章目录
- 线程池简介
- 线程池原理
- 线程池实现
- 数据结构设计
- 接口设计
- 线程池实现
线程池简介
线程池是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。之所以采用线程池,因为如果每次接收请求就创建一个线程,不断地对线程进行创建和销毁对服务器来说是一种资源的浪费,尤其当接入数量持续增加,系统资源是有限的,这会成为系统的负担。
为了规避上述问题,给系统固定数量的线程来处理请求,这就是线程池。对于如何确定线程池中应该有多少线程,应该依据任务类型来看,如果是CPU密集型,由系统cpu核心数量确定,如果是IO密集型则需要至少cpu核心数量两倍的线程数量。
线程池原理
线程池工作原理如下:
线程池实现
数据结构设计
typedef struct thrdpool_s thrdpool_t;
// 任务执行的规范 ctx 上下文
typedef void (*handler_pt)(void * /* ctx */);//任务
typedef struct task_s {void *next; //下一个任务handler_pt func; //回调函数void *arg; //回调函数参数
} task_t;//任务队列
typedef struct task_queue_s {void *head; //队列头指针void **tail; //队列尾指针int block; //标识队列是否为空//为了学习不同的锁而在这里设置spinlock_t lock; //队列锁 原子锁适合轻量级、细粒度的操作,更关注性能,主要用于低竞争场景。pthread_mutex_t mutex; //互斥锁 适合复杂的同步需求,更关注功能,主要用于高竞争或复杂的资源保护。pthread_cond_t cond; //条件变量
} task_queue_t;//线程池
struct thrdpool_s {task_queue_t *task_queue; //任务队列atomic_int quit; //线程池是否关闭int thrd_count; //线程数量pthread_t *threads; //保存线程ID
};
接口设计
只向用户暴露关于线程池的创建,以及向线程池中输入任务和任务执行的函数。对于线程池内部关于任务的添加,任务的消费全部封装。
thrdpool_t *thrdpool_create(int thrd_count);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
线程池实现
基于上面的设定,对线程池给予实现
//创建任务队列
static task_queue_t *_taskqueue_create()
{int ret;task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));if (queue){ret = pthread_mutex_init(&queue->mutex, NULL);if (ret == 0){ret = pthread_cond_init(&queue->cond, NULL);if (ret == 0){queue->head = NULL;//二级指针,指向head指针的地址queue->tail = &queue->head;//设置队列是锁住的,无任务queue->block = 1;return queue;}pthread_mutex_destroy(&queue->mutex);}free(queue);}return NULL;
}
//从队列弹出任务
static void * _pop_task(task_queue_t *queue){//原子锁spinlock_lock(&queue->lock);if(queue->head==NULL){spinlock_unlock(&queue->lock);return NULL;}task_t *task;task=queue->head;//强制转换task为二级指针,因为task结构体第一个是next指针,其实link指向的是nextvoid **link = (void**)task;//*link的值就是next指向的节点queue->head=*link;if(queue->head==NULL){queue->tail=&queue->head;}spinlock_unlock(&queue->lock);return task;
}
//获得任务
static void * _get_task(task_queue_t *queue){task_t *task;//当无法再弹出任务是进入循环while((task=_pop_task(queue))==NULL){pthread_mutex_lock(&queue->mutex);//当任务队列锁住时退出互斥锁if (queue->block == 0) {pthread_mutex_unlock(&queue->mutex);return NULL;}// 1. 先 unlock(&mtx)// 2. 在 cond 休眠// --- __add_task 时唤醒// 3. 在 cond 唤醒// 4. 加上 lock(&mtx);//释放锁,等待信号量变化;当信号量变化,再次重新获得锁pthread_cond_wait(&queue->cond, &queue->mutex);pthread_mutex_unlock(&queue->mutex);}return task;
}
//线程执行函数
static void *_thrdpool_worker(void *arg){thrdpool_t *pool=(thrdpool_t *)arg;task_t* task;void *ctx;//检测线程池是否在运行状态while(atomic_load(&pool->quit)==0){task=(task_t*)_get_task(pool->task_queue);if(!task) break;handler_pt func=task->func;ctx=task->arg;free(task);//执行任务函数func(ctx);}
}
//任务队列添加任务
static void _add_task(task_queue_t *queue,void *task){//强制转换二级指针,link指向nextvoid **link = (void **)task;//next指向为NULL*link=NULL;spinlock_lock(&queue->lock);//*queue->tail表示next的值,link即是指向next,同时那个地址也是task的首地址*queue->tail=link;queue->tail=link;spinlock_unlock(&queue->lock);
}
//往线程池中放入任务
int thrdpool_post(thrdpool_t* pool,handler_pt func,void *arg){if(atomic_load(&pool->quit)==1) return -1;task_t * task =(task_t*)malloc(sizeof(task_t));if(!task) return -1;task->func=func;task->arg=arg;_add_task(pool->task_queue,task);return 0;
}
//创建线程
static int _threads_create(thrdpool_t *pool, int count)
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret == 0){pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * count);if (pool->threads){int i = 0;for (i = 0; i < count; i++){if ((pthread_create(&pool->threads[i], &attr, _thrdpool_worker, pool)) != 0){break;}}pool->thrd_count = i;pthread_attr_destroy(&attr);}ret = -1;}return ret;
}
//线程池创建
thrdpool_t *thrdpool_create(int thrd_count)
{thrdpool_t *pool;pool = (thrdpool_t *)malloc(sizeof(thrdpool_t));if (pool){//创建任务队列task_queue_t *queue = _taskqueue_create();if (queue){pool->task_queue = queue;//初始化线程池标志,0表示线程池在运行状态,1表示停止运行状态atomic_init(&pool->quit,0);if (__threads_create(pool, thrd_count) == 0)return pool;_taskqueue_destroy(queue);}free(pool);}return NULL;
}
//销毁任务队列
static void _taskqueue_destroy(task_queue_t *queue) {task_t *task;while ((task = __pop_task(queue))) {free(task);}spinlock_destroy(&queue->lock);pthread_cond_destroy(&queue->cond);pthread_mutex_destroy(&queue->mutex);free(queue);
}
//关闭线程池
void thrdpool_waitdone(thrdpool_t *pool) {int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}_taskqueue_destroy(pool->task_queue);free(pool->threads);free(pool);
}
在上面的代码中涉及到强制转换二级指针的操作,如果不能很好的理解,可以参考【C】二级指针的原理与应用