标签:默认值 源码 host server return 语言 oca 错误 als
现如今项目集群化,大部分都是采用了多节点部署。在越来越复杂的业务中,java自身的锁机制已经满足不了现在的业务需求了。所以此时需要分布式锁来避免一些因为并发导致的业务错误。
现在业务中分布式锁主要有redis与zookeeper两种方式实现。
redis锁可以自己手撸也可以直接使用现在比较流行的解决方案,如redisson
本篇在接下来主要介绍redis分布式锁(redisson)的实现,项目github地址为: https://github.com/redisson/redisson
根据readme.md中文档介绍,首先我们需要创建客户端实例
// 是否集群
private boolean cluster;
private String host;
private int port;
private String password;
private int database;
Config config = new Config();
if (cluster) {
String[] preHosts = host.split(",");
String[] hosts = new String[preHosts.length];
for (int i = 0; i < preHosts.length; i++) {
if (!"http://".startsWith(preHosts[i])) {
hosts[i] = "http://" + preHosts[i];
}
}
config.useClusterServers()
.addNodeAddress(hosts)
.setPassword(password);
} else {
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("http://" + host + ":" + port)
.setDatabase(database);
if (!StringUtils.isEmpty(password)) {
singleServerConfig.setPassword(password);
}
}
return Redisson.create(config);
获取到Redisson后,我们便可以使用相关api
RLock lock = client.getLock(lockPath);
boolean locked = false;
try{
locked = lock.tryLock(waitTime, releaseTime, TimeUnit.SECONDS);
if (!locked) {
throw new LockException(String.format("Lock[path=%s] failed", lockPath));
}
// TODO something
} finally {
if (locked) {
try {
lock.unlock();
} catch (Exception e) {
log.error(String.format("An error happened when try to release lock[path=%s], call method forceUnlock", lockPath), e);
lock.forceUnlock();
}
}
}
首先,我们知道redis是支持Lua脚本语言的。
我们知道Lua脚本整体是原子操作,那么我们便可以利用这点来搞一些事情。Redisson也是如此
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
"redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
"redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call(‘pttl‘, KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上图代码摘自Redisson源码中RedissonLock类,我们可以从代码中看出Redisson使用了一段Lua脚本来进行了一些reids操作。
# KEYS[1]为之前client.getLock(lockPath)的lockPath
# ARGV[1]为internalLockLeaseTime 默认值是30s
# ARGV[2]为getLockName(threadId) 代表的是 id:threadId 用锁对象id+线程id, 表示当前访问线程,用于区分不同服务器上的线程。
# 判断当前节点是否存在
if (redis.call(‘exists‘, KEYS[1]) == 0) then
# 若不存在
# redis hash 将当前lockPath作为key添加到reids,value为[getLockName(threadId), 1]键值对 表示此线程的重入次数为1
redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1);
# 设置过期时间为传入参数internalLockLeaseTime,默认值是30s
redis.call(‘pexpire‘, KEYS[1], ARGV[1]);
# 结束操作
return nil;
end;
# 判断当前节点下的key是否是getLockName(threadId),即检测是否是当前线程持有锁
if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then
# 如果是,则该线程的重入次数累加1
redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1);
# 重置过期时间
redis.call(‘pexpire‘, KEYS[1], ARGV[1]);
# 结束操作
return nil;
end;
# 如果都存在,则返回当前节点的过期时间
return redis.call(‘pttl‘, KEYS[1]);
可以通过lock.unlock()往下查看到此段代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call(‘del‘, KEYS[1]); " +
"redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Lua脚本分析:
# KEYS[1] 表示的是getName() 为之前client.getLock(lockPath)的lockPath
# KEYS[2] 表示getChanelName() 表示的是发布订阅过程中使用的Chanel
# ARGV[1] 表示的是LockPubSub.unLockMessage 是解锁消息,实际代表的是数字 0,代表解锁消息
# ARGV[2] 表示的是internalLockLeaseTime 默认的有效时间 30s
# ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是当前锁id+线程id
# 判断当前节点下的key是否是getLockName(thread.currentThread().getId()),即检测是否是当前线程持有锁
if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then
# 如果不是,结束操作
return nil;
end;
# 获取当前节点下当前线程的重入次数,并减一
local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1);
# 若重入次数仍然大于0,则代表当前锁下还有其他任务在执行
if (counter > 0) then
# 重置过期时间
redis.call(‘pexpire‘, KEYS[1], ARGV[2]);
# 返回0 结束操作
return 0;
else
# 若减一后,当前线程下已经没有任务执行了,则删除当前节点
redis.call(‘del‘, KEYS[1]);
# 发布当前锁消息
redis.call(‘publish‘, KEYS[2], ARGV[1]);
# 返回1 结束操作
return 1;
end;
# 返回nil 结束操作
return nil;
可通过lock.forceUnlock()方法往下查看到此段代码
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call(‘del‘, KEYS[1]) == 1) then "
+ "redis.call(‘publish‘, KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}
Lua脚本分析
# 强行删除当前节点
if (redis.call(‘del‘, KEYS[1]) == 1) then
# 发布当前锁消息
redis.call(‘publish‘, KEYS[2], ARGV[1]);
# 返回1 结束操作
return 1
else
# 返回0 结束操作
return 0
end
针对redisson的redis锁与curator的zookeeper锁做了简单的封装,github项目地址如下:
https://github.com/loveowie/distributed-lock.git
标签:默认值 源码 host server return 语言 oca 错误 als
原文地址:https://www.cnblogs.com/heyouxin/p/13564217.html