标签:动态 多参数 异常 结构 含金量 地址 连接 set latch
业务背景:存储请求参数token ,token唯一 ,且新的生成旧的失效
思路:因为是多台机器,获取token存入redis,保持唯一,考虑使用redis来加锁,其实就是在redis中存一个key,其他机器发现key有值的话就不进行获取token的请求。
SET操作会覆盖原有值,SETEX虽然可设置key过期时间,但也会覆盖原有值,所以考虑可以使用SETNX
SETNX Key value
jedis.setnx("lockName","value"); //这里redis挂掉,就是一个死锁 jedis.expire("lockName",10);
因为这两个操作不具备原子性,所以可能出现死锁,之所以有这样的示例,是因为低版本的redis的SET还不支持多参数命令
从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改 EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。 PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。 NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。 XX :只在键已经存在时,才对键进行设置操作。
这里可以引出 redis正确的加锁示例:
public static boolean lock(Jedis jedis, String lockKey, String uid, int expireTime) { String result = jedis.set(lockKey, uid,"NX" "PX", expireTime); if ("OK".equals(result)) { return true; } return false; }
其实就等于在redis中执行了 :set key value nx px 10000
再来看一下分布式锁的要求:
NX保证互斥性
下面看一个释放锁的错误示例:
public static void wrongUnLock1(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是不是同一个线程 if (requestId.equals(jedis.get(lockKey))) { // lockkey锁失效,下一步删除的就是别人的锁 jedis.del(lockKey); } }
根本问题还是保证操作的原子性,因为是两步操作,即便判断到是当前线程的锁,但是也有可能再删除之前刚好过期,这样删除的就是其他线程的锁。
如果业务要求精细,我们可以使用lua脚本来进行完美解锁
/** * redis可以保证lua中的键的原子操作 unlock:lock调用完之后需unlock,否则需等待lock自动过期 * * @param lock * uid 只有线程已经获取了该锁才能释放它(uid相同表示已获取) */ public void unlock( String lock) { Jedis jedis = new Jedis("localhost"); final String uid= tokenMap.get(); if (StringUtil.isBlank(token)) return; try { final String script = "if redis.call(\"get\",\"" + lock + "\") == \"" + uid + "\"then return redis.call(\"del\",\"" + lock + "\") else return 0 end "; jedis.eval(script); } catch (Exception e) { throw new RedisException("error"); } finally { if (jedis != null) jedis.close(); } }
关于lua:
上面这个脚本很简单
if redis.call(\"get\",\"" + lock + "\") // redisGET命令 == \"" +uid + // 判断是否是当前线程 "\"then return redis.call(\"del\",\"" + lock + "\") // 如果是,执行redis DEL操作,删除锁 else return 0 end
同理我们可以使用lua给线程加锁
local lockkey = KEYS[1] --唯一随机数 local uid = KEYS[2] --失效时间,如果是当前线程,也是续期时间 local time = KEYS[3] if redis.call(‘set‘,lockkey,uid,‘nx‘,‘px‘,time)==‘OK‘ then return ‘OK‘ else if redis.call(‘get‘,lockkey) == uid then if redis.call(‘EXPIRE‘,lockkey,time/1000)==1 then return ‘OOKK‘ end end end
lua脚本也可以通过外部文件读取,方便修改
public void luaUnLock() throws Exception{ Jedis jedis = new Jedis("localhost") ; InputStream input = new FileInputStream("unLock.lua"); byte[] by = new byte[input.available()]; input.read(by); String script = new String(by); Object obj = jedis.eval(script, Arrays.asList("key","123"), Arrays.asList("")); System.out.println("执行结果 " + obj); }
PS:跟同事讨论的时候,想到可不可以利用redis的额事物来解锁,并没有实际使用,怕有坑。
public boolean unLock(Jedis jedis, String lockName, String uid) throws Exception{ jedis.watch(lockName); //这里的判断uid和下面的del虽然不是原子性,有了watch可以保证不会误删锁 if (jedis.get(lockName).equals(uid)) { redis.clients.jedis.Transaction transaction = jedis.multi(); transaction.del(lockName); List<Object> exec = transaction.exec(); if (exec.get(0).equals("OK")) { transaction.close(); return true; } } return false; }
//保存每个线程独有的token private static ThreadLocal<String> tokenMap = new ThreadLocal<>(); /** * 这个例子还不太完善。 * redis实现分布式可重入锁,并不保证在过期时间内完成锁定内的任务,需根据业务逻辑合理分配seconds * * @param lock * 锁的名称 * @param mseconds * 锁定时间,单位 毫秒 * token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取 * */ public boolean lock(final String lock, int mseconds ,Jedis jedis) { // token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取 String token = tokenMap.get(); if (StringUtil.isBlank(token)) { token = UUID.randomUUID().toString().replaceAll("-",""); tokenMap.set(token); } boolean flag = false; try { String ret = jedis.set(lock, token, "NX", "PX", mseconds); if (ret == null) {// 该lock的锁已经存在 String origToken = jedis.get(lock);// 即使lock已经过期也可以 if (token.equals(origToken) || origToken==null) { // token相同默认为同一线程,所以token应该尽量长且随机,保证不同线程的该值不相同 ret = jedis.set(lock, token, "NX", "PX", mseconds);// if ("OK".equalsIgnoreCase(ret)) flag = true; System.out.println("当前线程 " + token); } } else if ("OK".equalsIgnoreCase(ret)) flag = true; System.out.println("当前线程 " + token); } catch (Exception e) { } finally { if (jedis != null) jedis.close(); } return flag; }
继续正题,说到lua脚本 和 可重入锁,就不得不提 redission了
下面分析一下可重入的源码
/** * redission分布式锁-重试时间 秒为单位 * @param lockName 锁名 * @param waitTime 重试时间 * @param leaseTime 锁过期时间 * @return */ public boolean tryLock(String lockName,long waitTime,long leaseTime){ try{ RLock rLock = redissonClient.getLock(lockName); return rLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); }catch (Exception e){ logger.error("redission lock error with waitTime",e); } return false; }
@Override public RLock getLock(String name) { return new RedissonLock(commandExecutor, name, id); }
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 1.尝试获取锁 Long ttl = tryAcquire(leaseTime, unit); // 2.获得锁成功 if (ttl == null) { return; } // 3.等待锁释放,并订阅锁 long threadId = Thread.currentThread().getId(); Future<RedissonLockEntry> future = subscribe(threadId); get(future); try { while (true) { // 4.重试获取锁 ttl = tryAcquire(leaseTime, unit); // 5.成功获得锁 if (ttl == null) { break; } // 6.等待锁释放 if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { // 7.取消订阅 unsubscribe(future, threadId); } }
private Long tryAcquire(long leaseTime, TimeUnit unit) { return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId())); } private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2.用默认的锁超时时间去获取锁 Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // 成功获得锁 if (ttlRemaining == null) { // 3.锁过期时间刷新任务调度 scheduleExpirationRenewal(); } } }); return ttlRemainingFuture; } <T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 3.使用 EVAL 命令执行 Lua 脚本获取锁 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call(‘exists‘, KEYS[1]) == 0) then " + "redis.call(‘hset‘, KEYS[1], ARGV[2], 1); " + "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " + "redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " + "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call(‘pttl‘, KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
public void unlock() { // 1.通过 EVAL 和 Lua 脚本执行 Redis 命令释放锁 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call(‘exists‘, KEYS[1]) == 0) then " + "redis.call(‘publish‘, KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call(‘del‘, KEYS[1]); " + "redis.call(‘publish‘, KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); // 2.非锁的持有者释放锁时抛出异常 if (opStatus == null) { throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 3.释放锁后取消刷新锁失效时间的调度任务 if (opStatus) { cancelExpirationRenewal(); }
可以看到redission最终还是使用了lua脚本来加解锁 :
加锁脚本
if (redis.call(‘exists‘ KEYS[1]) == 0) then + -- exists 判断key是否存在 redis.call(‘hset‘ KEYS[1] ARGV[2] 1); + --如果不存在,hset存哈希表 redis.call(‘pexpire‘ KEYS[1] ARGV[1]); + --设置过期时间 return nil; + -- 返回null 就是加锁成功 end; + if (redis.call(‘hexists‘ KEYS[1] ARGV[2]) == 1) then + -- 如果key存在,查看哈希表中是否存在 redis.call(‘hincrby‘ KEYS[1] ARGV[2] 1); + -- 给哈希中的key加1,代表重入1次,以此类推 redis.call(‘pexpire‘ KEYS[1] ARGV[1]); + -- 重设过期时间 return nil; + end; + return redis.call(‘pttl‘ KEYS[1]); --如果前面的if都没进去,说明ARGV2 的值不同,也就是不是同 一线程的锁,这时候直接返回该锁的过期时间
推荐使用sciTE来编辑lua
解锁的脚本就不分析了,还是操作的redis命令,主要是lua脚本执行的时候能保证原子性。
a = 0 while(a < 3) do print("x = " .. ‘我是循环‘) end
几个lua脚本示例
local key = "rate.limit:" .. KEYS[1] local limit = tonumber(ARGV[1]) local expire_time = ARGV[2] local is_exists = redis.call("EXISTS", key) if is_exists == 1 then if redis.call("INCR", key) > limit then return ‘拒绝访问‘ else return ‘可以访问‘ end else return redis.call("SET", key, "1","NX","PX",expire_time) end
-- 脚本:尝试获得红包,如果成功,则返回json字符串,如果不成功,则返回空 -- 参数:红包队列名, 已消费的队列名,去重的Map名,用户ID -- 返回值:nil 或者 json字符串,包含用户ID:userId,红包ID:id,红包金额:money -- jedis.eval(getScript(), 4, hongBaoList, hongBaoConsumedList, hongBaoConsumedMap, "" + j) if redis.call(‘hexists‘, KEYS[3], KEYS[4]) ~= 0 then return nil else -- 先取出一个小红包 local hongBao = redis.call(‘rpop‘, KEYS[1]) -- hongbao : {"Money":9,"Id":8} if hongBao then local x = cjson.decode(hongBao) -- 加入用户ID信息 x[‘userId‘] = KEYS[4] local re = cjson.encode(x) -- 把用户ID放到去重的set里 redis.call(‘hset‘, KEYS[3], KEYS[4], KEYS[4]) -- 把红包放到已消费队列里 redis.call(‘lpush‘, KEYS[2], re) return re; end end return nil
redis 127.0.0.1:6379> SCRIPT LOAD "return ‘hello moto‘" "232fd51614574cf0867b83d384a5e898cfd24e5a" redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0 "hello moto"
public void scriptLoad()throws Exception{ Jedis jedis = new Jedis("localhost"); //从文件读取lua脚本 InputStream input = new FileInputStream("return.lua"); byte[] by = new byte[input.available()]; input.read(by); byte[] scriptBy = jedis.scriptLoad(by); String sha1 = new String(scriptBy); //直接解析 String sha2 = jedis.scriptLoad("local key1 = KEYS[1]\n" + "local key2 = KEYS[2]\n" + "local argv1 = ARGV[1]\n" + "return \"key1:\"..key1 ..\" key2:\"..key2.. \" argv1:\"..argv1"); System.out.println("sha1 : " + sha1); System.out.println("sha2 : " + sha2); Object obj = jedis.evalsha(sha1, Arrays.asList("value1","value2"), Arrays.asList("value3")); System.out.println("执行结果: "+ obj); }
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
主要内容是以上这些,小弟不才,写的不好,如果大家发现由bug,一定@我
20:49:54
最后是自己学习lua的一些笔记,含金量不高
print("Hello lua!")
print(b)
nil
b=10
print(b)
10
##function
function fun1(n)
if n == 0 then
return 1
else
return n * fun1(n - 1)
end
end
print(fun1(3))
fun2 = fun1
print(fun2(4))
tab = {"Hello","World","hello","lua"}
for k,v in pairs(tab) do
print(k.." "..v)
end
##for循环
for i=1,10
do print(i)
end
--[ 0 为 true ]
if(0)
then
print("0 为 true")
else
print("0 为 false")
end
if(null)
then
print("nil 为 true")
else
print("nil 为 false")
end
标签:动态 多参数 异常 结构 含金量 地址 连接 set latch
原文地址:https://www.cnblogs.com/number7/p/8320259.html