当前位置: 首页 > news >正文

C语言实现高并发编程——线程池

目录

 

概念

实现原理

1、线程池结构

2、线程池中存储的所有信息

线程池接口设计

1、线程池初始化

2、往线程池中投送任务

3、增加活跃线程 

4、删除活跃线程

5、销毁线程池

6、例子


概念

        线程池就是假设在程序设计时需要有很多任务需要按一定顺序执行(可能是并行执行也可能有先后顺序),所以我们为了满足所有任务的需要初始化了很多个线程,

但是这些任务的执行有些是按先后顺序执行的,如果让这些线程都同时运行无疑要占用大量的系统资源,

        为了节省系统资源,我们把所有的线程都设置为休眠态,然后我们将所有准备要执行的任务用一个单链表(或者队列,总之就是一个链式结构)进行存储,

        每一个准备要执行的任务对应单链表的一个节点,需要执行一个任务的时候我们就将这个任务所在的节点从单链表中剔除,然后唤醒一个正在休眠的进程去执行他,这样的话我们线程的操纵就会很灵活,不会导致一些线程在空转浪费内存资源的情况,

        同理,我们也可以去增加唤醒的线程,就是此刻正在执行的线程和需要执行的任务都完全受程序员的操控,如此一来就能实现高并发状态下服务器内存资源的节省,线程池需要搭配互斥锁的使用,为了避免某些共享资源的访问冲突

实现原理

1、线程池结构

任务节点队列设计

任务节点队列由一个回调函数指针,参数,还有链表指针构成

struct task
{void *(*do_task)(void *arg); // 函数指针,指向要执行任务void *arg;					 // 传递给任务函数的参数struct task *next;
};

2、线程池中存储的所有信息

typedef struct thread_pool
{pthread_mutex_t lock;  //互斥锁,保护任务队列pthread_cond_t cond;   //条件变量,同步所有线程bool shutdown;   //线程池销毁标记struct task *task_list;   //任务链队列指针pthread_t *tids;   //线程ID存放位置unsigned max_waiting_tasks;   unsigned waiting_tasks;   //任务链队列中等待的任务个数unsigned active_threads;   //当前活跃线程个数
} thread_pool;

线程池接口设计

1、线程池初始化

bool init_pool(thread_pool *pool, unsigned int threads_number);

//初始化线程池结构体 
bool init_pool(thread_pool *pool, unsigned int threads_number)
{//初始化锁和条件变量  pthread_mutex_init(&pool->lock, NULL);pthread_cond_init(&pool->cond, NULL);pool->shutdown = false; //关键标记设置为假 pool->task_list = malloc(sizeof(struct task)); //初始化头节点  pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); //sizeof(pthread_t) * 20 ,数组有20个元素        //判断堆空间是否分配成功if(pool->task_list == NULL || pool->tids == NULL){perror("allocate memory error");return false;}//初始化任务链表头节点pool->task_list->next = NULL;//初始化最大任务数 pool->max_waiting_tasks = MAX_WAITING_TASKS; //最多可以有 1000 个任务  pool->waiting_tasks = 0; //当前等待的任务数为 0  pool->active_threads = threads_number; //需要创建的活跃线程数 int i;for(i=0; i<pool->active_threads; i++)  //循环创建多个线程{  //&((pool->tids)[i] 把tid存储到堆空间 ,routine 线程执行的工作  ,把整个管理结构体传递给线程 if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0){perror("create threads error");return false;}#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);#endif}return true;
}

2、往线程池中投送任务

bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{// 新建一个任务节点struct task *new_task = malloc(sizeof(struct task));if (new_task == NULL){perror("allocate memory error");return false;}// 初始化任务节点new_task->do_task = do_task;new_task->arg = arg;new_task->next = NULL;// 上锁 ,防止在添加任务的时候,有别的线程去拿任务//============ LOCK =============//pthread_mutex_lock(&pool->lock);//===============================//// 如果当前等待的任务 大于 最大任务数if (pool->waiting_tasks >= MAX_WAITING_TASKS){pthread_mutex_unlock(&pool->lock); // 解锁fprintf(stderr, "too many tasks.\n");free(new_task); // 放弃任务return false;	// 结束函数}struct task *tmp = pool->task_list;while (tmp->next != NULL)tmp = tmp->next; // 偏移链表末尾tmp->next = new_task;  // 尾插pool->waiting_tasks++; // 任务数量增加//=========== UNLOCK ============//pthread_mutex_unlock(&pool->lock); // 任务已经添加完毕了,可以解锁了//===============================//#ifdef DEBUGprintf("[%u][%s] ==> a new task has been added.\n",(unsigned)pthread_self(), __FUNCTION__);
#endifpthread_cond_signal(&pool->cond); // 通知正在休眠的线程干活!return true;
}

