当前位置: 首页 > news >正文

Redis分片集群+MQ处理高并发

Redis的三大集群模式:主从复制、哨兵模式和Cluster模式。每种模式都有其特点和应用场景,具体如下:

  1. 主从复制模式:适用于数据备份和读写分离场景,配置简单,但在主节点故障时需要手动切换。
  2. 哨兵模式:在主从复制的基础上实现自动故障转移,提高高可用性,适用于高可用性要求较高的场景。
  3. Cluster模式:通过数据分片和负载均衡实现大规模数据存储和高性能,适用于大规模数据存储和高性能要求场景。

在实际应用中,可以根据系统的需求和特点选择合适的Redis集群模式,以实现高可用性、高性能和大规模数据存储。

一.Redis分片集群:

1.什么是Redis分片集群?

Redis 集群(Redis Cluster)是一种分布式的 Redis 实现,它可以将数据分布到多个 Redis 节点上,从而实现 数据分片(Sharding)和 高可用性。Redis 集群的分片方式是通过哈希槽(Hash Slots)将数据分布到多个节点中,而每个 Redis 集群节点负责一部分哈希槽的管理。

2.工作原理:

哈希槽(Hash Slots): Redis 集群将数据分配到 16384 个哈希槽中。每个键会通过哈希函数计算出它所属的哈希槽,然后根据哈希槽将数据存储到对应的 Redis 节点上。

节点角色

  • 主节点(Master Node):负责存储数据并处理读写请求。
  • 从节点(Replica Node):用于备份主节点数据,提供高可用性。在主节点宕机时,从节点可以被提升为新的主节点。

数据迁移: Redis 集群会根据集群状态进行 数据迁移,当节点加入或退出时,数据会自动迁移到新的节点,确保数据分布均衡。

故障转移: 如果某个主节点发生故障,Redis 集群会自动将一个从节点提升为主节点,确保系统持续可用。

客户端与集群的交互: 客户端不直接访问集群中的节点,而是通过集群的代理来实现。客户端会根据哈希槽将请求发送到正确的 Redis 节点。客户端还会根据集群的状态信息自动切换节点。

二.创建Redis集群:

我们可以使用 redis-cli 工具创建 Redis 集群,需要以下几个步骤。首先,确保你已经安装了 Redis,并且有多个 Redis 实例在不同的端口上运行。这些实例将用于构建 Redis 集群。

1.启动 Redis 实例:

在创建 Redis 集群之前,需要至少 6 个 Redis 实例(3 个主节点和 3 个从节点)。每个实例都需要配置为支持集群模式。

(1)配置每个 Redis 实例:

确保每个 Redis 实例的配置文件中启用了集群模式。打开每个 Redis 实例的配置文件(通常位于 redis.conf),并确保以下选项已启用:

# 启用集群模式
cluster-enabled yes
# 配置集群的配置文件路径
cluster-config-file nodes.conf
# 配置集群的超时时间
cluster-node-timeout 5000
# 启用 AOF(追加文件)以保证数据持久化
appendonly yes

(2)启动 Redis 实例:

假设已经创建了 6 个 Redis 配置文件并且已经修改了配置(每个配置文件的端口不同)。我们可以通过以下命令启动 Redis 实例:

redis-server /path/to/redis.conf

假设你有以下端口配置:

  • Redis 实例 1:127.0.0.1:7000
  • Redis 实例 2:127.0.0.1:7001
  • Redis 实例 3:127.0.0.1:7002
  • Redis 实例 4:127.0.0.1:7003
  • Redis 实例 5:127.0.0.1:7004
  • Redis 实例 6:127.0.0.1:7005

启动这些实例:

redis-server /path/to/redis_7000.conf
redis-server /path/to/redis_7001.conf
redis-server /path/to/redis_7002.conf
redis-server /path/to/redis_7003.conf
redis-server /path/to/redis_7004.conf
redis-server /path/to/redis_7005.conf

目前这只是启动了 6 个 Redis 实例,它们并没有指定主节点和从节点的角色,因此还需要手动配置集群的主从关系。 

2.使用 redis-cli 创建 Redis 集群:

一旦 Redis 实例启动并且配置为支持集群模式,我们就可以使用 redis-cli 工具来创建 Redis 集群。

(1)使用 redis-cli --cluster 命令:

redis-cli 提供了一个集群管理命令 --cluster,允许你将多个 Redis 实例组织成一个集群。

通常,Redis 集群需要 至少 3 个主节点,每个主节点都有 一个或多个从节点。我们可以通过运行以下命令来创建 Redis 集群并自动配置主从节点:

redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1
  • 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005:这些是要加入集群的 Redis 实例的 IP 和端口。
  • --cluster-replicas 1:指定每个主节点有 1 个从节点。这样这个命令会自动创建 3 个主节点3 个从节点

 (2)确认集群创建:

在运行上述命令后,redis-cli 会要求你确认是否要创建集群,输入 yes 即可继续。

>>> Proceed with the cluster creation? (yes/no): yes

然后,Redis 会自动将节点配置为 Redis 集群,并且分配槽。

(3)检查集群状态:

创建集群后,你可以使用 redis-cli 连接到集群中的任一节点,执行以下命令查看集群的状态:

redis-cli -c -h 127.0.0.1 -p 7000 cluster info

这会显示集群的状态信息,例如:

cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:3
cluster_stats_messages_ping_sent:0
cluster_stats_messages_pong_sent:0
cluster_stats_messages_ping_received:0
cluster_stats_messages_pong_received:0

三.配置 RabbitMQ 消息队列:

(1)安装RabbitMQ:

我们需要先安装 RabbitMQ,这里可以参考 RabbitMQ 官方文档进行安装。

也可以观看下面博客:微服务架构 --- 使用RabbitMQ进行异步处理_rabbtimq 消息异步处理-CSDN博客

(2)添加依赖:

pom.xml 中添加 Spring Boot 和 RabbitMQ 的依赖:

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Data Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
</dependencies>

(3)配置 RabbitMQ:

application.propertiesapplication.yml 中配置 RabbitMQ 连接:

spring:rabbitmq:host: localhost  # RabbitMQ 服务器地址port: 5672       # RabbitMQ 服务器端口username: guest   # RabbitMQ 用户名password: guest   # RabbitMQ 密码virtual-host: /  # 虚拟主机listener:simple:# 配置消费者并发数concurrency: 3# 配置最大并发数max-concurrency: 10template:# 配置消息发送超时时间receive-timeout: 10000msaddresses:# 如果有多个 RabbitMQ 集群节点,可以配置多个地址- 127.0.0.1:5672- 192.168.1.10:5672

四.使用 Redis 和 RabbitMQ 处理高并发业务:

 业务场景:

假设你有一个双十一大促活动,多个用户同时进行商品抢购。你需要保证:

  1. 商品库存数量的准确性(使用 Redis 分片集群来保证高可用和性能)。
  2. 用户抢购操作的顺序性(使用 RabbitMQ 消息队列来保证消息的顺序处理)。

系统架构:

  1. Redis 分片集群:用于存储商品的库存信息,支持高并发的库存操作。
  2. RabbitMQ:用于异步处理用户的抢购请求,避免过多并发直接操作数据库。

1.创建 Redis 配置类:

在 Spring Boot 项目中,通常使用 LettuceJedis 作为 Redis 客户端。以下是使用 Lettuce 客户端配置 Redis 集群的 application.yml 配置示例:

spring:redis:cluster:# Redis 集群节点的地址和端口nodes:- 127.0.0.1:7000- 127.0.0.1:7001- 127.0.0.1:7002- 127.0.0.1:7003- 127.0.0.1:7004- 127.0.0.1:7005timeout: 2000ms  # Redis 连接的超时时间jedis:pool:# Redis 连接池配置max-active: 8  # 最大连接数max-wait: 5000ms  # 最大等待时间max-idle: 8  # 最大空闲连接数min-idle: 4  # 最小空闲连接数

我们需要创建一个配置类来设置 Redis 集群的连接。Spring Boot 默认使用 Lettuce 作为 Redis 客户端。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ClusterConfiguration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import redis.clients.jedis.Jedis;@Configuration
public class RedisConfig {@Beanpublic JedisConnectionFactory redisConnectionFactory() {RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration();clusterConfiguration.setClusterNodes(Arrays.asList(new RedisNode("127.0.0.1", 7000),new RedisNode("127.0.0.1", 7001),new RedisNode("127.0.0.1", 7002),new RedisNode("127.0.0.1", 7003),new RedisNode("127.0.0.1", 7004),new RedisNode("127.0.0.1", 7005)));return new JedisConnectionFactory(clusterConfiguration);}@Beanpublic RedisTemplate<String, String> redisTemplate() {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory());return redisTemplate;}
}

2.创建 RabbitMQ 配置类:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue() {return new Queue("orderQueue", true); // 订单队列,持久化}
}

3.创建订单服务:

