当前位置: 首页 > news >正文

【C】一文速学----线程池原理与实战

文章目录

  • 线程池简介
  • 线程池原理
  • 线程池实现
    • 数据结构设计
    • 接口设计
    • 线程池实现

线程池简介

线程池是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。之所以采用线程池,因为如果每次接收请求就创建一个线程,不断地对线程进行创建和销毁对服务器来说是一种资源的浪费,尤其当接入数量持续增加,系统资源是有限的,这会成为系统的负担。

为了规避上述问题,给系统固定数量的线程来处理请求,这就是线程池。对于如何确定线程池中应该有多少线程,应该依据任务类型来看,如果是CPU密集型,由系统cpu核心数量确定,如果是IO密集型则需要至少cpu核心数量两倍的线程数量。

线程池原理

线程池工作原理如下:
在这里插入图片描述

线程池实现

数据结构设计


typedef struct thrdpool_s thrdpool_t;
// 任务执行的规范 ctx 上下文
typedef void (*handler_pt)(void * /* ctx */);//任务
typedef struct task_s {void *next;  //下一个任务handler_pt func; //回调函数void *arg;  //回调函数参数
} task_t;//任务队列
typedef struct task_queue_s {void *head; //队列头指针void **tail; //队列尾指针int block;  //标识队列是否为空//为了学习不同的锁而在这里设置spinlock_t lock; //队列锁  原子锁适合轻量级、细粒度的操作,更关注性能,主要用于低竞争场景。pthread_mutex_t mutex; //互斥锁 适合复杂的同步需求,更关注功能,主要用于高竞争或复杂的资源保护。pthread_cond_t cond; //条件变量
} task_queue_t;//线程池
struct thrdpool_s {task_queue_t *task_queue; //任务队列atomic_int quit; //线程池是否关闭int thrd_count; //线程数量pthread_t *threads; //保存线程ID
};

接口设计

只向用户暴露关于线程池的创建,以及向线程池中输入任务和任务执行的函数。对于线程池内部关于任务的添加,任务的消费全部封装。

thrdpool_t *thrdpool_create(int thrd_count);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);

线程池实现

基于上面的设定,对线程池给予实现

