分布式锁

[TOC]

1 什么是分布式锁

在单体的应用开发场景中涉及并发同步的时候,大家往往采用Synchronized(同步)或者其他同一个JVM内Lock机制来解决多线程间的同步问题。

在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。

2 分布式锁的实现

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

图片

2.1 基于数据库的分布式锁

基于数据库实现分布式锁主要是利用数据库的唯一索引来实现,唯一索引天然具有排他性,这刚好符合我们对锁的要求

2.1.1 设计原理

  • 同一时刻只能允许一个竞争者获取锁。加锁时我们在数据库中插入一条记录,利用唯一键进行防重。
  • 当竞争者A加锁成功后,第竞争者B再来加锁就会抛出唯一索引冲突,如果抛出这个异常,我们就判定竞争者B加锁失败
  • 竞争者B加锁失败后,会阻塞等待,一直到竞争者A释放锁(也就是删除记录后),再去获取锁

image-20230228222626149

2.1.2 实现注意事项

  • 没有锁超时机制。如果程序发生了异常,将无法删除数据,也就是锁无法被释放掉,需要自己写一套锁超时机制,比如:在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据。
  • 基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,可以考虑实现数据库的高可用方案。
  • 需要自旋实现阻塞效果。当获取锁失败时自旋转。
  • 如果使用数据库自增 id ,规律太明显。
  • 受单表数据量的限制。 在高并发场景下,我们都知道 MySQL 的单张表根本不可能容纳大量数据(性能等原因的限制);如果是将单表拆成多表,还是用数据库自增 id 的话,就存在了 id 重复的情况了,很显然这是业务不允许的。

