当前位置: 首页 > news >正文

并发编程(9)——Actor/CSP设计模式

文章目录

  • 九、day9
  • 1. Actor设计模式
  • 2. CSP设计模式
    • 2.1 使用C++实现CSP
    • 2.2 通过CSP实现ATM取款逻辑

九、day9

在并发编程中,多个线程可能需要同时访问相同的内存资源。为了防止不同线程之间的资源冲突,传统并发设计方法通常使用共享内存和加锁机制来确保线程安全。例如,当一个线程在修改共享数据时,其他线程会被“锁住”,无法同时访问该数据。但是传统并发设计方法在频繁加锁的情况下会带来性能开销,降低系统的执行效率;并且共享内存加锁方式要求线程之间对共享数据有很强的依赖关系,这种依赖增加了代码的复杂性和耦合度,使代码难以维护。

新的设计模式

  • Actor模式:Actor模式通过消息传递的方式来实现线程间通信。每个Actor都有自己的状态和行为,它们通过发送消息来完成交互,而不需要共享内存。这种方式避免了加锁的复杂性和性能损耗。
  • CSP(Communicating Sequential Processes)模式:CSP模式也是通过消息传递进行通信,但它强调线程(或进程)之间的严格隔离。各个线程通过通道(Channel)来传递消息,而不直接共享状态,避免了竞争条件和加锁问题。

参考:

博主恋恋风辰的个人博客


1. Actor设计模式

Actor模型的设计模式有以下几个核心要素:

  1. 独立的Actor:每个Actor是独立的个体,拥有自己的状态和行为。Actor之间不共享状态,从而消除了并发编程中的数据共享问题。
  2. 异步消息传递:Actor之间不直接调用方法,而是通过消息传递来通信。消息传递是异步非阻塞的,发送方不需要等待接收方完成处理,而是立即继续执行自己的任务,避免了阻塞等待带来的性能问题。
  3. 顺序消息处理:每个Actor都有一个“邮箱”或“消息队列”,它接收来自其他Actor的消息,并按照顺序缓存起来。Actor处理消息时会从邮箱中取出一条消息并执行相应的操作,并且每个Actor一次只同步能处理一个消息(处理过程中,除了可以接受消息外,不能左任何其他无关操作),保证了消息处理的原子性。Actor在处理消息时不会被其他消息打断,也不会与其他Actor竞争资源,从而减少了并发问题。
  4. 无共享状态:由于Actor之间不共享数据,只能通过消息传递来交互,因此大大降低了并发编程中的数据竞争和死锁等复杂性。
  5. 单线程:每个Actor在独立的线程中运行,Actor之间通过消息队列来通信。例如,Actor1向Actor2发送消息时,消息会投递至Actor2的队列中,Actor2从队列中取出消息并进行处理。这种设计就像邮件通信一样,一个Actor向另一个Actor“投递”一条消息,接收的Actor从“邮箱”中取出消息进行处理。

如下图所示:

在这里插入图片描述

Actor2邮箱处理过程

因为Actor之间不共享数据,只能通过消息传递来交互,因此大大降低了并发编程中的数据竞争和死锁等复杂性。我们需要维护的只有每个Actor接受消息的消息队列,只有保证是线程安全的消息队列即可

之前不管是在网络编程逻辑层消息队列(多个服务线程向逻辑线程消息队列投递消息)设计中,还是在并发编程学习关于条件变量相关知识中,都实现了线程安全的消息队列,可参考:

  1. 逻辑层 消息队列设计
  2. 条件变量实现线程安全的消息队列

2. CSP设计模式

CSP(Communicating Sequential Processes,通信顺序进程)由英国计算机科学家 Tony Hoare 在1978年提出,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。

**CSP(通信顺序进程)**模式将channel视为一等公民(第一类对象),其主要关注点在于通信的通道,而非发送或接收消息的实体。与Actor模型关注"谁在发送或接收消息"不同,CSP更侧重于"通过什么渠道传递消息",即强调通信本身而非通信双方的具体实现。

在CSP中,channel被用作不同进程之间的通信媒介。进程通过channel发送和接收消息来进行同步和数据传递,channel承担了中介的角色。与Actor模型中由每个Actor维护自己的邮箱不同,CSP模型允许进程通过共享的channel进行直接通信(CSP将消息投递给channel,至于谁从channel中取数据,谁从channel中发数据,发送的一方和接收的一方是不关注的)。CSP模型不关心消息从哪来或到哪去,只关心消息是通过哪个channel传递的。这种设计实现了进程的解耦,并且允许进程通过共享channel来实现安全的同步通信

