当前位置: 首页 > news >正文

Linux系统基础-进程间通信(4)_模拟实现进程池

个人主页:C++忠实粉丝
欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C++忠实粉丝 原创

Linux系统基础-进程间通信(4)_模拟实现进程池

收录于专栏[Linux学习]
本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨论💌

目录

1. 基础库引入模块

2. Channel类模块

成员变量

构造函数

成员函数

析构函数

总体分析

3. 进程与管道创建模块

参数

函数逻辑

创建管道:

创建子进程:

父进程操作:

总体分析

4. 任务分配与执行控制模块

NextChannel 函数

SendTaskCommand 函数

ctrlProcessOnce 函数

ctrlProcess 函数

5. 清理与资源回收模块

6. 主函数模块

7. 任务管理模块

8. 效果展示


1. 基础库引入模块

功能 : 包含标准库和自定义任务头文件.

#include <iostream>      // 标准输入输出流
#include <string>       // 字符串处理
#include <vector>       // 向量(动态数组)
#include <unistd.h>     // UNIX 标准函数
#include <sys/types.h>  // 数据类型定义
#include <sys/wait.h>   // 进程等待
#include "Task.hpp"     // 自定义任务头文件,假设其中定义了任务相关的函数和类型

 这里引入必要的标准库和自定义库, 以支持后续功能的实现

2. Channel类模块

功能 : 封装管道及进程信息

class Channel
{
public:Channel(int wfd, pid_t id, const std::string& name):_wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() {return _wfd;}pid_t GetProcessId() {return _subprocessid;}std::string GetName() {return _name;}void closeChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if(rid > 0){std::cout << "wait " << rid << "success" << std::endl;}}~Channel(){}private:int _wfd;pid_t _subprocessid;std::string _name;
};

成员变量

int _wfd : 代表写入文件描述符, 用于进程间通信

pid_t _subprocessid : 存储子进程的pid, 用于管理子进程

_name : 存储通道的名称

构造函数

构造函数初始化成员变量, 使用初始化列表初始化 _wfd, _subprocessid, _name

成员函数

int GetWfd() : 

返回写入文件描述符, 方便其他类或函数使用这个文件描述符进行写操作

pid_t GetProcessid() : 

返回子进程pid

std::string GetName() :

返回通道名称

void closeChannel() :

关闭写入文件描述符, 释放系统资源, 函数中使用 close(_wd) 确保在不需要时清理资源

void Wait() : 

等待进程结束, 使用waitpid 函数来阻塞当前进程, 直到指定进程结束, 如果waitpid返回值大于0, 表示成功等待子进程, 控制台会输出成功信息.

析构函数

由于文件描述符在 closeChannel 中关闭, 析构时不需要额外操作.

总体分析

这个Channel类为管理进程间提供了一个简单而有效的封装, 它使得管道的创建和管理变得更为方便, 提供了很好的接口来处理与子进程的交互

3. 进程与管道创建模块

功能 : 创建管道和子进程的功能

//进程与管道创建模块
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{for(int i = 0; i < num; i++){//1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0) exit(1);//2. 创建子进程pid_t id = fork();if(id == 0){if(!channels->empty()){//第二次之后, 开始创建新的管道for(auto &channel : *channels) channel.CloseChannel();}//child - readclose(pipefd[1]);dup2(pipefd[0], 0); //将管道的读端重定向到标准输入task();close(pipefd[0]);exit(0);}//3. 构建一个channel名称std::string channel_name = "Channel-" + std::to_string(i);//父进程close(pipefd[0]);//a. 子进程的pid b. 父进程关心的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name)); }
}

参数

int num:要创建的子进程数量。

std::vector<Channel> *channels:

指向 Channel 对象的指针,用于存储每个子进程及其对应的管道信息。

task_t task:传入的任务函数指针,子进程执行的任务。

函数逻辑

创建管道:

使用 pipe(pipefd) 创建一个管道,pipefd[0] 为读端,pipefd[1] 为写端。若创建失败,调用 exit(1) 终止程序。

创建子进程:

使用 fork() 创建子进程。返回值 id:

如果 id 为 0,表示当前进程为子进程。

子进程执行以下操作:

        如果 channels 不为空,关闭已存在的管道的写端,确保没有文件描述符泄露。

        关闭管道的写端 (close(pipefd[1])),并使用 dup2(pipefd[0], 0) 将管道的读端重定向到标准输入,以便子进程可以从管道读取数据。

        执行传入的 task() 函数。

        关闭读端并退出。

父进程操作:

父进程关闭管道的读端 (close(pipefd[0])),因为只关心写端。

创建一个通道名称,如 "Channel-0",然后将子进程的 PID 和写端的文件描述符封装到 Channel 对象中,并将其添加到 channels 中。

总体分析

这个函数高效地管理了多进程创建与通信。它确保每个子进程拥有独立的管道,并且在创建新的管道前关闭不再使用的管道,避免资源泄露。task 函数的执行允许用户定义子进程的具体工作,实现灵活的任务分配。这种结构适合需要并行处理的场景,如数据处理或并发计算。

4. 任务分配与执行控制模块

功能 : 负责任务的选择和分配给子进程

//0 1 2 3 4 channelnum
int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}//任务分配与执行控制模块
void SendTaskCommand(Channel &Channel, int taskcommand)
{write(Channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}void ctrlProcessOnce(std::vector<Channel> &Channels)
{sleep(1);//1. 选择一个任务int taskcommand = SelectTask();//2. 选择一个信道和进程int channel_index = NextChannel(Channels.size());//3. 发送任务SendTaskCommand(Channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << "channel: " << Channels[channel_index].GetName() << "sub process: " << Channels[channel_index].GetProcessId() << std::endl;
}void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if(times > 0){while(times--){ctrlProcessOnce(channels);}}else{while(true){ctrlProcessOnce(channels);}}
} 

