【云岚到家】-day09-2-秒杀抢购
【云岚到家】-day09-2-秒杀抢购
- 4.3 抢券
- 4.3.1 解决超卖问题
- 1)系统需求
- 2)什么是超卖问题
- 3)悲观锁与乐观锁
- 4)数据库行锁控制方案
- 5)Redis分布式锁方案
- 6)Redis原子操作方案
- 4.3.2 Redis原子操作方案
- 1)通过 MULTI 事务命令实现
- 2)了解Pipeline与MULTI 的区别
- 3)Redis+Lua实现
- 4)选择方案
- 5)使用Lua脚本注意点
- 4.3.3 抢券整体方案
- 1)抢券方案分析
- 2)数据流
- 3)Redis数据结构
- 4)小结
- 4.3.4 库存同步
- 1)系统设计
- 2)预热程序中同步库存
- 3)测试
- 4.3.5 抢券Lua脚本
- 1)抢券Lua脚本
- 2)测试
- 3) 小结
- 4.3.6 抢券接口开发
- 1)接口定义
- 2)校验活动有效性
- 3)抢券service方法
- 4)抢券controller方法
- 5)抢券测试
- 6)小结
- 4.4 抢券结果同步
- 4.4.1 Redis到MySQL同步方案分析
- 1)整体思路
- 2)如何使用线程池?
- 3)如何从Redis 批量取数据?
- 4)小结
- 4.4.2 Redisson分布式锁
- 1)问题分析
- 2)Redisson实现分布式锁
- 3)看门狗机制
- 4.4.3 数据同步组件
- 1)测试同步组件
- 2)理解数据同步组件原理
- 4.4.4 抢券结果同步开发
- 1)编写扣减库存方法
- 2)完善数据同步处理器
- 3)抢券结果同步测试
4.3 抢券
4.3.1 解决超卖问题
1)系统需求
在抢券模块中需要实现下边两个需求:
1、提升高并发吞吐量
抢券类似抢购、秒杀等业务场景具有时效性的特点,提前规定好用户在什么时间可以抢购,用户访问集中,这样就会给系统造成高并发,抢券模块在设计时要以提升系统在高并发下的吞吐量为目标,吞吐量表示单位时间内系统处理的总请求数量,吞吐量高意味着系统处理能力强。
衡量系统吞吐量的常用指标有哪些?
QPS(Queries Per Second):
每秒查询数(Queries Per Second),它表示系统在每秒内能够处理的查询或请求的数量,是衡量一个系统处理请求的性能和吞吐量的指标。
计算公式:总请求数/时间窗口大小
示例:
在10秒内处理1万个请求,QPS为1000。每个请求处理的时间越短,QPS越大。
假设:一个网站有10万用户,有2万日活跃用户,并发量是4000,每个用户每秒平均发起2个请求,那么总请求数就是 2*4000,那么QPS就是 8000,如果单机支持2000的qps理论上需要4台服务器。
qps指标是需要根据服务器硬件性能、及具体的业务场景去测试,比如:门户查询数据如果直接走Nginx静态服务器则QPS可以达到上万,如果请求查询Tomcat,并且通过数据库去查询数据库返回,此时QPS会远低于查询Nginx静态服务器的QPS值,如果不走数据库,而是从Redis查询数其QPS也会大大提升。
TPS(Transactions Per Second):
表示系统每秒完成的事务数,与QPS不同,TPS更关注系统的事务处理能力,而不仅仅是单纯的查询或请求,一次事务通常会包括多个请求。在高度事务性的系统中,如在线交易系统、支付系统等,TPS是一个关键指标,用于衡量系统的处理能力。
TPS指标通常会涉及业务处理及数据库存储,在测试时也需要根据服务器硬件性能、及具体的业务场景去测试,拿下单举例:单机支持几十到几百的TPS指标属于正常。
在开发中,对以上性能指标的优化,可通过CDN、缓存、异步处理、数据库优化、多线程、集群、负载均衡等技术去提高系统的吞吐量。当然,再优化也不要忘记系统保护,通过限流技术根据系统的性能指标进行限流保护。
2、解决超卖问题
抢购、秒杀等业务场景还需要解决超卖问题,超卖是最终下单购买数量大于库存数量,比如:库存有100个,用户最终购买了101个,多出这一个就是超卖了,结合抢券业务即用户最终抢到的优惠券总数大于优惠券库存数。
下边先分析超卖问题的解决方案。
2)什么是超卖问题
超卖是最终下单购买数量大于库存数量,比如:库存100个用户最终购买成功了101个,多出这一个就是超卖了,结合抢券业务,用户最终抢到的优惠券总数大于优惠券库存数就出现了超卖问题。
导致超卖问题的原因是什么呢?
下边举例说明超卖问题并分析导致超卖问题的原因。
下图是两个线程更新数据库的库存字段。
线程1:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新数据库库存字段。
线程2:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新数据库库存字段。
线程1和线程2查询到的库存都是1,两个线程分别减1得到剩余库存数0,由于线程2并不是基于线程1扣减库存后的值进行扣减,线程2更新库存覆盖了线程1更新的库存值。
上边的例子就出现了超卖的问题。
造成超卖问题的原因是在高并发场景下对库存这个共享资源进行操作存在线程不安全所导致。
3)悲观锁与乐观锁
提到解决线程安全问题大家想到了锁,下边复习下关于锁的基本概念。
jvm提供了很多锁,比如:synchronized、reentrantLock、CAS等,它们都可以解决线程安全问题,synchronized、reentrantLock可以实现悲观锁,CAS可以实现乐观锁,关于这些锁的 知识掌握不牢固的一定要自行复习。下边理解悲观锁与乐观锁的概念。
Object lock=new Object();
public void test(){synchronized (lock){System.out.println("test");}
}
- 悲观锁
悲观锁是一种悲观思想,总认为会有其它线程修改数据,为了保证线程安全所以在操作前总是先加锁,操作完成后释放锁,其它线程只有当锁释放后才可以获取锁继续操作数据。synchronized和ReentrantLock都可以实现悲观锁。
使用悲观锁后原来的多线程并发执行改为了顺序(同步)执行,当线程2去执行时查询到库存发现为0,不满足条件更新库存失败。
- 乐观锁
乐观锁则是一种乐观思想,认为不会有太多线程去并发修改数据,所以谁都可以去执行代码。
Java提供的CAS机制可以实现乐观锁,CAS即Compare And Swap 比较并交换,在修改数据前先比较版本号,如果数据的版本号没有变化说明数据没有修改,此时再去更改数据。
示例如下:
库存数据对应一个版本,库存每次变化则版本号跟着变化,如下:
库存 | 版本号 |
---|---|
100 | 1 |
99 | 2 |
… | … |
1 | 100 |
0 | 101 |
线程1修改库存前拿到库存及对应的版本号:1和100。
线程1判断库存如果大于0则将库存减1,准备更新库存。
更新库存时要校验当前库存的版本是否和自己之前拿到的一致,如果版本号为1说明自己在执行的这过程没有其它线程去修改过库存,此时将库存更新为99并将版本号加1为2。
线程2执行和线程1一样的逻辑,线程2去更新库存时发现库存的版本号为2与自己之前拿到的不一致,更新库存失败。
- 结论
悲观锁和乐观锁都是一种解决共享资源的线程安全问题的方法,悲观锁是在读数据时就加锁,如果读比较多则加锁频繁影响性能,相比而言乐观锁性能比悲观锁要好。
4)数据库行锁控制方案
数据库的行级锁可以实现悲观锁也可以实现乐观锁。
- 实现悲观锁(排他锁)
执行select … for update 实现加锁,select … for update 会锁住符合条件的行的数据,如下语句会锁一行的数据
select * from 库存表 where id=? for update
通常此语句放在事务中,开启事务开始时执行此语句获取锁,事务提交或回滚自动锁释放,保证在事务处理过程中没有其它线程去修改数据。
测试:
使用一个线程执行下边的命令:
start transaction;select ... for update;COMMIT;
另一个线程修改数据,如果上边不释放锁将无法修改。
高并发场景不推荐使用select … for update方法,同时也可能存在死锁的潜在风险。
- 实现乐观锁
数据库的行级锁也可以实现乐观锁,通用的做法是在表中添加一个version版本字段,在更新时对比版本号,更新成功将版本号加1,SQL为:
update 表名 set 字段=值,version=version+1 where id =? and version =?
针对扣减库存业务扣减库存SQL:
update 库存表 set 库存=库存-1 where 库存>0 and id =?
多线程执行上边的SQL,假如线程1先执行会添加排他锁,当事务没有结束前其它线程去更新同一条记录会被阻塞,等到线程1更新结束其它线程才可以更新库存。
当执行update后返回影响的记录行数为1表示更新成功即扣减库存成功,返回0表示没有更新记录行,即扣减库存失败。
- 结论
悲观锁在查询时就开始加锁,如果读比较多则加锁频繁影响性能,相比而言乐观锁性能比悲观锁要好。
对于并发不高的场景可以使用数据乐观锁去控制扣减库存,由于抢购业务并发较高且对性能要求也高,如果使用数据库行锁去控制,并发高就会对数据造成压力,如果进行限流控制并发数又无法满足性能要求,所以对于抢购业务使用数据库行锁进行控制是不适合的。
5)Redis分布式锁方案
数据库乐观锁不适用高并发场景,我们能否将库存数据放在Redis,并且通过JVM锁去控制扣减库存呢?
上边介绍的synchronized、reentrantLock、CAS只控制了JVM本身的线程争抢同一个锁,无法控制多个JVM之间争抢同一个锁。
如下图,有两个JVM进程,每个JVM进程都有一个Lock01锁,这两个JVM进程中的线程1仍然会同时去修改库存:
线程1:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新Redis库存数据。
线程2:先查询库存为1,判断是否大于0,如果大于则库存减1,最后更新Redis库存数据。
此时就会出现修改库存数据的线程不安全问题。
所以,如果是单机环境下,使用JVM的锁在内存加锁可以解决资源并发访问的线程安全问题。
微服务架构的项目在部署时每个微服务会部署多个实例(JVM),每个实例就是一个JVM,如果要控制多个JVM之间争抢资源需要用到分布式锁,分布式锁是由一个统一的服务提供分布式锁服务,比如:使用redis、数据库都可以实现分布式锁,下边介绍分布式锁控制争抢资源的方法。
如下图:每个JVM中的线程去争抢同一个分布式锁,在扣减库存前先获取分布式锁,拿到锁再扣减库存,执行完释放锁之后其它JVM的线程才可以获取锁继续扣减库存,如下图:
上边的方案将库存放在Redis中避免与数据库交互,很大的提高的了执行效率,在分布式场景下使用分布式锁是一种常用的控制共享资源的方案。
分布式锁需要搭建独立的分布式锁服务(例如Redis、Zookeeper等),每次操作需要远程与分布式锁服务交互获取锁、释放锁,还有没有性能更高的方案呢?
6)Redis原子操作方案
上边使用分布式锁的方案每次操作需要远程与分布式锁服务交互获取锁、释放锁,有没有优化的方法避免申请锁与释放锁的交互呢?
在分布式锁方案中是在java程序扣减库存最后更新redis库存的值,能否使用redis的decr命令去扣减库存呢?
Redis Decr 命令将 key 中储存的数字值减一,并且具有原子性,Redis中所有命令都具有原子性。
原子性表示该命令在执行过程中是不被中断的,也就实现了多线程去执行decr命令扣减库存是顺序执行的,假如库存原来是100,扣减到0结束,多线程并发执行decr命令不会出现扣减次数超过100次,如下图:
基于这个思想可以对分布式锁方案优化如下:
此方案中没有使用分布式锁,而是基于Redis命令具有原子性的特点实现。
本项目使用Redis原子操作控制超卖问题。
4.3.2 Redis原子操作方案
在Redis原子操作方案中扣减库存使用decr命令实现,decr命令具有原子性,如果在扣减库存操作中有多个操作 ,那么整体还是原子性吗?如下图:
扣减库存逻辑如下:
1、首先查询库存
2、判断库存大小,如果大于0则扣减库存,否则 直接返回
3、记录抢券成功的记录,用于判断用户不能重复抢券的依据。
4、记录抢券同步的记录,用于后续的异步处理,将抢券结果保存到数据库。
如果上述四步整体不具有原子性仍然没有办法控制超卖问题,所以必须保证1、2、3步逻辑放在一起整体具有原子性。
如何保证多个Redis命令具有原子性呢?本节介绍两个保证Redis多个命令具有原子性的方法。
1)通过 MULTI 事务命令实现
对于redis单个命令都是原子操作,现在要求扣减库存、写入抢券成功队列及写入同步队列保证原子性,多个redis命令如何保证原子性呢?
1、通过 MULTI 事务命令实现
下边的命令执行流程如下:
执行MULTI 标记首先标记一个事务块开始。
然后将要执行的命令加入队列。
将“HSET key1 field1 value2 field2 value2” 命令放入队列中,表示向key1中写入两个hashkey。
将“INCR key2”命令放入队列中,表示对key2自增1。
运行EXEC命令按顺序执行,整体保证原子性。
MULTI
HSET key1 field1 value2 field2 value2
INCR key2
EXEC
测试:
2)了解Pipeline与MULTI 的区别
学习过Redis的同学听说过pipline管道命令,pipline也可实现批量执行多个 redis命令,pipline与multi的区别是:
pipeline 是把多个redis指令一起发出去,redis并没有保证这些命令的执行是原子的;multi实现的是将多个命令作为事务块去执行,保证整个操作的原子性。
如果仅是执行多个命令不保证原子性那么使用pipeline 的性能要比multi要高,但是针对本项目要保证多个命令实现原子性的需求那么pipeline 不符合要求。
3)Redis+Lua实现
Lua 是一种强大、高效、轻量级、可嵌入的脚本语言,Lua体积小、启动速度快,从而适合嵌入在别的程序里,Lua可以用于web开发、游戏开发、嵌入式开发等领域。
参考:http://www.lua.org/docs.html,或者去百度搜索Lua中文教程。
对于Lua脚本语法非常容易理解,先不用系统的去学习,先把本项目使用的Lua脚本读懂即可,实际工作中用到时再参考本项目的脚本去写即可,不会的再查Lua 的语法。
先看一个例子,对上边的例子编写Lua脚本,如下:
local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);
redis.call('incr', KEYS[2]);
return ret..'';
说明:
KEYS:表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,KEYS[1]表示第一个key,KEYS[2]表示第2个key,依次类推。
ARGV:表示在脚本中所用到的参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类),ARGV[1]表示第一个参数,ARGV[2]表示第二个参数,依次类推。
如何执行上边的Lua脚本呢?
使用EVAL 命令执行Lua脚本。
EVAL是redis的命令本身具有原子性,整个脚本的执行具有原子性。
EVAL script numkeys key [key ...] arg [arg ...]
参数说明:
- script: 是一段 Lua 5.1 脚本程序。
- numkeys: 用于指定键名参数的个数。(操作大key的数量,小key不管)
- key [key …]: 从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
- arg [arg …]: 附加参数,在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。
执行下边的命令:
eval "local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);redis.call('incr', KEYS[2]);return ret..'';" 2 test_key01 test_key02 field1 aa field2 bb
说明:
eval后边的script参数即脚本程序,将上边的Lua脚本使用双引号括起来。
numkeys:为2表示2个key
之后传入key的名称(多key中间用空格分隔):test_key01 test_key02
key后边再传入ARGV 参数(多ARGV 中间用空格分隔):field1 aa field2 bb
测试结果如下所示:
返回2表示向hash中写入2个key
下边是使用RedisTemplate执行Lua脚本的方法:
<T> T execute(RedisScript<T> script, List<K> keys, Object... args)
通过第一个参数类型指定要执行的Lua脚本,RedisScript的实现类是DefaultRedisScript,下边查阅DefaultRedisScript的源代码。
第一种方法是将Lua脚本的内容作为字符串传入DefaultRedisScript对象并且指定返回值类型,代码如下:
public DefaultRedisScript(String script, @Nullable Class<T> resultType) {this.shaModifiedMonitor = new Object();this.setScriptText(script);this.resultType = resultType;
}
如果脚本内容比较多使用第一种方法显得很麻烦。
第二种方法是指定Lua脚本的位置,通过DefaultRedisScript的setScriptSource方法完成,如下:
public void setScriptSource(ScriptSource scriptSource) {this.scriptSource = scriptSource;
}
本项目使用第二种方法,在RedisLuaConfiguration中定义DefaultRedisScript bean
@Bean("Lua_test01")
public DefaultRedisScript<Integer> getLuaTest01() {DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();//resource目录下的scripts文件下的Lua_test01.Lua文件redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/Lua_test01.Lua")));redisScript.setResultType(Integer.class);return redisScript;
}
创建lua脚本
创建RedisTest测试类:
注入上边定义的DefaultRedisScript,注意注入时指定名称“Lua_test01”。
package com.jzo2o.market.service;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;/*** @author Mr.M* @version 1.0* @description TODO* @date 2023/10/13 16:28*/
@SpringBootTest
@Slf4j
public class RedisLuaTest {@Resource(name = "redisTemplate")RedisTemplate redisTemplate;@Resource(name = "Lua_test01")DefaultRedisScript script;//测试Lua@Testpublic void test_Luafirst() {//参数1:key ,key1:test_key01 key2:test_key02List<String> keys = Arrays.asList("test_key01","test_key02");//参数2:传入Lua脚本的参数,"field1","aa","field2", "bb"Object result = redisTemplate.execute(script, keys, "field1","aa","field2", "bb");log.info("执行结果:{}",result);}}
执行test_Luafirst() 测试方法。
4)选择方案
上边学习了Multi和Redis+Lua两种实现Redis原子操作的方案,在本项目你会选择哪一种方案?
你肯定会说是第一种方案,因为简单,使用Lua脚本还要写Lua脚本,去学习它的语法。
不过,在实际使用中要根据具体的需求去确定方案,比如下边的Lua脚本在执行过程中就会有一些业务逻辑判断,不满足条件提前返回结果,而MULTI 执行命令是执行完成最后一起拿到所有命令的执行结果,并且MULTI 不适合写带有业务逻辑的脚本内容。
下边的lua脚本实现了项目抢券功能,可以尝试阅读,稍后会详细讲解。
-- 抢券Lua实现
-- key: 抢券同步队列,资源库存,抢券成功列表
-- argv:活动id,用户id--优惠券是否已经抢过
local couponNum = redis.call("HGET", KEYS[3], ARGV[2])
-- hget 获取不到数据返回false而不是nil
if couponNum ~= false and tonumber(couponNum) >= 1
thenreturn "-1";
end
-- --库存是否充足校验
local stockNum = redis.call("HGET",KEYS[2], ARGV[1])
if stockNum == false or tonumber(stockNum) < 1
thenreturn "-2";
end
--抢券列表
local listNum = redis.call("HSET",KEYS[3], ARGV[2], 1)
if listNum == false or tonumber(listNum) < 1
thenreturn "-3";
end--减库存
stockNum = redis.call("HINCRBY",KEYS[2], ARGV[1], -1)
if tonumber(stockNum) < 0
thenreturn "-4"
end
-- 抢单结果写入同步队列
local result = redis.call("HSETNX", KEYS[1], ARGV[2],ARGV[1])
if result > 0
thenreturn ARGV[1] ..""
end
return "-5"
根据需求本项目使用Redis执行Lua脚本的方式保证多命令的原子性,完成抢券功能。
在Spring的测试中,因为config配置中针对不同的脚本文件使用不同的bean,自然测试的时候需要引入对应的bean,并且我们现在是单元测试,自然lua_test01的脚本文件应该放在test下的resource的script文件夹中
@SpringBootTest
@Slf4j
public class RedisLuaTest {@Resource(name = "redisTemplate")RedisTemplate redisTemplate;@Resource(name = "lua_test01")DefaultRedisScript script;//测试lua@Testpublic void test_luafirst() {//参数1:key ,key1:test_key01 key2:test_key02List<String> keys = Arrays.asList("bblb1","bblb2");//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"Object result = redisTemplate.execute(script, keys, "test1","bb","test2", "lb");log.info("执行结果:{}",result);}@Testpublic void test_luafirst2() {//参数1:key ,key1:test_key01List<String> keys = Arrays.asList("test_key01{1}","test_key02{1}");//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"Object result = redisTemplate.execute(script, keys, "field1","aa","field2", "bb");log.info("执行结果:{}",result);}}
对应的lua_test01中的内容
local ret = redis.call('hset', KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4]);
redis.call('incr', KEYS[2]);
return ret..'';
测试理论上,应该是两个大key,分别是bblb1和bblb2,然后bblb2里有自增的1,bblb2有俩键值对,分别是(test1,bb)和(test2,lb)
测试成功
5)使用Lua脚本注意点
Lua脚本在redis集群上执行需要注意什么?
在redis集群下执行redis命令会根据key求哈希,确定具体的槽位(slot),然后将命令路由到负责该槽位的 Redis 节点上。
执行一次Lua脚本会涉及到多个key,在redis集群下执行lua脚本要求多个key必须最终落到同一个节点,否则调用Lua脚本会报错:ERR eval/evalsha command keys must be in same slot。
如何保证多个key落地到一个redis节点呢?
只要保证多个key的哈希值一致即可保证多个key落到一个redis节点上,这个如何实现呢?
解决方法:一次执行Lua脚本的所有key中使用大括号‘{}’且保证大括号中的内容相同,此时会根据大括号中的内容求哈希,因为内容相同所以求得的哈希数据相同所以就落在了同一个Redis节点。
测试如下:
所以我们的代码要变成在key名称后边添加{},大括号中写一个固定的值。
@Test
public void test_luafirst2() {//参数1:key ,key1:test_key01List<String> keys = Arrays.asList("bblb1{1}","bblb{1}");//参数2:传入lua脚本的参数,"field1","aa","field2", "bb"Object result = redisTemplate.execute(script, keys, "test1","bb","test2", "lb");log.info("执行结果:{}",result);
}
执行测试成功,观察redis多了两个key:"bblb1{1}“和"bblb{1}”
4.3.3 抢券整体方案
1)抢券方案分析
抢券的架构设计思想同抢券查询,将库存保存在Redis,避免抢券操作请求数据库,通过异步任务将Redis中的抢券结果同步到数据库。
抢券的交互流程如下:
如下图:
说明如下:
1、由预热程序将待生效库存同步到redis(活动开始将不允许更改库存)
2、活动开始后,抢券程序请求Redis扣减库存,扣减库存成功向抢券成功队列和抢券同步队列写入记录
Redis中两个队列的作用如下:
抢券成功队列:为了校验用户是否抢过该优惠券。
抢券同步队列:将抢券结果同步到数据库
3、通过定时任务程序根据Redis中同步队列记录的用户抢券结果信息将数据同步到MySQL,具体操作如下:
向优惠券表插入用户抢券记录。
更新优惠券活动表的库存。
写入数据库完成后删除Redis中同步队列的相应记录,删除后表示同步完成,如果同步过程失败将保留Redis同步队列的相应记录。
2)数据流
根据交互流程分析数据流如下:
3)Redis数据结构
- 活动信息
缓存结构:String类型:
key: “ACTIVITY:LIST”
value: 符合条件的优惠券活动列表JSON数据。
过期时间:永不过期
缓存一致性方案:通过预热程序保证缓存一致性
- 优惠券活动库存
缓存结构:Hash
RedisKey:COUPON:RESOURCE:STOCK:{活动id%10}
{活动id%10}表示根据活动id除以10求余,通过这种方法将key分散到不同的redis服务器上,通过“活动id%10”表达式可知优惠券活动库存hash最多有10个。
HashKey:活动id
HashValue: 库存
过期时间:永不过期
缓存一致性方案:通过预热程序保证缓存一致性
举例:
如果n为10,1号活动的库存是100,将1号活动库存存储Redis的效果如下:
- 抢券成功队列
缓存结构:Hash
RedisKey:COUPON:SEIZE:LIST:活动id_{活动id%10}
HashKey:用户id
HashValue:1
过期时间:永不过期
- 抢券同步队列
缓存结构:Hash
RedisKey:QUEUE:COUPON:SEIZE:SYNC:{活动id%10}
HashKey:用户id
HashValue:活动id
过期时间:永不过期
4)小结
抢券是怎么做的?或方案是什么?
抢券业务的Redis数据结构用的什么?具体说说
秒杀系统中如何进行流量削峰?
在秒杀系统中进行流量削峰是非常重要的,因为瞬时的高流量可能会导致系统崩溃或性能下降。以下是一些常见的流量削峰策略:
- **限流措施:**通过控制请求的发放速率,可以有效地平滑流量,避免瞬时的高并发。
- 队列缓冲: 使用消息队列来缓冲请求,将瞬时的高并发请求进行缓存和排队。秒杀系统可以异步地从队列中取出请求进行处理,以平滑处理流量。
- **分批处理:**将瞬时的高并发请求分批处理。不需要一次性处理所有请求,可以将请求按照一定的规模分批处理,以减轻数据库和系统的压力。
- 负载均衡:采用多节点部署,通过负载均衡器将流量分发到不同的服务器上。
- 熔断机制:
- 熔断策略: 实现熔断机制,当系统达到一定的负载阈值时,暂时停止接受新的请求,防止系统崩溃。等到系统恢复后再重新开启。
- **缓存预热:**在秒杀开始之前,提前将秒杀商品的信息加载到缓存中,减轻数据库的压力。
- **验证码和身份验证:**引入验证码和身份验证机制,防止机器人或恶意请求,减少无效请求对系统的冲击。
- **数据库优化:**对于秒杀系统,数据库通常是瓶颈之一。通过优化数据库结构、建立索引、使用缓存等手段来提高数据库的读写性能。
4.3.4 库存同步
1)系统设计
根据整体方案分析,用户抢券要在Redis扣减库存,所以需要提前将优惠券活动的库存同步到Redis。
可以通过定时预热程序中将优惠券活动的库存同步到Redis,同步规则如下:
- 对于待生效的活动更新库存。
- 对于已生效的活动如果库存已经同步则不再同步,只更新没有同步库存的活动。
做第二点的原因是为了避免时间差问题,活动状态更改为进行中了但是库存还没有同步到Redis。
当用户抢券成功,Redis中的库存有了变化,如何将最新库存由Redis同步MySQL呢?
根据整体方案分析,在抢券结果同步程序中根据抢券结果修改数据库中的库存,此部分在抢券结果同步章节再确定具体的方法。
交互流程如下:
2)预热程序中同步库存
下边实现在预热程序中同步库存。
在com.jzo2o.market.service.impl.ActivityServiceImpl#preHeat的预热程序中添加:
@Override
public void preHeat() {/***SELECT *FROM activity tWHERE t.distribute_start_time <= DATE_ADD(NOW(), INTERVAL 30 DAY) AND t.status IN (1, 2)ORDER BY t.distribute_start_time ASC;*/// 1.查询准备LocalDateTime now = DateUtils.now();LocalDateTime preHeatTime = now.plusDays(30);LambdaQueryWrapper<Activity> lambdaQueryWrapper = new LambdaQueryWrapper<>();// 查询条件lambdaQueryWrapper.le(Activity::getDistributeStartTime, preHeatTime).in(Activity::getStatus, Arrays.asList(NO_DISTRIBUTE.getStatus(), DISTRIBUTING.getStatus())).orderByAsc(Activity::getDistributeStartTime);// 查询List<Activity> activities = baseMapper.selectList(lambdaQueryWrapper);if (CollUtils.isEmpty(activities)) {//防止缓存穿透activities = new ArrayList<>();}// 2.数据转换: 将List<Activity> 转为List<SeizeCouponInfoResDTO>List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = BeanUtils.copyToList(activities, SeizeCouponInfoResDTO.class);// 3.再转为json字符串String json = JsonUtils.toJsonStr(seizeCouponInfoResDTOS);// 4.存入redis,操作string用opsForValue(),操作哈希用opsForHash()redisTemplate.opsForValue().set(ACTIVITY_CACHE_LIST, json);// 5.对未开始的活动的库存直接更新到redisactivities.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==NO_DISTRIBUTE.getStatus()).forEach(v->{redisTemplate.opsForHash().put(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());});// 6.对已经开始的活动的库存,如果redis中没有,则更新到redisactivities.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==DISTRIBUTING.getStatus()).forEach(v->{redisTemplate.opsForHash().putIfAbsent(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());});
说明:
对于待生效的活动库存使用put方法,可以对已设置的记录进行更改。
对已生效的活动库存使用putIfAbsent实现,当key不存在时才执行设置操作。
String.format(COUPON_RESOURCE_STOCK, v.getId() % 10) 用来拼装 key,库存的redis key为:
COUPON:RESOURCE:STOCK:{活动id%10}。
3)测试
测试流程:
启动定时预热程序任务。
观察redis中是否成功存储库存信息。
先清空redis后,直接执行一次定时任务
我们当前只有洗澡大派送是进行中,自然只有一个。
4.3.5 抢券Lua脚本
1)抢券Lua脚本
本节对抢券Lua脚本进行阅读并测试,理解抢券的过程。
- 阅读下边的Lua脚本
-- 抢券lua实现
-- key: 抢券同步队列,资源库存,抢券成功列表
-- argv:活动id,用户id--优惠券是否已经抢过
local couponNum = redis.call("HGET", KEYS[3], ARGV[2])
-- hget 获取不到数据返回false而不是nil
if couponNum ~= false and tonumber(couponNum) >= 1
thenreturn "-1";
end
-- --库存是否充足校验
local stockNum = redis.call("HGET",KEYS[2], ARGV[1])
if stockNum == false or tonumber(stockNum) < 1
thenreturn "-2";
end
--抢券列表
local listNum = redis.call("HSET",KEYS[3], ARGV[2], 1)
if listNum == false or tonumber(listNum) < 1
thenreturn "-3";
end--减库存
stockNum = redis.call("HINCRBY",KEYS[2], ARGV[1], -1)
if tonumber(stockNum) < 0
thenreturn "-4"
end
-- 抢券结果写入同步队列
local result = redis.call("HSETNX", KEYS[1], ARGV[2],ARGV[1])
if result > 0
thenreturn ARGV[1] ..""
end
return "-5"
错误代码:
-1: 限领一张
-2: 已抢光
-3: 写入抢券成功队列失败,返回给用户为:抢券失败
-4: 已抢光
-5: 写入抢券同步队列失败,返回给用户为:抢券失败
2)测试
编写测试方法,准备好调用抢券Lua脚本需要传入的key和参数。
代码如下:
@Test
void test_seizeCouponScriptLua() {//argv:抢券活动idlong activityId = 1851921214852177920L;// argv: 用户idLong userId = 1828787045661319168L;//index:就是rediskey后大括号里的内容,多个rediskey的内容必须一直,否则会出现数据不一致,就是活动id%10//int index = (int) (activityId % 10);int index =0;//key: 抢券同步队列,资源库存,抢券成功列表// 同步队列redisKeyString couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index);// 资源库存redisKeyString resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index);// 抢券成功列表String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST,activityId, index);// 抢券Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey),activityId, userId);log.debug("seize coupon result : {}", execute);
}
执行成功,观察redis:
示例:
库存是否减少:
抢券成功队列是否存在相应记录:
抢券同步队列是否存在相应记录:
3) 小结
抢券的Lua脚本做的什么工作?
- 判断用户是否在该活动抢过券。
- 判断库存是否充足
- 写入抢券成功列表
- 扣减库存
- 写入抢券同步列表
4.3.6 抢券接口开发
1)接口定义
下边进行接口分析,定义抢券接口。
在下边的界面中点击“立即领取”即开始抢券。
请求哪些参数?
抢券需要明确两个元素: 哪个用户抢的是哪个活动的优惠券。
用户的身份信息在token中由前端传入服务端。
所以,本接口需要传入服务端的参数是活动ID。
传入参数:活动ID。
响应结果:无,通过状态码判断。
接口定义如下:
接口名称:抢券接口
接口路径:POST/market/consumer/coupon/seize
编写controller方法:
@RestController("consumerCouponController")
@RequestMapping("/consumer/coupon")
@Api(tags = "用户端-优惠券相关接口")
public class CouponController {@PostMapping("/seize")
public void seizeCoupon(@RequestBody SeizeCouponReqDTO seizeCouponReqDTO) {}
...
2)校验活动有效性
下边定义service方法,在service方法中需要做哪些事?
- 校验活动是否有效
- 调用Lua脚本执行抢券
本节实现第一步,校验活动的有效性。
定义 service接口如下:
public interface ICouponService extends IService<Coupon> {/*** 抢券** @param seizeCouponReqDTO*/
void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO);
...
实现类:
@Overridepublic void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO) {// 1.校验活动开始时间或结束// 首先从缓存查询活动// 2.抢券准备
// key: 抢券同步队列,资源库存,抢券列表
// argv:抢券id,用户id// 3.执行lua脚本进行抢券结果// 4.处理lua脚本结果,失败的抛出异常,成功的正常返回}
如何校验活动是否有效?
1、从缓存中查询指定活动的信息
抢券接口避免与数据库交互。
2、根据活动时间校验活动是否未开始或者已经结束,这两类活动不允许抢券
实现方法如下:
- 定义从缓存查询指定活动信息的方法
public interface IActivityService extends IService<Activity> {/*** 从缓存中获取活动信息* @param id* @return*/
ActivityInfoResDTO getActivityInfoByIdFromCache(Long id);
实现方法如下:
@Override
public ActivityInfoResDTO getActivityInfoByIdFromCache(Long id) {// 1.从缓存中获取活动信息Object activityList = redisTemplate.opsForValue().get(ACTIVITY_CACHE_LIST);if (ObjectUtils.isNull(activityList)) {return null;}// 2.过滤指定活动信息List<ActivityInfoResDTO> list = JsonUtils.toList(activityList.toString(), ActivityInfoResDTO.class);if (CollUtils.isEmpty(list)) {return null;}// 3.过滤指定活动return list.stream().filter(activityInfoResDTO -> activityInfoResDTO.getId().equals(id)).findFirst().orElse(null);
}
3)抢券service方法
下边编写抢券的service方法
@Overridepublic void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO) {// 1.校验活动开始时间或结束// 抢券时间ActivityInfoResDTO activity = activityService.getActivityInfoByIdFromCache(seizeCouponReqDTO.getId());LocalDateTime now = DateUtils.now();if (activity == null ||activity.getDistributeStartTime().isAfter(now)) {throw new CommonException(SEIZE_COUPON_FAILD, "活动未开始");}if (activity.getDistributeEndTime().isBefore(now)) {throw new CommonException(SEIZE_COUPON_FAILD, "活动已结束");}// 2.抢券准备
// key: 抢券同步队列,资源库存,抢券列表
// argv:抢券id,用户idint index = (int) (seizeCouponReqDTO.getId() % 10);// 同步队列redisKeyString couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index);// 资源库存redisKeyString resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index);// 抢券列表String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST, activity.getId(), index);log.debug("seize coupon keys -> couponSeizeListRedisKey->{},resourceStockRedisKey->{},couponSeizeListRedisKey->{},seizeCouponReqDTO.getId()->{},UserContext.currentUserId():{}",couponSeizeListRedisKey, resourceStockRedisKey, couponSeizeListRedisKey, seizeCouponReqDTO.getId(), UserContext.currentUserId());// 3.抢券结果Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey),seizeCouponReqDTO.getId(), UserContext.currentUserId());log.debug("seize coupon result : {}", execute);// 4.处理lua脚本结果if (execute == null) {throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");}long result = NumberUtils.parseLong(execute.toString());if (result > 0) {return;}if (result == -1) {throw new CommonException(SEIZE_COUPON_FAILD, "限领一张");}if (result == -2 || result == -4) {throw new CommonException(SEIZE_COUPON_FAILD, "已抢光!");}throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");}
4)抢券controller方法
@ApiOperation("抢券接口")
@PostMapping("/seize")
public void seizeCoupon(@RequestBody SeizeCouponReqDTO seizeCouponReqDTO) {couponService.seizeCoupon(seizeCouponReqDTO);
}
5)抢券测试
启动网关
启动优惠券活动管理工程
启动xxl-job
打开小程序,进入抢券页面
点击领取,观察写入redis的数据是否正确:
示例:
抢券成功存入redis抢券成功队列:
存入redis抢券同步队列:
抢券失败情况测试:
限领一张提示:
已抢光就不测试了
6)小结
项目是怎么实现抢券功能的?
1)将优惠券活动的库存同步到Redis
2)用户抢券请求redis,执行Lua脚本,具体如下:
先判断当前用户是否抢过该优惠券,如果抢过则返回-1
判断库存是否充足,如果不充足返回-2
向抢券成功列表写入记录
扣减库存
向抢券同步列表写入记录
3)由异步任务将redis抢券成功记录同步到数据库中
4.4 抢券结果同步
4.4.1 Redis到MySQL同步方案分析
如何将Redis中的抢券结果同步到MySQL的优惠券表(coupon)呢?
1)整体思路
基本思路: 遍历Redis中的抢券结果同步队列,拿到一个元素就向数据库的优惠券表插入记录,插入完成后删除Redis中的这条记录。
下图是存储抢券结果同步数据的Hash表。
我们可以一次从Hash表中拿一批数据,每个元素包括了用户id和活动id,根据这两个参数插入coupon表。
从下图可以看出,只要拿到用户id和活动id即可向优惠券表插入一条记录。
基本思路清楚,现在需要考虑:系统有多个活动,如何提高同步程序的处理性能呢?
假如同步队列的key为:QUEUE:COUPON:SEIZE:SYNC:{活动id % 10},这说明最多有10个同步列表。
我们可以用多线程,每个线程处理一个同步队列。
由定时任务去调度,每隔1分钟由多线程对同步队列中的数据进行处理。
2)如何使用线程池?
根据我们需求,假如同步队列个数为10我们需要定义一个最多有10个活跃线程的线程池,满负荷工作下每个线程处理一个同步队列,当满负荷工作时如果再有新的任务线程池拒绝任务。
定义一个线程池需要以下参数:
- corePoolSize(核心线程数): 核心线程一直存活的线程池中,即使它们是空闲的也会被保留在池中。在执行新任务时,如果线程池中的线程数小于
corePoolSize
,将会创建一个新的线程来执行任务。 - maximumPoolSize(最大线程数): 池中允许的最大线程数。如果队列满了,并且活动线程数小于
maximumPoolSize
,则会创建新的线程来处理任务。 - keepAliveTime(线程空闲时间): 当线程池中的线程数量超过
corePoolSize
时,多余的空闲线程的存活时间如果超过这个时间会被终止,直到线程数量不超过corePoolSize
。这样可以确保在低负载时最小化资源消耗。 - unit(时间单位):
keepAliveTime
的时间单位,可以是TimeUnit.SECONDS
、TimeUnit.MINUTES
等。 - workQueue(阻塞队列): 用于保存等待执行的任务的阻塞队列。
常用的阻塞队列:
LinkedBlockingQueue:链表结构,无界队列。
ArrayBlockingQueue:数组结构,有界队列。
SynchronousQueue:容量为1,在没有线程去消费时不会保存任务。
- **线程池的拒绝策略(RejectedExecutionHandler)**定义了当线程池无法执行新任务时应该采取的策略。当线程池中的工作队列已满,并且线程池中的线程数已达到最大值时,新任务的处理方式由拒绝策略来确定。
如下:
ThreadPoolExecutor.AbortPolicy 是默认的饱和策略。当任务添加到线程池中被拒绝时,会抛出 RejectedExecutionException 异常。 ThreadPoolExecutor.CallerRunsPolicy 当任务被拒绝时,会使用调用线程池的线程来执行被拒绝的任务。 ThreadPoolExecutor.DiscardPolicy 当任务被拒绝时,会默默地丢弃被拒绝的任务,不会抛出异常也不会执行被拒绝的任务。 ThreadPoolExecutor.DiscardOldestPolicy 当任务被拒绝时,会丢弃队列中最老的一个任务,并尝试重新提交被拒绝的任务。
在com.jzo2o.market.config.ThreadPoolConfiguration中定义线程如下:
@Configuration
public class ThreadPoolConfiguration {@Bean("syncThreadPool")public ThreadPoolExecutor synchronizeThreadPool(RedisSyncProperties redisSyncProperties) {// 定义线程池参数int corePoolSize = 1; // 核心线程数int maxPoolSize = redisSyncProperties.getQueueNum(); // 最大线程数long keepAliveTime = 120; // 线程空闲时间TimeUnit unit = TimeUnit.SECONDS; // 时间单位// 指定拒绝策略为 DiscardPolicy RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.DiscardPolicy();// 任务队列,使用SynchronousQueue容量为1,在没有线程去消费时不会保存任务ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit,new SynchronousQueue<>(),rejectedHandler);return executor;}
}
线程池使用规则:(用返回的结果使用Callable()),不用返回结果用Runnable())
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAXIMUM_POOL_SIZE,KEEP_ALIVE,TimeUnit.SECONDS,sPoolWorkQueue,sThreadFactory);
// 向线程池提交任务
threadPool.execute(new Runnable() {@Overridepublic void run() {... // 线程执行的任务}
});
// 关闭线程池
threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
下边测试线程池:
分两轮向线程池提交任务,每轮提交10个任务。
@SpringBootTest
@Slf4j
public class SyncThreadPoolTest {public class RunnableSimple implements Runnable{//任务序号private int index;public RunnableSimple(int index){this.index = index;}@Overridepublic void run() {//执行任务log.info("{}执行任务:{}",Thread.currentThread().getId(),index);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}}@Resource(name="syncThreadPool")private ThreadPoolExecutor threadPoolExecutor;@Testpublic void test_threadPool() throws InterruptedException {for (int i = 0; i < 10; i++) {threadPoolExecutor.execute(new RunnableSimple(i));}
// Thread.sleep(3000);延迟三秒,线程池的线程执行任务后空闲下来for (int i = 10; i < 20; i++) {threadPoolExecutor.execute(new RunnableSimple(i));}//主线程休眠一定的时间防止程序结束Thread.sleep(9999999);}}
如果注释Thread.sleep(3000);,由于线程池最大线程数是10,提交10个任务后线程还没有空闲再提交新任务会被拒绝。
2024-11-06 20:31:44.214 INFO 15048 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 113执行任务:1
2024-11-06 20:31:44.214 INFO 15048 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : 112执行任务:0
2024-11-06 20:31:44.214 INFO 15048 --- [pool-3-thread-3] c.j.market.service.SyncThreadPoolTest : 114执行任务:2
2024-11-06 20:31:44.214 INFO 15048 --- [pool-3-thread-4] c.j.market.service.SyncThreadPoolTest : 115执行任务:3
2024-11-06 20:31:44.214 INFO 15048 --- [pool-3-thread-5] c.j.market.service.SyncThreadPoolTest : 116执行任务:4
2024-11-06 20:31:44.215 INFO 15048 --- [pool-3-thread-6] c.j.market.service.SyncThreadPoolTest : 117执行任务:5
2024-11-06 20:31:44.215 INFO 15048 --- [pool-3-thread-7] c.j.market.service.SyncThreadPoolTest : 118执行任务:6
2024-11-06 20:31:44.215 INFO 15048 --- [pool-3-thread-8] c.j.market.service.SyncThreadPoolTest : 119执行任务:7
2024-11-06 20:31:44.215 INFO 15048 --- [pool-3-thread-9] c.j.market.service.SyncThreadPoolTest : 120执行任务:8
2024-11-06 20:31:44.215 INFO 15048 --- [ool-3-thread-10] c.j.market.service.SyncThreadPoolTest : 121执行任务:9
如果放开Thread.sleep(3000);,20个任务都会执行完成。
3)如何从Redis 批量取数据?
使用线程池处理的任务的代码完成测试,下边需要解决的是每个任务如何从Hash中拿一批数据呢?
我们使用redisTemplate.opsForHash().scan(H key, ScanOptions options)方法,scan方法通过游标的方式实现从hash中批量获取数据。
测试代码如下:
@SpringBootTest
@Slf4j
public class SyncThreadPoolTest {public class RunnableSimple implements Runnable{private int index;public RunnableSimple(int index){this.index = index;}@Overridepublic void run() {//执行任务log.info("{}执行任务:{}",Thread.currentThread().getId(),index);//获取数据String queue = String.format("QUEUE:COUPON:SEIZE:SYNC:{%s}",index);log.info("开始获取{}队列的数据",queue);getData(queue);}
}
/*** 从同步队列拿数据* @param queue 队列名称*/public void getData(String queue) {Cursor<Map.Entry<String, Object>> cursor = null;// 通过scan从redis hash数据中批量获取数据,获取完数据需要手动关闭游标ScanOptions scanOptions = ScanOptions.scanOptions().count(10).build();try {// sscan获取数据cursor = redisTemplate.opsForHash().scan(queue, scanOptions);// 遍历数据转换成SyncMessage列表List<SyncMessage<Object>> collect = cursor.stream().map(entry -> SyncMessage.builder().key(entry.getKey().toString()).value(entry.getValue()).build()).collect(Collectors.toList());log.info("{}获取{}数据{}条", Thread.currentThread().getId(),queue,collect.size());collect.stream().forEach(System.out::println);}catch (Exception e){log.error("同步处理异常,e:", e);throw new RuntimeException(e);} finally {// 关闭游标if (cursor != null) {cursor.close();}}}}
下边进行测试:
保证存在“QUEUE:COUPON:SEIZE:SYNC:{%s}”的队列且其中有数据
再次运行test_threadPool()方法进行测试。
观察控制台的日志"{}获取{}队列的数据{}条",并观察是否输出了从Hash表中拿到的数据。
输出日志示例:
2024-11-06 20:47:42.762 INFO 25576 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : 109获取QUEUE:COUPON:SEIZE:SYNC:{0}数据2条
SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
4)小结
本节我们分析了从Redis到MySQL同步方案:
1、使用线程池从多个同步队列中查询数据,每个线程处理一个同步队列。
注意:同步队列的个数可以灵活配置,但不宜过多,因为同步队列个数为最大线程数,通常配置10到20即可。
2、使用redisTemplate.opsForHash().scan(H key, ScanOptions options)方法从hash表获取数据。
这里需要注意游标的使用,一定要在finally 中关闭游标。
项目中如何定义的线程池?
4.4.2 Redisson分布式锁
1)问题分析
多线程在执行任务时,如果多个线程从同一个Redis Hash中获取数据就会出现重复处理数据的问题。
下边的情况出现这个问题:
定时任务每隔1分钟调用线程池处理一个数据同步任务,假如同步队列1的数据非常多,一轮结束后线程1还在处理同步队列1的数据,此时当第二轮开始又会从同步队列1开始分配线程去处理,将会找一个空闲线程去处理同步队列1的数据,此时将会有多个线程处理一个同步队列的数据。
如下图:
这个问题如何解决呢?
在前边我们介绍超卖方案时提到了使用锁去控制共享资源访问,对于分布式系统我们使用分布式锁去控制。
如下图:每个同步队列使用一把锁进行控制,当线程1还没有处理完时锁不进行释放,这样线程2就无法获取同步队列1的锁,解决了多个线程处理同一个队列的问题。
2)Redisson实现分布式锁
实现分布式锁的方案有很多:Redis、数据库、Zookeeper等,我们使用Redis实现分布式锁,使用Redis实现分布式锁可以使用的它的setNx命令,也可以使用Redisson工具去实现,本项目使用Redission去实现,关于setNx的知识可以参考视频(https://www.bilibili.com/video/BV1j8411N7Bm?p=181)自学。
Redisson 是一个用于 Java 开发的 Redis 客户端和分布式锁框架,它不仅可以实现分布式锁,还可以实现分布式集合(如 List、Set、Map 等)和分布式对象(如 AtomicInteger、AtomicLong、CountDownLatch 等),简单理解就是将JVM中内存存储的List、Set、AtomicInteger这些对象使用Redis去存储和管理。
使用Redisson的基本用法如下:
// 创建Redisson客户端
RedissonClient redissonClient = Redisson.create();
// 获取名为myLock的分布式锁实例,通过此实例进行加锁、解锁
RLock lock = redissonClient.getLock("myLock");try {// 尝试获取锁,最多等待3秒,持锁时间为5秒boolean isLockAcquired = lock.tryLock(3, 5, TimeUnit.SECONDS);if (isLockAcquired) {// 获取锁成功,执行业务逻辑} else {// 获取锁失败,处理相应逻辑}
} catch (InterruptedException e) {// 处理中断异常
} finally {// 释放锁lock.unlock();
}
说明:
lock.tryLock方法是一种非阻塞获取锁的方式,没有获取锁可以直接返回,而lock.lock()是一种阻塞获取锁的方法,多个线程通过lock()方法获取锁,只有一个线程获取到锁,其它线程将阻塞等待。
通常lock.tryLock方法使用的更广泛。
- 使用tryLock方法获取锁时传3个参数:
- waitTime:尝试获取锁的最大等待时间,在这个时间范围内会不断地尝试获取锁,如果在
waitTime
时间内未能获取到锁,则返回false
。waitTime默认为-1,表示获取锁失败后立刻返回不重试。 - leaseTime:表示持锁的时间,即锁的自动释放时间。在获取锁成功后,锁会在
leaseTime
时间后自动释放。如果在持锁的时间内未手动释放锁,锁也会在leaseTime
时间后自动释放。 - TimeUnit:表示时间单位,可以是秒、毫秒等。
- tryLock方法返回值:
true:获取到了锁
false:未获取到锁
下边我们进行测试
- 注意释放锁
lock.tryLock代码放在try中,在finally 中释放锁。
下边进行测试:
我们实现一个队列只有一个线程去处理数据,锁的粒度为队列,每个锁与队列对应,我们定义锁名称为"Lock:"加队列名称。
代码如下:
public class RunnableSimple2 implements Runnable {//任务序号private int index;public RunnableSimple2(int index) {this.index = index;}@Overridepublic void run() {// 指定锁名称,每个队列只有一个线程去处理,锁粒度为队列,锁名称在队列名称前加Lock:String lockName = String.format("Lock:QUEUE:COUPON:SEIZE:SYNC:{%s}",index);//队列名称String queue = String.format("QUEUE:COUPON:SEIZE:SYNC:{%s}",index);RLock lock = redissonClient.getLock(lockName);try {// 2.尝试获取锁,参数:waitTime、leaseTime、时间单位boolean isLock = lock.tryLock(3, 10, TimeUnit.SECONDS);if (isLock) {//执行任务log.info("{}执行任务:{}",Thread.currentThread().getId(),index);log.info("{}开始获取{}队列的数据",Thread.currentThread().getId(),queue);getDate(queue);//模拟执行任务的时长Thread.sleep(3000);}else{log.info("!!!!!!{}获取{}队列的锁失败",Thread.currentThread().getId(),queue);}} catch (Exception e) {e.printStackTrace();} finally {// 4.释放锁if (lock != null && lock.isLocked()) {lock.unlock();}}}
}
下边进行测试:
编写测试方法:
@Test
public void test_lock() throws InterruptedException {threadPoolExecutor.execute(new RunnableSimple2(0));//Thread.sleep(3000);//延迟两秒,线程池的线程执行任务后空闲下来threadPoolExecutor.execute(new RunnableSimple2(0));//延迟一定的时间避免程序结束Thread.sleep(20000000);}
运行上边的test_lock方法,预期结果:
第一个线程成功执行任务,第二个线程由于无法获取分布式锁导致执行任务失败。
示例如下:
2024-11-06 21:14:57.477 INFO 3768 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 161执行任务:0
2024-11-06 21:14:57.478 INFO 3768 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 161开始获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的数据
2024-11-06 21:14:58.132 INFO 3768 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 161获取QUEUE:COUPON:SEIZE:SYNC:{0}数据2条
SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
2024-11-06 21:15:00.432 INFO 3768 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : !!!!!!160获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的锁失败
如果将 Thread.sleep(3000);代码放开注释,预期结果两个线程都可以成功执行任务,因为第一个线程执行3秒后释放锁,第二个线程获取锁成功。
示例:
2024-11-06 21:19:53.458 INFO 8196 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : 158执行任务:0
2024-11-06 21:19:53.458 INFO 8196 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : 158开始获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的数据
2024-11-06 21:19:53.781 INFO 8196 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : 158获取QUEUE:COUPON:SEIZE:SYNC:{0}数据2条
SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
2024-11-06 21:19:56.798 INFO 8196 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 164执行任务:0
2024-11-06 21:19:56.798 INFO 8196 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 164开始获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的数据
2024-11-06 21:19:56.801 INFO 8196 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 164获取QUEUE:COUPON:SEIZE:SYNC:{0}数据2条
SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
3)看门狗机制
- 什么是Redisson看门狗
学习了Redisson的基本使用现在有一个问题:
当设置了leaseTime的时间为10秒,结果任务执行了20秒,会出现什么问题?
由于锁的自动释放时间为10秒,当到达到10秒即使任务还没有结束锁将自动释放,此时就会有新线程获取该锁去执行任务,设置分布式锁的本意是当前只有一个线程去执行,出现这个问题会导致多个线程共同去执行任务,可能在并发处理上存在问题。
当执行任务的时间可以控制在一个范围就可以指定leaseTime锁自动释放时间,如果执行任务的时间不容易通过leaseTime去设置,此时可以使用Redisson的看门狗机制避免在任务没有完成时自动释放锁的问题发生。
Redisson的"看门狗机制"(Watchdog)是一种用于监测和维护锁的超时时间的机制,它可以确保在任务没有完成时对锁的过期时间进行自动续期,以避免任务没有完成时锁自动释放的问题。
开启看门狗后针对当前锁创建一个线程执行延迟任务,默认每隔10秒将锁的过期时间重新续期为30秒。
看门狗线程会首先判断锁是否存在,如果不存在将不再续期,当程序执行unlock()方法释放锁时会将该锁的对应的延迟任务取消,此时看门狗线程结束任务。
注意:任务结束一定要执行unlock()方法释放锁,否则看门狗线程一直进行续期,导致锁无法释放。
- 测试看门狗
调用下边的方法都可以开启看门狗:
方法1:tryLock(long waitTime, TimeUnit unit)
方法2:tryLock(long waitTime, -1,TimeUnit unit)
方法2正是上边讲解Redisson基本使用时用的方法,传入leaseTime参数为-1可以开启看门狗。
下边进行测试:
修改tryLock的方法,leaseTime参数传入-1,开启看门狗。
boolean isLock = lock.tryLock(1, -1, TimeUnit.SECONDS);//开启看门狗
将Thread.sleep(1000);改为50秒:Thread.sleep(50000);
代码如下:
public class RunnableSimple3 implements Runnable {//任务序号private int index;public RunnableSimple3(int index) {this.index = index;}@Overridepublic void run() {// 指定锁名称,每个队列只有一个线程去处理,锁粒度为队列,锁名称在队列名称前加Lock:String lockName = String.format("Lock:QUEUE:COUPON:SEIZE:SYNC:{%s}",index);//队列名称String queue = String.format("QUEUE:COUPON:SEIZE:SYNC:{%s}",index);RLock lock = redissonClient.getLock(lockName);try {// 2.尝试获取锁,参数:waitTime、leaseTime、时间单位,设置为-1启动看门狗boolean isLock = lock.tryLock(1, -1, TimeUnit.SECONDS);if (isLock) {//执行任务log.info("{}执行任务:{}",Thread.currentThread().getId(),index);log.info("{}开始获取{}队列的数据",Thread.currentThread().getId(),queue);getDate(queue);//模拟执行任务的时长Thread.sleep(50000);}else{log.info("!!!!!!{}获取{}队列的锁失败",Thread.currentThread().getId(),queue);}} catch (Exception e) {e.printStackTrace();} finally {// 4.释放锁,这里一定要释放锁,否则看门狗线程一直运行if (lock != null && lock.isLocked()) {lock.unlock();}}}
}
编写测试方法:
@Testpublic void test_lock2() throws InterruptedException {threadPoolExecutor.execute(new RunnableSimple3(0));
// Thread.sleep(3000);threadPoolExecutor.execute(new RunnableSimple3(0));//延迟一定的时间避免程序结束Thread.sleep(20000000);}
运行test_lock2()方法。
预期结果:
2024-11-06 21:24:04.210 INFO 23868 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 113执行任务:0
2024-11-06 21:24:04.210 INFO 23868 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 113开始获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的数据
2024-11-06 21:24:04.579 INFO 23868 --- [pool-3-thread-2] c.j.market.service.SyncThreadPoolTest : 113获取QUEUE:COUPON:SEIZE:SYNC:{0}数据2条
SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
2024-11-06 21:24:05.235 INFO 23868 --- [pool-3-thread-1] c.j.market.service.SyncThreadPoolTest : !!!!!!112获取QUEUE:COUPON:SEIZE:SYNC:{0}队列的锁失败
“Lock:QUEUE:COUPON:SEIZE:SYNC:{0}”锁自动续期,并观察续期现象,每隔10秒续期一次,将锁续期为30秒。
程序结束不再续期,锁到期自动释放。可以看到lock的ttl在不断的变化
示例图如下:
4.4.3 数据同步组件
1)测试同步组件
针对Redis到MySQL数据同步的需求本项目开发数据同步组件,使用组件可以提高开发效率,数据同步组件的代码在jzo2o-framework下的jzo2o-redis工程,下边我们先学会使用同步组件,再去理解它的工作原理。
我们的目标是要将抢券同步队列的数据同步到数据库,我们手动向抢券同步队列添加一些数据方便进行测试:
抢券同步队列的key为用户id,value是活动id。
从jzo2o-customer数据库的common_user表找一些用户id添加到同步队列,如下图:
如何使用同步组件呢?非常简单,只需要两步:
第一步我们定义处理器:
同步组件会自动从上图的Hash结构中读取数据,处理器负责接收到数据后写入数据库。
第二步启动同步任务:
调用组件syncManager接口的start方法即启动。
- 编写数据同步处理器
数据同步处理需要实现SyncProcessHandler接口。
处理器的实例会放在spring容器,bean的名称的命名规则如下:
从队列名称中截取一部分:
队列名称:QUEUE:COUPON:SEIZE:SYNC:{8},把开头部分(QUEUE:)和序号部分(:{8})去掉。
截取后为:COUPON:SEIZE:SYNC
处理器定义如下:
/*** 抢单成功同步任务*/
@Component(COUPON_SEIZE_SYNC_QUEUE_NAME)
@Slf4j
public class SeizeCouponSyncProcessHandler implements SyncProcessHandler<Object> {@Overridepublic void batchProcess(List<SyncMessage<Object>> multiData) {throw new RuntimeException("不支持批量处理");}/*** signleData key activityId, value 抢单用户id* @param singleData*/@Override@Transactional(rollbackFor = Exception.class)public void singleProcess(SyncMessage<Object> singleData) {log.info("获取要同步的数据: {}",singleData);//用户idlong userId = NumberUtils.parseLong(singleData.getKey());//活动idlong activityId = NumberUtils.parseLong(singleData.getValue().toString());log.info("userId={},activity={}",userId,activityId);//todo: 向优惠券表插入数据//todo:扣减数据库表中的库存}
}
- 启动同步任务
数据同步任务仍然由xxl-job调度,下边先编写任务调度方法然后在xxl-job调度中心进行配置。
在定时任务方法中调用组件提供的syncManager接口的start方法:
在XxlJobHandler 中注入syncManager接口的bean:
@Component
public class XxlJobHandler {@Resourceprivate SyncManager syncManager;...
syncManager接口提供两个start方法如下,阅读start方法的注释:
package com.jzo2o.redis.sync;import java.util.concurrent.Executor;/*** 同步程序管理器*/
public interface SyncManager {/*** 开始同步,使用默认线程池* @param queueName 同步队列名称* @param storageType 数据存储类型,1:redis hash数据结构,2:redis list数据结构,3:redis zSet结构* @param mode 单条执行 2批量执行*/void start(String queueName, int storageType, int mode);/*** 开始同步,可以使用自定义线程池,如果不设置使用默认线程池* @param queueName 同步队列名称* @param storageType 数据存储类型,1:redis hash数据结构,2:redis list数据结构,3:redis zSet结构* @param mode 1 单条执行 2批量执行* @param dataSyncExecutor 数据同步线程池*/void start(String queueName, int storageType, int mode, Executor dataSyncExecutor);}
如果使用组件默认线程池调用第一个start方法启动任务,如果使用自定义的线程池调用第二个start方法。
queueName 同步队列名称参数:是将同步队列名称裁剪后的名称。
storageType 数据存储类型参数:选择hash数据结构
mode :选择单条执行,目前针对hash结构数据的同步只支持单条同步
dataSyncExecutor :线程池
首先在XxlJobHandler中注入自定义的线程池:
@Resource(name="syncThreadPool")
private ThreadPoolExecutor threadPoolExecutor;
编写定时任务方法:
/*** 抢券同步队列* 10秒一次*/
@XxlJob("seizeCouponSyncJob")
public void seizeCouponSyncJob() {syncManager.start(COUPON_SEIZE_SYNC_QUEUE_NAME, RedisSyncQueueConstants.STORAGE_TYPE_HASH, RedisSyncQueueConstants.MODE_SINGLE,threadPoolExecutor);
}
- 下边在xxl-job配置定时任务
启动任务:
在定时任务方法中打断点:
在数据同步处理器中打断点:
重启优惠券服务,等待xxl-job调度,调度成功跟踪数据同步处理器,观察是否获取到hash中的数据:
当数据同步完后将redis Hash中的数据删除。
2024-11-06 21:50:56.109 INFO 18968 --- [Pool-1827364862] c.xxl.job.core.executor.XxlJobExecutor : >>>>>>>>>>> xxl-job regist JobThread success, jobId:18, handler:com.xxl.job.core.handler.impl.MethodJobHandler@34989ceb[class com.jzo2o.market.handler.XxlJobHandler#seizeCouponSyncJob]
2024-11-06 21:50:56.512 INFO 18968 --- [pool-3-thread-1] c.j.m.h.SeizeCouponSyncProcessHandler : 获取要同步的数据: SyncMessage(key=1828787045661319168, value=1851921214852177920, data=null)
2024-11-06 21:50:56.522 INFO 18968 --- [pool-3-thread-1] c.j.m.h.SeizeCouponSyncProcessHandler : userId=1828787045661319168,activity=1851921214852177920
2024-11-06 21:50:56.528 INFO 18968 --- [pool-3-thread-1] c.j.m.h.SeizeCouponSyncProcessHandler : 获取要同步的数据: SyncMessage(key=1837769073443094528, value=1851921214852177920, data=null)
2024-11-06 21:50:56.528 INFO 18968 --- [pool-3-thread-1] c.j.m.h.SeizeCouponSyncProcessHandler : userId=1837769073443094528,activity=1851921214852177920
2)理解数据同步组件原理
- SyncManager 接口
数据同步组件在jzo2o-framework的jzo2o-redis工程中定义,提供以下SyncManager 接口供使用。
SyncManager 接口提供两个方法:如果使用组件默认线程池调用第一个start方法启动任务,如果使用自定义的线程池调用第二个start方法,项目使用第二个start方法。
package com.jzo2o.redis.sync;import java.util.concurrent.Executor;/*** 同步程序管理器*/
public interface SyncManager {/*** 开始同步,使用默认线程池* @param queueName 同步队列名称* @param storageType 数据存储类型,1:redis hash数据结构,2:redis list数据结构,3:redis zSet结构* @param mode 单条执行 2批量执行*/void start(String queueName, int storageType, int mode);/*** 开始同步,可以使用自定义线程池,如果不设置使用默认线程池* @param queueName 同步队列名称* @param storageType 数据存储类型,1:redis hash数据结构,2:redis list数据结构,3:redis zSet结构* @param mode 1 单条执行 2批量执行* @param dataSyncExecutor 数据同步线程池*/void start(String queueName, int storageType, int mode, Executor dataSyncExecutor);}
组件提供SyncManager 接口的实现类SyncManagerImpl如下:
@Override
public void start(String queueName, int storageType, int mode) {this.start(queueName, storageType, mode, DEFAULT_SYNC_EXECUTOR);
}@Override
public void start(String queueName, int storageType, int mode, final Executor dataSyncExecutor) {//根据队列的数量循环,将每个队列的数据同步任务提交到线程池for (int index = 0; index < redisSyncProperties.getQueueNum(); index++) {try {if (dataSyncExecutor == null) {//使用默认线程池//使用getSyncThread方法获取任务对象DEFAULT_SYNC_EXECUTOR.execute(getSyncThread(queueName, index, storageType, mode));} else {//使用自定义线程池dataSyncExecutor.execute(getSyncThread(queueName, index, storageType, mode));}} catch (Exception e) {log.error("同步数据处理异常,e:", e);}}
}/*** 获取线程对象** @param queueName 队列名称* @param index 队列序号* @param storageType 存储结构 1:redis hash数据结构,2:redis list数据结构,3:redis zSet结构* @param mode 1 单条处理,2 批量处理* @return*/
private SyncThread getSyncThread(String queueName, int index, Integer storageType, int mode) {switch (storageType) {//目前组件支付同步Redis Hash结构的数据case STORAGE_TYPE_HASH:return new HashSyncThread(redissonClient, queueName, index, redisTemplate, redisSyncProperties.getPerCount(), mode);case STORAGE_TYPE_LIST:return null;case STORAGE_TYPE_ZSET:return null;}return null;
}
查看start的源代码,根据配置的队列数量,循环创建针对每个队列的数据同步任务,将任务放入线程池。
- HashSyncThread类
查看SyncManagerImpl源代码,通过getSyncThread方法获取任务对象,目前组件提供从Redis的Hash结构同步数据任务类HashSyncThread。
下边进入HashSyncThread的源代码,HashSyncThread继承抽象类AbstractSyncThread,重写getData()和process(List<SyncMessage> data)两个方法:
getData()方法从Hash结构中获取数据。
process(List<SyncMessage> data)是对获取的数据进行处理。
下边查看AbstractSyncThread抽象类的源代码:
任务对象的run方法在抽象类AbstractSyncThread中,执行逻辑如下:
首先获取该队列的分布式锁。
获取锁成功调用getData()方法从Hash结构查询数据。
再调用 process(List<SyncMessage> data)方法进行处理。
直到调用getData()方法获取不到数据时本次任务执行完成。
源代码如下:
@Override
public void run() {// 1.使用redssion看门狗模式锁定当前序号的队列String lockName = LOCK_PREFIX + RedisSyncQueueUtils.getQueueRedisKey(queueName, index);RLock lock = redissonClient.getLock(lockName);try {if(!lock.tryLock(0, -1, TimeUnit.SECONDS)){return;}// 2.获取数据List<SyncMessage<T>> data = null;while (CollUtils.isNotEmpty(data = this.getData())) {// 3.处理数据this.process(data);try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}} catch (Exception e){return;} finally {// 4.解锁if (lock != null && lock.isLocked()) {lock.unlock();}}
}
在run方法中,使用分布式锁控制每个队列只有一个线程在处理数据。
首先调用this.getData()从redis查询数据,再调用this.process(data)去处理数据。
调用getData()和process(data)方法 实际是调用 HashSyncThread子类方法,下边继续进入子类HashSyncThread查看源代码:
getData()方法正是使用我们前边学习的游标的方法从Hash查询数据
protected List<SyncMessage<Object>> getData() {Cursor<Map.Entry<String, Object>> cursor = null;// 通过scan从redis hash数据中批量获取数据,获取完数据需要手动关闭游标ScanOptions scanOptions = ScanOptions.scanOptions().count(perCount).build();try {// sscan获取数据cursor = redisTemplate.opsForHash().scan(RedisSyncQueueUtils.getQueueRedisKey(getQueueName(), getIndex()), scanOptions);// 遍历数据转换成SyncMessage列表return cursor.stream().map(entry -> SyncMessage.builder().key(entry.getKey().toString()).value(entry.getValue()).build()).collect(Collectors.toList());}catch (Exception e){log.error("同步处理异常,e:", e);throw new RuntimeException(e);} finally {// 关闭游标if (cursor != null) {cursor.close();}}
}
process(List<SyncMessage> data)方法对数据进行处理,根据处理模式(单条处理还是批量处理)进行数据处理。
protected boolean process(List<SyncMessage<Object>> data) {// 校验数据是否为空,为空停止循环遍历if (CollUtils.isEmpty(data)) {return false;}// 根据名称获取同步处理器SyncProcessHandler<Object> syncProcessHandler = getSyncProcessHandler();// 队列redisKeyString queueRedisKey = RedisSyncQueueUtils.getQueueRedisKey(getQueueName(), getIndex());// 单条执行模式if (mode == RedisSyncQueueConstants.MODE_SINGLE) {//逐条执行data.stream().forEach(objectSyncMessage -> {try {// 执行单条数据syncProcessHandler.singleProcess(objectSyncMessage);// 从hash表中删除数据redisTemplate.opsForHash().delete(queueRedisKey, objectSyncMessage.getKey());} catch (Exception e) {log.error("hash结构同步消息单独处理异常,e:", e);}});} else {// 批量执行模式try {// 批量处理数据syncProcessHandler.batchProcess(data);// 获取所有hashKeyList<String> hashKeys = data.stream().map(SyncMessage::getKey).collect(Collectors.toList());// 根据redisKey和hashKey列表从hash表中删除数据redisTemplate.opsForHash().delete(queueRedisKey, hashKeys);} catch (Exception e) {log.error("hash结构同步消息批量处理异常,e:", e);}}return true;
}
通过getSyncProcessHandler()方法得到数据同步处理器,getSyncProcessHandler()方法的源代码如下:
protected SyncProcessHandler<T> getSyncProcessHandler() {SyncProcessHandler syncProcessHandler = SpringUtil.getBean(queueName, SyncProcessHandler.class);if (syncProcessHandler != null) {return syncProcessHandler;}throw new RuntimeException("未找到名称(" + queueName + ")redis 队列数据处理器");
}
阅读代码方法就是从spring容器获取一个等于队列名称的bean进行处理。
注意:
在配置处理器时我们是不是指定了bean的名称为 裁剪后的队列名称即“COUPON:SEIZE:SYNC”,如下:
@Component(COUPON_SEIZE_SYNC_QUEUE_NAME)//COUPON:SEIZE:SYNC
@Slf4j
public class SeizeCouponSyncProcessHandler implements SyncProcessHandler<Object> {
接下来获取redis key,如下代码:
String queueRedisKey = RedisSyncQueueUtils.getQueueRedisKey(getQueueName(), getIndex());
查看getQueueRedisKey源代码:
package com.jzo2o.redis.utils;public class RedisSyncQueueUtils {private static final String QUEUE_REDIS_KEY_FORMAT = "QUEUE:%s:{%s}";/*** 获取redis队列redisKey** @param queueName redis队列名称* @param index 队列序号* @return*/public static String getQueueRedisKey(String queueName, int index){return String.format(QUEUE_REDIS_KEY_FORMAT, queueName, index);}
}
这里是根据裁剪的名称及队列序号最终拿到redis中的Hash结构的Key。
继续阅读process(List<SyncMessage> data)方法:
下边的代码是对数据一条一条进行处理,处理完一条删除一条。
if (mode == RedisSyncQueueConstants.MODE_SINGLE) {//逐条执行data.stream().forEach(objectSyncMessage -> {try {// 执行单条数据syncProcessHandler.singleProcess(objectSyncMessage);// 从hash表中删除数据redisTemplate.opsForHash().delete(queueRedisKey, objectSyncMessage.getKey());} catch (Exception e) {log.error("hash结构同步消息单独处理异常,e:", e);}});}
处理器正是我们自定义的SeizeCouponSyncProcessHandler 。
整个数据处理的序列图如下:
4.4.4 抢券结果同步开发
1)编写扣减库存方法
项目使用数据同步组件完成数据从Redis的Hash结构同步到MySQL中。
关于数据同步组件的使用前边已经讲解,下边我们需要完善数据同步处理器即可将抢券同步队列的数据同步到MySQL。
数据同步处理器拿到抢券结果做两件事:
- 插入优惠券表
- 扣减库存
首先编写扣减库存的方法
public interface IActivityService extends IService<Activity> {/*** 扣减库存* @param id 活动id* 如果扣减库存失败抛出异常*/
public void deductStock(Long id);
...
实现方法:
@Service
public class ActivityServiceImpl extends ServiceImpl<ActivityMapper, Activity> implements IActivityService {/*** 扣减库存* @param id 活动id* 如果扣减库存失败抛出异常*/
public void deductStock(Long id){boolean update = lambdaUpdate().setSql("stock_num = stock_num-1").eq(Activity::getId, id).gt(Activity::getStockNum, 0).update();if(!update){throw new CommonException("扣减优惠券库存失败,活动id:"+id);}
}
...
2)完善数据同步处理器
下边完善数据同步处理器:
/*** 抢单成功同步任务*/
@Component(COUPON_SEIZE_SYNC_QUEUE_NAME)
@Slf4j
public class SeizeCouponSyncProcessHandler implements SyncProcessHandler<Object> {@Resourceprivate IActivityService activityService;@Resourceprivate ICouponService couponService;@Resourceprivate CommonUserApi commonUserApi;@Overridepublic void batchProcess(List<SyncMessage<Object>> multiData) {throw new RuntimeException("不支持批量处理");}/*** signleData key activityId, value 抢单用户id* @param singleData*/@Override@Transactional(rollbackFor = Exception.class)public void singleProcess(SyncMessage<Object> singleData) {log.info("获取要同步抢券结果数据: {}",singleData);//用户idlong userId = NumberUtils.parseLong(singleData.getKey());//活动idlong activityId = NumberUtils.parseLong(singleData.getValue().toString());log.info("userId={},activity={}",userId,activityId);// 1.获取活动Activity activity = activityService.getById(activityId);if (activity == null) {return;}CommonUserResDTO commonUserResDTO = commonUserApi.findById(userId);if(commonUserResDTO == null){return;}// 2.新增优惠券Coupon coupon = new Coupon();coupon.setId(IdUtils.getSnowflakeNextId());coupon.setActivityId(activityId);coupon.setUserId(userId);coupon.setUserName(commonUserResDTO.getNickname());coupon.setUserPhone(commonUserResDTO.getPhone());coupon.setName(activity.getName());coupon.setType(activity.getType());coupon.setDiscountAmount(activity.getDiscountAmount());coupon.setDiscountRate(activity.getDiscountRate());coupon.setAmountCondition(activity.getAmountCondition());coupon.setValidityTime(DateUtils.now().plusDays(activity.getValidityDays()));coupon.setStatus(CouponStatusEnum.NO_USE.getStatus());couponService.save(coupon);//扣减库存activityService.deductStock(activityId);}
}
3)抢券结果同步测试
测试流程:
启动客户管理工程(需要根据用户id查询抢券人的信息)
抢券测试,保证抢券成功后在Redis同步队列写入数据成功
示例:上回测试给我直接把同步队列删了,我真的服了,手动加回来
redisTemplate.opsForHash().delete(queueRedisKey, objectSyncMessage.getKey());
在抢券数据同步处理器中打断点
启动xxl-job抢券结果同步任务
跟踪断点,观察数据是否正确:
数据同步成功删除同步 HashKey
最终写入优惠券表并扣减库存。
跟踪控制台SQL
2024-11-06 22:33:50.762 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.CouponMapper.insert : ==> Preparing: INSERT INTO coupon ( id, name, user_id, user_name, activity_id, type, discount_rate, discount_amount, amount_condition, validity_time, status ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )
2024-11-06 22:33:50.763 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.CouponMapper.insert : ==> Parameters: 1854170332542791680(Long), 码农洗澡大派送(String), 1837769073443094528(Long), 普通用户77071(String), 1851921214852177920(Long), 1(Integer), 0(Integer), 10.00(BigDecimal), 100.00(BigDecimal), 2024-11-16T22:33:50.754837200(LocalDateTime), 1(Integer)
2024-11-06 22:33:50.770 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.CouponMapper.insert : <== Updates: 1
2024-11-06 22:33:50.776 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.ActivityMapper.update : ==> Preparing: UPDATE activity SET stock_num = stock_num-1 WHERE is_deleted=0 AND (id = ? AND stock_num > ?)
2024-11-06 22:33:50.777 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.ActivityMapper.update : ==> Parameters: 1851921214852177920(Long), 0(Integer)
2024-11-06 22:33:50.779 DEBUG 18724 --- [pool-3-thread-1] c.j.market.mapper.ActivityMapper.update : <== Updates: 1
2024-11-06 22:33:53.438 INFO 18724 --- [eoutChecker_1_1] i.s.c.r.netty.NettyClientChannelManager : will connect to 192.168.101.68:8091
示例:
查询coupon表,发现用户抢券结果已成功写入优惠券表:
活动的库存数据正常扣除。
抢券结果同步完成,redis中记录的活动的库存与数据库活动表的库存一致:
redis中已经没有了
看看小程序
非常的完美