标签:结束 evo semaphore org 数据对象 original utils 避免 adl
在许多场景中,数据一致性是一个比较重要的话题,在单机环境中,我们可以通过Java提供的并发API来解决;而在分布式环境(会遇到网络故障、消息重复、消息丢失等各种问题)下要复杂得多,常见的解决方案是分布式事务、分布式锁等。
本文主要探讨如何利用Zookeeper来实现分布式锁。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
在实现分布式锁的过程中需要注意的:
在使用分布式锁时需要注意:
以下是几种常见的实现分布式锁的方案及其优缺点。
1. 基于数据库表
最简单的方式可能就是直接创建一张锁表,当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。给某字段添加唯一性约束,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
会引入数据库单点、无失效时间、不阻塞、不可重入等问题。
2. 基于数据库排他锁
如果使用的是MySql的InnoDB引擎,在查询语句后面增加for update
,数据库会在查询过程中(须通过唯一索引查询)给数据库表增加排他锁,我们可以认为获得排它锁的线程即可获得分布式锁,通过 connection.commit() 操作来释放锁。
会引入数据库单点、不可重入、无法保证一定使用行锁(部分情况下MySQL自动使用表锁而不是行锁)、排他锁长时间不提交导致占用数据库连接等问题。
3. 数据库实现分布式锁总结
优点:
缺点:
相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点。目前有很多成熟的缓存产品,包括Redis、memcached、tair等。
这里以Redis为例举出几种实现方法:
1. 基于 redis 的 setnx()、expire() 方法做分布式锁
setnx 的含义就是 SET if Not Exists
,其主要有两个参数 setnx(key, value)
。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire() 来对 key 设置。
2. 基于 redis 的 setnx()、get()、getset()方法做分布式锁
getset 这个命令主要有两个参数 getset(key,newValue)
,该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。
3. 基于 Redlock 做分布式锁
Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)
4. 基于 redisson 做分布式锁
redisson 是 redis 官方的分布式锁组件,GitHub 地址:https://github.com/redisson/redisson
基于缓存实现分布式锁总结
优点:
缺点:
大致思想为:每个客户端对某个方法加锁时,在 Zookeeper 上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个临时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题
Zookeeper实现分布式锁总结
优点:
缺点:
下面讲如何实现排他锁和共享锁,以及如何解决羊群效应。
排他锁,又称写锁或独占锁。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取或更新操作,其他任务事务都不能对这个数据对象进行任何操作,直到T1释放了排他锁。
排他锁核心是保证当前有且仅有一个事务获得锁,并且锁释放之后,所有正在等待获取锁的事务都能够被通知到。
Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。可以利用Zookeeper这个特性,实现排他锁。
create
方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况基于Zookeeper实现排他锁流程:
共享锁,又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。
共享锁与排他锁的区别在于,加了排他锁之后,数据对象只对当前事务可见,而加了共享锁之后,数据对象对所有事务都可见。
/lockpath/[hostname]-请求类型-序号
的临时顺序节点create
方法创建表示锁的临时顺序节点,如果是读请求,则创建 /lockpath/[hostname]-R-序号
节点,如果是写请求则创建 /lockpath/[hostname]-W-序号
节点/lockpath
节点下的所有子节点,并对该节点注册子节点变更的Watcher监听基于Zookeeper实现共享锁流程:
在实现共享锁的 "判断读写顺序" 的第1个步骤是:创建完节点后,获取 /lockpath
节点下的所有子节点,并对该节点注册子节点变更的Watcher监听。这样的话,任何一次客户端移除共享锁之后,Zookeeper将会发送子节点变更的Watcher通知给所有机器,系统中将有大量的 "Watcher通知" 和 "子节点列表获取" 这个操作重复执行,然后所有节点再判断自己是否是序号最小的节点(写请求)或者判断比自己序号小的子节点是否都是读请求(读请求),从而继续等待下一次通知。
然而,这些重复操作很多都是 "无用的",实际上每个锁竞争者只需要关注序号比自己小的那个节点是否存在即可
当集群规模比较大时,这些 "无用的" 操作不仅会对Zookeeper造成巨大的性能影响和网络冲击,更为严重的是,如果同一时间有多个客户端释放了共享锁,Zookeeper服务器就会在短时间内向其余客户端发送大量的事件通知--这就是所谓的 "羊群效应"。
改进后的分布式锁实现:
具体实现如下:
create
方法创建一个类似于 /lockpath/[hostname]-请求类型-序号
的临时顺序节点getChildren
方法获取所有已经创建的子节点列表(这里不注册任何Watcher)exist
来对比自己小的那个节点注册WatcherZookeeper羊群效应改进前后Watcher监听图
Apache Curator是一个Zookeeper的开源客户端,它提供了Zookeeper各种应用场景(Recipe,如共享锁服务、master选举、分布式计数器等)的抽象封装,接下来将利用Curator提供的类来实现分布式锁。
Curator提供的跟分布式锁相关的类有5个,分别是:
关于错误处理:还是强烈推荐使用ConnectionStateListener处理连接状态的改变。当连接LOST时你不再拥有锁。
Shared Reentrant Lock,全局可重入锁,所有客户端都可以请求,同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类 InterProcessMutex
来实现,它的主要方法:
// 构造方法
public InterProcessMutex(CuratorFramework client, String path)
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
// 通过acquire获得锁,并提供超时机制:
public void acquire() throws Exception
public boolean acquire(long time, TimeUnit unit) throws Exception
// 撤销锁
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
定义一个 FakeLimitedResource 类来模拟一个共享资源,该资源一次只能被一个线程使用,直到使用结束,下一个线程才能使用,否则会抛出异常
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
// 模拟只能单线程操作的资源
public void use() throws InterruptedException {
if (!inUse.compareAndSet(false, true)) {
// 在正确使用锁的情况下,此异常不可能抛出
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (100 * Math.random()));
} finally {
inUse.set(false);
}
}
}
下面的代码将创建 N 个线程来模拟分布式系统中的节点,系统将通过 InterProcessMutex 来控制对资源的同步使用;每个节点都将发起10次请求,完成 请求锁--访问资源--再次请求锁--释放锁--释放锁
的过程;客户端通过 acquire
请求锁,通过 release
释放锁,获得几把锁就要释放几把锁;这个共享资源一次只能被一个线程使用,如果控制同步失败,将抛异常。
public class SharedReentrantLockTest {
private static final String lockPath = "/testZK/sharedreentrantlock";
private static final Integer clientNums = 5;
final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的资源
private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework client = ZKUtils.getClient();
client.start();
Random random = new Random();
try {
final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 每个客户端请求10次共享资源
for (int j = 0; j < 10; j++) {
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥锁");
}
try {
System.out.println(j + ". " + clientName + " 已获取到互斥锁");
resource.use(); // 使用资源
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥锁");
}
System.out.println(j + ". " + clientName + " 已再次获取到互斥锁");
lock.release(); // 申请几次锁就要释放几次锁
} finally {
System.out.println(j + ". " + clientName + " 释放互斥锁");
lock.release(); // 总是在finally中释放
}
Thread.sleep(random.nextInt(100));