当前位置: 首页 > news >正文

rocketmq

1. 结构及工作原理

在这里插入图片描述
RocketMQ 是一个分布式消息队列系统,最初由阿里巴巴开发,设计用于高吞吐量和高可用性的消息传递系统。

RocketMQ 的整体工作原理围绕着生产者、消费者、Broker 和 NameServer 四个核心组件展开。其消息的发送、存储、消费,以及路由发现的过程,形成了完整的工作流。下面是 RocketMQ 的工作原理详细介绍:

1. Producer 生产消息

  • 消息发送
    • 生产者通过指定的 Topic 向 Broker 发送消息。生产者可以选择同步、异步或单向的方式发送消息。
    • 在发送消息之前,Producer 会从 NameServer 获取与目标 Topic 相关的 Broker 路由信息,包括可以发送消息的 Broker 地址和相应的队列信息。
    • 根据负载均衡算法,Producer 会选择一个 Broker,并将消息发送到该 Broker 上的一个消息队列中。
  • 消息确认
    • 生产者发送消息后,Broker 会接收消息并进行持久化。消息持久化到磁盘(CommitLog)后,Broker 返回一个确认给生产者,以确保消息已成功存储。

2. Broker 存储消息

  • CommitLog 存储
    • Broker 将接收到的消息存储在 CommitLog 中。CommitLog 是按顺序写入磁盘的,保证了高效的写入性能。
  • 消费队列
    • Broker 会为每个 Topic 维护一个或多个 消息队列(Message Queue),每个消息队列中保存的是指向 CommitLog 的消息索引。
    • 消费者从这些消息队列中读取消息,而不是直接访问 CommitLog。
  • Master-Slave 复制
    • 如果 Broker 处于 主从架构中,消息会从 Master Broker 复制到 Slave Broker。这是为了保证高可用性和数据冗余,当 Master Broker 故障时,Slave Broker 可以顶上继续服务。

3. NameServer 管理路由信息

  • 路由发现
    • NameServer 是一个轻量级的服务,负责管理 Broker 的注册与路由信息。每个 Broker 在启动时都会将自身的信息(例如:IP 地址、端口、所在的 Topic 等)注册到 NameServer。
    • 生产者和消费者在发送或消费消息时,都需要先从 NameServer 获取到 Broker 的路由信息。
  • 无状态设计
    • NameServer 是无状态的,因此可以部署多个实例提高可用性。它的主要功能就是维护 Broker 的路由表,并为客户端(Producer 和 Consumer)提供查询服务。

4. Consumer 消费消息

  • 拉取消息
    • 消费者也会从 NameServer 获取与其订阅的 Topic 相关的 Broker 路由信息。
    • 消费者根据负载均衡算法选择一个 Broker,然后从对应的 消息队列 中拉取消息。
  • 消费模式
    • 集群消费:多个消费者组中的消费者共同消费一个 Topic,消息只会被一个消费者消费。
    • 广播消费:消息会被所有订阅该 Topic 的消费者消费,每个消费者都会收到相同的消息。
  • 消息确认
    • 消费者在成功处理完消息后,会向 Broker 确认该消息已被成功消费。确认后,Broker 会将该消息标记为已消费,确保该消息不会再次被消费。

2. 保证消息可靠性

RocketMQ 通过多种机制确保消息的可靠性,主要体现在消息的持久化、确认、重试、事务管理等方面。以下是 RocketMQ 保证消息可靠性的关键方法:

1. 消息持久化

  • CommitLog:RocketMQ 使用 CommitLog 将消息持久化到磁盘。这是一个顺序写入的日志文件,保证了高效的消息写入性能。通过持久化,系统即使在意外故障后也能恢复消息数据。
  • 消息复制:在 Master-Slave 架构中,RocketMQ 会将消息从 Master Broker 复制到 Slave Broker。这提供了数据冗余,确保在 Master Broker 故障时,Slave Broker 可以继续提供服务,保证消息不会丢失。