db操作性能较差,并且有锁表的风险,一般不考虑。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 */
public abstract class AbstractLock implements Lock{

/**
* 加锁,增加重试逻辑
*/
@Override
public void lock() {
//尝试获取锁
if(tryLock()){
System.out.println("---------获取锁---------");
}else {
//等待锁 阻塞
waitLock();
//重试
lock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MysqlDistributedLock extends AbstractLock {

@Autowired
private MethodlockMapper baseMapper;

@Override
public boolean tryLock() {
try {
//插入一条数据 insert into
baseMapper.insert(new Methodlock("lock"));
}catch (Exception e){
//插入失败
return false;
}
return true;
}

@Override
public void waitLock() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void unlock() {
//删除数据 delete
baseMapper.deleteByMethodlock("lock");
System.out.println("-------释放锁------");
}

2.2 基于 Redis 的分布式锁

效率最高,加锁速度最快,因为Redis几乎都是纯内存操作。

适用于并发量很大、性能要求很高而可靠性问题可以通过其 他方案去弥补的场景。

2.2.1 设计原理

  • 利用利用Redis的SETNX key value这个命令获取锁,并设置过期时间,保存线程标示

  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁 (Lua 脚本保证原子性)

SETNX key value: 只有当key不存在时才会执行成功,如果key已经存在则命令执行失败。

2.2.2 实现注意事项

  • Redis 的 increment 并不能满足安全性,如果使用它需要特殊处理增加复杂性,如:

    • ID的组成部分:
      • 符号位:1bit,永远为0
      • 时间戳:31bit,以秒为单位,可以使用69年
      • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
  • 需要设置一个超时时间,因为有可能宕机或者被运维重启了,无法释放锁,但是这个超时时间的长短却不好确定:

    • 设置过短,会导致其他线程提前获得锁,引发线程安全问题
    • 设置过长,线程需要额外等待
  • 如果业务执行时间> 过期时间,就需要锁续命:搞一个定时任务,设一个间隔时间,小于失效时间,过一段时间去监控业务是否执行完了,执行没结束,也就是锁还没释放,我就再把锁的过期时间重新设置成初始值

  • 避免线程A释放线程B的锁,需要在释放锁时多加一个判断,每个线程只释放自己的锁,不能释放别人的锁!可以给每个线程分配一个唯一的UUID

  • 需要确保锁可以正常释放

    • 使用Lua脚本获取和删除锁,保证原子性
    • 必须使用try catch 在finally中释放锁,否则有异常,锁便没法释放,其他线程进来就会一直执行失败
  • 需要支持锁重入,同一个线程可能多次获取同一把锁

  • 需要重试机制,避免锁只尝试一次就返回false

  • 如果Redis提供了主从集群,那么需要考虑主从一致性。因为主从同步存在延迟,当主宕机时,在主节点中的锁数据并没有及时同步到从节点中,则会导致其他线程也能获得锁,引发线程安全问题(延迟时间是在毫秒以下的,所以这种情况概率极低)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Service
public class RedisLockService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String LOCK_PREFIX = "lock:";

/**
* 尝试获取锁
*
* @param lockKey 锁的键
* @param requestId 请求ID,用于解锁时验证
* @param expireTime 锁的超时时间,毫秒
* @return 锁是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String key = LOCK_PREFIX + lockKey;
// 尝试设置锁,这里SETNX不是原子操作,实际使用时应考虑使用Lua脚本或Redis 2.6.12+的SET命令
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTime, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(result);
}

/**
* 释放锁
*
* @param lockKey 锁的键
* @param requestId 请求ID,用于验证
* @return 锁是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
// // Lua 脚本,用于释放锁,Lua 脚本可以确保操作的原子性。
String lockScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> releaseLockScript = new DefaultRedisScript<>(lockScript, Long.class);
Long result = redisTemplate.execute(releaseLockScript, Collections.singletonList(key), requestId);
return result != null && result == 1;
}
}

2.2.4 redison使用

Redisson是一个基于Redis的分布式Java对象和服务的框架,它提供了丰富的功能来支持各种分布式系统场景。Redisson提供了多种分布式锁的实现,包括可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)等,用于在分布式环境中实现互斥访问

image-20230228222105112

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class RedissonLockExample {

// Redis 服务器配置
private static final String REDIS_URL = "redis://127.0.0.1:6379";

public static void main(String[] args) {

Config config = new Config();
config.useSingleServer().setAddress(REDIS_URL);
RedissonClient redisson = Redisson.create(config);
String lockKey = "myLock";

// 获取锁对象
RLock lock = redisson.getLock(lockKey);

try {
boolean res = lock.tryLock(10, 10, TimeUnit.SECONDS); // 尝试加锁,这里只是示例,Redisson 中通常不这样用
if (res) {
try {
// 执行需要同步的代码
System.out.println("Lock acquired, executing task...");
// 模拟任务执行
Thread.sleep(2000);
} finally {
// 释放锁
lock.unlock();
System.out.println("Lock released");
}
} else {
System.out.println("Failed to acquire lock");
}

// 或者,更常见的用法是直接调用 lock() 方法阻塞等待锁,并在 finally 块中释放锁
lock.lock(); // 阻塞等待锁
try {
// 执行需要同步的代码
System.out.println("Lock acquired, executing task...");
// 模拟任务执行
Thread.sleep(2000);
} finally {
// 释放锁
lock.unlock();
System.out.println("Lock released");
}

// 注意:在 Redisson 中,您还可以设置锁的自动过期时间,以避免死锁
// lock.lock(10, TimeUnit.SECONDS); // 上锁10秒后自动解锁

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
} finally {
// 关闭 Redisson 客户端
redisson.shutdown();
}
}
}

2.3 基于 Zookeeper 的分布式锁

适用于高可靠(高可用),而并发量不是太高的场景

优点:不存在锁失效的问题,不需要续命。

缺点:因为需要频繁的创建和删除节点,性能上不如Redis。

在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper 的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁

Zookeeper第三方客户端curator中已经实现了基于Zookeeper的分布式锁。

2.3.1 基于Zookeeper设计思路一

利用Zookeeper创建临时节点来实现分布式锁,同一路径下的节点名称不能重复,来行防重

  • 同一时刻只能允许一个竞争者获取锁。加锁时我们创建一个临时节点。
  • 当竞争者A加锁成功后,第竞争者B再来加锁就会抛出节点名称不能重复错误,如果抛出这个异常,我们就判定竞争者B加锁失败
  • 竞争者B加锁失败后,会阻塞等待,监听节点状态,当节点数据删除后,也就是竞争者A释放锁,再去获取锁

image-20230228221910782

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ZkDistributedLock extends AbstractLock implements IZkDataListener{

private ZkClient zkClient ;
private String path = "/lock";

private CountDownLatch countDownLatch ;
private String config;


@Override
public boolean tryLock() {
try {
if(zkClient ==null){
//建立连接
zkClient = new ZkClient(config);
}
// 创建临时节点
zkClient.createEphemeral(path);

}catch (Exception e){
//存在节点
return false;
}
return true;
}



@Override
public void waitLock() {
//注册监听
zkClient.subscribeDataChanges(path,this);
if(zkClient.exists(path)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await(); //计数器变为0之前,都会阻塞
// 解除监听
zkClient.unsubscribeDataChanges(path,this);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

@Override
public void unlock() {

if(zkClient !=null){
zkClient.delete(path);
System.out.println("-----释放锁资源----");
}

}

@Override
public void handleDataDeleted(String dataPath) throws Exception {
countDownLatch.countDown();
}

@Override
public void handleDataChange(String dataPath, Object data) throws Exception {

}

}

2.3.2 基于Zookeeper设计思路二

  • 利用Zookeeper创建临时有序节点来实现分布式锁,谁创建的节点序号最小,谁就获得了锁,

  • 其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁

image-20230228221920388

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class ZkDistributedLock2 extends AbstractLock implements IZkDataListener {

private ZkClient zkClient;
private String path;
private String config;

private CountDownLatch countDownLatch ;
private String beforePath; // 当前节点前一个节点
private String currentPath; // 当前节点


public ZkDistributedLock2(String config, String path){
this.path = path;
this.config = config;
zkClient = new ZkClient(config);
if (!zkClient.exists(path)) {
// 如果根节点不存在,则创建根节点
zkClient.createPersistent(path);
}
}

@Override
public void lock(){
//尝试获取锁
boolean locked = tryLock();
if(locked){
System.out.println("---------获取锁---------");
}
while (!locked){
//等待锁 阻塞
waitLock();
//重试
//获取等待的子节点列表
List<String> childrens = zkClient.getChildren(path);
//判断,是否加锁成功
if (checkLocked(childrens)) {
locked = true;
}
}

}



@Override
public boolean tryLock() {
try {
//创建临时有序的节点 -e -s
currentPath = zkClient.createEphemeralSequential(path+"/",null);
//获取到所有子节点
List<String> childrens = zkClient.getChildren(path);
//获取等待的子节点列表,判断自己是否第一个
if (checkLocked(childrens)) {
return true;
}
// 若不是第一个,则找到自己的前一个节点
int index = Collections.binarySearch(childrens,
currentPath.substring(currentPath.lastIndexOf("/") + 1));
if(index<0){
throw new Exception(currentPath+"节点没有找到" );
}
//如果自己没有获得锁,则要监听前一个节点
beforePath = path + "/" + childrens.get(index-1);
}catch (Exception e){
e.printStackTrace();
}
return false;
}


private boolean checkLocked(List<String> childrens) {

//节点按照编号,升序排列
Collections.sort(childrens);

// 如果是第一个,代表自己已经获得了锁
if (currentPath.equals(path + "/" +childrens.get(0))) {
System.out.println("成功的获取分布式锁,节点为"+ currentPath);
return true;
}
return false;
}

@Override
public void waitLock() {
try {
countDownLatch = new CountDownLatch(1);
if(zkClient.exists(beforePath)) {
//订阅比自己次小顺序节点的删除事件 index-1
zkClient.subscribeDataChanges(beforePath, this);
countDownLatch.await();
zkClient.unsubscribeDataChanges(beforePath, this);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}


@Override
public void unlock() {

if(zkClient !=null){
//删除临时节点
zkClient.delete(currentPath,-1);
System.out.println(currentPath+" 节点释放锁资源");
}
}


@Override
public void handleDataChange(String dataPath, Object data) throws Exception {

}

@Override
public void handleDataDeleted(String dataPath) throws Exception {
countDownLatch.countDown(); //减1
}
}

2.3.3 Apache Curator 使用

Apache Curator 是一个开源的客户端库,用于简化 Apache ZooKeeper 的使用。Curator 提供了丰富的 API,其中包括实现分布式锁的功能。Curator 的 InterProcessMutex 类是一个分布式可重入互斥锁的实现,非常适合在分布式系统中用来同步资源访问。

1. 添加 Curator 依赖

首先,你需要在你的项目中添加 Curator 的 Maven 依赖。以下是一个例子:

1
2
3
4
5
6
7
8
9
10
<dependency>  
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>你的版本号</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>你的版本号</version>
</dependency>

请替换 你的版本号 为你想要使用的 Curator 版本。

2. 配置 Curator 客户端

你需要配置一个 Curator 客户端,指定 ZooKeeper 的服务器地址等。

1
2
3
4
5
6
7
8
9
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181", // ZooKeeper 地址
new ExponentialBackoffRetry(1000, 3) // 重试策略
);
client.start();

3. Curator实现分布式锁

使用 Curator 的 InterProcessMutex 类来实现分布式锁。

注意事项

  • 确保 ZooKeeper 服务正在运行。
  • 在生产环境中,使用 Curator 的配置可能需要更多的细化和优化,例如使用更复杂的重试策略或配置连接池。
  • InterProcessMutex 提供了可重入锁的功能,这意味着同一个客户端可以多次获取同一把锁。
  • finally 块中释放锁是非常重要的,以避免死锁。
  • Curator 还提供了其他类型的锁,如 InterProcessSemaphoreMutex(信号量锁)和 InterProcessReadWriteLock(读写锁),你可以根据需要选择使用。

使用 Curator 的分布式锁可以极大地简化在分布式系统中的同步控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.curator.framework.recipes.locks.InterProcessMutex;  
import org.apache.curator.framework.recipes.locks.LockAcquireException;

// 锁的路径(通常是 ZooKeeper 中的一个节点路径)
String lockPath = "/locks/myLock";

// 创建锁对象
InterProcessMutex lock = new InterProcessMutex(client, lockPath);

try {
// 尝试获取锁
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
// 执行你的同步代码
System.out.println("Lock acquired, executing task...");
// 模拟任务执行
Thread.sleep(2000);
} finally {
// 释放锁
lock.release();
System.out.println("Lock released");
}
} else {
// 如果在规定时间内没有获取到锁
System.out.println("Failed to acquire lock");
}
} catch (InterruptedException | LockAcquireException e) {
Thread.currentThread().interrupt();
// 处理异常
}

分布式锁
http://example.com/分布式锁/
作者
Panyurou
发布于
2024年8月1日
许可协议