当前位置: 首页 > news >正文

SpringBoot Redis 消息队列

文章目录

  • 参考
  • 消息队列
  • list
    • 源码
  • pub/sub
    • 源码

参考

https://www.cnblogs.com/uniqueDong/p/15904837.html
https://www.cnblogs.com/wzh2010/p/17205390.html
https://blog.csdn.net/qq_16557637/article/details/121015736
https://developer.aliyun.com/article/1095035
https://blog.csdn.net/sco5282/article/details/132904956

消息队列

消息队列可以实现消息解耦、消息路由、异步处理、流量削峰填谷。主流消息队列有kafka, rabbitmq, rocketmq
Redis也可以实现消息队列。方式有

  1. list
  2. pub/sub
  3. stream

list

redis的list底层是链表,满足先进先出。
list实现队列比较方便。同时可以满足有序,消息去重。缺点是

  1. 没有订阅功能,消费者要主动查询队列。而为了避免频繁查询队列消耗CPU资源,可以采用阻塞式查询。redis中阻塞查询命令是brpop
  2. 无法保证可靠性。缺少消息确认机制,无法及时感知遗漏消息,导致数据不一致。

源码

完整项目在https://gitcode.com/zsss1/redis_mq/overview
pom.xml添加redisson依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.40.2</version></dependency>

Redission封装依赖。

@SpringBootTest(classes = DemoApplication.class)
public class RedisListTest {@Autowiredprivate RedissonClient client;private static final String REDIS_QUEUE = "list_queue";private static final Logger LOGGER = LoggerFactory.getLogger(RedisListTest.class);@Testpublic void test_redis_list_mq() throws Exception {RedissonBlockingDeque r;new Thread(() -> {for (int i = 0; i < 10; i++) {producer("message" + i);}}).start();new Thread(() -> {consumer();}).start();Thread.currentThread().join();}// 消费者,阻塞public void consumer() {RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);boolean isCheck = true;while (isCheck) {try {String message = deque.takeLast();System.out.println("consumer: " + message);} catch (InterruptedException e) {LOGGER.error("consumer failed, cause: {}", e.getMessage());}}}// 生产者public void producer(String message) {RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);System.out.println(deque.getClass());try {deque.putFirst(message);} catch (InterruptedException e) {LOGGER.error("producer failed, msg: {}, cause: {}", message, e.getMessage());}}
}

pub/sub

发布订阅模式是一种消息传递模式。发送者将消息发送到频道,订阅者订阅频道即可及时收到消息。
它支持组生产者与消费者。但是它会丢失消息。
Redis在server端为每个消费者保留一块内存区域,存储该消费者订阅的数据。如果消费者处理速度慢,内存区域满了,那么Redis会断开消费者连接,这会导致消息丢失。

源码

  1. 定义频道。
public class TopicChannel {public static final String SEND_PHONE = "send_phone";public static final String SEND_EMAIL = "send_email";
}
  1. 定义监听频道的订阅者。分清org.springframework.data.redis.connection.MessageListenerorg.redisson.api.listener.MessageListener
public class MyMessageListener implements MessageListener {private static Map<String, Consumer<String>> RULE = new HashMap<>();static {RULE.put(TopicChannel.SEND_EMAIL, MyMessageListener::sendEmail);RULE.put(TopicChannel.SEND_PHONE, MyMessageListener::sendPhone);}public static void sendEmail(String msg) {System.out.println("listen email:" + msg);}public static void sendPhone(String msg) {System.out.println("listen phone:" + msg);}@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] byteChannel = message.getChannel();byte[] byteBody = message.getBody();try {String channel = new String(byteChannel);String body = new String(byteBody);System.out.println("channel: + " + channel + ", body: " + body);RULE.get(channel).accept(body);} catch (Exception e) {System.out.println(e.getMessage());}}
}
  1. 在redis注册订阅者。
@Component
public class RedisConfig {@Beanpublic MessageListenerAdapter messageListenerAdapter() {return new MessageListenerAdapter(new MyMessageListener());}@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// messageListenerAdapter 订阅 SEND_EMAIL 频道container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_EMAIL));// messageListenerAdapter 订阅 SEND_PHONE 频道container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_PHONE));return container;}
}
  1. 测试
@SpringBootTest(classes = DemoApplication.class)
public class MyListener {@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Testpublic void test_pub() {redisTemplate.convertAndSend(TopicChannel.SEND_EMAIL, "pub email message");redisTemplate.convertAndSend(TopicChannel.SEND_PHONE, "pub phone message");}
}

测试结果

channel: + send_email, body: pub email message
listen email:pub email message
channel: + send_phone, body: pub phone message
listen phone:pub phone message

http://www.mrgr.cn/news/80990.html

相关文章:

  • 前端【技术方案】浏览器兼容问题(含解决方案、CSS Hacks、条件注释、特性检测、Polyfill 等)
  • 疯狂前端面试题(四)
  • [7] 游戏机项目说明
  • graylog初体验
  • 采用DDNS-GO与cloudflare实现双域名同时访问NAS
  • 【cursor破解】【cursor白嫖】
  • JWT,OAuth 2.0,Apigee的区别与关系
  • MySQL的详细使用教程
  • .NET重点
  • iOS + watchOS Tourism App(含源码可简单复现)
  • 【Lua热更新】上篇
  • Restaurants WebAPI(三)——Serilog/
  • BenchmarkSQL使用教程
  • 使用RTP 协议 对 H264 封包和解包
  • 使用“NodeMCU”、“红外模块”实现空调控制
  • Day12 梯度下降法的含义与公式
  • php各个版本的特性以及绕过方式
  • 基础电路的学习
  • 在VBA中结合正则表达式和查找功能给文档添加交叉连接
  • 分析excel硕士序列数据提示词——包含对特征的筛选
  • k8s迁移——岁月云实战笔记
  • JWT令牌与微服务
  • Pytorch | 利用MI-FGSM针对CIFAR10上的ResNet分类器进行对抗攻击
  • GTID详解
  • (耗时4天制作)详细介绍macOS系统 本博文含有全英版 (全文翻译稿)
  • 本地计算机上的MySQL服务启动后停止(connection refused: connect)解决一系列数据库连接不上的问题