2. 消息确认机制

  • 生产者确认:生产者在发送消息时,可以选择不同的确认模式:
    • 同步确认:生产者在发送消息后,等待 Broker 确认消息已被持久化后再继续发送。这样可以确保消息确实被成功接收和存储。
    • 异步确认:生产者发送消息后不等待 Broker 确认,适用于对延迟要求较高的场景,但在失败情况下,可能会丢失消息。
    • 单向发送:适用于不需要确认的场景,速度快但不保证消息的可靠性。
  • 消费者确认:消费者在成功处理消息后,需要向 Broker 发送确认,标记消息为已消费。未确认的消息将会在消费者出现异常时重投递。

3. 消息重试机制

  • 重试策略:如果消费者在处理消息时发生错误,RocketMQ 支持重试机制。消费者可以重新消费未确认的消息,直到达到最大重试次数为止,确保消息能够被成功处理。
  • 死信队列:当消息在消费时多次重试仍然失败,RocketMQ 可以将这些消息转发到一个专门的死信队列(Dead Letter Queue),以便后续分析和处理。

4. 事务消息

RocketMQ 支持事务消息,可以确保分布式系统中多个操作的一致性。在发送事务消息时,生产者会先发送一个“半消息”,然后在执行相关业务逻辑后提交或回滚该消息。只有在业务成功完成的情况下,消息才会被提交并持久化。

5. 顺序消息

RocketMQ 支持顺序消息的发送和消费,确保消息按照发送顺序被处理。在某些业务场景中,顺序性对于数据的一致性和可靠性至关重要。

6. 高可用性架构

  • Master-Slave 结构:通过设置 Master 和 Slave,RocketMQ 提供了高可用性。如果 Master Broker 故障,消费者可以从 Slave Broker 中拉取消息,保证消息不会丢失。
  • 多副本机制:在集群环境中,消息可以在多个 Broker (一主多从) 之间进行备份,提高系统的可靠性。

RocketMQ 通过持久化存储、消息确认机制、重试策略、事务支持以及高可用性架构等多种方式,确保了消息在发送、存储和消费过程中的可靠性。这些机制使得 RocketMQ 能够有效应对网络故障、系统崩溃等各种不确定性,确保消息传递的高可靠性。

3. 保证消息有序性

1. 消息队列

  • 消息队列的设计:在 RocketMQ 中,每个 Topic 可以有多个消息队列(Message Queue)。为了保证消息的有序性,通常会为一个 Topic 设置多个队列,但确保同一条消息的生产和消费只在同一个队列中进行。
  • 队列选择:在发送消息时,生产者根据一定的逻辑(如 hash 算法)将消息发送到特定的消息队列中。这意味着对于相同的消息键(Key),它们将被发送到同一个队列中,从而保证了这些消息的顺序。

2. 顺序发送

  • 顺序发送模式:RocketMQ 支持顺序消息发送。生产者在发送消息时,如果希望保持顺序,可以选择将消息发送到特定的队列。
  • 限制并发:如果需要保证消息的严格顺序,生产者应避免并发发送同一主题的消息。通常可以通过限制生产者的并发度,确保在同一时刻只有一个线程向某个队列发送消息。

3. 消费顺序

  • 顺序消费:在消费者端,RocketMQ 可以配置为顺序消费。在消费过程中,消费者从消息队列中拉取消息并处理,这样可以保证在消费时消息的顺序。
  • 单一消费者实例:为了确保消费的顺序性,建议在同一个消息队列中只使用一个消费者实例处理消息,避免多个消费者同时处理来自同一队列的消息。

4. 消息键(Key)

  • 消息键的使用:生产者可以使用消息键(Key)来指定消息的分发逻辑。当使用消息键时,RocketMQ 会确保同一键的消息被路由到同一队列中,从而保证这些消息的顺序。

5. 高可用性和故障恢复

  • 容灾恢复:在 Master-Slave 结构中,RocketMQ 确保在故障发生时不会丢失消息顺序。在切换主从 Broker 时,系统会保持消息的一致性,确保消息的顺序性不会受到影响。

6. 限制消息重发

  • 重发机制:在发生消费失败时,RocketMQ 提供重试机制。为了保证重试消息的顺序,重试机制需要被设计为在同一队列中进行处理。

通过合理的消息队列设计、顺序发送和消费、使用消息键、单一消费者实例以及控制并发等机制,RocketMQ 可以有效地保证消息的有序性。这样设计使得在特定场景下,应用可以以高效的方式处理有序消息,满足业务的需求。

4. 事务