简单来说,Actor在发送消息前必须知道接收方是谁,而接受方收到消息后也需知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的,不关心消息从哪来或到哪去,只关心消息是通过哪个channel传递的。

2.1 使用C++实现CSP

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>template <typename T>
class Channel {
private:std::queue<T> queue_;std::mutex mtx_;std::condition_variable cv_producer_;std::condition_variable cv_consumer_;size_t capacity_;bool closed_;
public:Channel(size_t capacity = 0) : capacity_(capacity), closed_(false) {}bool send(T value) {std::unique_lock<std::mutex> lock(mtx_);cv_producer_.wait(lock, [this]() {// 对于无缓冲的channel,我们应该等待直到有消费者准备好return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;});if (closed_) {return false;}queue_.push(value);cv_consumer_.notify_one();return true;}bool receive(T& value) {std::unique_lock<std::mutex> lock(mtx_);cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });if (closed_ && queue_.empty()) {return false;}value = queue_.front();queue_.pop();cv_producer_.notify_one();return true;}void close() {std::unique_lock<std::mutex> lock(mtx_);closed_ = true;cv_producer_.notify_all();cv_consumer_.notify_all();}
};
  1. 类的成员变量

    • std::queue<T> queue_: 存储发送到 channel 的消息的队列。
    • std::mutex mtx_: 互斥锁,用于保护共享资源(消息队列)避免多线程的竞争。
    • std::condition_variable cv_producer_: 用于控制生产者(send操作)线程的条件变量。
    • std::condition_variable cv_consumer_: 用于控制消费者(receive操作)线程的条件变量。
    • size_t capacity_: 表示 channel 的容量。如果 capacity_ 为 0,则表示这是一个无缓冲的 channel,如果没有缓冲,那就相当于同步应用,生产者放入数据后生产者马上就会取。
    • bool closed_: 表示 channel 是否已关闭。true 表示 channel 已关闭,无法再发送消息。
  2. send 方法:向 channel 发送消息

    bool send(T value) {std::unique_lock<std::mutex> lock(mtx_);cv_producer_.wait(lock, [this]() {return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;});if (closed_) {return false;}queue_.push(value);cv_consumer_.notify_one();return true;
    }
    

    主要步骤是通过条件变量挂起当前线程,若当前线程被唤醒,并且满足判断条件,则继续执行下面的代码,过程如下

    使用 cv_producer_.wait 等待以下条件之一:

    • 无缓冲 channel 且队列为空:只有当无缓冲 channel 且没有未消费的消息时,才能继续执行 send的代码。
    • 缓冲 channel 且队列未满:缓冲 channel 时,只有队列中消息数量小于 capacity_ 时才能继续 send
    • channel 已关闭:如果 closed_true,不再等待,直接返回。

    将数据push至消息队列,并唤醒消费者进行消费

  3. receive 方法:从 channel 接收消息

    bool receive(T& value) {std::unique_lock<std::mutex> lock(mtx_);cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });if (closed_ && queue_.empty()) {return false;}value = queue_.front();queue_.pop();cv_producer_.notify_one();return true;
    }
    

    若当前线程被唤醒,并且满足队列不为空或者closed_为true任意之一时,继续执行receive下面的代码;如果不满足,线程继续挂起,等待被再次唤醒;

    消息从队列弹出后,唤醒生产者线程生产;

    注意,这里从消息队列中receice相当于之前通过条件变量实现的线程安全的消息队列中的pop函数,这个对引用进行赋值,必须先将消息队列中的值赋值给引用对象,然后才能从消息队列中弹出。这是为了防止内存爆炸造成的数据丢失,具体分析可以参考我之前写的文章:条件变量实现线程安全的消息队列

  4. close 方法:用于关闭 channel

    void close() {std::unique_lock<std::mutex> lock(mtx_);closed_ = true;cv_producer_.notify_all();cv_consumer_.notify_all();
    }
    

    将判断变量closed_置为true,表示直接退出,然后使用notify_all()唤醒全部工作线程。

  5. 测试

    int main() {Channel<int> ch(10);  // 10缓冲的channelstd::thread producer([&]() {for (int i = 0; i < 5; ++i) {ch.send(i);std::cout << "Sent: " << i << std::endl;}ch.close();});std::thread consumer([&]() {std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 故意延迟消费者开始消费int val;while (ch.receive(val)) {std::cout << "Received: " << val << std::endl;}});producer.join();consumer.join();return 0;
    }
    

    创建一个有 10 个缓冲的 Channel<int> 类型的对象 ch

