当前位置: 首页 > news >正文

记录一次gRpc流式操作(jedis版)

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法

@Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}", "0");RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {MQListener mqListener=new MQListener(responseObserver);try {CacheKey cacheKey= AppKey.appReport;String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC).replace("{APPTYPE}","0");RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserver<StringValue> responseObserver){_responseObserver=responseObserver;}private StreamObserver<StringValue> _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder = StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}

客户端写法

public static void receiveFacebookAndroidMsg() {try {log.info("facebook android msg");// 接收消息StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue msgProto) {try {log.info("facebook android msg 接收到消息: {}", msgProto.getValue());JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error("facebook ios msg 消费失败{}", e.getMessage());// 发给mq重新消费...}}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());log.info("facebook android Error occurred: {}", throwable.getMessage());}@Overridepublic void onCompleted() {System.out.println("Stream completed.");log.info("facebook android Stream completed.");}};log.info("接收fb android msg 开始");ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info("接收fb android msg 成功");} catch (Exception e) {log.info("出错了");}}

源码下载


http://www.mrgr.cn/news/39798.html

相关文章:

  • 课程记录,实验4,
  • HashMap底层原理是什么?从源码入手,沉浸式解读HashMap序列化、存储、扩容、获取等方法具体实现
  • 在外打工,我看谁还分不清定金和订金!
  • java如何查看线程死锁?
  • ipad平替笔哪个好?2024实意推荐五款热门电容笔,新手必看篇!
  • Halcon内部和外部函数,区分明白
  • 深圳市软件行业协会领导到访开源网安,共筑大湾区数字经济安全未来
  • react crash course 2024(3) jsx语法及组件
  • 微服务sentinel解析部署使用全流程
  • 鸿蒙开发(NEXT/API 12)【申请接入Wear Engine服务】 穿戴服务
  • Ubuntu下安装Zookeeper集群
  • cheese安卓版纯本地离线文字识别插件
  • React 生命周期 - useEffect 介绍
  • 普通人怎样才能找到可靠的副业?
  • 局部整体(七)利用python绘制圆形嵌套图
  • LangChain进阶技巧:提高聊天机器人性能的策略[第三课]
  • Linux内核 -- 读写文件系统文件之kernel_read与kernel_write
  • SpringBoot整合weixin-java-pay实现微信小程序支付
  • 高效开发:SpringBoot网上租赁系统实现细节
  • Leetcode 981. 基于时间的键值存储