RocketMQ 通过以下机制来保证事务消息的可靠性和一致性,确保在分布式系统中进行消息发送时的事务管理:

1. 事务消息模型

RocketMQ 采用两阶段提交(Two-Phase Commit)模型来处理事务消息。这一过程分为两个阶段:

1.1. 第一阶段(发送半消息)

  • 发送半消息:生产者在开始事务时,会先发送一条“半消息”(Half Message)到 Broker。此时,Broker 只记录消息,但不立即将其可见(未提交)。
  • 本地事务执行:生产者随后执行本地业务逻辑(如更新数据库),确保本地事务完成的条件。

1.2. 第二阶段(提交或回滚)

  • 提交:如果本地事务成功执行,生产者将向 Broker 发送提交消息的请求。Broker 接收到提交请求后,将半消息变为可见状态,完成消息的提交。
  • 回滚:如果本地事务执行失败,生产者向 Broker 发送回滚请求,Broker 将删除未提交的半消息,确保消息不会被消费。

应用场景

  • 电商订单处理:在创建订单的同时发出支付请求,保证订单创建成功时支付请求的成功性。
  • 库存系统:减库存操作和发送通知消息的事务一致性,确保在库存扣减成功后通知其他系统库存变化。

2. 事务状态查询

RocketMQ 提供了事务状态查询的机制,允许 Broker 在一定条件下主动查询生产者的事务状态。

流程概述

  • 在某些情况下,生产者可能在完成本地事务后未能发送提交或回滚请求(如生产者故障)。
  • Broker 会保留这些未决的半消息,并定期向生产者查询这些事务的状态。
  • 生产者可以根据本地事务的执行情况,返回 COMMIT(提交)或 ROLLBACK(回滚)的响应,告知 Broker 应该如何处理这些半消息。

应用场景

  • 网络故障或生产者崩溃:在生产者网络异常、宕机等情况下,Broker 可以通过查询机制确保事务消息的最终一致性。

3. 分布式事务

RocketMQ 的事务消息功能可以用于解决分布式系统中的分布式事务问题,即跨多个系统或服务的事务处理。

流程概述

  • RocketMQ 的事务消息机制能够确保多个不同系统或服务之间的操作一致性,避免了传统分布式事务(如两阶段提交协议)的复杂性。
  • 通过事务消息,系统可以确保消息的可靠投递,同时与数据库或其他外部资源的操作在一个事务上下文中进行处理。

应用场景

  • 金融系统:跨银行或支付系统的交易一致性,确保资金划拨和支付状态一致。
  • 跨微服务的事务一致性:在微服务架构中,不同服务之间的调用往往需要事务保证,RocketMQ 通过事务消息实现跨服务操作的事务性。

4. 延迟事务处理

RocketMQ 支持通过延迟消息来处理某些复杂的事务场景。例如,当本地事务执行成功后,消息可以在一段时间后才被消费者消费。

应用场景

  • 订单超时处理:某些电商场景中,订单创建后,如果用户未在规定时间内支付,可以通过延迟消息触发订单超时取消的逻辑。

5. 多步事务处理

在某些复杂业务中,事务可能包含多个步骤。RocketMQ 的事务消息机制可以用于管理多个步骤的事务一致性。

应用场景

  • 复杂工作流处理:比如订单创建、支付、发货等多个步骤的事务,使用 RocketMQ 事务消息可以确保每个步骤的顺序执行和事务一致性。

6. 保证消息的可靠性

  • 消息持久化:RocketMQ 确保半消息在 Broker 中持久化到磁盘,以防止 Broker 崩溃导致消息丢失。
  • 高可用性:通过 Master-Slave 架构,RocketMQ 在 Broker 故障时可以确保消息不会丢失,并能够保证事务的一致性。

7. 消息重试机制

  • 重试策略:在事务提交过程中,如果出现异常或失败,RocketMQ 提供消息重试机制。通过重试,系统可以确保本地事务最终能够成功完成,进而提交消息。

RocketMQ 通过两阶段提交模型、事务状态查询、消息持久化和高可用性等机制,实现了事务消息的可靠性和一致性。主要包括本地事务(两阶段提交)、事务状态查询、分布式事务、延迟事务处理和多步事务处理。这些设计确保了在分布式环境中,生产者可以在处理业务逻辑时可靠地发送消息,保证系统的数据一致性。

