标签:exce factor targe rop nose long 安全 ogg ret
Redis为单进程单线程模式,采用队列模式将并发访问的请求变成串行访问,并且多客户端对Redis的访问不存在竞争关系。
以下将会讲解如何使用Redis实现一个可靠的,自旋分布式锁。以及实现的思路,还有实现时会遇到的常见错误。
当然,这些实现的都是不可重入的。在最后,还会讲一下,实现可重入锁的思路。
Redis提供了一些基本指令可以用来实现分布式锁,例如
SET,SENTX,GETSET,INCR,DEL,GET 等操作,以下是对这些指令的基本用法:
> SET key val [NX|XX] [EX seconds | PX milliseconds]
// 将字符串值key 关联到 value。成功后,返回值为"OK"。后面有两个可选参数
// 可选参数 NX|XX:NX表示只在键不存在时,才对键进行操作,缺省方式是NX。XX表示只在键存在时对键进行操作
// 可选参数 EX|PX:键过期的时间单位,后面跟长整型数字表示过期时间。EX表示秒,PX表示毫秒。缺省不设置过期时间。
> SETNX key val
// 当且仅当key值不存在,将key对应的值设置为value,并且返回1,否则不做任何操作,返回0
> GETSET key val
// 获取key的旧值,并且将新的value放入
> INCR key
// 将key中存储的数字自增1并且返回结果。
> DEL key
// 将对应Key的值删除
为了确保分布式锁可用,我们至少要确保锁的可靠性,要满足一下四个条件:
1)互斥性,在任意时刻,只能有一个客户端(或者说业务请求)获得锁,并且也只能由该客户端请求解锁成功。
2)避免死锁,即使获取了锁的客户端崩溃没有释放锁,也要保证锁正常过期,后续的客户端能正常加锁。
3)容错性,只要大部分Redis节点可用,客户端就能正常加锁。
4)自旋重试,获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。(这一点很重要,我发现网上很多方案并没有把这个功能加上,只尝试一次加锁请求失败就返回了,加了自旋重试更好一些)
这里有三个参数需要考虑,一般来说,设定的值,需要根据实际场景来判断:
锁的过期时间 (EXPIRE_TIME)
太短可能过早的释放锁,造成数据安全问题。太长的话,如果客户端挂掉,会长时间无法释放锁,导致其他客户端锁请求阻塞或者失败(这种场景太少见)
我们一般会预估一下加锁需要进行的操作最长耗时,然后在最长耗时基础上再加一个buffer的时间来确定。(buffer比例多少不确定,这个自行判断吧)需要保证锁在任务执行完之前不会过期。
自旋间隔时间 (WAIT_INTERVAL)
适当间隔就好,一般是50~100ms
获取锁的超时时间 (ACCQUIRE_TIME_OUT)
在激烈的竞争环境下,超时时间设置太短会导致失败次数显著增加。建议至少设置成和锁的过期时间一样。
首先是代码示例,以下是使用了两种方式实现的 Redis锁:
第一种方式是利用了 Redis 的 SET key value [NX|XX] [EX seconds | PX milliseconds]
第二种方式利用了 Redis 的 SETNX key value 和 GETSET key value
/**
* @Author Antony
* @Since 2018/5/25 22:48
*/
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME_SECOND = "EX";
private static final int ACQUIRE_LOCK_TIME_OUT_IN_MS = 5*1000;//获取锁超时时间
private static final int EXPIRE_IN_SECOND = 5; //锁超时时间
private static final int WAIT_INTERVAL_IN_MS = 100; //自旋重试间隔
private static JedisPool jedisPool = JedisPoolFactory.getJedisPool();
/**
* 使用 set key value expireTime 获取锁
* @param lockKey
* @return
*/
public static boolean tryLockWithSet(String lockKey){
boolean flag = false;
long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
try (Jedis jedis = jedisPool.getResource()){
String result;
while (true) {
long now = System.currentTimeMillis();
if(timeoutAt < now){
break;
}
result = jedis.set(lockKey, "", SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME_SECOND, EXPIRE_IN_SECOND);
if(LOCK_SUCCESS.equals(result)){
flag = true;
return flag;
}
TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
}
} catch (InterruptedException e) {
logger.error("accquire redis lock error...", e);
e.printStackTrace();
}
if(!flag){
logger.error("cannot accquire redis lock...");
}
return flag;
}
/**
* 使用 setnx 和 getset 方式获取锁
* @param lockKey
* @return
*/
public static boolean tryLockWithSetnx(String lockKey){
boolean flag = false;
try (Jedis jedis = jedisPool.getResource()) {
long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
while (true){
long now = System.currentTimeMillis();
if