码迷,mamicode.com
首页 > 其他好文 > 详细

redis实现分布式锁

时间:2020-01-22 18:22:45      阅读:80      评论:0      收藏:0      [点我收藏+]

标签:exce   factor   targe   rop   nose   long   安全   ogg   ret   

Redis为单进程单线程模式,采用队列模式将并发访问的请求变成串行访问,并且多客户端对Redis的访问不存在竞争关系。

以下将会讲解如何使用Redis实现一个可靠的,自旋分布式锁。以及实现的思路,还有实现时会遇到的常见错误。
当然,这些实现的都是不可重入的。在最后,还会讲一下,实现可重入锁的思路。

 


实现原理

 

Redis操作

Redis提供了一些基本指令可以用来实现分布式锁,例如
SET,SENTX,GETSET,INCR,DEL,GET 等操作,以下是对这些指令的基本用法:

>  SET key val [NX|XX] [EX seconds | PX milliseconds]
// 将字符串值key 关联到 value。成功后,返回值为"OK"。后面有两个可选参数
// 可选参数 NX|XX:NX表示只在键不存在时,才对键进行操作,缺省方式是NX。XX表示只在键存在时对键进行操作
// 可选参数 EX|PX:键过期的时间单位,后面跟长整型数字表示过期时间。EX表示秒,PX表示毫秒。缺省不设置过期时间。

>  SETNX key val 
// 当且仅当key值不存在,将key对应的值设置为value,并且返回1,否则不做任何操作,返回0

>  GETSET key val
// 获取key的旧值,并且将新的value放入

>  INCR key
// 将key中存储的数字自增1并且返回结果。

>  DEL key
// 将对应Key的值删除

 

锁的可靠性

为了确保分布式锁可用,我们至少要确保锁的可靠性,要满足一下四个条件:

1)互斥性,在任意时刻,只能有一个客户端(或者说业务请求)获得锁,并且也只能由该客户端请求解锁成功。
2)避免死锁,即使获取了锁的客户端崩溃没有释放锁,也要保证锁正常过期,后续的客户端能正常加锁。
3)容错性,只要大部分Redis节点可用,客户端就能正常加锁。
4)自旋重试,获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。(这一点很重要,我发现网上很多方案并没有把这个功能加上,只尝试一次加锁请求失败就返回了,加了自旋重试更好一些)

 

参数设置

这里有三个参数需要考虑,一般来说,设定的值,需要根据实际场景来判断:

  • 锁的过期时间 (EXPIRE_TIME)
    太短可能过早的释放锁,造成数据安全问题。太长的话,如果客户端挂掉,会长时间无法释放锁,导致其他客户端锁请求阻塞或者失败(这种场景太少见)
    我们一般会预估一下加锁需要进行的操作最长耗时,然后在最长耗时基础上再加一个buffer的时间来确定。(buffer比例多少不确定,这个自行判断吧)需要保证锁在任务执行完之前不会过期。

  • 自旋间隔时间 (WAIT_INTERVAL)
    适当间隔就好,一般是50~100ms

  • 获取锁的超时时间 (ACCQUIRE_TIME_OUT)
    在激烈的竞争环境下,超时时间设置太短会导致失败次数显著增加。建议至少设置成和锁的过期时间一样。


如何实现

 

代码示例

首先是代码示例,以下是使用了两种方式实现的 Redis锁:
第一种方式是利用了 Redis 的 SET key value [NX|XX] [EX seconds | PX milliseconds]
第二种方式利用了 Redis 的 SETNX key value 和 GETSET key value

/**
 * @Author Antony
 * @Since 2018/5/25 22:48
 */
public class RedisLock {

    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME_SECOND = "EX";

    private static final int ACQUIRE_LOCK_TIME_OUT_IN_MS = 5*1000;//获取锁超时时间
    private static final int EXPIRE_IN_SECOND = 5;                  //锁超时时间
    private static final int WAIT_INTERVAL_IN_MS = 100;             //自旋重试间隔

    private static JedisPool jedisPool = JedisPoolFactory.getJedisPool();


    /**
     * 使用 set key value expireTime 获取锁
     * @param lockKey
     * @return
     */
    public static boolean tryLockWithSet(String lockKey){
        boolean flag = false;
        long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS;    //此次获取锁的超时时间点
        try (Jedis jedis = jedisPool.getResource()){
            String result;
            while (true) {
                long now = System.currentTimeMillis();
                if(timeoutAt < now){
                    break;
                }
                result = jedis.set(lockKey, "", SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME_SECOND, EXPIRE_IN_SECOND);
                if(LOCK_SUCCESS.equals(result)){
                    flag = true;
                    return flag;
                }
                TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
            }

        } catch (InterruptedException e) {
            logger.error("accquire redis lock error...", e);
            e.printStackTrace();
        }

        if(!flag){
            logger.error("cannot accquire redis lock...");
        }

        return flag;
    }