现在我们来创建一个订单服务,处理抢购业务。假设每个商品有一个库存,并且多个用户抢购同一商品时,库存会减少。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class OrderService {private static final String PRODUCT_STOCK_KEY = "product_stock"; // 商品库存的 Redis 键@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;// 订单请求处理逻辑public void processOrder(String productId) {// 检查 Redis 中的商品库存String stock = redisTemplate.opsForValue().get(PRODUCT_STOCK_KEY + ":" + productId);if (stock != null && Integer.parseInt(stock) > 0) {// 执行购买流程,将库存减1redisTemplate.opsForValue().decrement(PRODUCT_STOCK_KEY + ":" + productId);// 将订单消息发送到消息队列rabbitTemplate.convertAndSend("orderQueue", productId);} else {System.out.println("库存不足,无法购买!");}}// 监听队列消息,处理订单@RabbitListener(queues = "orderQueue")public void handleOrder(Message message) {String productId = new String(message.getBody());System.out.println("处理订单:" + productId);// TODO: 在这里处理订单逻辑,比如更新数据库等}
}

4.创建控制器:

我们为前端提供一个 REST API 来模拟用户发起抢购请求。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class OrderController {@Autowiredprivate OrderService orderService;@GetMapping("/buy")public String buy(@RequestParam String productId) {// 处理抢购逻辑orderService.processOrder(productId);return "请求已发出,等待处理!";}
}

5.防止超卖问题(Redis分布式锁):

为了防止多个用户同时抢购同一商品并导致库存数据超卖,我们可以使用 Redis 的分布式锁来保证同一时刻只有一个线程能够修改商品库存。

我们可以使用 Redis 的 SETNX 命令来获取锁,操作完成后释放锁。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class OrderService {private static final String PRODUCT_STOCK_KEY = "product_stock"; // 商品库存的 Redis 键private static final String LOCK_KEY = "lock"; // Redis 锁的键@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void processOrder(String productId) {// 尝试获取锁Boolean locked = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "LOCKED");if (locked != null && locked) {try {// 检查库存String stock = redisTemplate.opsForValue().get(PRODUCT_STOCK_KEY + ":" + productId);if (stock != null && Integer.parseInt(stock) > 0) {// 执行购买流程,将库存减1redisTemplate.opsForValue().decrement(PRODUCT_STOCK_KEY + ":" + productId);// 将订单消息发送到消息队列rabbitTemplate.convertAndSend("orderQueue", productId);} else {System.out.println("库存不足,无法购买!");}} finally {// 释放锁redisTemplate.delete(LOCK_KEY);}} else {System.out.println("抢购人数过多,请稍后再试!");
}
  • setIfAbsent(LOCK_KEY, "LOCKED") 方法用于获取一个分布式锁。它会尝试将指定的键 LOCK_KEY 设置为 LOCKED,如果该键不存在时,才会成功设置,并返回 true 表示锁定成功。如果该键已经存在(即其他线程或进程已经获得锁),则返回 false,表示锁定失败。
  • Redis 是通过 setIfAbsent 实现的原子操作,因此可以确保只有一个线程或进程可以获得锁,其他的请求会被阻塞或延迟处理。

http://www.mrgr.cn/news/81130.html

相关文章:

  • 准备写一个内网穿透的工具
  • 安卓从Excel文件导入数据到SQLite数据库的实现
  • 【微信小程序】1|底部图标 | 我的咖啡店-综合实训
  • 2024年图像处理、多媒体技术与机器学习
  • PostgreSQL编译安装教程
  • [LeetCode-Python版] 定长滑动窗口1(1456 / 643 / 1343 / 2090 / 2379)
  • 从零创建一个 Django 项目
  • GIS数据处理/程序/指导,街景百度热力图POI路网建筑物AOI等
  • Pandas系列|第二期:Pandas中的数据结构
  • Redis安装、启动、卸载
  • GitCode 光引计划投稿|MilvusPlus:开启向量数据库新篇章
  • NIPS2014 | GAN: 生成对抗网络
  • C语言初阶习题【15】猜数字游戏
  • OpenEuler 22.03 不依赖zookeeper安装 kafka 3.3.2集群
  • 智慧商城:编辑切换状态,删除功能,空购物车处理
  • 华为实训课笔记 2024 1223-
  • 简易CPU设计入门:本系统中的通用寄存器(一)
  • 设计模式期末复习
  • JavaScriptEs6 - String类和Array类扩展内容
  • Kamailio db_text 之使用
  • 计算机毕业设计PySpark+PyFlink+Hive地震预测系统 地震数据分析可视化 地震爬虫 大数据毕业设计 Hadoop 机器学习 深度学习
  • 基于OpenAI API使用Fastchat部署调用本地大模型
  • spring cache源码解析(四)——从@EnableCaching开始来阅读源码
  • 【数据结构练习题】栈与队列
  • 浏览器工作原理与实践-12|栈空间和堆空间:数据是如何存储的
  • 【Linux进程】进程间通信(共享内存、消息队列、信号量)