Orleans Stream测试
服务端
配置
全局变量
/// <summary>/// 测试用全局数据定义/// </summary>public static class GlobalValueDefinition{public const string StreamProviderName = "StreamProvider";public const string ImplicitStreamSubscriptionName = "RANDOMDATA";public const string TestStreamID = "clientStream";public const string GrainStorageName = "PubSubStore";public const string MySqlInvariant = "MySql.Data.MySqlClient";public const string MySqlConnection = "server=localhost;user id=root;database=orleans_test;port=3306;password=123qwe!@#";}
承载Stream的Grain定义
public interface IStream : IGrainWithStringKey{}[ImplicitStreamSubscription(GlobalValueDefinition.ImplicitStreamSubscriptionName)]public class StreamGrains: Grain, IStream{private readonly ILogger _logger;public StreamGrains(ILogger<StreamGrains> logger){_logger = logger;Debug.WriteLine($"====={GrainContext.Address.SiloAddress?.Endpoint.Port ?? 0}.{GrainContext.GrainId}");DelayDeactivation(TimeSpan.FromDays(1000));}private IAsyncStream<StreamMessage> _demoStream = null;private async Task CreateStreamSubscribe(){if (_demoStream != null)return;var guid = this.GetPrimaryKeyString();var streamProvider = this.GetStreamProvider(GlobalValueDefinition.StreamProviderName);var streamId = StreamId.Create(GlobalValueDefinition.ImplicitStreamSubscriptionName, GlobalValueDefinition.TestStreamID);_demoStream = streamProvider.GetStream<StreamMessage>(streamId);await _demoStream.SubscribeAsync<StreamMessage>(async (data, token) =>{Console.WriteLine($"*****Hello {data}");await Task.CompletedTask;});}public override Task OnActivateAsync(CancellationToken cancellationToken){base.OnActivateAsync(cancellationToken);#region 流的支持CreateStreamSubscribe().GetAwaiter().GetResult();#endregionreturn Task.CompletedTask;}}
定义一个空Grain接口,和一个实现类,使用ImplicitStreamSubscription注解对流的支持;激活时创建流对象并订阅处理函数。
客户端
在配置中引入Stream支持。
private async Task<IAsyncStream<StreamMessage>> CreateStreamSubscribe(IClusterClient client){ var streamProvider = client.GetStreamProvider(GlobalValueDefinition.StreamProviderName);var streamId = StreamId.Create(GlobalValueDefinition.ImplicitStreamSubscriptionName, GlobalValueDefinition.TestStreamID);var demoStream = streamProvider.GetStream<StreamMessage>(streamId);await demoStream.SubscribeAsync<StreamMessage>(async (data, token) =>{ShowLog($"*****StreamDemo:Hello bytesLen = {data.Data.Length}");await Task.CompletedTask;});return demoStream;}
调用上面的函数创建stream引用:
_demoStream001 = await CreateStreamSubscribe(client01);
使用流引用对象推送消息,多个订阅方都可收到通知。
await _demoStream001.OnNextAsync(new StreamMessage { ID = 1, MessageName = "Test00100", Data = new byte[1024] });
注意:Stream也是一个Grain,使用StreamId获取流对象时,服务端会创建一个对应的Grain实例。 本例创建了内存流,也可创建基于MQ的流。