标签:作者 doc 为什么 sync 存在 当前时间 部署 情况 等于
本篇文章主要介绍基于Redis的分布式锁实现到底是怎么一回事,其中参考了许多大佬写的文章,算是对分布式锁做一个总结
在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问,Java中我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。但是现在公司都是流行分布式架构,在分布式环境下,如何保证不同节点的线程同步执行呢?
实际上,对于分布式场景,我们可以使用分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。
比如说在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。大致意思如下图所示(不一定准确):
分布式锁一般有如下的特点:
我们一般实现分布式锁有以下几种方式:
本篇文章主要介绍基于Redis如何实现分布式锁
Redis的SETNX命令,setnx key value,将key设置为value,当键不存在时,才能成功,若键存在,什么也不做,成功返回1,失败返回0 。 SETNX实际上就是SET IF NOT Exists的缩写
因为分布式锁还需要超时机制,所以我们利用expire命令来设置,所以利用setnx+expire命令的核心代码如下:
public boolean tryLock(String key,String requset,int timeout) { Long result = jedis.setnx(key, requset); // result = 1时,设置成功,否则设置失败 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } }
实际上上面的步骤是有问题的,setnx和expire是分开的两步操作,不具有原子性,如果执行完第一条指令应用异常或者重启了,锁将无法过期。
一种改善方案就是使用Lua脚本来保证原子性(包含setnx和expire两条指令)
代码如下
public boolean tryLock_with_lua(String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call(‘setnx‘,KEYS[1],ARGV[1]) == 1 then" + "redis.call(‘expire‘,KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判断是否成功 return result.equals(1L); }
Redis在 2.6.12 版本开始,为 SET 命令增加一系列选项:
SET key value[EX seconds][PX milliseconds][NX|XX]
set命令的nx选项,就等同于setnx命令,代码过程如下:
public boolean tryLock_with_set(String key, String UniqueId, int seconds) { return "OK".equals(jedis.set(key, UniqueId, "NX", "EX", seconds)); }
value必须要具有唯一性,我们可以用UUID来做,设置随机字符串保证唯一性,至于为什么要保证唯一性?假如value不是随机字符串,而是一个固定值,那么就可能存在下面的问题:
所以通常来说,在释放锁时,我们需要对value进行验证
释放锁时需要验证value值,也就是说我们在获取锁的时候需要设置一个value,不能直接用del key这种粗暴的方式,因为直接del key任何客户端都可以进行解锁了,所以解锁时,我们需要判断锁是否是自己的,基于value值来判断,代码如下:
public boolean releaseLock_with_lua(String key,String value) { String luaScript = "if redis.call(‘get‘,KEYS[1]) == ARGV[1] then " + "return redis.call(‘del‘,KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); }
这里使用Lua脚本的方式,尽量保证原子性。
使用 set key value [EX seconds][PX milliseconds][NX|XX]
命令 看上去很OK,实际上在Redis集群的时候也会出现问题,比如说A客户端在Redis的master节点上拿到了锁,但是这个加锁的key还没有同步到slave节点,master故障,发生故障转移,一个slave节点升级为master节点,B客户端也可以获取同个key的锁,但客户端A也已经拿到锁了,这就导致多个客户端都拿到锁。
所以针对Redis集群这种情况,还有其他方案
Redis作者 antirez基于分布式环境下提出了一种更高级的分布式锁的实现Redlock,原理如下:
假设有5个独立的Redis节点(注意这里的节点可以是5个Redis单master实例,也可以是5个Redis Cluster集群,但并不是有5个主节点的cluster集群):
对于Java用户而言,我们经常使用Jedis,Jedis是Redis的Java客户端,除了Jedis之外,Redisson也是Java的客户端,Jedis是阻塞式I/O,而Redisson底层使用Netty可以实现非阻塞I/O,该客户端封装了锁的,继承了J.U.C的Lock接口,所以我们可以像使用ReentrantLock一样使用Redisson,具体使用过程如下。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.6</version> </dependency>
// 1. 配置文件 Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword(RedisConfig.PASSWORD) .setDatabase(0); //2. 构造RedissonClient RedissonClient redissonClient = Redisson.create(config); //3. 设置锁定资源名称 RLock lock = redissonClient.getLock("redlock"); lock.lock(); try { System.out.println("获取锁成功,实现业务逻辑"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }
关于Redlock算法的实现,在Redisson中我们可以使用RedissonRedLock来完成,具体使用细节可以参考大佬的文章: mp.weixin.qq.com/s/8uhYult2h…
下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。
自定义一个注解,被注解的方法会执行获取分布式锁的逻辑
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RedisLock { /** * 业务键 * * @return */ String key(); /** * 锁的过期秒数,默认是5秒 * * @return */ int expire() default 5; /** * 尝试加锁,最多等待时间 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 锁的超时时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:
@Aspect @Component public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper; @Autowired private JedisUtil jedisUtil; private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class); @Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("获取锁失败"); throw new RuntimeException("获取锁失败"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } finally { logger.info("释放锁"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } } }
@Component public class RedisLockHelper { private long sleepTime = 100; /** * 直接使用setnx + expire方式获取分布式锁 * 非原子性 * * @param key * @param value * @param timeout * @return */ public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) { Long result = jedis.setnx(key, value); // result = 1时,设置成功,否则设置失败 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } } /** * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call(‘setnx‘,KEYS[1],ARGV[1]) == 1 then" + "redis.call(‘expire‘,KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判断是否成功 return result.equals(1L); } /** * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); } /** * 自定义获取锁的超时时间 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 错误的解锁方法—直接删除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); } /** * 使用Lua脚本进行解锁操纵,解锁的时候验证value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call(‘get‘,KEYS[1]) == ARGV[1] then " + "return redis.call(‘del‘,KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); } }
定义一个TestController来测试我们实现的分布式锁
@RestController public class TestController { @RedisLock(key = "redis_lock") @GetMapping("/index") public String index() { return "index"; } }
分布式锁重点在于互斥性,在任意一个时刻,只有一个客户端获取了锁。在实际的生产环境中,分布式锁的实现可能会更复杂,而我这里的讲述主要针对的是单机环境下的基于Redis的分布式锁实现,至于Redis集群环境并没有过多涉及,有兴趣的朋友可以查阅相关资料。
项目源码地址:github.com/pjmike/redi…
作者:pjmike_pj
链接:https://juejin.im/post/5cc165816fb9a03202221dd5
来源:掘金
标签:作者 doc 为什么 sync 存在 当前时间 部署 情况 等于
原文地址:https://www.cnblogs.com/zl1991/p/12983781.html