5. 延迟消息

RocketMQ 的延迟消息主要通过消息的延迟级别(delay level)来控制发送的消息在一定时间后才可被消费者消费。

1. 实现原理

RocketMQ 并非支持任意精确的延迟时间,而是通过预定义的延迟级别来控制消息的延迟发送。消息发送时,生产者可以指定延迟级别,Broker 会根据该级别将消息存储在一个特殊的队列中,等到指定的延迟时间后,Broker 才会将该消息转移到对应的消息队列中供消费者消费。

RocketMQ 的延迟消息只能使用预定义的延迟级别,无法指定任意精确的延迟时间。如果业务需要比默认的延迟级别更精确的延迟时间,可能需要通过调整延迟级别配置或结合其他定时机制实现。

默认情况下,RocketMQ 支持 18 个延迟级别,但可以通过修改 RocketMQ 配置文件来增加延迟级别。

2. 使用方式

发送延迟消息的步骤:

1. 创建生产者实例:

  • 创建一个 DefaultMQProducer 对象,用于发送消息。

2. 设置延迟级别:

  • 当生产者发送消息时,可以通过设置消息的 delayTimeLevel 属性来指定延迟级别。

3. 消息存储与延迟处理:

  • 当消息被发送到 Broker 时,Broker 会根据 delayTimeLevel 将该消息存储在一个特殊的队列中,称为“定时消息队列”。
  • 当达到延迟时间时,Broker 会将消息从定时队列转移到该 Topic 的正常队列中,消息变得可消费。
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();// 创建一条消息
Message msg = new Message("TestTopic", "TagA", "OrderID188", "Hello RocketMQ".getBytes());// 设置消息的延迟级别,例如延迟 5 秒
msg.setDelayTimeLevel(2);  // 使用延迟级别 2(5 秒)// 发送消息
producer.send(msg);// 关闭生产者实例
producer.shutdown();

RocketMQ 通过内置的延迟级别机制支持延迟消息功能。生产者可以在发送消息时指定延迟级别,Broker 会根据该级别在延迟时间到达后将消息投递给消费者。延迟消息在定时任务、订单超时处理等场景中非常有用,同时也可以与事务消息结合使用,增强分布式事务的灵活性和可靠性。

6. 死信队列

在 RocketMQ 中,死信队列(Dead Letter Queue,DLQ) 是一种特殊的消息队列,用于处理无法正常消费的消息。它是消息重试和处理失败后的一种机制,确保系统能够有效地处理异常情况,从而不影响正常的消息流转。

1. 工作原理

当消息在消费过程中发生错误,且多次重试仍然无法成功消费时,这些消息会被转发到死信队列。RocketMQ 的死信队列机制如下:

  1. 消费失败:消费者在消费消息时,如果遇到异常情况(如处理逻辑错误、业务逻辑异常等),会导致消息消费失败。
  2. 重试次数限制:每条消息在消费失败后会被自动重试,RocketMQ 会对每条消息设置一个最大重试次数(可以在 Broker 的配置中设置,默认为 16 次)。如果超出这个重试次数,消息将被转发到死信队列。
  3. 转发到死信队列:当消息达到最大重试次数仍未成功消费时,RocketMQ 会将该消息移动到指定的死信队列。死信队列的名称通常为 TopicName%DLQ,例如,若消息属于主题 TopicA,则对应的死信队列为 TopicA%DLQ。
  4. 监控和处理:开发者可以单独订阅死信队列,对其中的消息进行监控和处理。通常可以设计一个特殊的消费者来处理死信队列中的消息,例如记录错误信息、发送告警、重新处理等。

2. 配置

在 RocketMQ 中,可以通过以下参数来配置死信队列:

  • 最大重试次数:设置每条消息的最大重试次数,超过此次数的消息会被发送到死信队列。
  • 死信队列的名称:默认情况下,死信队列的名称会自动生成为 TopicName%DLQ。

3. 消费者处理死信队列的示例

当消费者处理死信队列时,可以使用普通的消费逻辑来读取死信消息,并根据业务需要进行处理。

