当前位置: 首页 > news >正文

分布式锁实现方式

原理

什么是分布式锁

分布式锁是一种用于多个进程或线程之间同步访问共享资源的机制。在分布式系统中,由于多个节点之间的通信延迟和并发访问的问题,没有一个中心节点可以直接控制所有资源的访问。因此,设计一种分布式锁可以确保在分布式环境中的并发访问中只有一个进程或线程可以访问某个共享资源。

分布式锁应用场景

(1)传统的单体应用单机部署情况下,可以使用java并发处理相关的API进行互斥控制。

(2)分布式系统后由于多线程,多进程分布在不同机器上,使单机部署情况下的并发控制锁策略失效,为了解决跨JVM互斥机制来控制共享资源的访问,这就是分布式锁的来源;分布式锁应用场景大都是高并发、大流量场景。

常见的实现分布式锁的方法

  1. 基于数据库的实现:使用数据库的事务机制和唯一索引来实现分布式锁。通过在数据库中创建一个特定的表或记录,来作为锁的标识。当一个进程想要获取锁时,它尝试在数据库中插入一个特定的记录,如果插入成功,则获得了锁。其他进程在尝试获取锁时,会因为唯一索引的限制而失败。

  2. 基于缓存的实现:使用分布式缓存系统,如Redis,来实现分布式锁。通过在缓存中设置一个特定的键值对,来作为锁的标识。当一个进程想要获取锁时,它尝试在缓存中设置这个键值对,如果设置成功,则获得了锁。其他进程在尝试获取锁时,会因为缓存系统的原子操作特性而失败。

  3. 基于Zookeeper的实现:使用Zookeeper来实现分布式锁。Zookeeper是一个分布式协调服务,可以提供分布式锁的功能。通过创建一个临时有序节点,来作为锁的标识。当一个进程想要获取锁时,它在Zookeeper中创建一个临时有序节点,并检查自己是否是最小的节点,如果是,则获得了锁。其他进程在检查节点顺序时,会因为自己不是最小节点而失败

基于Redisson对redis分布式锁的实现

(1)加锁机制:根据hash节点选择一个客户端执行lua脚本

(2)锁互斥机制:再来一个客户端执行同样的lua脚本会提示已经存在锁,然后进入循环一直尝试加锁

(3)可重入机制

(4)watch dog自动延期机制

(5)释放锁机制

首先,你需要在你的项目中引入Redisson的依赖。可以通过Maven来引入依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.5</version>
</dependency>

接下来,你可以使用Redisson的RLock接口来创建和使用分布式锁

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class RedisLockExample {public static void main(String[] args) {// 创建Redisson客户端Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient client = Redisson.create(config);// 使用RLock接口创建分布式锁RLock lock = client.getLock("myLock");try {// 尝试获取锁,最多等待10秒boolean acquired = lock.tryLock(10, TimeUnit.SECONDS);if (acquired) {// 获取锁成功,执行业务逻辑System.out.println("获取到分布式锁");} else {// 获取锁失败,执行其他逻辑System.out.println("获取分布式锁失败");}} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放锁lock.unlock();client.shutdown();}}
}

源码

lock源码

  1. 获取当前线程的ID。
  2. 调用tryAcquire方法,尝试获取锁定,并返回锁的过期时间ttl。
  3. 如果ttl不为null,说明成功获取了锁。然后订阅锁的释放事件,并将返回的Future对象赋给future变量。
  4. 调用commandExecutor的syncSubscription方法,同步等待订阅操作完成。
  5. 进入循环,不断尝试获取锁。
  6. 如果ttl为null,说明锁已被其他线程占用,直接返回。
  7. 如果ttl大于等于0,说明锁已被其他线程占用,并且还有一段时间才会释放。这时候通过调用getEntry方法获取锁的实例,并调用其getLatch方法尝试在ttl时间内获取锁。
  8. 如果ttl小于0,说明锁已被其他线程占用,并且没有设置过期时间。这时候通过调用getEntry方法获取锁的实例,并调用其getLatch方法无限等待获取锁。
  9. 在finally块中调用unsubscribe方法取消订阅事件,并传入future和threadId参数。
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread0.getId0;Long ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl != nul1)RFuture < RedissonLockEntry > future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future);try {while (true) {ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl == nul1) {return;}if (ttl >= 0 L) {this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {this.getEntry(threadId).getLatch().acquire();}}} finally {this.unsubscribe(future, threadId);}
}

Redis单机分布式锁