3、增加活跃线程 

int add_thread(thread_pool *pool, unsigned int additional_threads_number);

int add_thread(thread_pool *pool, unsigned additional_threads)
{// 如果添加的线程数为 0 则 退出if (additional_threads == 0)return 0;// 总线程数                 当前的              +  添加的unsigned total_threads = pool->active_threads + additional_threads;int i, actual_increment = 0;// 循环创建新的任务线程for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){perror("add threads error");// no threads has been created, return failif (actual_increment == 0)return -1;break;}// 增加创建成功的线程数actual_increment++;#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}pool->active_threads += actual_increment;// 返回成功创建的线程数return actual_increment;
}

4、删除活跃线程

int remove_thread(thread_pool *pool, unsigned int removing_threads_number);

int remove_thread(thread_pool *pool, unsigned int removing_threads)
{// 如果需要删除 0 个直接退出if (removing_threads == 0)return pool->active_threads;//              剩余      = 当前的  -  需要删除的int remaining_threads = pool->active_threads - removing_threads;// 如果剩余的 小于 0 ,则保留一个线程,执行任务remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int i;for (i = pool->active_threads - 1; i > remaining_threads - 1; i--){// 循环取消线程errno = pthread_cancel(pool->tids[i]);if (errno != 0)break;
#ifdef DEBUGprintf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}// 返回剩下的活跃线程if (i == pool->active_threads - 1)return -1;else{pool->active_threads = i + 1;return i + 1;}
}

5、销毁线程池

 bool destroy_pool(thread_pool *pool);

bool destroy_pool(thread_pool *pool)
{// 1, activate all threadspool->shutdown = true; // 设置关机标记为真// 广播条件变量,唤醒所有休眠的线程pthread_cond_broadcast(&pool->cond);// 2, wait for their exitingint i;for (i = 0; i < pool->active_threads; i++){errno = pthread_join(pool->tids[i], NULL); // 循环回收线程资源if (errno != 0){printf("join tids[%d] error: %s\n",i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}// 3, free memories 释放所有的堆空间free(pool->task_list);free(pool->tids);free(pool);return true;
}

6、例子

通过线程池复制目录

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>#include <errno.h>
#include <pthread.h>#define MAX_WAITING_TASKS 1000
#define MAX_ACTIVE_THREADS 20struct task
{void *(*do_task)(void *arg); // 函数指针,指向要执行任务void *arg;					 // 传递给任务函数的参数struct task *next;
};typedef struct thread_pool
{pthread_mutex_t lock;pthread_cond_t cond;bool shutdown;struct task *task_list;pthread_t *tids;unsigned max_waiting_tasks;unsigned waiting_tasks;unsigned active_threads;
} thread_pool;bool init_pool(thread_pool *pool, unsigned int threads_number);
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
void *routine(void *arg);//添加活跃线程 
//pool:线程池结构体 
//additional_threads_number:需要添加的线程数
int add_thread(thread_pool *pool, unsigned int additional_threads_number);//删除活跃的删除 
//pool:线程池结构体 
//removing_threads_number:需要删除的线程数
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);//销毁线程池
bool destroy_pool(thread_pool *pool);#endif

thread_pool.c

