黑马点评自学03
分布式锁
分布式锁介绍
分布式锁:满足在分布式系统或者集群模式下多进程可见并且进程间获取锁的操作是互斥的锁。
在之前的测试中,当我们进入到集群或分布式的环境中时,一人一单业务在不同集群中可以被同时给用户id加锁,出现了并发问题,这是因为每个集群单独有jvm环境,而之前的锁机制是在自己的jvm环境中进行加锁的,流程如下:
为了解决这个问题,我们要引入分布式锁机制,通过一个锁监视器,使得左右集群都在一个共同的锁监视器下去对用户加锁,从而保证加锁的一致性,业务流程如下:
一个分布式锁需哟爱满足下面的几个特性:
分布式锁的核心是实现多进程间的互斥,常见的有三种方式:
这三种实现方式之间各有优劣,之后,我们将使用Redis实现分布式锁。
基于Redis实现分布式锁流程
实现分布式锁需需要实现加锁和解锁两个操作
加锁:
- 互斥:确保同时只能有一个线程获取锁
两种方案:
方案一:SETNX加expire的方式:
SETNX方法给key设置值成功返回1,失败返回0,根据返回值判断加锁是否成功,expire方法对key设置超时过期时间,在业务期间出现服务宕机或超时会自动删除key以释放锁。
缺陷:如果在SETNX执行后出现服务宕机,expire未成功执行,可能导致后续无法加锁。
方案二:SET方法实现
NX表示互斥,EX设置超时时间,单位为s
SET lock thread1 EX 10 NX命令可以实现SETNX功能,同时设置超时删除时间为10s。
设置成功返回ok,失败返回nil,根据返回值判断加锁成功与否。
如此可保证互斥和超时时间是原子操作。
- 采用非阻塞:只尝试一次获取锁,成功返回true,失败返回false(也可以采用阻塞等待,直到获取到锁为止)
释放锁:
- 手动释放
DEL key可以主动删除key从而完成释放锁的操作。
- 超时自动释放:SET方法设置超时时间
整体流程如下:
Redis实现分布式锁(简单版)
ILock接口
public interface ILock {/*** 尝试获取锁* @param timeoutSec* @return*/boolean tryLock(long timeoutSec);void unLock();
}
SimpleRedisLock类来实现ILock接口,实现对应的TryLock和unLock方法,
public class SimpleRedisLock implements ILock{private String name;// 业务名称,用于拼接keyprivate StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";// key前缀@Overridepublic boolean tryLock(long timeoutSec) {String key = KEY_PREFIX + name;// 拼接key// 获取线程标识,作为value值long threadId = Thread.currentThread().getId();// 尝试获取锁,这里使用的setIfAbsent方法,如果key不存在,则设置value,并返回true,否则返回falseBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId + "", timeoutSec, TimeUnit.SECONDS);// 由于自动拆箱可能存在null,所以这里采用Boolean.equals方法进行比较,防止空指针错误return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
}
在VoucherOrderServiceImpl的seckillVoucher方法中使用该分布式锁,
public Result seckillVoucher(Long voucherId) {// 1. 查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);// 2. 判断秒杀活动是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail(MessageConstant.SECKILL_IS_NOT_START);// 活动还未开始}//3. 判断秒杀活动是否结束if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail(MessageConstant.SECKILL_IS_OVER);// 秒杀已经结束}//4. 判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail(MessageConstant.STOCK_IS_NOT_ENOUGH);// 库存不足}// 对当前用户加锁Long userId = UserHolder.getUser().getId();// 创建锁对象SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 尝试获取锁boolean isLock = lock.tryLock(1200);// 判断获取锁是否成功if (!isLock) {// 获取锁失败,返回对应的错误信息return Result.fail(MessageConstant.REPEAT_BUY);}// 获取锁成功,进行业务操作try {// 获取代理对象,保证下面的事务能够正确执行IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {// 释放锁lock.unLock();}
// synchronized (userId.toString().intern()) {// intern方法获取字符串常量池中的对象,保证同一个id是一个对象}
测试:通过测试,之前的一人一单功能,不再会同时有两个程序对当前用户加锁了,只会有一个加锁成功。
存在的问题
当前分布式锁在一些极端的情况下还是会产生并发问题,如果线程1在阻塞过程中,锁因为超时过期了,此时线程2成功加锁,但在线程2执行业务的过程中,线程1完成业务并执行了释放锁操作,此时,线程1会将线程2的锁释放掉,导致线程3能在线程2业务未完成时加锁成功,从而带来并发问题。
要解决这个问题,我们需要在释放锁的时候进行一次判断,只有当前锁是自己获取的才进行释放,通过一个标识来判断锁的一致性,一旦锁不一致,则表示自己的锁是超时过期了,因此不需要释放锁的操作。
整体业务流程更改如下:(用UUID+当前线程ID作为标识)
修改之前的分布式锁实现,满足:
- 在获取锁时存入线程标识(这里用UUID+线程ID作为标识)
- 在释放锁时先获取锁中的线程标识,然后与当前线程标识进行判断
- 如果一致,则释放该锁
- 如果不一致,则不进行任何操作
修改加锁操作,增加标识
private static final String KEY_PREFIX = "lock:";// key前缀private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";// 标识前缀@Overridepublic boolean tryLock(long timeoutSec) {String key = KEY_PREFIX + name;// 拼接key// 获取线程标识,作为value值String threadId = ID_PREFIX + Thread.currentThread().getId();// 尝试获取锁,这里使用的setIfAbsent方法,如果key不存在,则设置value,并返回true,否则返回falseBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, timeoutSec, TimeUnit.SECONDS);// 由于自动拆箱可能存在null,所以这里采用Boolean.equals方法进行比较,防止空指针错误
修改释放锁操作,增加标识判断功能
@Overridepublic void unLock() {// 线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断是否为当前线程的锁,如果是则删除锁,不是则不进行任何操作if (threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
测试,成功实现只释放自己的锁的功能。
另一种极端情况
在上面的代码中,我们通过在获取锁时添加标识,释放锁之前判断标识的方式来防止出现锁误删的情况。但这种方式存在另一种极端情况,那就是在判断锁和释放锁之间出现了阻塞,导致线程1误删了线程2的锁,从而产生线程3在线程2业务过程中获取到锁的情况。如下图所示
要解决这种问题,我们需要保证判断锁和释放锁的操作是原子性的。
Lua脚本功能
在Redis中提供了一个Lua脚本功能,我们可以在脚本中编写多条Redis命令,以确保多条命令执行的原子性。基本语法可参照Lua 教程 | 菜鸟教程
Lua脚本的Redis调用函数,语法命令如下:注意括号内是单引号字符串
Lua脚本设置key=name,value=jack
EVAl "return redis.call('set', 'name', 'jack')" 0
Lua脚本获取对应的值
EVAL "return redis.call('get', 'name')" 0
注意:EVAL是脚本命令,双引号内是脚本,0表示key参数个数
还可以不写死参数,采用动态传参的方式。
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 age 10
正常释放分布式锁的流程如下:
使用Lua脚本编写为:
-- 获取锁中的线程标识
local id = redis.call('get', KEYS[1])
-- 比较线程标识与锁中的标识是否一致
if(id == ARGV[1]) then-- 释放锁return redis.call('del',KEYS[1])
end
return 0
改进分布式锁
基于Lua脚本实现分布式锁的释放锁逻辑,保证线程标识判断和释放锁的操作是原子操作。
要使用Java执行Lua脚本,使用RedisTemplate中的execute方法即可
- script是一个RedisScript类型的对象,它表示Lua脚本
- keys是一个List集合,存储的待执行的命令
- args是对应的参数,是可变长的
- 不需要指定key的个数numkeys,List的大小即可代表
编写Lua脚本,放到文件中,便于后续修改
-- 比较线程标识与锁只能够的标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then-- 释放锁return redis.call('del', KEYS[1])
end
return 0
修改SimpleRedisLock工具类
提前加载Lua脚本
// 提前加载脚本private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 加载脚本UNLOCK_SCRIPT.setResultType(Long.class);// 设置脚本返回值类型}
修改unlock方法,采用脚本执行
// 脚本修改释放锁代码@Overridepublic void unLock() {// 获取锁标识,封装到keys集合中List<String> keys = Collections.singletonList(KEY_PREFIX + name);// 线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT, keys, threadId);}
测试:发现成功实现了标识判断和释放锁的原子性,避免了锁误删的情况。
至此,一个基本的基于Redis的分布式锁实现完成。
基于Redis的分布式锁实现思路为:
- 利用SET NX EX的方式获取锁,保证互斥性,并设置过期时间保底,防止业务宕机导致其他线程无法加锁。
- 保存线程标识来解决防止误删锁的问题,在释放锁的时候,判断锁中的线程标识和自身的一致性,一致才删除,并使用Lua脚本实现两个操作的原子性
实现思路的特性:
- 利用SET NX保证线程间的互斥
- 利用SET EX保证业务宕机时也能释放锁,从而避免死锁问题,提高整体安全性
- 能够用于Redis集群,保证集群的高可用和高并发特性
Redisson
之前设计的分布式锁在大多数业务场景下已经够用了,但在一些特殊场景或者说一些极端情况下,该锁还存在下面的几个问题:
- 不可重入问题:同一个线程无法多次获取同一把锁,
- 不可重试问题:每次获取锁一次失败就返回,没有重试机制,大大增加了失败的概率
- 超时释放问题:使用超时释放机制虽然能够避免死锁问题的产生,但如果一个业务执行时间过程,而锁自动超时释放了,另一个线程的业务可能给当前线程带来安全隐患。
- 主从一致性问题:如果Redis提供了主从集群,由于主从同步间存在延迟(虽然很短),当主宕机时,从可能由于还未同步,因此不存在锁表示,导致其他线程能够获取到锁。
Redisson作为一个在Redis的基础上实现的工具,提供了一系列分布式服务,包括对各种分布式锁的实现,我们只需要调用它进行对应的功能使用即可。
Redisson使用
导入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
使用配置类配置Redisson,也可以使用Spring-starter,但需要替换本身的配置
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient(){// 配置类Config config = new Config();// 添加Redis地址,这里是单点地址,如果使用集群则使用config.useClusterServers()config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword(null);// 创建客户端return Redisson.create(config);}
}
使用Redisson
@Resourceprivate RedissonClient redissonClient; @Testvoid testRedisson() throws InterruptedException {// 获取锁,指定锁的名称RLock lock = redissonClient.getLock("anyLock");// 尝试获取锁,无参则有默认值,参数分别是:获取锁的最大等待时间,默认为-1不等待,锁自动释放时,默认为30s,时间单位boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);// 判断锁是否获取成功if (isLock) {try {System.out.println("加锁成功,执行业务逻辑");}finally {lock.unlock();}}}
测试成功
使用Redisson改造一人一单业务的锁代码
将锁对象的常见和使用用Redisson替换
// 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order" + userId);// 尝试获取锁,使用无参构造,默认不等待,超时时间为30sboolean isLock = lock.tryLock();// 判断获取锁是否成功if (!isLock) {// 获取锁失败,返回对应的错误信息return Result.fail(MessageConstant.REPEAT_BUY);}// 获取锁成功,进行业务操作try {// 获取代理对象,保证下面的事务能够正确执行IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {// 释放锁lock.unlock();}
测试发现能够满足分布式锁的各项功能,即使通过多线程压测也只会生成一个订单。
不可重入问题
对于下面的代码,当我们使用之前编写的分布式锁时,method1获取锁以后,进行method2,此时,由于Redis中存在该锁,导致其获取锁失败,从而提前结束method2,因此是不可重入的。
要实现重入功能,我们需要使用Redis的Hash类型,统计锁的重入次数,在加锁的时候,对于同一个线程的加锁,采用增加重入次数的方式,在释放锁的时候,采用减少重入次数的方式,只有当重入次数等0时,才执行删除对应key的操作。整体流程如下:
对于这些操作,我们同样需要采用Lua脚本保证原子性。
加锁操作:
-- 获取锁的Lua脚本
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否存在
if (redis.call('exists', key) == 0) then-- 不存在,则设置锁,使用hash结构redis.call('hset', key, threadId, '1');-- 设置锁的有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果,加锁成功
end;
-- 锁已存在,判断是否是自己的锁
if(redis.call('hexists', key, threadId) == 1) then-- 是自己的锁,进行重入次数加1redis.call('hincrby', key, threadId, '1');-- 更新有效期redis.call('expire', key, releaseTime);return 1; -- 重入成功
end;
return 0; -- 锁不是自己的,获取锁失败
释放锁操作:
-- 重入方式的释放锁
local key = KEY[1]; -- 锁标识
local threadId = ARGV[1]; -- 线程标识
local releaseTime = ARGV[2]; -- 锁自动释放时间
-- 判断锁是否存在
if (redis.call('hexists', key, threadId) == 0) then-- 锁不是以后自己的,直接返回return nil;
end;
-- 是自己的锁,重入次数-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数是否为0,为0则需要删除锁
if (count > 0) then-- 重入次数大于0,重置锁过期时间redis.call('expire', key, releaseTime);return nil;
else-- 重入次数为0,删除锁redis.call('del', key);return nil;
end;
改造之前的分布式锁,使其满足可重入功能(注意,脚本执行的时候,传入的都是String类型的数据,传入其他类型的会报错)
// 提前加载脚本private static final DefaultRedisScript<Long> TRYLOCK_SCRIPT;private static final DefaultRedisScript<String> UNLOCK_SCRIPT;static {TRYLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT = new DefaultRedisScript<>();TRYLOCK_SCRIPT.setLocation(new ClassPathResource("tryLock.lua"));UNLOCK_SCRIPT.setLocation(new ClassPathResource("unLock2.lua"));// 加载脚本TRYLOCK_SCRIPT.setResultType(Long.class);UNLOCK_SCRIPT.setResultType(String.class);// 设置脚本返回值类型}/*** Lua脚本实现待重入次数的加锁和释放锁*/@Overridepublic boolean tryLock(long timeoutSec) {String key = KEY_PREFIX + name;// 拼接key// 获取线程标识,作为value值String threadId = ID_PREFIX + Thread.currentThread().getId();// 执行脚本List<String> keys = Collections.singletonList(key);Long success = stringRedisTemplate.execute(TRYLOCK_SCRIPT, keys, threadId, String.valueOf(timeoutSec));// 由于自动拆箱可能存在null,所以这里采用Boolean.equals方法进行比较,防止空指针错误return success.equals(1L);}// 脚本修改释放锁代码@Overridepublic void unLock() {// 获取锁标识,封装到keys集合中List<String> keys = Collections.singletonList(KEY_PREFIX + name);// 线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT, keys, threadId, "100");}
通过测试,验证了可重入功能的实现,同一个线程在持有锁期间能够多次获取该分布式锁。
不可重试和超时释放问题
Redisson获取锁和释放锁的流程:
Redisson分布式锁解决问题原理:
- 可重入问题:
采用hash结构,同时记录线程标识和重入次数,在获取锁时,增加重入次数,在释放锁时,减少重入次数,直到重入次数为0时正式释放锁。
- 可重试问题
利用信号量和Redis的PubSub功能来实现等待、唤醒,以及获取锁失败时的重试机制。
- 超时续约问题:
超时释放时间不设置即默认为-1,利用watchDog机制,每隔一段时间,递归调用函数重置超时时间。
主从一致性问题
在一些环境下,只使用一个Redis作为缓存,当Redis出现故障时,会出现数据丢失问题,因此,通常会采用多个Redis服务器构建一个主从模式。在这种模式下,通常采用Redis Master进行写操作,然后将数据同步给Slave节点,Slave节点主要进行读操作。
在这样一个主从模式下,只需要向Master节点获取锁,当Mater节点被攻击时,选择一个Slave节点作为主节点,但此时,原本的Master节点中的锁未同步到新的Master节点,导致请求不需要获取锁即可访问新的Master节点,从而出现主从一致性问题。
为了解决这种问题,Redisson采用所有Redis节点作为单独的节点,请求需要对所有的节点获取锁,才能够进行数据操作。
还可以进一步建立主从关系,但在这种情况下的,即使一个Master Node节点出现问题,新的Master Node节点也不会在未获取锁的时候被访问,保证了数据安全问题。
总结:
秒杀优化
在之前的代码中,我们的秒杀优惠券功能的流程如下所示,在一个串行的执行流程中,需哟爱多次对数据库进行操作,这使得其并发效率低下,在多个用户同时请求的时候耗时较长。
针对上面的情况,我们通过将判断库存和校验一人一单的功能单独提出,通过提前缓存库存的方式,采用异步执行的方法,使用新的线程在后续进行数据库的减缓存和订单创建功能。
通过提前查询库存存储到redis中,后续判断库存直接在redis中判断,然后通过set数据结构来存储当前优惠券的下单用户,后续校验一人一单功能只需要在set集合中查询当前用户即可,异步线程只需要读取set中的数据,后续做减库存和创建订单操作。
由于redis并发速度较快,因此整体性能较高,但注意,在这种情况下,逻辑上订单完成以后,实际数据库中的订单可能还未创建完成。
整体流程:
左边的流程使用Lua脚本执行,整体是对redis中的数据进行操作,右边的流程则是异步线程后续的操作。
需求1:创建秒杀订单时将库存存入Redis中
只需要在创建秒杀订单的方法中addSeckillVoucher,增加一个对Redis的操作即可,这里我们用String类型来存储库存。
@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀券库存到Redis中stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(), voucher.getStock().toString());}
测试:
在redis中存在库存的缓存
需求2:基于Lua脚本,实现判断秒杀库存、一人一单功能
库存不足返回1,重复下单返回2,抢购成功返回0
-- 1. 传入的参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1];
-- 1.2 用户id
local userId = ARGV[2];
-- 2. 数据key
-- 2.1 库存key Lua脚本中使用..来拼接字符串
local stockKey = 'seckill:stock:' .. voucherId;
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId;
-- 3. 脚本业务
-- 3.1 判断库存是否充足,tonumber用于将字符串转变成数组进行比较
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1;
end;
-- 3.2 判断 用户是否已下单
if (redis.call('sismember', orderKey, userId) == 1) then-- 3.3 用户已下单,返回2return 2;
end;
-- 3.4 扣减库存
redis.call('incrby', stockKey, -1);
-- 3.5 下单,即保存用户到订单集合中
redis.call('sadd', orderKey, userId);
-- 返回抢购成功
return 0;
在秒杀订单功能中使用:
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}/*** 优惠券秒杀订单优化* @param voucherId* @return*/@Overridepublic Result seckillVoucher(Long voucherId) {// 获取当前用户IDLong userId = UserHolder.getUser().getId();// 1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());// 2.判断执行的结果int r = result.intValue();if (r != 0) {// 2.0 执行结果不为0,返回对应的错误信息,1为库存不足,2为重复下单return Result.fail(r == 1? MessageConstant.STOCK_IS_NOT_ENOUGH: MessageConstant.REPEAT_BUY);}// 2.2 执行结果为0,把下单信息保存到阻塞队列中// TODO 保存阻塞队列// 3. 返回订单idLong orderId = redisIdWorker.nextId("order");return Result.ok(orderId);}
测试:在redis中成功实现减库存和一人一单功能,此时,因为还未编写阻塞队列代码,所以数据库中不变。
需求3:抢购成功后,将优惠券id和用户id封装后存入阻塞队列
创建一个阻塞队列,后续开启异步下单后,不存在数据时会阻塞
// 阻塞队列,用于保存订单信息,以便于异步创建订单private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
在seckillVoucher方法中,将用户id、订单id、秒杀券id封装为订单,加入到阻塞队列中,在这里获取当前代理对象用于后续保证创建订单的业务能够正确执行。
@Overridepublic Result seckillVoucher(Long voucherId) {// 获取当前用户IDLong userId = UserHolder.getUser().getId();// 1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());// 2.判断执行的结果int r = result.intValue();if (r != 0) {// 2.0 执行结果不为0,返回对应的错误信息,1为库存不足,2为重复下单return Result.fail(r == 1? MessageConstant.STOCK_IS_NOT_ENOUGH: MessageConstant.REPEAT_BUY);}// 2.2 执行结果为0,把下单信息保存到阻塞队列中VoucherOrder voucherOrder = new VoucherOrder();// 2.3 订单idLong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 2.4 用户idvoucherOrder.setUserId(userId);// 2.5 秒杀券idvoucherOrder.setVoucherId(voucherId);// 2.6 放入到阻塞队列中orderTasks.add(voucherOrder);// 3. 获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4. 返回订单idreturn Result.ok(orderId);}
需求4:开启线程任务,不断从阻塞队列中获取订单信息,实现异步下单的功能
创建一个内部类,用于获取阻塞队列中的信息并创建订单
/*** 内部类,用于处理阻塞队列中的订单,进行订单创建*/private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {// 1. 获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();// 2. 创建订单handleVoucherOrder(voucherOrder);}catch (Exception e) {log.error("处理订单异常", e);}}}}
一个新的创建订单方法,用户ID和秒杀券id信息通过传入的订单信息获取,因为后续执行的是异步操作,无法通过ThreadLocal获取这些信息,这里查询用户和秒杀券的操作其实可以不做,因为在之前的redis中已经做了一人一单操作,这里为了防止redis出错,进行兜底。
/*** 新的订单创建方式,传入的是封装好的订单信息,包括用户id,订单id和秒杀券id* @param voucherOrder*/@Overridepublic void createVoucherOrder(VoucherOrder voucherOrder) {// 1.获取当前用户IdLong userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();// 2.查询订单中是否存在当前用户和秒杀券信息Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 3.判断订单是否存在if (count > 0) {log.error(MessageConstant.REPEAT_BUY);return;// 禁止重复购买}// 4. 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)
// .eq("stock", seckillVoucher.getStock()) // 乐观锁,判断库存是否被修改.gt("stock", 0)// 剩余库存大于0才扣减库存.update();if (!success) {// 5扣减库存失败,返回错误信息log.error(MessageConstant.STOCK_IS_NOT_ENOUGH);return;}// 6. 不用创建订单了,直接插入传入的订单save(voucherOrder);// 插入到数据库中}
内部类调用的创建订单的方法,这里其实也可以不加锁,加锁同样是为了兜底,其实前面redis只要不出错,这里每个到达的订单信息都是不同用户,且不超过库存的。
/*** 新建订单的方法,增加一道分布式锁的机制,来防止redis删减库存等步骤出问题,进一步保证一人一单功能* @param voucherOrder*/private void handleVoucherOrder(VoucherOrder voucherOrder) {// 对当前用户加锁,注意这里需要从voucherOrder中获取用户(因为是异步执行的)Long userId = voucherOrder.getUserId();// 创建锁对象RLock lock = redissonClient.getLock("lock:order" + userId);// 尝试获取锁,使用无参构造,默认不等待,超时时间为30sboolean isLock = lock.tryLock();// 判断获取锁是否成功if (!isLock) {// 获取锁失败,返回对应的错误信息log.error(MessageConstant.REPEAT_BUY);return;}// 获取锁成功,进行业务操作try {// 代理对象保证下面的事务能够正确执行proxy.createVoucherOrder(voucherOrder);}finally {// 释放锁lock.unlock();}}
创建线程池并使用Spring的@PostConstruct注解将异步执行方法用新线程执行,即确保init方法提交一个 VoucherOrderHandler 实例到 SECKILL_ORDER_EXECUTOR 线程池中执行,用于异步处理订单创建任务。
// 线程池,用于异步执行创建订单的业务private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}
测试:成功实现异步创建订单功能,每次请求的线程只对redis中的库存和订单进行处理,将订单信息加入阻塞队列中,使用异步线程实现对数据库的写入操作。
总结一些这里的秒杀订单优化思路:
- 利用Redis完成库存判断、一人一单判断,实现抢单业务,业务执行速度更快
- 将订单放入阻塞队列中,利用独立的线程进行异步下单操作,进行实际的数据库插入操作
基于阻塞队列的异步秒杀存在的问题:
- 内存限制问题:阻塞队列是在jvm中存在的,在高并发情况下会创建多个阻塞队列,且目前限制了阻塞队列的大小,导致部分数据由于内存不足无法存储。
- 数据安全问题:阻塞队列不具有持久性,异步线程创建订单具有延时性,当redis完成抢单业务,但异步线程还未执行完时发生故障或服务重启时,阻塞队列中的数据会丢失,导致实际的抢单业务未完成,造成数据库和redis中数据不一致,导致后续带来安全问题。
Redis消息队列
消息队列(Message Queue)字面上的意思就是存方消息的队列,一个最简单的消息队列模型包括3个角色:
- 消息队列:用于存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列中
- 消费者:从消息队列中获取消息并处理消息
以上面的订单秒杀功能为例,整个模型如下:
使用消息队列的好处:
- 将业务功能分离,能够实现redis业务和数据库业务的分离
- 消息队列会对数据进行持久化,不会想阻塞队列那样因为故障而导致数据丢失
- 消息队列保证存储的消息至少处理一次,即使出现服务器重启,下次也会继续处理未处理完的消息
Redis提供了三种不同的方式来实现消息队列:
- List结构:基于List的结构可以用来模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:redis5.0新提供的,比较完善的消息队列模型
基于List结构模拟消息队列
消息队列是一个存储消息的队列,而redis中的List结构是一个双向链表,它能够很好的模拟处队列的效果。
队列需要一边出一边如,可以使用redis的LPUSH(左入)结合RPOP(右出)、或者是RPUSH(右入)结合LPOP(左出)来实现消息队列。
但RPOP和LPOP操作在消息队列中没有消息时会返回null,而我们期望的是消息队列中没有消息时阻塞等待 ,因此我们需要使用BRPOP和BLPOP操作来实现阻塞的效果。如果在超过等待时间还没有消息的话就会返回null。
基于List的消息队列的优缺点:
优点:
- 利用redis进行存储,不受限与JVM的内存上限
- 基于Redis的持久化机制,能够有效的保证数据的安全性
- 可以满足消息的有序性(消息队列的要求)
缺点:
- 无法避免消息丢失:当从消息队列中获取消息但未处理时发生故障,此时该消息就会丢失。
- 只支持单消费者:一个消息只能被一个消费者获,在一些场景下会有一个消息需要被多个消息获取的需求,但这里无法满足。
基于PubSub的消息队列
PubSub(发布订阅)是Publish和Subscribe的缩写,它是Redis2.0引入的消息传递模型。在这种模式下,一个消费者可以订阅一个或多个Channel,生产者向对应的channel中发送消息后,所对应的所有订阅者都能够收到相关的消息。PubSub是天生阻塞的。
常用命令:
- SUBSCRIBE channel[channel]:订阅一或多个频道
- PUBLISH channel msg:向频道 发送消息msg
- PSUBSCRIBE patter[patter]:订阅与patter格式 匹配的所有频道(使用通配符时,必须用该命令进行频道匹配)
下面是频道的通配符标识
- ?表示匹配一个字符
- *表示中间匹配任意种类和数量的字符
- [ae]表示匹配中间的任意一个个字符
一个基本模型为:
测试:
SUBSCRIBE命令
PSUBSCRIBE命令
PUBLISH命令
基于SubPub的优缺点:
优点:
- 采用发布订阅抹模型,支持多生产者多消费者场景
缺点:
- 不支持数据持久化:不像list那样存储在Redis中,服务器故障则数据丢失
- 无法避免消息丢失:服务器宕机则数据丢失
- 消息堆积有上限,超出上限时数据丢失:消费者有缓冲区存储收到的消息,当消费者处理消息太慢而无法存储新来的消息时会出现数据丢失的情况。
基于Stream的消息队列
Stream是Redis在5.0版本的新的数据类型,它与List等数据类型一样,都是持久化的数据。
Stream常用命令:
- XADD:stream的新增命令
基本格式为:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] thresholdd [LIMIT count]] *|ID field value [field value ...]
各个变量含义为:
- key:stream数据的key
- NOMKSTREAM:表示如果队列不存在时,是否自动创建,不写则默认自动创建,写了则不需要自动创建。
- [MAXLEN|MINID [=|~] thresholdd [LIMIT count]]:用于限制Stream数据的长度
- *|ID:表示数据的唯一Id,*表示Redis随机生成(推荐使用该方式),格式通常为“时间戳-递增数字”,如“1644804662707-0”。
- field value:是存储的数据,即发送到队列中的消息,称为Entry,一个Entry是一个key-value键值对的格式,可以发送多个键值对数据。
- XREAD:Stream类型读取数据的命令,基本格式为:
XREAD [COUNT count] [BLOCK milliseconds] [STREAMS key [key ...]] ID [ID ...]
各个变量含义:
- COUNT:表示每次读取消息的最大数量,可以同时读取多个数据
- BLOCK:表示当队列里没有消息时,是否阻塞,不写或时间设置为0都表示不阻塞
- STREAMS:表示要从那个队列读取消息,key是队列名,可以同时读取多个
- ID:起始id只返回 大于该ID的消息,0表示从第一个消息开始返回,$表示从最新的消息开始返回。
注意:当我们读取数据时用$作为起始Id时,可能会出现消息漏读的情况,当我们在处理一条消息的过程中,又有超过1条以上的消息达到队列,则下次获取消息也只会阻塞等到最新得到消息,从而漏读在处理消息期间到达队列的消息。
测试:
STERAM类型消息队列的特点:
- 消息可回溯:读取完的消息不会消失,而是存储在Redis中,可以再次本访问
- 一个消息可以被多个消费者读取
- 可以阻塞读取:$符号的利用
- 有消息漏读的额风险(上面已经提到了)
消费者组
为了解决消息处理较慢和消息漏读的问题,消费者组被设计了出来。
消费者组(Consumer Group):将多个消费者划分到一个组中,用于监听同一个队列,它具有下列特点:
- 消息分流:一个队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组中会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启了,还是会从标示之后开始读取消息,从而保证每一个消息被消费,降低漏读的风险。
- 消息确认:消费者获取消息后,消息处于一个pending状态,并存入一个pending-list列表中,当处理完成一个消息后,需要通过XACK来确认消息,标记消息为已处理,从pending-list中移除,进一步避免了消息漏读的问题。
创建消费者组
命令格式为:
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组的名称
- ID:起始ID标示,$表示队列中的最后一个消息,0表示队列中的第一个消息
- MKSTREAM:队列不存在时,自动创建队列
从消费者在读取消息
命令格式为:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费者组的名称
- consumer:消费者名称,当消费者不存在时,会自动创建一个对应的消费者。
- count:本次读取的最大消息数量
- BLOCK milliseconds:当没有消息时的阻塞时间
- NOACK:无需手动ACK,获取消息后自动确认,可以理解为不放入pending-list中
- STREAMS key:指定消息队列的名称
- ID:回去消息的起始Id:
- ">":表示从下一个未消费的消息开始读取
- 其它:根据指定的id从pending-list中获取已消费但未确认的消息,例如0表示从pending-list中的第一个消息开始获取
测试:
为s1创建一个名为group1的消费者组
正常获取消息
处理完这五条消息后确认,才能才能够pending-list中删除
再次获取消息,当出现故障时,需要获取未处理的消息,即pending-list中的消息。
测试完成。
消费者组的Java伪代码
外层while中获取消息队列中的消息,当出现异常时,catch中的while循环处理pending-list中的消息,从而保证每个 消息至少处理一次。
STREAM类型消息队列 的特点:
- 消息可回溯:消息可被多次读取
- 可以多个消费者处理同一个队列中的消息,进而加快消费的速度,提高效率
- 可以阻塞读取
- 没有消息漏读风险:ACK和pending-list机制的存在保证不会漏读
- 有消息确认机制,能够保证消息至少被消费一次
Redis三种消息队列的比较:
Redis的STREAM类型作为消息队列实现异步秒杀功能
需求1:创建一个STREAM类型的消息队列,名为stream.orders
需求2:修改秒杀下单的Lu脚本,在抢购成功以后,直接向stream.orders中添加消息,内容包含voucherId,userId,orderId
增加订单id,并将其发送到消息队列中,使用id作为订单id名称存储,与VoucherOrder实体类中的属性对应,以便后续封装。
-- 1. 传入的参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1];
-- 1.2 用户id
local userId = ARGV[2];
-- 1.3 订单id
local orderId = ARGV[3];
-- 2. 数据key
-- 2.1 库存key Lua脚本中使用..来拼接字符串
local stockKey = 'seckill:stock:' .. voucherId;
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId;
-- 3. 脚本业务
-- 3.1 判断库存是否充足,tonumber用于将字符串转变成数组进行比较
if (tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1;
end;
-- 3.2 判断 用户是否已下单
if (redis.call('sismember', orderKey, userId) == 1) then-- 3.3 用户已下单,返回2return 2;
end;
-- 3.4 扣减库存
redis.call('incrby', stockKey, -1);
-- 3.5 下单,即保存用户到订单集合中
redis.call('sadd', orderKey, userId);
-- 3.6 发送到消息队列中,这里订单id存储为id,与VoucherOrder实体类中的属性对应
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
-- 返回抢购成功
return 0;
修改seckill方法,订单信息的发送在Lua脚本中执行,这里不在需要封装订单,只需要针对脚本的执行结果进行判断即可。
/*** 优惠券秒杀订单优化,使用STREAM* @param voucherId* @return*/@Overridepublic Result seckillVoucher(Long voucherId) {// 获取当前用户IDLong userId = UserHolder.getUser().getId();// 订单idLong orderId = redisIdWorker.nextId("order");// 1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),orderId.toString());// 2.判断执行的结果int r = result.intValue();if (r != 0) {// 2.0 执行结果不为0,返回对应的错误信息,1为库存不足,2为重复下单return Result.fail(r == 1? MessageConstant.STOCK_IS_NOT_ENOUGH: MessageConstant.REPEAT_BUY);}// 3. 获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4. 返回订单idreturn Result.ok(orderId);}
需求3:项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
还是需要线程池和@PostConstruct注解在项目启动时自动处理消息队列中的消息。
// 线程池,用于异步执行创建订单的业务private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}
处理消息队列消息,while循环保证不断处理消息队列的消息
final String queueName = "stream.orders";/*** 处理消息队列中的信息,异步创建订单*/private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {// 1. 获取消息队列中的订单信息 XREADGROUP GROUP gg1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),//使用spring相关的ConsumerStreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),// 读取1个消息,阻塞时间2sStreamOffset.create(queueName, ReadOffset.lastConsumed())// 相当于>符号,表示上一个未消费的消息);// 2. 判断消息是否获取成功if (list == null || list.isEmpty()){continue;// 获取失败,没有消息,继续下次循环}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);// 这里我们只取一个消息Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 创建订单handleVoucherOrder(voucherOrder);// 5. 进行消息ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());}catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}}
handlePendingList方法是在前面处理消息时报错的情况下,对pending-list中的消息进行处理的方法。
private void handlePendingList() {while (true) {try {// 1. 获取handing-list中的消息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),//使用spring相关的ConsumerStreamReadOptions.empty().count(1),// 读取1个消息,不需要阻塞StreamOffset.create(queueName, ReadOffset.from("0"))// 自己定义一个"0",表示handing-list中未确认的第一个);// 2. 判断消息是否获取成功if (list == null || list.isEmpty()){break;// 获取失败,表示handing-list中没有消息,结束当前循环}// 3. 解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);// 这里我们只取一个消息Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 创建订单handleVoucherOrder(voucherOrder);// 5. 进行消息ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());}catch (Exception e) {// 异常结束会重复当前循环,直到handing-list中没有消息log.error("处理pending-list订单异常", e);try {Thread.sleep(20);// 休眠20ms,避免不断进行尝试,浪费资源} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}
测试:实现了STREAM作为消息队列异步订单秒杀功能的修改。
秒杀订单业务整体流程和优化