NextChannel 函数

功能:循环返回下一个信道的索引。

实现:

使用静态变量 next 来保存当前的信道索引。

每次调用时返回当前的 next 值,并将其增加,随后使用取模操作确保它不会超过 channelnum - 1。

特点:保证信道的选择是循环的,这样可以平均分配任务到各个信道上。

SendTaskCommand 函数

功能:发送任务命令到指定的信道。

实现:

使用 write 函数将 taskcommand 的值写入信道的写端。

Channel.GetWfd() 获取信道的写文件描述符(WFD),确保命令能够被子进程接收。

特点:该函数直接进行系统调用以发送数据.

ctrlProcessOnce 函数

功能:控制单次任务分配和发送。

实现:

首先调用 sleep(1) 暂停执行,以控制任务发送的频率。

使用 SelectTask() 选择一个任务命令。

调用 NextChannel 获取下一个信道的索引。

发送选定的任务到对应信道。

打印发送的任务信息,包括任务命令、信道名称和子进程 ID。

ctrlProcess 函数

功能:控制多个任务的发送。

实现:

接受一个可选的参数 times,用于指定任务发送的次数。

如果 times 大于 0,则执行 times 次任务发送;否则,进入无限循环,不断发送任务。

特点:通过这个函数,用户可以灵活控制任务的发送次数,适应不同的需求场景。

5. 清理与资源回收模块

功能 : 负责关闭管道和回收子进程

void CleanUpChannel(std::vector<Channel> &channels)
{for(auto &channels : channels){channels.CloseChannel();channels.Wait();}
}

Wait() 会调用操作系统的 wait 系统调用,确保主进程在子进程结束之前不会继续执行,从而避免僵尸进程。 

6. 主函数模块

功能 : 处理命令行参数和程序的主逻辑。

//主函数模块
//处理命令行参数和程序的主逻辑。int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);LoadTask();std::vector<Channel> channels;// 1. 创建信道和子进程CreateChannelAndSub(num, &channels, work1);// 2. 通过channel控制子进程ctrlProcess(channels, 5);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);// sleep(100);return 0;
}

int main(int argc, char *argv[]): 这是程序的入口点。argc 表示命令行参数的数量,argv 是一个字符串数组,存储所有参数。

if (argc != 2): 这个条件检查传入的参数数量是否等于 2。第一个参数(argv[0])是程序本身的名称,第二个参数(argv[1])应该是用户提供的。

7. 任务管理模块

功能 : 主要用于创建, 加载, 和执行一些简单的任务 

#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针类型void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 17777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if (number < 0 || number > 2)return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}void work1()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}void work2()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}

typedef void (*task_t)();:

这行代码的意思是:task_t 是一个新的类型别名,它代表一个指向返回类型为 void,且不接受任何参数的函数的指针。

8. 效果展示


http://www.mrgr.cn/news/56994.html

相关文章:

  • 面经整理 八股 虾皮购物 Java后端开发 上
  • 计算机三级嵌入式知识点总结(一)
  • 输入输出管理器的使用
  • 桌面文件和图片等拖动不了了怎么办?
  • 简单汇编教程10 数组
  • Mapbox GL 加载GeoServer底图服务器的WMS source
  • 智慧楼宇平台,构筑未来智慧城市的基石
  • 聊一聊电的产生和输送联接到桌面PDU插座的那些事儿
  • Shiro授权
  • OpenHarmony4.0配置应用开机自启
  • 高效休息法
  • CSS背景
  • 【Java SE 】抽象类 和 接口 详解
  • 高标准农田信息化推动农业产业链升级
  • Scala的内部类
  • uniapp学习(007-3 壁纸项目:系统高度等信息的操作)
  • 线程池常见面试题
  • hadoop
  • linux 编译安装的php7.4 开启pgsql,pdo_pgsql的扩展
  • 软件设计师考试大纲整理
  • JavaEE进阶----18.<Mybatis补充($和#的区别+数据库连接池)>
  • 如何设置Page Cache的大小为默认值
  • 32 类和对象 · 中
  • 卡牌抽卡机小程序,带来新鲜有趣的拆卡体验
  • 2025秋招八股文--mysql篇
  • 日志分析工具-应急响应实战笔记