该方法的参数包括leaseTime(锁的租约时间)、unit(时间单位)、threadid(线程ID)和RedisStrictCommand(Redis命令)。

该方法首先将leaseTime转换为毫秒,并将其保存在internalLockLeaseTime变量中。

然后,它调用commandExecutor对象的evalWriteAsync方法执行Lua脚本。

Lua脚本中包含以下几个步骤:

  1. 首先,脚本检查键KEYS[1]是否存在(通过调用exists命令)。如果键不存在,则执行以下操作:

    • 使用hset命令将键KEYS[1]的值设置为ARGV[2](线程ID)的值为1。
    • 使用pexpire命令设置键KEYS[1]的过期时间为ARGV[1](锁的租约时间)的值。
    • 返回nil表示锁获取成功。
  2. 接着,脚本检查键KEYS[1]中是否存在ARGV[2](线程ID)。如果存在,则执行以下操作:

    • 使用hincrby命令将键KEYS[1]中ARGV[2]的值加1。
    • 使用pexpire命令设置键KEYS[1]的过期时间为ARGV[1](锁的租约时间)的值。
    • 返回nil表示锁获取成功。
  3. 最后,脚本调用pttl命令返回键KEYS[1]的剩余过期时间(以毫秒为单位)。

Redis红锁

  1. 首先,根据当前已经获取锁的数量计算出一个基础的等待时间(baseWaitTime),每个锁对应的等待时间为1500毫秒。

  2. 判断传入的参数leaseTime是否为-1,如果是-1,则将等待时间设置为baseWaitTime并将时间单位设置为毫秒。

  3. 如果leaseTime不为-1,则将等待时间转换为毫秒,并进行以下判断:

    • 如果waitTime小于等于2000毫秒,则将等待时间设置为2000毫秒。
    • 如果waitTime小于等于baseWaitTime,则将等待时间设置为一个介于waitTime/2和waitTime之间的随机数。
    • 如果waitTime大于baseWaitTime,则将等待时间设置为一个介于baseWaitTime和waitTime之间的随机数。
  4. 最后,将等待时间转换为传入的时间单位,并使用tryLock方法尝试获取锁。如果成功获取锁,则方法返回。如果未能获取锁,则进入下一次循环继续尝试获取锁,直到成功获取锁为止。

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long baseWaitTime = locks.size() * 1500;long waitTime = -1;if (leaseTime == -1) {waitTime = baseWaitTime;unit = TimeUnit.MILLISECONDS;} else {waitTime = unit.toMillis(leaseTime);if (waitTime <= 2000) {waitTime = 2000;} else if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(least: waitTime / 2, waitTime);} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);}while (true) {if (tryLock(waitTime, leaseTime, unit)) {return;}}

通过遍历一个包含锁对象的列表,尝试获取每个锁。获取锁的方式是使用Redis的tryLock方法,如果获取成功,则将该锁添加到acquiredLocks列表中。如果获取失败,则通过捕获不同类型的异常来处理,如RedisConnectionClosedException和RedisResponseTimeoutException。对于其他类型的异常,将lockAcquired设置为false。

首先,判断是否成功获取锁(lockAcquired)。如果成功,将该锁添加到已获取锁的集合(acquiredLocks)中。

如果未成功获取锁,就会执行else分支的代码。

首先,通过计算当前未成功获取锁的次数和已获取锁的次数,判断是否达到了失败锁的限制(failedLocksLimit)。如果达到了失败锁的限制,就会跳出循环。

接着判断是否设置了失败锁的限制。如果失败锁的限制为0,表示不允许再进行获取锁的操作,此时会释放已获取的锁(unlockInner(acquiredLocks))。然后判断是否设置了等待时间(waitTime == -1)和租约时间(leaseTime == -1),如果没有设置等待时间和租约时间,返回false表示获取锁失败。

重新设置失败锁的限制为初始值(failedLocksLimit0),并清空已获取锁的集合(acquiredLocks)。

最后,通过循环将迭代器(iterator)重置为起始位置,或者将失败锁的限制(failedLocksLimit)减1。