//创建任务队列
static task_queue_t *_taskqueue_create()
{int ret;task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));if (queue){ret = pthread_mutex_init(&queue->mutex, NULL);if (ret == 0){ret = pthread_cond_init(&queue->cond, NULL);if (ret == 0){queue->head = NULL;//二级指针,指向head指针的地址queue->tail = &queue->head;//设置队列是锁住的,无任务queue->block = 1;return queue;}pthread_mutex_destroy(&queue->mutex);}free(queue);}return NULL;
}
//从队列弹出任务
static void * _pop_task(task_queue_t *queue){//原子锁spinlock_lock(&queue->lock);if(queue->head==NULL){spinlock_unlock(&queue->lock);return NULL;}task_t *task;task=queue->head;//强制转换task为二级指针,因为task结构体第一个是next指针,其实link指向的是nextvoid **link = (void**)task;//*link的值就是next指向的节点queue->head=*link;if(queue->head==NULL){queue->tail=&queue->head;}spinlock_unlock(&queue->lock);return task;
}
//获得任务
static void * _get_task(task_queue_t *queue){task_t *task;//当无法再弹出任务是进入循环while((task=_pop_task(queue))==NULL){pthread_mutex_lock(&queue->mutex);//当任务队列锁住时退出互斥锁if (queue->block == 0) {pthread_mutex_unlock(&queue->mutex);return NULL;}// 1. 先 unlock(&mtx)// 2. 在 cond 休眠// --- __add_task 时唤醒// 3. 在 cond 唤醒// 4. 加上 lock(&mtx);//释放锁,等待信号量变化;当信号量变化,再次重新获得锁pthread_cond_wait(&queue->cond, &queue->mutex);pthread_mutex_unlock(&queue->mutex);}return task;
}
//线程执行函数
static void *_thrdpool_worker(void *arg){thrdpool_t *pool=(thrdpool_t *)arg;task_t* task;void *ctx;//检测线程池是否在运行状态while(atomic_load(&pool->quit)==0){task=(task_t*)_get_task(pool->task_queue);if(!task) break;handler_pt func=task->func;ctx=task->arg;free(task);//执行任务函数func(ctx);}
}
//任务队列添加任务
static void _add_task(task_queue_t *queue,void *task){//强制转换二级指针,link指向nextvoid **link = (void **)task;//next指向为NULL*link=NULL;spinlock_lock(&queue->lock);//*queue->tail表示next的值,link即是指向next,同时那个地址也是task的首地址*queue->tail=link;queue->tail=link;spinlock_unlock(&queue->lock);
}
//往线程池中放入任务
int thrdpool_post(thrdpool_t* pool,handler_pt func,void *arg){if(atomic_load(&pool->quit)==1) return -1;task_t * task =(task_t*)malloc(sizeof(task_t));if(!task) return -1;task->func=func;task->arg=arg;_add_task(pool->task_queue,task);return 0;
}
//创建线程
static int _threads_create(thrdpool_t *pool, int count)
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret == 0){pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * count);if (pool->threads){int i = 0;for (i = 0; i < count; i++){if ((pthread_create(&pool->threads[i], &attr, _thrdpool_worker, pool)) != 0){break;}}pool->thrd_count = i;pthread_attr_destroy(&attr);}ret = -1;}return ret;
}
//线程池创建
thrdpool_t *thrdpool_create(int thrd_count)
{thrdpool_t *pool;pool = (thrdpool_t *)malloc(sizeof(thrdpool_t));if (pool){//创建任务队列task_queue_t *queue = _taskqueue_create();if (queue){pool->task_queue = queue;//初始化线程池标志,0表示线程池在运行状态,1表示停止运行状态atomic_init(&pool->quit,0);if (__threads_create(pool, thrd_count) == 0)return pool;_taskqueue_destroy(queue);}free(pool);}return NULL;
}
//销毁任务队列
static void _taskqueue_destroy(task_queue_t *queue) {task_t *task;while ((task = __pop_task(queue))) {free(task);}spinlock_destroy(&queue->lock);pthread_cond_destroy(&queue->cond);pthread_mutex_destroy(&queue->mutex);free(queue);
}
//关闭线程池
void thrdpool_waitdone(thrdpool_t *pool) {int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}_taskqueue_destroy(pool->task_queue);free(pool->threads);free(pool);
}

在上面的代码中涉及到强制转换二级指针的操作,如果不能很好的理解,可以参考【C】二级指针的原理与应用


http://www.mrgr.cn/news/70731.html

相关文章:

  • 开源 - Ideal库 - 常用枚举扩展方法(二)
  • Elasticsearch中什么是倒排索引?
  • 【Python】轻松实现机器翻译:Transformers库使用教程
  • Conpair: 配对样本一致性concordance与污染contamination分析
  • python opencv3
  • 计算机网络:运输层 —— 运输层端口号
  • 【计算机网络】网络框架
  • C0028.在Clion中快速生成头文件中声明的函数的方法
  • 车载诊断架构---域控下挂节点信息同步策略
  • 基于51单片机密码锁—有3个密码lcd1602显示
  • 【项目开发】RESTful架构及RESTful API设计指南
  • dapp获取钱包地址,及签名
  • js.零钱兑换
  • python:用 sklearn 转换器处理数据
  • 【C++ 篇】类之华章:超越固有模式,品味面向对象的璀璨光芒
  • OSG开发笔记(三十一):OSG中LOD层次细节模型介绍和使用
  • MySQL数据库的备份与还原
  • 大模型论文精华—20241111
  • 贪心算法day05(k次取反后最大数组和 田径赛马)
  • 3.keeplived配置文件
  • VideoChat:开源的数字人实时对话系统,支持自定义数字人的形象和音色
  • 二维差分矩阵 模板题
  • 李佳琦回到巅峰背后,双11成直播电商分水岭
  • 链式结构二叉树
  • 【QT常用技术讲解】任务栏图标+socket网络服务+开机自启动
  • 项目管理平台盘点:2024推荐的9款优质工具