    • 最多可以存储 10 个未被消费的数据。
    • 生产者可以最多连续发送 10 条数据,而无需等待消费者立即接收(消费者可以等一会儿再继续消费)。
    • 缓冲区满后,生产者会暂停(若缓冲区满后,除了将closed_置为true外,不满足任何判断条件可以退出线程等待),直到消费者消费一些数据,从而腾出空间。

    创建并启动生产者线程:

    • 在循环中,将值 i 发送到 ch,并在控制台输出 “Sent: i”。
    • 生产者线程发送 5 个值(0 到 4),每发送一个值后会打印一次。
    • 发送完成后,调用 ch.close(),通知 channel 已关闭,表示生产者不再发送数据。

    创建并启动消费者线程:

    • 线程开始后延迟 500 毫秒,模拟消费者稍后开始消费的情况。
    • 进入循环,尝试从 ch 接收数据(因为在生产者从队列插入数据后,会唤醒消费者线程进行消费)。若 receive 成功接收到值,则输出 “Received: val”。
    • 当 channel 关闭并且所有消息都已被消费后,receive 将返回 false,使得循环结束。

    这里有一个问题需要注意:消费者线程是在生产者线程结束之后才运行的(消费者线程延迟500ms),那么就没有代码唤醒消费者线程,为什么消费者线程可以正常运行?

    在生产者线程调用 ch.send(i) 时,send 方法的最后一行是 cv_consumer_.notify_one();,它会在成功推送数据到 queue_ 后通知等待的消费者线程。这一行为虽然发生在消费者进入等待状态(500 毫秒后)之前,但 notify_one() 的通知会被记录下来

    这种行为在 condition_variable 的底层实现中是合理的:在消费者延迟执行后调用 cv_consumer_.wait() 时,条件变量会再次检查 predicate。如果 queue_ 已有数据(即 !queue_.empty()),则判断条件 predicatetrue,消费者线程就会立即获取消息并继续执行,而不会阻塞。

    这是因为当消费者线程进入 wait 时,它会首先检查判断条件是否满足,如果条件已经为 true,消费者不再等待,而是直接继续执行。并不是唤醒线程后才会检查,而是调用wait函数后会立即检查判断条件,然后每次被唤醒再次检查。

    测试代码

    输出结果为:

    Sent: 0
    Sent: 1
    Sent: 2
    Sent: 3
    Sent: 4
    Received: 0
    Received: 1
    Received: 2
    Received: 3
    Received: 4
    

    如果将缓冲capacity_设为0的话,就相当于同步应用,生产者放入数据后生产者马上就会取。

    测试代码

    输出结果为:

    Sent: 0
    Received: 0
    Sent: 1
    Received: 1
    Sent: 2
    Received: 2
    Sent: 3
    Received: 3
    Sent: 4
    Received: 4
    

2.2 通过CSP实现ATM取款逻辑

在这里插入图片描述

整体逻辑如上图所示。

代码可参考:代码


http://www.mrgr.cn/news/70698.html

相关文章:

  • C#实现在windows上实现指定句柄窗口的指定窗口坐标点击鼠标左键和右键的详细情况
  • spring中entity的作用
  • Java 中使用Mockito 模拟对象的单元测试的快速示例
  • PICO+Unity MR空间锚点
  • 创新体验触手可及 紫光展锐携手影目科技推出AI眼镜开放平台
  • uniapp 集成 uview
  • js 数据类型=》理解=》应用
  • 【Android、IOS、Flutter、鸿蒙、ReactNative 】文本点击事件
  • 工具收集 - java-decompiler / jd-gui
  • Property ‘name‘ does not exist on type ‘Object‘.
  • 三十七、Python基础语法(异常)
  • 没有想到AI以这样的方式入侵人类——AI泛滥——关于AI的冷思考
  • 避免内存陷阱:掌握memcpy和memmove的正确用法
  • 智慧园区解决方案:科技赋能,打造未来管理新典范
  • 常见计算机网络知识整理(未完,整理中。。。)
  • Linux(CentOS)项目总结(前后端分离)
  • 【寻找重复数字】——脑筋急转弯...
  • 深入理解分支预测原理,揭开AMD Zen 5的高性能秘诀
  • 项目管理中不可或缺的能力
  • Qt文件系统-二进制文件读写
  • 【优选算法 — 滑动窗口】水果成篮 找到字符串中所有字母异位词
  • 函数
  • Flink独立集群+Flink整合yarn
  • MySQL-建表原则和方式
  • C语言中,“extern”关键字的含义与用法
  • [线程池]