if (lockAcquired) {acquiredLocks.add(lock);
} else {if (1 ocks.size() - acquiredLocks.size() == failedLocksLimit() {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit0;acquiredLocks.clear(;//reset iteratorwhile (iterator.hasPrevious() {iterator.previous(;} else {failedLocksLimit--}

Redis 单机分布式锁测试

private RedissonClient getClient() {Config config = new Config();config.useSingleServer().setAddress("redis://39.103.132.33:7004”).setPassword("Redis@ 123456 ");RedissonClient redissonClient =Redisson.create(config);return redissonClient; }private ExecutorService executorService = Executors.newCachedThreadPool();
int[] count = {0
};
for (int i = 0; i < 10; i++) {RedissonClient client = getClient();final RedisLock redisLock = new RedisLock(client, key: "1ock_key");executorService.submit(() - > {try {redisLock.lock();count[0] ++;} catch (Exception e) {e.printStackTrace();} finally {redisLock.unlock();}})}

红锁

public static RLock create(String url, String key) {Config config = new Config();config.useSingleServer().setAddress(url);RedissonClient redissonClient = Redisson.create(config);return redissonClient.getLock(key);
}
RedissonRedLock redissonRedLock = new RedissonRedLock(create(url: "redis://172.16.10.164:6379", key: "1ock_key1"), create(url: "redis://172.16.10.164:6380", key: "lockkey2”),create(url:"redis: //172.16.10.164:6381",key:"lock_key3"));RedisRedLock redLock =new RedisRedLock(redissonRedLock);private ExecutorService executorService = Executors.newCachedThreadPool();

基于ETCD实现分布式锁分析

基于 ETCD的锁实现的基础机制

Lease机制:租约机制(TTL,Time To Live),Etcd 可以为存储的 key-value 对设置租约, 当租约到期,key-value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约, 以避免 key-value 对过期失效。Lease 机制可以保证分布式锁的安全性,为锁对应的 key 配置租约, 即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放

  Revision机制:每个 key 带有一个 Revision 号,每进行一次事务加一,它是全局唯一的, 通过 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个客户端同时抢锁, 根据 Revision 号大小依次获得锁,可以避免 “羊群效应” ,实现公平锁

 Prefix机制:即前缀机制。例如,一个名为 /etcdlock 的锁,两个争抢它的客户端进行写操作, 实际写入的 key 分别为:key1="/etcdlock/UUID1",key2="/etcdlock/UUID2", 其中,UUID 表示全局唯一的 ID,确保两个 key 的唯一性。写操作都会成功,但返回的 Revision 不一样, 那么,如何判断谁获得了锁呢?通过前缀 /etcdlock 查询,返回包含两个 key-value 对的的 KeyValue 列表, 同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁

Watch机制:即监听机制,Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制), 当被 Watch 的 key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败, 可通过 Prefix 机制返回的 KeyValue 列表获得 Revision 比自己小且相差最小的 key(称为 pre-key), 对 pre-key 进行监听,因为只有它释放锁,自己才能获得锁,如果 Watch 到 pre-key 的 DELETE 事件, 则说明 pre-key 已经释放,自己已经持有锁

基于 ETCD的分布式锁的原理

基于ETCD分布式锁的实现流程

 1. 建立连接 客户端连接 Etcd,以 /etcd/lock 为前缀创建全局唯一的 key, 假设第一个客户端对应的 key="/etcd/lock/UUID1",第二个为 key="/etcd/lock/UUID2"; 客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定;

2: 创建定时任务作为租约的“心跳” 当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效, 客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃, 心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁

3: 客户端将自己全局唯一的 key 写入 Etcd 执行 put 操作,将

      步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制, 假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以 接下来判断自己是否获得锁

 4: 客户端判断是否获得锁 客户端以前缀 /etcd/lock/ 读取 keyValue 列表,判断自己 key 的 Revision 是否为当前列表中 最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

 5: 执行业务 获得锁后,操作共享资源,执行业务代码

步骤 6: 释放锁 完成业务流程后,删除对应的key释放锁

基于ETCD的分布式锁源码核心接口

@
0 verride
public void lock() {//检查重入性1. 检查锁重入性Thread currentThread = Thread.currentThread();LockData oldLockData = threadData.get(currentThread);if (oldLockData != null && oldLockData.isLockSuccess()) {2. 设置租约//re-enteringoldLockData.lockCount.incrementAndGet();3. 开启定时任务心跳检查return;}4. 阻塞获取锁//记录租约ID5. 加锁成功, 返回锁对象Long leaseId = 0 L;try {leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL))· get()· getID();//续租心跳周期long period = leaseTTL - leaseTTL / 5;//启动定时任务续约service.scheduleAtFixedRate(new EtcdDistributeLock.KeepAliveRunnable(leaseClient, leaseId), initialDelay, period, TimeUnit.NANOSECONDS);LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();if (lockResponse != null) {LockPath = lockResponse.getKey().toString(Charset.forName("utf-8"));log.info("获取锁成功,锁路径:{},线程:{}", LockPath, currentThread.getName());}} catch (InterruptedException | ExecutionException e) {log.error("获取锁失败", e);return;} //获取锁成功,锁对象设置LockData newLockData = new LockData(currentThread, lockKey);newLockData.setLeaseId(leaseId);newLockData.setService(service);threadData.put(currentThread, newLockData);
@
0 verride
public void unlock() {Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if (lockData == null) {throw new IllegalMonitorStateException("You do not own the lock:" + lockKey);}int newLockCount = lockData.lockCount.decrementAndGet();if (newLockCount > 0) {return;7 上if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock:" + lockKey);}try {//释放锁if (LockPath != null) {lockClient, unlock(ByteSequence.from(LockPath.getBytes()))· get();}if (LockData != null) {解锁过程://关闭定时任务lockData.getService().shutdown();1. 进行重入性检查//删除租约if (lockData.getLeaseId() != 0 L) {2. 移除当前锁的节点路径, 释放锁leaseClient.revoke(lockData.getLeaseId());}}} catch (InterruptedException | ExecutionException e) {log.error("解锁失败", e);3. 清除重入的线程资源} finally {//移除当前线程资源threadData.remove(currentThread);}}

基于Zookeeper分布式锁的使用

封装了Curator内实现的分布式锁,且实现了jdk8的Lock接口,面向接口编程,使用的时候可以随时更换锁的具体实现。

Curator的几种锁实现:

1. InterProcessMutex:分布式可重入排它锁 ---------------------上面的ZKLock就是用的这个

2. InterProcessSemaphoreMutex:基于信号量实现的分布式排它锁(资源个数是1)

3. InterProcessReadWriteLock:分布式读写锁 4. InterProcessMultiLock:将多个锁作为单个实体管理的容器

基于Zookeeper分布式锁原理

zookeeper作为高性能分布式协调框架,可以把其看做一个文件系统,其中有节点的概念,并且分为4种:1.持久性节点2.持久性顺序节点3.临时性节点4.临时性顺序节点。

分布式锁的实现主要思路就是:监控其他客户端的状态,来判断自己是否可以获得锁。

采用临时性顺序节点的原因:

1.zk服务器维护了客户端的会话有效性,当会话失效的时候,其会话所创建的临时性节点都会被删除,通过这一特点,可以通过watch临时节点来监控其他客户端的情况,方便自己做出相应动作。

2.因为zk对写操作是顺序性的,所以并发创建的顺序节点会有一个唯一确定的序号,当前锁是公平锁的一种实现,所以依靠这种顺序性可以很好的解释—节点序列小的获取到锁   并且可以采用watch自己的前一个节点来避免惊群现象(这样watch事件的传播是线性的)


http://www.mrgr.cn/news/69449.html

相关文章:

  • centos7 升级openssl 与升级openssh 安装卸载 telnet-server
  • Photoshop(PS)——人像磨皮
  • 接口文档的定义
  • Django 2024全栈开发指南(二):Django项目配置详解
  • 机器学习: LightGBM模型(优化版)——高效且强大的树形模型
  • 力扣 二叉树的直径-543
  • 深入理解 Vue 3 中的 Props
  • 2024年下半年系统分析师论文
  • 基于Multisim心率脉搏测量电路(含仿真和报告)
  • 数据结构:顺序表(动态顺序表)
  • 云计算在esxi 主机上创建 4g磁盘,同时在此磁盘上部署linux
  • 呼叫中心外呼主要用于哪些场景?
  • 编程新星挑战赛-题解
  • C/C++语言基础--C++模板与元编程系列四(类型模板参数、整数、指针 、模板类型)
  • Windows 11 安装 MySQL 8.4 LTS 详细安装配置教程(入门篇)
  • Cesium着色器的创意和方法(五——Polyline)
  • Linux中的用户创建及参数说明
  • nvm 切换 Node.js 版本
  • ElasticSearch向量检索技术方案介绍
  • Java异常处理
  • Android setContentView执行流程(一)-生成DecorView
  • AcWing 29:删除链表中重复的节点
  • 动态规划 —— dp 问题-买卖股票的最佳时机含冷冻期
  • Mysql基础 03 pymysql库、事务命令
  • 《Python编程实训快速上手》第四天--字符串操作
  • 【Java 多线程】:线程状态 线程操作 线程同步