Redis分片集群+MQ处理高并发
Redis的三大集群模式:主从复制、哨兵模式和Cluster模式。每种模式都有其特点和应用场景,具体如下:
- 主从复制模式:适用于数据备份和读写分离场景,配置简单,但在主节点故障时需要手动切换。
- 哨兵模式:在主从复制的基础上实现自动故障转移,提高高可用性,适用于高可用性要求较高的场景。
- Cluster模式:通过数据分片和负载均衡实现大规模数据存储和高性能,适用于大规模数据存储和高性能要求场景。
在实际应用中,可以根据系统的需求和特点选择合适的Redis集群模式,以实现高可用性、高性能和大规模数据存储。
一.Redis分片集群:
1.什么是Redis分片集群?
Redis 集群(Redis Cluster)是一种分布式的 Redis 实现,它可以将数据分布到多个 Redis 节点上,从而实现 数据分片(Sharding)和 高可用性。Redis 集群的分片方式是通过哈希槽(Hash Slots)将数据分布到多个节点中,而每个 Redis 集群节点负责一部分哈希槽的管理。
2.工作原理:
哈希槽(Hash Slots): Redis 集群将数据分配到 16384 个哈希槽中。每个键会通过哈希函数计算出它所属的哈希槽,然后根据哈希槽将数据存储到对应的 Redis 节点上。
节点角色:
- 主节点(Master Node):负责存储数据并处理读写请求。
- 从节点(Replica Node):用于备份主节点数据,提供高可用性。在主节点宕机时,从节点可以被提升为新的主节点。
数据迁移: Redis 集群会根据集群状态进行 数据迁移,当节点加入或退出时,数据会自动迁移到新的节点,确保数据分布均衡。
故障转移: 如果某个主节点发生故障,Redis 集群会自动将一个从节点提升为主节点,确保系统持续可用。
客户端与集群的交互: 客户端不直接访问集群中的节点,而是通过集群的代理来实现。客户端会根据哈希槽将请求发送到正确的 Redis 节点。客户端还会根据集群的状态信息自动切换节点。
二.创建Redis集群:
我们可以使用 redis-cli
工具创建 Redis 集群,需要以下几个步骤。首先,确保你已经安装了 Redis,并且有多个 Redis 实例在不同的端口上运行。这些实例将用于构建 Redis 集群。
1.启动 Redis 实例:
在创建 Redis 集群之前,需要至少 6 个 Redis 实例(3 个主节点和 3 个从节点)。每个实例都需要配置为支持集群模式。
(1)配置每个 Redis 实例:
确保每个 Redis 实例的配置文件中启用了集群模式。打开每个 Redis 实例的配置文件(通常位于 redis.conf
),并确保以下选项已启用:
# 启用集群模式
cluster-enabled yes
# 配置集群的配置文件路径
cluster-config-file nodes.conf
# 配置集群的超时时间
cluster-node-timeout 5000
# 启用 AOF(追加文件)以保证数据持久化
appendonly yes
(2)启动 Redis 实例:
假设已经创建了 6 个 Redis 配置文件并且已经修改了配置(每个配置文件的端口不同)。我们可以通过以下命令启动 Redis 实例:
redis-server /path/to/redis.conf
假设你有以下端口配置:
- Redis 实例 1:
127.0.0.1:7000
- Redis 实例 2:
127.0.0.1:7001
- Redis 实例 3:
127.0.0.1:7002
- Redis 实例 4:
127.0.0.1:7003
- Redis 实例 5:
127.0.0.1:7004
- Redis 实例 6:
127.0.0.1:7005
启动这些实例:
redis-server /path/to/redis_7000.conf
redis-server /path/to/redis_7001.conf
redis-server /path/to/redis_7002.conf
redis-server /path/to/redis_7003.conf
redis-server /path/to/redis_7004.conf
redis-server /path/to/redis_7005.conf
目前这只是启动了 6 个 Redis 实例,它们并没有指定主节点和从节点的角色,因此还需要手动配置集群的主从关系。
2.使用 redis-cli
创建 Redis 集群:
一旦 Redis 实例启动并且配置为支持集群模式,我们就可以使用 redis-cli
工具来创建 Redis 集群。
(1)使用 redis-cli --cluster
命令:
redis-cli
提供了一个集群管理命令--cluster
,允许你将多个 Redis 实例组织成一个集群。
通常,Redis 集群需要 至少 3 个主节点,每个主节点都有 一个或多个从节点。我们可以通过运行以下命令来创建 Redis 集群并自动配置主从节点:
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
:这些是要加入集群的 Redis 实例的 IP 和端口。--cluster-replicas 1
:指定每个主节点有 1 个从节点。这样这个命令会自动创建 3 个主节点 和 3 个从节点。
(2)确认集群创建:
在运行上述命令后,redis-cli
会要求你确认是否要创建集群,输入 yes
即可继续。
>>> Proceed with the cluster creation? (yes/no): yes
然后,Redis 会自动将节点配置为 Redis 集群,并且分配槽。
(3)检查集群状态:
创建集群后,你可以使用 redis-cli
连接到集群中的任一节点,执行以下命令查看集群的状态:
redis-cli -c -h 127.0.0.1 -p 7000 cluster info
这会显示集群的状态信息,例如:
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:3
cluster_stats_messages_ping_sent:0
cluster_stats_messages_pong_sent:0
cluster_stats_messages_ping_received:0
cluster_stats_messages_pong_received:0
三.配置 RabbitMQ 消息队列:
(1)安装RabbitMQ:
我们需要先安装 RabbitMQ,这里可以参考 RabbitMQ 官方文档进行安装。
也可以观看下面博客:微服务架构 --- 使用RabbitMQ进行异步处理_rabbtimq 消息异步处理-CSDN博客
(2)添加依赖:
在 pom.xml
中添加 Spring Boot 和 RabbitMQ 的依赖:
<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Data Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
</dependencies>
(3)配置 RabbitMQ:
在 application.properties
或 application.yml
中配置 RabbitMQ 连接:
spring:rabbitmq:host: localhost # RabbitMQ 服务器地址port: 5672 # RabbitMQ 服务器端口username: guest # RabbitMQ 用户名password: guest # RabbitMQ 密码virtual-host: / # 虚拟主机listener:simple:# 配置消费者并发数concurrency: 3# 配置最大并发数max-concurrency: 10template:# 配置消息发送超时时间receive-timeout: 10000msaddresses:# 如果有多个 RabbitMQ 集群节点,可以配置多个地址- 127.0.0.1:5672- 192.168.1.10:5672
四.使用 Redis 和 RabbitMQ 处理高并发业务:
业务场景:
假设你有一个双十一大促活动,多个用户同时进行商品抢购。你需要保证:
- 商品库存数量的准确性(使用 Redis 分片集群来保证高可用和性能)。
- 用户抢购操作的顺序性(使用 RabbitMQ 消息队列来保证消息的顺序处理)。
系统架构:
- Redis 分片集群:用于存储商品的库存信息,支持高并发的库存操作。
- RabbitMQ:用于异步处理用户的抢购请求,避免过多并发直接操作数据库。
1.创建 Redis 配置类:
在 Spring Boot 项目中,通常使用 Lettuce
或 Jedis
作为 Redis 客户端。以下是使用 Lettuce 客户端配置 Redis 集群的 application.yml
配置示例:
spring:redis:cluster:# Redis 集群节点的地址和端口nodes:- 127.0.0.1:7000- 127.0.0.1:7001- 127.0.0.1:7002- 127.0.0.1:7003- 127.0.0.1:7004- 127.0.0.1:7005timeout: 2000ms # Redis 连接的超时时间jedis:pool:# Redis 连接池配置max-active: 8 # 最大连接数max-wait: 5000ms # 最大等待时间max-idle: 8 # 最大空闲连接数min-idle: 4 # 最小空闲连接数
我们需要创建一个配置类来设置 Redis 集群的连接。Spring Boot 默认使用 Lettuce 作为 Redis 客户端。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ClusterConfiguration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import redis.clients.jedis.Jedis;@Configuration
public class RedisConfig {@Beanpublic JedisConnectionFactory redisConnectionFactory() {RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration();clusterConfiguration.setClusterNodes(Arrays.asList(new RedisNode("127.0.0.1", 7000),new RedisNode("127.0.0.1", 7001),new RedisNode("127.0.0.1", 7002),new RedisNode("127.0.0.1", 7003),new RedisNode("127.0.0.1", 7004),new RedisNode("127.0.0.1", 7005)));return new JedisConnectionFactory(clusterConfiguration);}@Beanpublic RedisTemplate<String, String> redisTemplate() {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory());return redisTemplate;}
}
2.创建 RabbitMQ 配置类:
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue() {return new Queue("orderQueue", true); // 订单队列,持久化}
}
3.创建订单服务:
现在我们来创建一个订单服务,处理抢购业务。假设每个商品有一个库存,并且多个用户抢购同一商品时,库存会减少。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class OrderService {private static final String PRODUCT_STOCK_KEY = "product_stock"; // 商品库存的 Redis 键@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;// 订单请求处理逻辑public void processOrder(String productId) {// 检查 Redis 中的商品库存String stock = redisTemplate.opsForValue().get(PRODUCT_STOCK_KEY + ":" + productId);if (stock != null && Integer.parseInt(stock) > 0) {// 执行购买流程,将库存减1redisTemplate.opsForValue().decrement(PRODUCT_STOCK_KEY + ":" + productId);// 将订单消息发送到消息队列rabbitTemplate.convertAndSend("orderQueue", productId);} else {System.out.println("库存不足,无法购买!");}}// 监听队列消息,处理订单@RabbitListener(queues = "orderQueue")public void handleOrder(Message message) {String productId = new String(message.getBody());System.out.println("处理订单:" + productId);// TODO: 在这里处理订单逻辑,比如更新数据库等}
}
4.创建控制器:
我们为前端提供一个 REST API 来模拟用户发起抢购请求。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {@Autowiredprivate OrderService orderService;@GetMapping("/buy")public String buy(@RequestParam String productId) {// 处理抢购逻辑orderService.processOrder(productId);return "请求已发出,等待处理!";}
}
5.防止超卖问题(Redis分布式锁):
为了防止多个用户同时抢购同一商品并导致库存数据超卖,我们可以使用 Redis 的分布式锁来保证同一时刻只有一个线程能够修改商品库存。
我们可以使用 Redis 的 SETNX
命令来获取锁,操作完成后释放锁。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class OrderService {private static final String PRODUCT_STOCK_KEY = "product_stock"; // 商品库存的 Redis 键private static final String LOCK_KEY = "lock"; // Redis 锁的键@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void processOrder(String productId) {// 尝试获取锁Boolean locked = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "LOCKED");if (locked != null && locked) {try {// 检查库存String stock = redisTemplate.opsForValue().get(PRODUCT_STOCK_KEY + ":" + productId);if (stock != null && Integer.parseInt(stock) > 0) {// 执行购买流程,将库存减1redisTemplate.opsForValue().decrement(PRODUCT_STOCK_KEY + ":" + productId);// 将订单消息发送到消息队列rabbitTemplate.convertAndSend("orderQueue", productId);} else {System.out.println("库存不足,无法购买!");}} finally {// 释放锁redisTemplate.delete(LOCK_KEY);}} else {System.out.println("抢购人数过多,请稍后再试!");
}
setIfAbsent(LOCK_KEY, "LOCKED")
方法用于获取一个分布式锁。它会尝试将指定的键LOCK_KEY
设置为LOCKED
,如果该键不存在时,才会成功设置,并返回true
表示锁定成功。如果该键已经存在(即其他线程或进程已经获得锁),则返回false
,表示锁定失败。- Redis 是通过
setIfAbsent
实现的原子操作,因此可以确保只有一个线程或进程可以获得锁,其他的请求会被阻塞或延迟处理。