public class DeadLetterQueueConsumer {public static void main(String[] args) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DeadLetterQueueGroup");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅死信队列consumer.subscribe("TopicA%DLQ", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {System.out.println("Received dead letter message: " + new String(message.getBody()));// 处理死信消息的逻辑,例如记录日志、发送告警等}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});try {consumer.start();System.out.println("DeadLetterQueueConsumer started.");} catch (MQException e) {e.printStackTrace();}}
}
  • 死信队列 是 RocketMQ 中用于处理无法消费的消息的一种机制,能够有效地管理和监控消费异常的消息。
  • 通过设置最大重试次数和配置死信队列,开发者可以确保系统在遇到消费问题时不会崩溃,并可以采取适当的措施来处理这些异常情况。
  • 处理死信队列中的消息是维护系统稳定性和可靠性的一个重要环节。

7. 漏消费和重复消费

1. 漏消费问题

漏消费指的是消费者由于某些原因未能消费某些消息,导致这些消息丢失。RocketMQ 通过以下机制来解决漏消费的问题:

(1)消费进度(Offset)管理

  • 消费进度管理:RocketMQ 为每个消费者组维护了一个消费进度(Offset)记录。这个 Offset 记录了消费者消费到消息队列的哪个位置。消费者每消费完一条消息后,会更新消费进度。这样,当消费者重新启动或发生故障时,能够从上次消费的位置继续消费。
  • 自动与手动提交消费进度
    • 自动提交:默认情况下,RocketMQ 自动更新消费者的 Offset,确保在消费消息后消费进度能够及时更新。
    • 手动提交:消费者也可以选择手动提交消费进度,以便更灵活地控制消费的确认时机。

(2)消费失败的重试机制

  • 如果消费者在处理某条消息时遇到异常,可以通过 重试机制 来重新消费该消息。RocketMQ 提供了自动重试机制,如果消息处理失败(比如消费者抛出异常或未确认消息),RocketMQ 会将该消息重新放回队列,允许消费者再次消费。

(3)消息的持久化

  • RocketMQ 将所有的消息持久化在磁盘上,确保即使消费者在消费过程中宕机或失去连接,未消费的消息依然存储在 Broker 中,能够在消费者恢复后重新消费。

2. 重复消费问题

重复消费指的是消费者可能会多次消费同一条消息。这可能由于网络异常、消费失败、或者消费进度提交失败等原因引发。为了应对重复消费,RocketMQ 提供了以下机制:

(1)幂等性设计

  • 幂等性设计:消费者端需要设计成 幂等性 的,即对同一条消息的多次消费不会对系统造成不同的影响。这样,即使消费者不小心重复消费了一条消息,业务逻辑的执行结果也会保持一致。例如,消费者可以通过消息的唯一 ID 来判断该消息是否已经处理过。

(2)精确一次消费

  • RocketMQ 通过 AT-Least-Once(至少一次) 的消息投递语义来保障消息不会漏掉,即消息会被至少投递一次。但这也意味着在某些情况下消息可能会被多次投递。因此,消费者需要在业务端实现消费的幂等性,避免因为重复消费带来的副作用。

(3)事务消息机制

  • RocketMQ 提供了事务消息机制,可以通过使用事务来确保消息的一致性和可靠性。在处理一些分布式事务时,消费者可以确保消费操作和事务操作是原子性的,防止因为事务失败导致消息重复消费。

3. 总结

  • 漏消费问题:通过消费进度管理、重试机制和消息持久化,RocketMQ 能有效防止漏消费。
  • 重复消费问题:虽然 RocketMQ 保证至少一次投递,但消费者需要通过幂等性设计来避免重复消费对业务造成的影响。
  • 事务消息机制:RocketMQ 提供的事务消息机制可以帮助在分布式事务场景中,确保消息消费的准确性和一致性。

通过以上机制,RocketMQ 在高可靠性消息系统中提供了强大的保障,防止消息漏消费和重复消费问题。

8. 生产者组

在 RocketMQ 中,生产者组(Producer Group) 是一个非常重要的概念,主要用于管理多个生产者的事务消息和容错机制。

1. 事务消息的管理

通过生产者组,RocketMQ 可以准确定位哪个生产者负责哪条消息的事务状态,从而确保事务消息的一致性。

2. 生产者实例的管理

