当前位置: 首页 > news >正文

windows C++ 有效利用异步代理库(二)

使用限制机制限制数据管道中的消息数

许多消息缓冲区类型(如 concurrency::unbounded_buffer)可以保存无限数量的消息。 当消息生成者向数据管道发送消息的速度快于使用者处理这些消息的速度时,应用程序可能会进入内存不足状态。 可以使用限制机制(例如信号灯)来限制数据管道中并发处于活动状态的消息数。

以下基本示例演示如何使用信号灯来限制数据管道中的消息数。 数据管道使用 concurrency::wait 函数来模拟至少需要 100 毫秒的操作。 由于发送方生成消息的速度比使用者处理这些消息的速度快,因此此示例定义了 semaphore 类,以使应用程序能够限制活动消息的数量。

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>using namespace concurrency;
using namespace std;// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:explicit semaphore(long long capacity): _semaphore_count(capacity){}// Acquires access to the semaphore.void acquire(){// The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context.if (--_semaphore_count < 0){_waiting_contexts.push(Context::CurrentContext());Context::Block();}}// Releases access to the semaphore.void release(){// If the semaphore count is negative, unblock the first waiting context.if (++_semaphore_count <= 0){// A call to acquire might have decremented the counter, but has not// yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available.Context* waiting = NULL;while (!_waiting_contexts.try_pop(waiting)){(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  }// Unblock the context.waiting->Unblock();}}private:// The semaphore count.atomic<long long> _semaphore_count;// A concurrency-safe queue of contexts that must wait to // acquire the semaphore.concurrent_queue<Context*> _waiting_contexts;
};// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:countdown_event(long long count): _current(count) {// Set the event if the initial count is zero.if (_current == 0LL)_event.set();}// Decrements the event counter.void signal() {if(--_current == 0LL) {_event.set();}}// Increments the event counter.void add_count() {if(++_current == 1LL) {_event.reset();}}// Blocks the current context until the event is set.void wait() {_event.wait();}private:// The current count.atomic<long long> _current;// The event that is set when the counter reaches zero.event _event;// Disable copy constructor.countdown_event(const countdown_event&);// Disable assignment.countdown_event const & operator=(countdown_event const&);
};int wmain()
{// The number of messages to send to the consumer.const long long MessageCount = 5;// The number of messages that can be active at the same time.const long long ActiveMessages = 2;// Used to compute the elapsed time.DWORD start_time;// Computes the elapsed time, rounded-down to the nearest// 100 milliseconds.auto elapsed = [&start_time] {return (GetTickCount() - start_time)/100*100;};// Limits the number of active messages.semaphore s(ActiveMessages);// Enables the consumer message buffer to coordinate completion// with the main application.countdown_event e(MessageCount);// Create a data pipeline that has three stages.// The first stage of the pipeline prints a message.transformer<int, int> print_message([&elapsed](int n) -> int {wstringstream ss;ss << elapsed() << L": received " << n << endl;wcout << ss.str();// Send the input to the next pipeline stage.return n;});// The second stage of the pipeline simulates a // time-consuming operation.transformer<int, int> long_operation([](int n) -> int {wait(100);// Send the input to the next pipeline stage.return n;});// The third stage of the pipeline releases the semaphore// and signals to the main appliation that the message has// been processed.call<int> release_and_signal([&](int unused) {// Enable the sender to send the next message.s.release();// Signal that the message has been processed.e.signal();});// Connect the pipeline.print_message.link_target(&long_operation);long_operation.link_target(&release_and_signal);// Send several messages to the pipeline.start_time = GetTickCount();for(auto i = 0; i < MessageCount; ++i){// Acquire access to the semaphore.s.acquire();// Print the message to the console.wstringstream ss;ss << elapsed() << L": sending " << i << L"..." << endl;wcout << ss.str();// Send the message.send(print_message, i);}// Wait for the consumer to process all messages.e.wait();
}
/* Sample output:0: sending 0...0: received 00: sending 1...0: received 1100: sending 2...100: received 2200: sending 3...200: received 3300: sending 4...300: received 4
*/

semaphore 对象将管道限制为最多同时处理两条消息。

此示例中的生成者向使用者发送的消息相对较少。 因此,此示例不演示潜在的内存不足情况。 但是,当数据管道包含相对较多的消息时,此机制很有用。


http://www.mrgr.cn/news/55039.html

相关文章:

  • 【MyBatis-Plus系列】QueryWrapper中or的使用
  • qtcreator 仿制vscode黑色背景主题monokai
  • uniapp,获取头部高度
  • 微信小程序-自定义组件
  • MT1351-MT1360 码题集 (c 语言详解)
  • 前端开发 环境变量 process.env.NODE_ENV 是什么
  • 上海市货运资格证二寸照片要求及手机拍照方法
  • C++编程语言:抽象机制:运算符重载(Bjarne Stroustrup)
  • PostgreSQL模板数据库template0和template1的异同点
  • 033 商品搜索
  • 音视频入门基础:FLV专题(17)——FFmpeg源码中,提取Video Tag的VIDEODATA的实现
  • Linux:基础IO
  • 软件测试技巧-如何定位前后端bug?
  • 营销新境界:解码品牌增长策略
  • [OpenCV] 数字图像处理 C++ 学习——17模板匹配详细讲解+附完整代码
  • 3.订阅者Subscriber的编程实现以及话题消息定义与使用后续课程
  • pgAdmin不显示template1数据库,该如何设置才可以显示?
  • ACM与蓝桥杯竞赛指南 基本输入输出格式二
  • 波浪理论(Elliott Wave Theory)
  • autosar-port/interface学习总结
  • Docker compose 安装Jenkins
  • c++迷宫游戏
  • 揭秘CSS浮动盒:掌握高度塌陷修复、文字环绕特效示艺的秘籍!!(重点秘籍!!)
  • 高清无水印推文视频素材下载网站推荐
  • vite脚手架中安装和按需引入vuetify
  • 了解这些U盘数据恢复工具,不再担忧数据丢失