RocketMQ延迟消息机制
架构设计
延迟级别(Delay Level)
RocketMQ 通过预定义的延迟级别来实现延迟消息。每个延迟级别对应一个固定的延迟时间。
消息存储和处理:
- 当消息到达 broker 后,如果设置了延迟级别,broker 将不会立即将消息投递到消费队列中,而是将其存储在特定的延迟队列中。
- RocketMQ 的 broker 定时任务会检查延迟队列中消息的时间,当消息到达指定的延迟时间后,broker 会将消息存入实际的消费队列中以供消费者消费。
示例代码
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class DelayedMessageProducer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("example_producer_group");producer.setNamesrvAddr("localhost:9876");// 启动生产者producer.start();// 创建消息,并指定主题Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello RocketMQ".getBytes());// 设置延迟级别msg.setDelayTimeLevel(3); // 表示延迟 10 秒 (根据 delay level 3)// 发送消息producer.send(msg);// 关闭生产者producer.shutdown();}
}