在 RocketMQ 中,生产者组 用于逻辑上将多个生产者实例组织在一起。具体来说,它有以下功能:

  • 高可用性:生产者组可以包含多个生产者实例,它们可以部署在多个节点上,以应对单点故障的情况。如果某个生产者实例出现问题,其他生产者实例可以继续工作,从而保证消息的可靠发送。
  • 事务回查:在分布式环境中,多个生产者实例可能会处理同一事务。如果其中一个生产者没有及时反馈事务状态,Broker 可以通过生产者组查找到其他生产者实例并进行事务回查,以确保事务消息的准确性。

3. 负载均衡和容错

  • 虽然 RocketMQ 的生产者没有负载均衡机制(不像消费者),但生产者组在分布式环境下可以帮助管理多个生产者实例的运行,确保高可用性。不同实例的生产者组成员可以并行工作,提高消息发送的吞吐量和容错能力。

4. 故障恢复

  • 如果某个生产者发送了事务消息但没有及时提交或回滚事务,由于生产者组的存在,RocketMQ 可以通过该组的其他生产者实例查询和恢复未完成的事务状态。这有效地提高了系统的容错性,避免了事务消息卡住的情况。

5. 生产者重试机制

  • 在 RocketMQ 中,如果消息发送失败,生产者组可以配置重试机制,确保消息最终能够成功投递到 Broker。生产者组使得多个生产者实例可以共享这类重试策略,从而保证消息发送的高可靠性。

6. 总结

在 RocketMQ 中,生产者组 的主要作用包括:

  • 管理 事务消息:在事务消息中,生产者组负责维护生产者实例之间的协同工作,确保事务消息的提交、回滚和事务状态回查。
  • 容错和高可用性:通过将多个生产者实例组织到同一个生产者组中,提高系统的容错能力和高可用性。
  • 支持 重试机制 和 故障恢复:在消息发送失败或事务未完成的情况下,生产者组帮助协调多个生产者实例,确保消息的正确处理。

生产者组是 RocketMQ 实现高可靠性和分布式事务处理的关键机制之一。

在 RocketMQ 中,分布式事务是以生产者组为单位进行管理的。RocketMQ 的分布式事务机制依赖于生产者组来确保事务消息的最终一致性。不同生产者组之间无法直接参与同一个分布式事务。因为事务消息的处理和状态回查只限于同一个生产者组内部。

不同生产者组之间的事务隔离

  • 每个生产者组都是一个逻辑隔离单位,不同的生产者组在 RocketMQ 中是相互独立的。即便两个生产者组同时在处理同一个 Topic,它们的事务消息处理依然是独立的,不会互相影响。
  • 如果一个生产者组中的事务消息未能完成提交或回滚,RocketMQ 不会去回查其他生产者组中的生产者实例,因为它们没有共享的事务上下文。

跨生产者组的事务协调

  • 如果业务场景需要多个不同生产者组参与同一个分布式事务,这种需求不能直接依赖 RocketMQ 内置的事务消息机制来完成。
  • RocketMQ 并不提供跨生产者组的事务管理功能,因此跨生产者组的分布式事务需要通过其他方式来协调。通常的做法是使用第三方的 分布式事务协调器(例如:Seata、TCC 模式),在业务层面实现跨服务、跨系统的事务协调。

7. 示例代码

1. 配置 application.yml

配置 RocketMQ 的 NameServer 和生产者组:

rocketmq:name-server: 127.0.0.1:9876producer:group: my-producer-groupsend-message-timeout: 3000

2. 生产者代码

创建一个 RocketMQProducer 类,使用生产者组发送消息。

@Service
public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final String TOPIC = "my-topic";  // 指定要发送消息的Topic/*** 发送普通消息*/public void sendMessage(String message) {// 使用生产者组发送消息rocketMQTemplate.convertAndSend(TOPIC, message);System.out.println("Message sent: " + message);}/*** 发送事务消息*/public void sendTransactionalMessage(String message) {// 发送事务消息,回调事务逻辑rocketMQTemplate.sendMessageInTransaction(TOPIC, MessageBuilder.withPayload(message).build(), null);System.out.println("Transactional message sent: " + message);}
}

这里,在 Spring Boot 中使用 RocketMQ 的时候,rocketMQTemplate.convertAndSend(TOPIC, message); 这段代码虽然没有明确指定使用哪个生产者组,但它实际上通过 Spring Boot 自动配置机制 和 配置文件(application.yml) 中的属性,默认绑定了你在 application.yml 文件中定义的生产者组。