#include "thread_pool.h"void handler(void *arg)
{printf("[%u] is ended.\n",(unsigned)pthread_self());pthread_mutex_unlock((pthread_mutex_t *)arg);
}// 线程的任务函数 ,所有重点操作都在该函数中
void *routine(void *arg)
{#ifdef DEBUGprintf("[%u] is started.\n", (unsigned)pthread_self()); // 打印线程的TID
#endif// 获取线程池结构体的地址,因为条件变量和互斥锁都在该结构体定义thread_pool *pool = (thread_pool *)arg;// 定义任务指针,指向要提取的任务struct task *p;while (1){/*** push a cleanup functon handler(), make sure that** the calling thread will release the mutex properly** even if it is cancelled during holding the mutex.**** NOTE:** pthread_cleanup_push() is a macro which includes a** loop in it, so if the specified field of codes that** paired within pthread_cleanup_push() and pthread_** cleanup_pop() use 'break' may NOT break out of the** truely loop but break out of these two macros.** see line 61 below.*///================================================//// 注册取消处理函数,&pool->lock 把锁传递过去,当前线程突然被取消时,防止死锁pthread_cleanup_push(handler, (void *)&pool->lock);// 上锁 ,因为所有线程都是在同一个任务列表中提取任务,用互斥锁保护链表中的任务pthread_mutex_lock(&pool->lock);//================================================//// 1, no task, and is NOT shutting down, then waitwhile (pool->waiting_tasks == 0 && !pool->shutdown){pthread_cond_wait(&pool->cond, &pool->lock); // 如果没有任何任务,则进入等待状态}// 2, no task, and is shutting down, then exitif (pool->waiting_tasks == 0 && pool->shutdown == true){pthread_mutex_unlock(&pool->lock);pthread_exit(NULL); // CANNOT use 'break'; //如果没有任务且是关机状态则,退出线程}// 3, have some task, then consume it 如果有任务p = pool->task_list->next;		 // 指向第一个任务节点pool->task_list->next = p->next; // 把任务节点从链表提取出来pool->waiting_tasks--;			 // 任务数量减少//================================================//pthread_mutex_unlock(&pool->lock); // 解锁 ,因为已经取完了任务pthread_cleanup_pop(0);//================================================//pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 关闭取消请求,确保任务被执行完毕(p->do_task)(p->arg);								  // 执行任务并传递参数 void *(*do_task)(void *arg);pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);  // 任务执行完毕后重启开启取消请求free(p); // 释放任务节点}pthread_exit(NULL);
}// 初始化线程池结构体
bool init_pool(thread_pool *pool, unsigned int threads_number)
{pthread_mutex_init(&pool->lock, NULL);pthread_cond_init(&pool->cond, NULL);pool->shutdown = false;pool->task_list = malloc(sizeof(struct task));pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);if (pool->task_list == NULL || pool->tids == NULL){perror("allocate memory error");return false;}pool->task_list->next = NULL;pool->max_waiting_tasks = MAX_WAITING_TASKS;pool->waiting_tasks = 0;pool->active_threads = threads_number;int i;for (i = 0; i < pool->active_threads; i++){if (pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0){perror("create threads error");return false;}#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}return true;
}bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{// 新建一个任务节点struct task *new_task = malloc(sizeof(struct task));if (new_task == NULL){perror("allocate memory error");return false;}// 初始化任务节点new_task->do_task = do_task;new_task->arg = arg;new_task->next = NULL;// 上锁 ,防止在添加任务的时候,有别的线程去拿任务//============ LOCK =============//pthread_mutex_lock(&pool->lock);//===============================//// 如果当前等待的任务 大于 最大任务数if (pool->waiting_tasks >= MAX_WAITING_TASKS){pthread_mutex_unlock(&pool->lock); // 解锁fprintf(stderr, "too many tasks.\n");free(new_task); // 放弃任务return false;	// 结束函数}struct task *tmp = pool->task_list;while (tmp->next != NULL)tmp = tmp->next; // 偏移链表末尾tmp->next = new_task;  // 尾插pool->waiting_tasks++; // 任务数量增加//=========== UNLOCK ============//pthread_mutex_unlock(&pool->lock); // 任务已经添加完毕了,可以解锁了//===============================//#ifdef DEBUGprintf("[%u][%s] ==> a new task has been added.\n",(unsigned)pthread_self(), __FUNCTION__);
#endifpthread_cond_signal(&pool->cond); // 通知正在休眠的线程干活!return true;
}int add_thread(thread_pool *pool, unsigned additional_threads)
{// 如果添加的线程数为 0 则 退出if (additional_threads == 0)return 0;// 总线程数                 当前的              +  添加的unsigned total_threads = pool->active_threads + additional_threads;int i, actual_increment = 0;// 循环创建新的任务线程for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){perror("add threads error");// no threads has been created, return failif (actual_increment == 0)return -1;break;}// 增加创建成功的线程数actual_increment++;#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}pool->active_threads += actual_increment;// 返回成功创建的线程数return actual_increment;
}int remove_thread(thread_pool *pool, unsigned int removing_threads)
{// 如果需要删除 0 个直接退出if (removing_threads == 0)return pool->active_threads;//              剩余      = 当前的  -  需要删除的int remaining_threads = pool->active_threads - removing_threads;// 如果剩余的 小于 0 ,则保留一个线程,执行任务remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int i;for (i = pool->active_threads - 1; i > remaining_threads - 1; i--){// 循环取消线程errno = pthread_cancel(pool->tids[i]);if (errno != 0)break;
#ifdef DEBUGprintf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}// 返回剩下的活跃线程if (i == pool->active_threads - 1)return -1;else{pool->active_threads = i + 1;return i + 1;}
}bool destroy_pool(thread_pool *pool)
{// 1, activate all threadspool->shutdown = true; // 设置关机标记为真// 广播条件变量,唤醒所有休眠的线程pthread_cond_broadcast(&pool->cond);// 2, wait for their exitingint i;for (i = 0; i < pool->active_threads; i++){errno = pthread_join(pool->tids[i], NULL); // 循环回收线程资源if (errno != 0){printf("join tids[%d] error: %s\n",i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}// 3, free memories 释放所有的堆空间free(pool->task_list);free(pool->tids);free(pool);return true;
}

