[TOC]
1 什么是分布式锁
在单体的应用开发场景中涉及并发同步的时候,大家往往采用Synchronized(同步)或者其他同一个JVM内Lock机制来解决多线程间的同步问题。
在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。
2 分布式锁的实现
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

2.1 基于数据库的分布式锁
基于数据库实现分布式锁主要是利用数据库的唯一索引来实现,唯一索引天然具有排他性,这刚好符合我们对锁的要求
2.1.1 设计原理
- 同一时刻只能允许一个竞争者获取锁。加锁时我们在数据库中插入一条记录,利用唯一键进行防重。
- 当竞争者A加锁成功后,第竞争者B再来加锁就会抛出唯一索引冲突,如果抛出这个异常,我们就判定竞争者B加锁失败
- 竞争者B加锁失败后,会阻塞等待,一直到竞争者A释放锁(也就是删除记录后),再去获取锁

2.1.2 实现注意事项
- 没有锁超时机制。如果程序发生了异常,将无法删除数据,也就是锁无法被释放掉,需要自己写一套锁超时机制,比如:在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据。
- 基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,可以考虑实现数据库的高可用方案。
- 需要自旋实现阻塞效果。当获取锁失败时自旋转。
- 如果使用数据库自增 id ,规律太明显。
- 受单表数据量的限制。 在高并发场景下,我们都知道 MySQL 的单张表根本不可能容纳大量数据(性能等原因的限制);如果是将单表拆成多表,还是用数据库自增 id 的话,就存在了 id 重复的情况了,很显然这是业务不允许的。
db操作性能较差,并且有锁表的风险,一般不考虑。
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| */ public abstract class AbstractLock implements Lock{
@Override public void lock() { if(tryLock()){ System.out.println("---------获取锁---------"); }else { waitLock(); lock(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class MysqlDistributedLock extends AbstractLock {
@Autowired private MethodlockMapper baseMapper;
@Override public boolean tryLock() { try { baseMapper.insert(new Methodlock("lock")); }catch (Exception e){ return false; } return true; }
@Override public void waitLock() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }
@Override public void unlock() { baseMapper.deleteByMethodlock("lock"); System.out.println("-------释放锁------"); }
|
2.2 基于 Redis 的分布式锁
效率最高,加锁速度最快,因为Redis几乎都是纯内存操作。
适用于并发量很大、性能要求很高而可靠性问题可以通过其 他方案去弥补的场景。
2.2.1 设计原理
SETNX key value: 只有当key不存在时才会执行成功,如果key已经存在则命令执行失败。
2.2.2 实现注意事项
Redis 的 increment 并不能满足安全性,如果使用它需要特殊处理增加复杂性,如:
- ID的组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
需要设置一个超时时间,因为有可能宕机或者被运维重启了,无法释放锁,但是这个超时时间的长短却不好确定:
- 设置过短,会导致其他线程提前获得锁,引发线程安全问题
- 设置过长,线程需要额外等待
如果业务执行时间> 过期时间,就需要锁续命:搞一个定时任务,设一个间隔时间,小于失效时间,过一段时间去监控业务是否执行完了,执行没结束,也就是锁还没释放,我就再把锁的过期时间重新设置成初始值
避免线程A释放线程B的锁,需要在释放锁时多加一个判断,每个线程只释放自己的锁,不能释放别人的锁!可以给每个线程分配一个唯一的UUID
需要确保锁可以正常释放
- 使用Lua脚本获取和删除锁,保证原子性
- 必须使用try catch 在finally中释放锁,否则有异常,锁便没法释放,其他线程进来就会一直执行失败
需要支持锁重入,同一个线程可能多次获取同一把锁
需要重试机制,避免锁只尝试一次就返回false
如果Redis提供了主从集群,那么需要考虑主从一致性。因为主从同步存在延迟,当主宕机时,在主节点中的锁数据并没有及时同步到从节点中,则会导致其他线程也能获得锁,引发线程安全问题(延迟时间是在毫秒以下的,所以这种情况概率极低)
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Service public class RedisLockService {
@Autowired private RedisTemplate<String, String> redisTemplate;
private static final String LOCK_PREFIX = "lock:";
public boolean tryLock(String lockKey, String requestId, long expireTime) { String key = LOCK_PREFIX + lockKey; Boolean result = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTime, TimeUnit.MILLISECONDS); return Boolean.TRUE.equals(result); }
public boolean releaseLock(String lockKey, String requestId) { String key = LOCK_PREFIX + lockKey; String lockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; DefaultRedisScript<Long> releaseLockScript = new DefaultRedisScript<>(lockScript, Long.class); Long result = redisTemplate.execute(releaseLockScript, Collections.singletonList(key), requestId); return result != null && result == 1; } }
|
2.2.4 redison使用
Redisson是一个基于Redis的分布式Java对象和服务的框架,它提供了丰富的功能来支持各种分布式系统场景。Redisson提供了多种分布式锁的实现,包括可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)等,用于在分布式环境中实现互斥访问

代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class RedissonLockExample {
private static final String REDIS_URL = "redis://127.0.0.1:6379";
public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress(REDIS_URL); RedissonClient redisson = Redisson.create(config); String lockKey = "myLock";
RLock lock = redisson.getLock(lockKey);
try { boolean res = lock.tryLock(10, 10, TimeUnit.SECONDS); if (res) { try { System.out.println("Lock acquired, executing task..."); Thread.sleep(2000); } finally { lock.unlock(); System.out.println("Lock released"); } } else { System.out.println("Failed to acquire lock"); }
lock.lock(); try { System.out.println("Lock acquired, executing task..."); Thread.sleep(2000); } finally { lock.unlock(); System.out.println("Lock released"); }
} catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { redisson.shutdown(); } } }
|
2.3 基于 Zookeeper 的分布式锁
适用于高可靠(高可用),而并发量不是太高的场景
优点:不存在锁失效的问题,不需要续命。
缺点:因为需要频繁的创建和删除节点,性能上不如Redis。
在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper 的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁
Zookeeper第三方客户端curator中已经实现了基于Zookeeper的分布式锁。
2.3.1 基于Zookeeper设计思路一
利用Zookeeper创建临时节点来实现分布式锁,同一路径下的节点名称不能重复,来行防重
- 同一时刻只能允许一个竞争者获取锁。加锁时我们创建一个临时节点。
- 当竞争者A加锁成功后,第竞争者B再来加锁就会抛出节点名称不能重复错误,如果抛出这个异常,我们就判定竞争者B加锁失败
- 竞争者B加锁失败后,会阻塞等待,监听节点状态,当节点数据删除后,也就是竞争者A释放锁,再去获取锁

代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| public class ZkDistributedLock extends AbstractLock implements IZkDataListener{
private ZkClient zkClient ; private String path = "/lock";
private CountDownLatch countDownLatch ; private String config;
@Override public boolean tryLock() { try { if(zkClient ==null){ zkClient = new ZkClient(config); } zkClient.createEphemeral(path);
}catch (Exception e){ return false; } return true; }
@Override public void waitLock() { zkClient.subscribeDataChanges(path,this); if(zkClient.exists(path)){ countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); zkClient.unsubscribeDataChanges(path,this); } catch (InterruptedException e) { e.printStackTrace(); } }
}
@Override public void unlock() {
if(zkClient !=null){ zkClient.delete(path); System.out.println("-----释放锁资源----"); }
} @Override public void handleDataDeleted(String dataPath) throws Exception { countDownLatch.countDown(); }
@Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
}
|
2.3.2 基于Zookeeper设计思路二