3. 编写事务监听器(用于事务消息)

事务消息需要一个事务监听器来处理事务的提交或回滚逻辑。

@Component
public class TransactionListenerImpl implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 本地事务逻辑处理try {// 模拟本地事务执行System.out.println("Executing local transaction for message: " + new String(msg.getBody()));// 如果本地事务执行成功,提交消息return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {// 如果本地事务执行失败,回滚消息return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 当事务消息的状态未知时,Broker 会进行回查System.out.println("Checking local transaction for message: " + new String(msg.getBody()));// 根据业务逻辑确认是否提交或回滚return LocalTransactionState.COMMIT_MESSAGE;  // 这里可以根据实际情况调整}
}

4. 注册事务监听器

在 RocketMQProducer 中,将事务监听器注册到生产者:

@Autowired
private TransactionListenerImpl transactionListener;// 在需要时传入transactionListener给rocketMQTemplate
rocketMQTemplate.setTransactionListener(transactionListener);

在 Spring Boot 项目中,使用 RocketMQ 可以配置多个生产者组。可以通过多种方式实现多个生产者组的配置,做法如下:

使用多个 RocketMQTemplate Bean

可以为每个生产者组定义一个 RocketMQTemplate Bean,并为每个 Bean 配置不同的生产者组。下面是一个示例:

1. 配置文件 (application.yml)

rocketmq:name-server: 127.0.0.1:9876# 生产者组 1
producer1:group: producer-group-1# 生产者组 2
producer2:group: producer-group-2

2. 创建多个 RocketMQTemplate Bean

@Configuration
public class RocketMQConfig {@Bean(name = "rocketMQTemplate1")public RocketMQTemplate rocketMQTemplate1() {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducerGroup("producer-group-1");// 设置其他配置,如 NameServer 地址等return rocketMQTemplate;}@Bean(name = "rocketMQTemplate2")public RocketMQTemplate rocketMQTemplate2() {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducerGroup("producer-group-2");// 设置其他配置,如 NameServer 地址等return rocketMQTemplate;}
}

3. 使用指定的 RocketMQTemplate

在服务中,可以通过 @Qualifier 注解来指定使用哪个 RocketMQTemplate:

@Service
public class MessageService {@Autowired@Qualifier("rocketMQTemplate1")private RocketMQTemplate rocketMQTemplate1;@Autowired@Qualifier("rocketMQTemplate2")private RocketMQTemplate rocketMQTemplate2;public void sendMessageToGroup1(String message) {rocketMQTemplate1.convertAndSend("my-topic", message);}public void sendMessageToGroup2(String message) {rocketMQTemplate2.convertAndSend("my-topic", message);}
}

4. 使用 @RocketMQMessageListener 注解

如果在消息监听器中需要使用不同的生产者组,可以为每个监听器配置不同的生产者组。

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "consumer-group-1")
public class Listener1 {// 处理消息的逻辑
}@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "consumer-group-2")
public class Listener2 {// 处理消息的逻辑
}

9. 示例代码

rocketmq 是模仿着 kafka 进行开发的,它的许多特性和后者是一样的,在此不做过多讲述了,kafka 的生态极其强大,以它作为重点掌握对象就可以了。

在一个 Spring Boot 项目中使用 RocketMQ 搭建一个生产者与消费者的集群环境,涉及到三个 Producer、三个 NameServer、三个 Broker(每个 Broker 是一主一从)的场景。通过代码示例展示如何配置与使用。

示例代码结构:

  • 3个 Producer 用于发送消息到不同的 Topic。
  • 消费者从集群中消费消息。
  • 3个 NameServer 和 3个 Broker,每个 Broker 一主一从,具体集群部署需要在 RocketMQ 安装环境中进行配置,这部分不涉及 Spring Boot 中的代码。

1. RocketMQ 集群部署说明

在实际环境中,RocketMQ 集群部署需要配置 NameServer 和 Broker。以下是集群配置的大致步骤(需要手动配置 RocketMQ 部署环境):

  • 部署 3 个 NameServer,启动时指定不同的端口。
  • 部署 3 对 Broker(3 主 + 3 从),每对中的从节点与主节点通过相同的 brokerName 关联,使用不同的 brokerId 区分主从。

