标签:pre zhang == 发布订阅 之一 keep uuid 备份 Fix
原文:Redis分布式锁在多线程开发中我们使用锁来避免线程争夺共享资源。在分布式系统中,程序在多个节点上运行无法使用单机锁来避免资源竞争,因此我们需要一个锁服务来避免多个节点上的进程争夺资源。
Redis数据库基于内存,具有高吞吐量、便于执行原子性操作等特点非常适合开发对一致性要求不高的锁服务。
本文介绍了简单分布式锁、Redisson分布式锁的实现以及解决单点服务的RedLock分布式锁概念。
Redis是一致性较低的数据库,若对锁服务的一致性要求较高建议使用zookeeper等中间件开发锁服务。
Redis实现分布式锁的原理非常简单, 节点在访问共享资源前先查询redis中是否有该资源对应的锁记录, 若不存在锁记录则写入一条锁记录(即获取锁)随后访问共享资源. 若节点查询到redis中已经存在了资源对应的锁记录, 则放弃操作共享资源.
下面给出一个非常简单的分布式锁示例:
import redis.clients.jedis.Jedis;
import java.util.Random;
import java.util.UUID;
public class MyRedisLock {
private Jedis jedis;
private String lockKey;
private String value;
private static final Integer DEFAULT_TIMEOUT = 30;
private static final String SUFFIX = ":lock";
public MyRedisLock(Jedis jedis) {
this.jedis = jedis;
}
public boolean acquire(String key, long time) throws InterruptedException {
Long outdatedTime = System.currentTimeMillis() + time;
lockKey = key + SUFFIX;
while (true) {
if (System.currentTimeMillis() >= outdatedTime) {
return false;
}
value = UUID.randomUUID().toString(); // 1
return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
}
}
public boolean check() {
return value != null && value.equals(jedis.get(lockKey)); // 3
}
public boolean release() {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
}
}
加锁后所有对共享资源的操作都应该先检查当前线程是否仍持有锁。
在分布式锁的实现中有几点需要注意:
set key value EX seconds NX
命令进行加锁,不要使用setnx和expire两个命令加锁。上文只是提供了简单示例,还有一些重要功能没有实现:
总结来看实现Redis分布式锁有几点需要注意:
这里我们以基于Java的Redisson为例讨论一下成熟的Redis分布式锁的实现。
redisson实现了java.util.concurrent.locks.Lock
接口,可以像使用普通锁一样使用redisson:
RLock lock = redisson.getLock("key");
lock.lock();
try {
// do sth.
} finally {
lock.unlock();
}
分析一下RLock的实现类org.redisson.RedissonLock
:
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
再看等待加锁的方法lockInterruptibly
:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
lockInterruptibly
方法会尝试获取锁,若获取失败则会订阅释放锁的消息。收到锁被释放的通知后再次尝试获取锁,直到成功或者超时。
接下来分析tryAcquire
:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId)); // 调用异步获得锁的实现,使用get(future)实现同步
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 设置了超时时间
if (leaseTime != -1) {
// tryLockInnerAsync 加锁成功返回 null, 加锁失败在 Future 中返回锁记录剩余的有效时间
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 未设置超时时间,尝试获得无限期的锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 避免对共享资源操作完成前锁就被释放掉,定期刷新锁失效的时间
// 默认锁失效时间的三分之一即进行刷新
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryAcquireAsync
中主要逻辑是无限期锁的实现,Redisson并非设置了永久的锁记录,而是定期刷新锁失效的时间。
这种方式避免了持有锁的进程崩溃无法释放锁导致死锁。
真正实现获取锁逻辑的是tryLockInnerAsync
方法:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 写入锁记录, 锁记录是一个hash; key:共享资源名称, field:锁实例名称(Redisson客户端ID:线程ID), value: 1(value是一个计数器,记录当前线程获取该锁的次数,实现可重入锁)
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置锁记录过期时间
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若当前线程已经持有该资源的锁
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将锁计数器加1,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 资源已被其它线程加锁,加锁失败。获取锁剩余生存时间后返回
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述操作使用eval命令执行lua脚本保证了操作的原子性。
解锁过程相对简单:
@Override
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
}
unlockInnerAsync
方法实现了具体的解锁逻辑:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁,可能锁已被超时释放
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁被释放的消息
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 锁的持有者不是自己,抛出异常
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 自己持有锁,因为锁是可重入的将计数器减1
"if (counter > 0) then " + // 计数器大于0,锁未被完全释放,刷新锁过期时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + // 锁被完全释放,删除锁记录,发布锁被释放的消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
基于单点的分布式锁无法解决redis故障的问题. 为了保证redis的可用性我们通常采用主从备份的方法, 即 使用一个master实例和至少一个slave实例.
当有写入请求时先写入master然后写入到所有slave, 当master实例故障时选择一个slave实例升级为master实例继续提供服务.
其中存在的问题是, 写入master和写入slave存在时间差. 若线程A成功将锁记录写入了master, 随后在同步写入slave之前, master故障转移到slave.
因为slave(新master)中没有锁记录, 因此线程B也可以成功加锁, 因此可能出现A和B同时持有锁的错误.
为了解决redis失效可能造成的问题, redis的作者antirez提出了RedLock实现方案:
客户端获取当前时间
客户端尝试获取N个节点的锁, 每个节点使用相同的key和value. 请求超时时间要远小于锁超时时间, 避免在节点或者网络故障时浪费时间.
客户端计算在加锁时消耗的时间, 只有客户端成功获得超过一半节点的锁且总时间小于锁超时间时才能成功加锁. 客户端持有锁的时间为锁超时时间减去加锁消耗的时间.
若获取锁失败则访问所有节点, 发起释放锁的请求.
释放锁时需要向所有Redis节点发出释放锁的请求, 原因在于可能某个Redis实例中成功写入了锁记录, 但是没有响应没有到达客户端.
为了保证所有锁记录都被正确释放, 所以需要向所有Redis实例发送释放请求.
关于RedLock的安全性问题, Martin Kleppmann和作者antirez进行了一些讨论:
关于这场讨论的分析可以参考:
标签:pre zhang == 发布订阅 之一 keep uuid 备份 Fix
原文地址:https://www.cnblogs.com/lonelyxmas/p/10674120.html