当前位置: 首页 > news >正文

Orleans Stream测试

服务端

配置

 全局变量

   /// <summary>/// 测试用全局数据定义/// </summary>public static class GlobalValueDefinition{public const string StreamProviderName = "StreamProvider";public const string ImplicitStreamSubscriptionName = "RANDOMDATA";public const string TestStreamID = "clientStream";public const string GrainStorageName = "PubSubStore";public const string MySqlInvariant = "MySql.Data.MySqlClient";public const string MySqlConnection = "server=localhost;user id=root;database=orleans_test;port=3306;password=123qwe!@#";}

承载Stream的Grain定义

    public interface IStream : IGrainWithStringKey{}[ImplicitStreamSubscription(GlobalValueDefinition.ImplicitStreamSubscriptionName)]public class StreamGrains: Grain, IStream{private readonly ILogger _logger;public StreamGrains(ILogger<StreamGrains> logger){_logger = logger;Debug.WriteLine($"====={GrainContext.Address.SiloAddress?.Endpoint.Port ?? 0}.{GrainContext.GrainId}");DelayDeactivation(TimeSpan.FromDays(1000));}private IAsyncStream<StreamMessage> _demoStream = null;private async Task CreateStreamSubscribe(){if (_demoStream != null)return;var guid = this.GetPrimaryKeyString();var streamProvider = this.GetStreamProvider(GlobalValueDefinition.StreamProviderName);var streamId = StreamId.Create(GlobalValueDefinition.ImplicitStreamSubscriptionName, GlobalValueDefinition.TestStreamID);_demoStream = streamProvider.GetStream<StreamMessage>(streamId);await _demoStream.SubscribeAsync<StreamMessage>(async (data, token) =>{Console.WriteLine($"*****Hello {data}");await Task.CompletedTask;});}public override Task OnActivateAsync(CancellationToken cancellationToken){base.OnActivateAsync(cancellationToken);#region 流的支持CreateStreamSubscribe().GetAwaiter().GetResult();#endregionreturn Task.CompletedTask;}}

定义一个空Grain接口,和一个实现类,使用ImplicitStreamSubscription注解对流的支持;激活时创建流对象并订阅处理函数。

客户端

 

在配置中引入Stream支持。

        private async Task<IAsyncStream<StreamMessage>> CreateStreamSubscribe(IClusterClient client){             var streamProvider = client.GetStreamProvider(GlobalValueDefinition.StreamProviderName);var streamId = StreamId.Create(GlobalValueDefinition.ImplicitStreamSubscriptionName, GlobalValueDefinition.TestStreamID);var demoStream = streamProvider.GetStream<StreamMessage>(streamId);await demoStream.SubscribeAsync<StreamMessage>(async (data, token) =>{ShowLog($"*****StreamDemo:Hello bytesLen = {data.Data.Length}");await Task.CompletedTask;});return demoStream;}

调用上面的函数创建stream引用:

_demoStream001 = await CreateStreamSubscribe(client01);

使用流引用对象推送消息,多个订阅方都可收到通知。

await _demoStream001.OnNextAsync(new StreamMessage { ID = 1, MessageName = "Test00100", Data = new byte[1024] });

注意:Stream也是一个Grain,使用StreamId获取流对象时,服务端会创建一个对应的Grain实例。 本例创建了内存流,也可创建基于MQ的流。


http://www.mrgr.cn/news/75086.html

相关文章:

  • 一文通透OpenVLA及其源码剖析——基于Prismatic VLM(SigLIP、DinoV2、Llama 2)及离散化动作预测
  • python 3个线程轮流打印A、B、C
  • 为什么Transformer使用LayerNorm而不是BatchNorm?
  • UE材质节点Fresnel
  • NLP中常见的分词算法(BPE、WordPiece、Unigram、SentencePiece)
  • Webshell工具内网穿透
  • 大数据新视界 -- 大数据大厂之 Impala 性能飞跃:动态分区调整的策略与方法(上)(21 / 30)
  • python语言基础-4 常用模块-4.12 namedtuple(名称元组)
  • 第12章 系统部署
  • 一道C语言关于距离的期末题及答案
  • 光伏储能微电网协调控制器
  • 20241114给荣品PRO-RK3566开发板刷Rockchip原厂的Android13下适配RJ45以太网卡
  • STM32学习笔记-----UART的概念
  • 远程开发测试必看:如何在群晖NAS上运行网页版Ubuntu
  • Docker 篇-Docker 详细安装、了解和使用 Docker 核心功能(数据卷、自定义镜像 Dockerfile、网络)
  • 三、模板与配置(上)
  • springboot学科竞赛管理(代码+数据库+LW)
  • P10901 [蓝桥杯 2024 省 C] 封闭图形个数
  • 【话题讨论】AI赋能电商:创新应用与销售效率的双轮驱动
  • 【AiPPT-注册/登录安全分析报告-无验证方式导致安全隐患】
  • python 字典 详解
  • 分享个好玩的,在k8s上部署web版macos
  • 【debug】QT 相关问题error汇总 QT运行闪退 QT5升级到QT6注意要点
  • QSerialPort高频接收数据不完整的问题
  • 【MySQL 保姆级教学】事务的隔离级别(详细)--下(13)
  • 赛力斯业绩飙升,董事长为何说“穷怕了”?