    /**
     * 使用 setnx 和 getset 方式获取锁
     * @param lockKey
     * @return
     */
    public static boolean tryLockWithSetnx(String lockKey){
        boolean flag = false;
        try (Jedis jedis = jedisPool.getResource()) {
            long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS;    //此次获取锁的超时时间点
            while (true){
                long now = System.currentTimeMillis();
                if(timeoutAt < now){
                    break;
                }

                String expireAt = String.valueOf(now + EXPIRE_IN_SECOND*1000);  //过期时间戳作为value
                long ret = jedis.setnx(lockKey, expireAt);
                if(ret == 1){//已取得锁
                    flag = true;
                    return flag;
                }else {
                    // 未获取锁,尝试重新获取
                    // 此处使用double check 的思想,防止多线程同时竞争到锁
                    // 1) 先获取上一个锁的过期时间,校验当前是否过期。
                    // 2) 如果过期了,尝试使用getset方式获取锁。此处可能存在多个线程同时执行到的情况。
                    // 3) getset更新过期时间,并且获取上一个锁的过期时间。
                    // 4) 如果getset获取到的oldExpireAt 已过期,说明获取锁成功。
                    //    如果和当前比未过期,说明已经有另一个线程提前获取到了锁
                    //    这样也没问题,只是短暂的将上一个锁稍微延后一点时间(只有在A和B线程同时执行到getset时,才会出现,延长的时间很短)
                    String oldExpireAt = jedis.get(lockKey);
                    if(oldExpireAt != null && Long.valueOf(oldExpireAt) < now){
                        oldExpireAt = jedis.getSet(lockKey, expireAt);
                        if(Long.parseLong(oldExpireAt) < now){
                            flag = true;
                            return flag;
                        }
                    }
                }

                TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
            }
        } catch (InterruptedException e) {
            logger.error("accquire redis lock error...", e);
            e.printStackTrace();
        }

        if(!flag){
            logger.error("cannot accquire redis lock...");
        }

        return flag;
    }

    /**
     * 释放锁
     * @param lockKey
     */
    public static void unLock(String lockKey){
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(lockKey);
        }
    }

}

 

思路详解

1)第一种方式,tryLockWithSet 是使用了 Redis set 的同时指定过期时间的功能。
这个方式的特点就是,简单有效,并且只有一个指令操作。一般也推荐这么使用。

注意,有一种常见的错误方式是使用 setnxexpire 组合实现加锁,这是两个操作,并没有保证原子性。如果客户端在setnx之后崩溃,那么将导致锁无法释放
错误代码如下:

    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
        jedis.expire(lockKey, expireTime);
    }

 

2)第二种方式,tryLockWithSetnx,是把锁的过期时间,当做value存储起来。
这个方式,解决了刚才提出的setnx 和 expire 操作无法保证原子性的问题,虽然使用了setnx操作,但是没有给redis的key设置过期时间。而是把该锁的过期时间作为value保存,在获取锁的时候判断是否过期期并抢占锁。这就需要保证 各个客户端的系统时间都严格一致,不然锁的持有时间就无法真正保证。

在这里简单解释一下部分核心逻辑,主要是获取锁失败的重试阶段:

  • 如果锁获取失败,代表当前已有其他客户端持有锁,那么就根据key获取value,得到该锁的过期时间和当前时间比较,可以知道是锁否过期。如果没过期,则进入下一个自旋。

  • 如果过期,则使用getset操作,尝试抢占锁。该操作将当前锁的过期时间放入,成功后将旧值返回,并进行再一次check,确认是否拿到锁。
    注意:这里可能会出现竞争,两个线程get到旧值后都判断过期,然后都执行了getset操作。
    关键在于getset拿到的value后,进行的再一次check,和当前时间判断,替换掉的旧值是否是已过期的值。如果小于当前时间,则表示替换掉的是已过期的锁。获取锁成功。如果判断没有小于,则表示替换掉的是另一个线程设置进去的值,进入下一个自旋。
    尽管执行成功了getset操作,这也只是将上一个成功拿到的锁过期时间稍微延迟,这个延迟时间很小,可以忽略不计。

举个栗子
线程A和B尝试setnx失败,然后同时拿到了value,并且都发现过期,然后都尝试进行getset操作。A线程先执行了getset操作,获取锁成功。B线程后执行了getset操作,那么B执行的就是把A的过期时间拿到,然后把自己的过期时间设置过去。这样的操作相当于把A的锁过期时间重置。
由于A和B同时到达了竞态条件,那么这两个尝试设置的过期时间也不会相差太大,差别可以忽略不计。

 

可重入锁

上面的实现方式,都是不可重入的分布式锁,任何重入锁的尝试都会导致死锁的发生。导致响应超时。

那么,要实现分布式锁的可重入,那就需要设计的可以存储更多信息。
目前我知道的有两种方式(只提供思路):

1)此种方式实现较为简单:value中多存储一个 全局唯一的requestId,代表客户端请求标识。具体可以使用UUID。在重入的情况下使用同一个UUID,就能判断是否是一个请求的锁重入,从而获取锁。

2)存储锁的重入次数,以及分布式环境下唯一的线程标识。
如何在分布式线程中标识唯一线程:
MAC地址 + jvm进程ID + 线程ID(或者线程地址都行),三者结合即可唯一分布式环境中的线程。
锁的信息采用json,存储格式如下:

{
    "count":1,
    "expireAt":147506817232,
    "jvmPid":22224,
    "mac":"28-D2-44-0E-0D-9A",
    "threadId":14
}

 

转载:https://www.jianshu.com/p/1c5c1a592088

redis实现分布式锁

标签:exce   factor   targe   rop   nose   long   安全   ogg   ret   

原文地址:https://www.cnblogs.com/minikobe/p/12228998.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!