代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| public class ZkDistributedLock2 extends AbstractLock implements IZkDataListener {
private ZkClient zkClient; private String path; private String config;
private CountDownLatch countDownLatch ; private String beforePath; private String currentPath;
public ZkDistributedLock2(String config, String path){ this.path = path; this.config = config; zkClient = new ZkClient(config); if (!zkClient.exists(path)) { zkClient.createPersistent(path); } }
@Override public void lock(){ boolean locked = tryLock(); if(locked){ System.out.println("---------获取锁---------"); } while (!locked){ waitLock(); List<String> childrens = zkClient.getChildren(path); if (checkLocked(childrens)) { locked = true; } }
}
@Override public boolean tryLock() { try { currentPath = zkClient.createEphemeralSequential(path+"/",null); List<String> childrens = zkClient.getChildren(path); if (checkLocked(childrens)) { return true; } int index = Collections.binarySearch(childrens, currentPath.substring(currentPath.lastIndexOf("/") + 1)); if(index<0){ throw new Exception(currentPath+"节点没有找到" ); } beforePath = path + "/" + childrens.get(index-1); }catch (Exception e){ e.printStackTrace(); } return false; }
private boolean checkLocked(List<String> childrens) {
Collections.sort(childrens);
if (currentPath.equals(path + "/" +childrens.get(0))) { System.out.println("成功的获取分布式锁,节点为"+ currentPath); return true; } return false; }
@Override public void waitLock() { try { countDownLatch = new CountDownLatch(1); if(zkClient.exists(beforePath)) { zkClient.subscribeDataChanges(beforePath, this); countDownLatch.await(); zkClient.unsubscribeDataChanges(beforePath, this); } } catch (InterruptedException e) { e.printStackTrace(); } }
@Override public void unlock() {
if(zkClient !=null){ zkClient.delete(currentPath,-1); System.out.println(currentPath+" 节点释放锁资源"); } }
@Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { countDownLatch.countDown(); } }
|
2.3.3 Apache Curator 使用
Apache Curator 是一个开源的客户端库,用于简化 Apache ZooKeeper 的使用。Curator 提供了丰富的 API,其中包括实现分布式锁的功能。Curator 的 InterProcessMutex
类是一个分布式可重入互斥锁的实现,非常适合在分布式系统中用来同步资源访问。
1. 添加 Curator 依赖
首先,你需要在你的项目中添加 Curator 的 Maven 依赖。以下是一个例子:
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>你的版本号</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>你的版本号</version> </dependency>
|
请替换 你的版本号
为你想要使用的 Curator 版本。
2. 配置 Curator 客户端
你需要配置一个 Curator 客户端,指定 ZooKeeper 的服务器地址等。
1 2 3 4 5 6 7 8 9
| import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; CuratorFramework client = CuratorFrameworkFactory.newClient( "127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3) ); client.start();
|
3. Curator实现分布式锁
使用 Curator 的 InterProcessMutex
类来实现分布式锁。
注意事项
- 确保 ZooKeeper 服务正在运行。
- 在生产环境中,使用 Curator 的配置可能需要更多的细化和优化,例如使用更复杂的重试策略或配置连接池。
InterProcessMutex
提供了可重入锁的功能,这意味着同一个客户端可以多次获取同一把锁。
- 在
finally
块中释放锁是非常重要的,以避免死锁。
- Curator 还提供了其他类型的锁,如
InterProcessSemaphoreMutex
(信号量锁)和 InterProcessReadWriteLock
(读写锁),你可以根据需要选择使用。
使用 Curator 的分布式锁可以极大地简化在分布式系统中的同步控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.LockAcquireException;
String lockPath = "/locks/myLock";
InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { if (lock.acquire(10, TimeUnit.SECONDS)) { try { System.out.println("Lock acquired, executing task..."); Thread.sleep(2000); } finally { lock.release(); System.out.println("Lock released"); } } else { System.out.println("Failed to acquire lock"); } } catch (InterruptedException | LockAcquireException e) { Thread.currentThread().interrupt(); }
|