main.c

#include <stdio.h>
#include <sys/types.h>
#include <dirent.h>
#include "thread_pool.h"
#include <sys/stat.h>thread_pool *pool = NULL;struct file
{char old_file[4096];char new_file[4096];
};// 拷贝需要两个文件名
void *cpfile(void *arg)
{struct file *p = arg;// 1.打开源文件FILE *fp = fopen(p->old_file, "r+");if (fp == NULL){printf("打开 %s 失败\n", p->old_file);exit(0);}// 1.创建新文件FILE *xfp = fopen(p->new_file, "w+");if (xfp == NULL){printf("创建新文件 %s 失败\n", p->new_file);exit(0);}while (1){// 读取源文件int ch = fgetc(fp);if (feof(fp)) // 没有到达末尾返回 0        -> 假break;    // 返回非零值代表已到达文件尾 -> 真// 写入新文件fputc(ch, xfp);}printf("%s 拷贝成功\n", p->old_file);// 关闭所有打开的文件fclose(fp);fclose(xfp);// 释放堆空间free(p);
}void cpdir(char *old_dir, char *new_dir)
{// 1.打开目录DIR *dp = opendir(old_dir);if (dp == NULL){perror("打开目录失败\n");exit(0);}// 2.创建新目录mkdir(new_dir, 0777);// 2.读取目录while (1){struct dirent *p = readdir(dp);if (p == NULL){break;}// 跳过隐藏文件if (p->d_name[0] == '.'){continue;}// 判断是否为目录文件if (p->d_type == DT_DIR){// 继续检索char o_path[4096] = {0};char n_path[4096] = {0};sprintf(o_path, "%s/%s", old_dir, p->d_name);sprintf(n_path, "%s/%s", new_dir, p->d_name);cpdir(o_path, n_path); // 递归调用}// 判断是否为普通文件if (p->d_type == DT_REG){// 初始化文件结构体struct file *f = malloc(sizeof(struct file));sprintf(f->old_file, "%s/%s", old_dir, p->d_name);sprintf(f->new_file, "%s/%s", new_dir, p->d_name);printf("正在拷贝 %s\n", f->new_file);// 添加拷贝任务add_task(pool, cpfile, f);}}closedir(dp);
}int main(int argc, char *argv[])
{if (argc != 3){printf("请输入源目录与新目录\n");return -1;}// 1.创建一个线程池结构体pool = malloc(sizeof(thread_pool));// 2.初始化线程池结构体init_pool(pool, 10);// 3.执行拷贝目录的操作cpdir(argv[1], argv[2]);// 退出主线程pthread_exit(0);
}


http://www.mrgr.cn/news/60701.html

相关文章:

  • Docker 常用命令全解析:提升对雷池社区版的使用经验
  • 如何排查断连问题——《OceanBase诊断系列》十三
  • SSA-CNN-LSTM-MATT多头注意力机制多特征分类预测
  • 【小白学机器学习21】 理解假设检验的关键:反证法
  • 深度学习_循环神经网络_预测平安中国股价(文末附带数据集下载链接, 长期有效, 如果有大佬愿意帮忙, 我先在这磕一个,感谢)
  • Redis 过期策略 总结
  • Open3D-Geometry-14:Distance Queries距离查询方法将网格生成为隐式表示
  • 【专题】关系模型的基本理论
  • 使用chatglm API处理论文
  • 排序算法简记
  • 五、Hadoop 分布式文件系统(HDFS)的原理与架构专业解析
  • python 数据结构 1
  • 一文贯通RAG的技术介绍和构建(简易版+附详细代码)
  • 2024年【制冷与空调设备安装修理】考试内容及制冷与空调设备安装修理最新解析
  • Java程序设计:spring boot(12)——定时调度集成 - Quartz
  • 怎么把照片恢复至手机?一文读懂详细教程与多种方法!
  • 从JDK 17 到 JDK 21:Java 新特性
  • 北理工计算机考研难度分析
  • ctfshow(151->154)--文件上传漏洞--.user.ini
  • 热门四款深度数据恢复软件大比拼!!!
  • 一个临床数据收集/调查问卷APP模板(streamlit+MongoDB)
  • rand5生成rand7
  • 代码随想录之字符串
  • Linux 进程间通信_匿名管道
  • IE快捷方式加载特定主页
  • 二叉树的存储方式和遍历方式