redis实现分布式锁详细教程,可续锁(看门狗)、可重入
前言
本文将讨论的做一个高并发场景下避不开的话题,即redis分布式锁。比如在淘宝 的秒杀场景、热点新闻和热搜排行榜等。可见分布式锁是一个程序员面向高级的一门必修课,下面请跟着本篇文章好好学习。
redis分布式锁有哪些面试题
1.Redis做分布式的时候需要注意什么问题?
2.你们公司自己实现的分布式锁是否用的setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
3.如果Redis是单点部署的,会带来什么问题?准备怎么解决单点问题呢?
Redis集群模式下,比如主从模式下,CAP方面有没有什么问题?
1.分布式锁是什么?
1.2.锁的种类介绍
锁的种类 | 锁的概念 |
---|---|
单机 | 单机版同一个JVM虚拟机内,synchronized或者lock接口 |
分布式 | 分布式多个不同的java虚拟机,单机的线程锁机制不再起作用了,资源类在不同的服务器之间共享了。 |
1.2一个正经的分布式锁具有哪些刚需
独占性:任何时刻只能有且仅有一个线程持有
高可用:若redis集群环境下,不能因为某个节点挂了而出现获取锁或者释放锁失败。高并发请求下依旧能够保证良好使用。
防止死锁:杜绝死锁,必须有超时控制或者撤销操作,有个兜底终止跳出方案
不乱抢:防止张冠李戴,不能私下uolock别人的锁,只能自己加锁自己释放,自己约的锁自己要释放,可以设置过期时间,或者业务代码执行完毕以后删除对一个的锁。
可重入:同一个节点的同一个线程如果获得锁之后,他也可以再次获得这个锁。
1.3 redis分布式锁
setnx key values
1.4 java实现分布式锁的案例
先来个乞丐版的分布式锁,并没有遵循上面五大原则。然后慢慢进行优化,乞丐版分布锁案例如下代码所示:
public String sale() {String resMessgae = "";String key = "luojiaRedisLocak";String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);// 抢不到的线程继续重试if (!flag) {// 线程休眠20毫秒,进行递归重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}sale();} else {try {// 1 抢锁成功,查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory01");// 2 判断库存书否足够Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);// 3 扣减库存,每次减少一个库存if (inventoryNum > 0) {stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;log.info(resMessgae);} else {resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;log.info(resMessgae);}} finally {stringRedisTemplate.delete(key);}}return resMessgae;
}
请看看以上代码有哪些问题?既没有删除过期时间 ,也没有判断redis获取的redis值进行删除,有可能删除错锁。如果进一步优化可以redis可以存一个流水号,业务代码执行完了以后,判断流水号是否相等,然后进行删除。可重入问题可以通过递归实现重试,但是依旧有问题:手工设置5000个线程来抢占锁,压测OK,但是容易导致StackOverflowError,在高并发不推荐使用,需要进一步完善。改进获取重试方法代码如下所示:
public String sale() {String resMessgae = "";String key = "luojiaRedisLocak";// 标记线程id,知道使哪个线程在执行String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();// 不用递归了,高并发容易出错,我们用自旋代替递归方法重试调用;也不用if,用while代替while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {// 线程休眠20毫秒,进行递归重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}try {// 1 抢锁成功,查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory01");// 2 判断库存书否足够Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);// 3 扣减库存,每次减少一个库存if (inventoryNum > 0) {stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;log.info(resMessgae);} else {resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;log.info(resMessgae);}} finally {stringRedisTemplate.delete(key);}return resMessgae;
}
为了防止出现死锁,需要给锁设置过期时,关键点在于过期时间设置,以避免代码异常出现,而该线程持续占有该锁。其java代码如下所示:
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {// 线程休眠20毫秒,进行递归重试try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}}
为了防止误删key,在执行完了业务代码以后需要删掉锁,在try-catch-finally 中添加如下删除锁的代码 :
try {//和上一个代码块重复,省略掉了} finally {// v5.0 改进点,判断加锁与解锁是不同客户端,自己只能删除自己的锁,不误删别人的锁if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)) {stringRedisTemplate.delete(key);}}
1.5 优化分布式锁
本次优化主要解决的问题有:宕机防止死锁、防止误删key、Lua保证原子性。设置 过期时间的同时,当业务执行时间大于过期时间,自动续锁功能等。java代码如下所示:
自动续锁的Lua脚本:
// 自动续期的LUA脚本
if redis.call('hexists', KEYS[1], ARGV[1]) == 1 thenreturn redis.call('expire', KEYS[1], ARGV[2])
elsereturn 0
end
新增续锁功能,java代码如下所示
package com.luojia.redislock.mylock;import cn.hutool.core.util.IdUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 自研的分布式锁,实现了Lock接口*/
public class RedisDistributedLock implements Lock {private StringRedisTemplate stringRedisTemplate;private String lockName; // KEYS[1]private String uuidValule; // ARGV[1]private long expireTime; // ARGV[2]public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuidValule = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();this.expireTime = 50L;}@Overridepublic void lock() {tryLock();}@Overridepublic boolean tryLock() {try {tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (-1 == time) {String script ="if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[1], 1) " +"redis.call('expire', KEYS[1], ARGV[2]) " +"return 1 " +"else " +"return 0 " +"end";System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);// 加锁失败需要自旋一直获取锁while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),Arrays.asList(lockName),uuidValule,String.valueOf(expireTime))) {// 休眠60毫秒再来重试try {TimeUnit.MILLISECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}}return true;}return false;}@Overridepublic void unlock() {String script = "" +"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +"return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";System.out.println("lockName:" + lockName + "\t" + "uuidValue:" + uuidValule);// LUA脚本由C语言编写,nil -> false; 0 -> false; 1 -> true;// 所以此处DefaultRedisScript构造函数返回值不能是Boolean,Boolean没有nilLong flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class),Arrays.asList(lockName),uuidValule);if (null == flag) {throw new RuntimeException("this lock does not exists.");}}// 下面两个暂时用不到,不用重写@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic Condition newCondition() {return null;}
}
完整的分布式锁java代码如下所示:
// v7.0 使用自研的lock/unlock+LUA脚本自研的Redis分布式锁
Lock redisDistributedLock = new RedisDistributedLock(stringRedisTemplate, "luojiaRedisLock");
public String sale() {String resMessgae = "";redisDistributedLock.lock();try {// 1 抢锁成功,查询库存信息String result = stringRedisTemplate.opsForValue().get("inventory01");// 2 判断库存书否足够Integer inventoryNum = result == null ? 0 : Integer.parseInt(result);// 3 扣减库存,每次减少一个库存if (inventoryNum > 0) {stringRedisTemplate.opsForValue().set("inventory01", String.valueOf(--inventoryNum));resMessgae = "成功卖出一个商品,库存剩余:" + inventoryNum + "\t" + ",服务端口号:" + port;log.info(resMessgae);} else {resMessgae = "商品已售罄。" + "\t" + ",服务端口号:" + port;log.info(resMessgae);}} finally {redisDistributedLock.unlock();}return resMessgae;
}
总结
synchronized单机版OK; -> v1.0
Nginx分布式微服务,轮询多台服务器,单机锁不行;-> v2.0
取消单机锁,上redis分布式锁setnx,中小企业使用没问题;-> v3.1
只是加锁了,没有释放锁,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁 -> v3.2
如果服务宕机,部署了微服务代码层面根本就没有走到finally这块,没办法保证解锁,这个Key没有被删除,需要对锁设置过期时间 -> v3.2
为redis的分布式锁key增加过期时间,还必须要保证setnx+过期时间在同一行,保证原子性 -> v4.1
程序由于执行超过锁的过期时间,所以在finally中必须规定只能自己删除自己的锁,不能把别人的锁删除了,防止张冠李戴 -> v5.0
将Lock、unlock变成LUA脚本保证原子性; -> v6.0
保证锁的可重入性,hset替代setnx+Lock变成LUA脚本,保障可重入性; -> v7.0
锁的自动续期 -> v8.0