Broker 和 NameServer 的配置样例如下:

  • NameServer 配置:无特别定制,启动多个 NameServer,监听不同端口。
  • Broker 配置(主从模式)
# broker-a.properties (主 broker)
brokerClusterName=MyCluster
brokerName=broker-a
brokerId=0  # 0 代表主 broker
namesrvAddr=localhost:9876;localhost:9877;localhost:9878  # 配置 NameServer 地址
storePathRootDir=/rocketmq/store/a
storePathCommitLog=/rocketmq/store/a/commitlog
# Other broker configuration...# broker-a-s.properties (从 broker)
brokerClusterName=MyCluster
brokerName=broker-a
brokerId=1  # 1 代表从 broker
namesrvAddr=localhost:9876;localhost:9877;localhost:9878
storePathRootDir=/rocketmq/store/a-s
storePathCommitLog=/rocketmq/store/a-s/commitlog
# Other broker configuration...

2. Spring Boot 项目中的配置与代码

1. 配置文件 application.yml

配置三个 NameServer 的地址和其他相关配置:

rocketmq:name-server: 127.0.0.1:9876;127.0.0.1:9877;127.0.0.1:9878producer:group: producer-groupconsumer:group: consumer-grouptopic: myTopic

2. 生产者代码

假设有 3 个 Producer,它们分别发送消息到不同的 Topic。以下是使用 RocketMQTemplate 的生产者示例:

@RestController
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// Producer 1@GetMapping("/sendProducer1")public String sendMessage1(@RequestParam String message) {rocketMQTemplate.convertAndSend("Topic1", message);return "Message sent to Topic1: " + message;}// Producer 2@GetMapping("/sendProducer2")public String sendMessage2(@RequestParam String message) {rocketMQTemplate.convertAndSend("Topic2", message);return "Message sent to Topic2: " + message;}// Producer 3@GetMapping("/sendProducer3")public String sendMessage3(@RequestParam String message) {rocketMQTemplate.convertAndSend("Topic3", message);return "Message sent to Topic3: " + message;}
}

3. 消费者代码

消费者需要订阅相应的 Topic 来消费消息。可以使用 @RocketMQMessageListener 注解来创建多个消费者订阅不同的 Topic。每个消费者可以属于同一个消费组,也可以属于不同的消费组。

// 消费者 1 - 消费 Topic1
@Service
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumer-group-1")
public class Consumer1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Consumer1 received message: " + message);}
}// 消费者 2 - 消费 Topic2
@Service
@RocketMQMessageListener(topic = "Topic2", consumerGroup = "consumer-group-2")
public class Consumer2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Consumer2 received message: " + message);}
}// 消费者 3 - 消费 Topic3
@Service
@RocketMQMessageListener(topic = "Topic3", consumerGroup = "consumer-group-3")
public class Consumer3 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Consumer3 received message: " + message);}
}

http://www.mrgr.cn/news/44302.html

相关文章:

  • 05:(寄存器开发)定时器一
  • 内网Debian\Ubuntu服务器安装dep包,基于apt-rdepends下载相关依赖
  • ZJYYC2360. 圆球的最大得分
  • 【python实操】python小程序之定义类
  • 【Linux】Linux命令与操作详解(一)文件管理(文件命令)、用户与用户组管理(创建、删除用户/组)
  • cisco交换机命令大全
  • [SAP ABAP] 程序调用
  • 解决方案:batch_size跟epoch有什么不同
  • 学校周赛(3)
  • 【Llamaindex RAG实践】
  • CSS——文字打字机效果
  • 有趣幽默彩虹屁文案生成工具微信小程序源码
  • OpenAI预计明年将推出“代理”系统
  • Nacos-Feign-Gateway-SpringCloud微服务
  • 【机器学习】智驭未来:探索机器学习在食品生产中的革新之路
  • 《Linux从小白到高手》理论篇补充:深入理解Linux中的输入输出及重定向
  • 功耗电流图的对比技巧
  • 免费版U盘数据恢复软件大揭秘,拯救你的重要数据
  • Spring Boot 2.1.6.RELEASE 中,javax.persistence缺失问题
  • 数据结构与算法——Java实现 30.合并多个有序链表 小顶堆实现