协作式 Saga 模式
在 协作式 Saga 模式(Choreographed Saga)中,不同服务之间通过事件进行通信,而没有中央协调者。每个服务都知道自己需要执行哪些操作,并在执行完后发布事件来通知其他服务进行下一步操作。当某个服务执行失败时,其他服务会根据业务需求执行补偿操作,确保最终一致性。
在协作式 Saga 模式中,服务之间是通过事件驱动进行交互的,消息队列(如 RabbitMQ 或 Kafka)用于服务之间传递事件,确保跨服务事务的执行。
协作式 Saga 模式的核心概念
- 事件驱动:每个服务根据事件来启动自己的事务,并在事务成功或失败后发布事件通知其他服务。
- 无中央协调者:不同服务之间相互了解自己的职责,并且是独立工作的。
- 补偿机制:如果某个服务失败,它会发布补偿事件,其他服务根据这些补偿事件执行回滚或补偿操作。
协作式 Saga 模式实现步骤
1. 系统设计
假设有两个服务:
- Order Service:负责创建订单。
- Inventory Service:负责扣减库存。
这两个服务之间将通过事件进行通信,确保订单的创建和库存的扣减是可靠的。
2. 技术栈
- Spring Boot:用于开发微服务。
- RabbitMQ 或 Kafka:用于事件传递。
- Spring AMQP 或 Spring Kafka:用于集成消息队列。
- Spring Data JPA:用于数据库操作。
3. 工作流程
Order Service 接收到订单请求时,创建订单并发布 OrderCreatedEvent,通知其他服务进行下一步操作。
Inventory Service 听到 OrderCreatedEvent 后,执行扣减库存操作。如果成功,它发布 InventoryReservedEvent,通知订单服务库存已预留;如果失败,则发布 InventoryRollbackEvent,通知订单服务进行补偿操作。
补偿服务 听到 InventoryRollbackEvent 后,执行回滚操作,将订单状态设置为“取消”。
4. Spring Boot 示例实现
4.1 Order Service
Order Service 在收到订单创建请求时,首先会创建订单并发布一个事件 OrderCreatedEvent,通知其他服务(如库存服务)进行库存扣减操作。
// OrderService.java
@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate OrderRepository orderRepository;// 创建订单@Transactionalpublic void createOrder(Order order) {// Step 1: 创建订单orderRepository.save(order);// Step 2: 发布事件,通知库存服务进行扣减库存OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(order.getId(), order.getItemId(), order.getQuantity());rabbitTemplate.convertAndSend("orderExchange", "order.created", orderCreatedEvent);}
}
4.2 Inventory Service
Inventory Service 监听 OrderCreatedEvent 事件,执行库存扣减操作。如果库存扣减成功,发布 InventoryReservedEvent,否则发布 InventoryRollbackEvent 进行补偿。
// InventoryService.java
@Service
public class InventoryService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate InventoryRepository inventoryRepository;// 监听创建订单事件,执行库存扣减@RabbitListener(queues = "order.created.queue")public void handleOrderCreated(OrderCreatedEvent event) {try {// Step 1: 扣减库存boolean success = decreaseInventory(event.getItemId(), event.getQuantity());if (!success) {throw new Exception("Insufficient inventory");}// Step 2: 发布库存预留成功事件InventoryReservedEvent reservedEvent = new InventoryReservedEvent(event.getOrderId(), event.getItemId(), event.getQuantity());rabbitTemplate.convertAndSend("inventoryExchange", "inventory.reserved", reservedEvent);} catch (Exception e) {// Step 3: 发布库存扣减失败,回滚事件InventoryRollbackEvent rollbackEvent = new InventoryRollbackEvent(event.getOrderId(), event.getItemId(), event.getQuantity());rabbitTemplate.convertAndSend("inventoryExchange", "inventory.rollback", rollbackEvent);}}// 扣减库存private boolean decreaseInventory(Long itemId, int quantity) {Inventory inventory = inventoryRepository.findByItemId(itemId);if (inventory.getStock() < quantity) {return false; // 库存不足}inventory.setStock(inventory.getStock() - quantity);inventoryRepository.save(inventory);return true; // 库存扣减成功}
}
4.3 补偿操作
如果某个服务执行失败,应该发布补偿事件。InventoryService 如果在扣减库存时失败,将发布 InventoryRollbackEvent 来通知其他服务进行回滚操作。
// CompensationService.java
@Service
public class CompensationService {@Autowiredprivate OrderRepository orderRepository;@RabbitListener(queues = "inventory.rollback.queue")public void handleInventoryRollback(InventoryRollbackEvent event) {// Step 1: 回滚订单Order order = orderRepository.findById(event.getOrderId()).orElseThrow(() -> new RuntimeException("Order not found"));order.setStatus("Cancelled");orderRepository.save(order);}
}
4.4 消息队列配置
配置 RabbitMQ(或 Kafka)交换机和队列:
@Configuration
public class RabbitMQConfig {// 创建交换机@Beanpublic TopicExchange orderExchange() {return new TopicExchange("orderExchange");}@Beanpublic TopicExchange inventoryExchange() {return new TopicExchange("inventoryExchange");}// 创建队列@Beanpublic Queue orderCreatedQueue() {return new Queue("order.created.queue");}@Beanpublic Queue inventoryReservedQueue() {return new Queue("inventory.reserved.queue");}@Beanpublic Queue inventoryRollbackQueue() {return new Queue("inventory.rollback.queue");}// 创建绑定@Beanpublic Binding orderCreatedBinding() {return BindingBuilder.bind(orderCreatedQueue()).to(orderExchange()).with("order.created");}@Beanpublic Binding inventoryReservedBinding() {return BindingBuilder.bind(inventoryReservedQueue()).to(inventoryExchange()).with("inventory.reserved");}@Beanpublic Binding inventoryRollbackBinding() {return BindingBuilder.bind(inventoryRollbackQueue()).to(inventoryExchange()).with("inventory.rollback");}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);return rabbitTemplate;}
}
4.5 事件定义
定义事件类,表示不同的操作。
// OrderCreatedEvent.java
public class OrderCreatedEvent {private Long orderId;private Long itemId;private int quantity;// 构造函数,getter 和 setter
}
// InventoryReservedEvent.java
public class InventoryReservedEvent {private Long orderId;private Long itemId;private int quantity;// 构造函数,getter 和 setter
}
// InventoryRollbackEvent.java
public class InventoryRollbackEvent {private Long orderId;private Long itemId;private int quantity;// 构造函数,getter 和 setter
}
5.最终一致性
- 补偿机制:如果某个服务执行失败,会通过发布补偿事件来保证系统的最终一致性。
- 幂等性:为了保证补偿事件的幂等性,服务需要确保同一个补偿事件被多次处理时不会产生不一致的状态。例如,库存回滚操作应该是幂等的。
- 事件的可靠传递:使用可靠的消息队列(如 RabbitMQ 或 Kafka),确保消息不丢失且能够正确投递。
6. 总结
协作式 Saga 模式的核心在于事件驱动和服务间的去中心化协调。每个微服务在收到事件后,执行自己的事务操作,并根据执行结果发布事件或补偿事件,最终通过一系列事件的流转来确保系统的一致性。与编排式 Saga 模式不同,协作式 Saga 模式更加灵活和解耦,但也需要保证事件的可靠性、幂等性以及服务间的正确协调。