标签:org lease this RoCE ota ptr spl proc http
背景:我们系统有一个下传单据接口由于上游推送重复单据[产生异步任务],消费任务的时候是多线程并发执行,导致我们的数据库有很多重复的脏数据,数据库由于业务原因无法加唯一性索引。
解决方案:使用redis的setnx命令实现分布式锁。
原理:setnx---> 这种加锁的思路是,如果 key 不存在,将 key 设置为 value,返回true。如果 key 已存在,则 SETNX 不做任何动作,返回false。
首先进行一次setnx命令,尝试获取锁,如果获取成功,则设置锁的最终超时时间(以防在当前进程获取锁后奔溃导致锁无法释放);如果获取锁失败,则检查当前的锁是否超时,如果发现没有超时,则获取锁失败;如果发现锁已经超时(即锁的超时时间小于等于当前时间),则再次尝试获取锁,取到后判断下当前的超时时间和之前的超时时间是否相等,如果相等则说明当前的客户端是排队等待的线程里的第一个尝试获取锁的,让它获取成功即可。
1 package com.atlas.sys.service; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.data.redis.core.RedisTemplate; 6 import org.springframework.stereotype.Component; 7 8 import javax.annotation.Resource; 9 import java.time.Instant; 10 import java.util.concurrent.TimeUnit; 11 12 /** 13 * @program: atlas-parent 14 * @description: 分布式锁 15 * @author: siming.wang 16 * @create: 2018-12-14 14:15 17 **/ 18 @Component 19 public class RedisDistributionLock { 20 21 private static final Logger logger = LoggerFactory.getLogger(RedisDistributionLock.class); 22 23 //key的TTL,一天 24 private static final int finalDefaultTTLwithKey = 100; 25 26 //锁默认超时时间,20秒 27 private static final long defaultExpireTime = 10 * 1000; 28 29 private static final boolean Success = true; 30 31 @Resource( name = "redisTemplate") 32 private RedisTemplate<String, String> redisTemplateForGeneralize; 33 34 /** 35 * 加锁,锁默认超时时间20秒 36 * @param resource 37 * @return 38 */ 39 public boolean lock(String resource) { 40 return this.lock(resource, defaultExpireTime); 41 } 42 43 /** 44 * 加锁,同时设置锁超时时间 45 * @param key 分布式锁的key 46 * @param expireTime 单位是ms 47 * @return 48 */ 49 public boolean lock(String key, long expireTime) { 50 51 logger.debug("redis lock debug, start. key:[{}], expireTime:[{}]",key,expireTime); 52 long now = Instant.now().toEpochMilli(); 53 long lockExpireTime = now + expireTime; 54 55 //setnx 56 boolean executeResult = redisTemplateForGeneralize.opsForValue().setIfAbsent(key,String.valueOf(lockExpireTime)); 57 logger.debug("redis lock debug, setnx. key:[{}], expireTime:[{}], executeResult:[{}]", key, expireTime,executeResult); 58 59 //取锁成功,为key设置expire 60 if (executeResult == Success) { 61 redisTemplateForGeneralize.expire(key,finalDefaultTTLwithKey, TimeUnit.SECONDS); 62 return true; 63 } 64 //没有取到锁,继续流程 65 else { 66 Object valueFromRedis = this.getKeyWithRetry(key, 3); 67 // 避免获取锁失败,同时对方释放锁后,造成NPE 68 if (valueFromRedis != null) { 69 //已存在的锁超时时间 70 long oldExpireTime = Long.parseLong((String) valueFromRedis); 71 logger.debug("redis lock debug, key already seted. key:[{}], oldExpireTime:[{}]", key, oldExpireTime); 72 //锁过期时间小于当前时间,锁已经超时,重新取锁 73 if (oldExpireTime <= now) { 74 logger.debug("redis lock debug, lock time expired. key:[{}], oldExpireTime:[{}], now:[{}]", key, oldExpireTime, now); 75 String valueFromRedis2 = redisTemplateForGeneralize.opsForValue().getAndSet(key, String.valueOf(lockExpireTime)); 76 long currentExpireTime = Long.parseLong(valueFromRedis2); 77 //判断currentExpireTime与oldExpireTime是否相等 78 if (currentExpireTime == oldExpireTime) { 79 //相等,则取锁成功 80 logger.debug("redis lock debug, getSet. key:[{}], currentExpireTime:[{}], oldExpireTime:[{}], lockExpireTime:[{}]", key, currentExpireTime, oldExpireTime, lockExpireTime); 81 redisTemplateForGeneralize.expire(key, finalDefaultTTLwithKey, TimeUnit.SECONDS); 82 return true; 83 } else { 84 //不相等,取锁失败 85 return false; 86 } 87 } 88 } else { 89 logger.warn("redis lock,lock have been release. key:[{}]", key); 90 return false; 91 } 92 } 93 return false; 94 } 95 96 private Object getKeyWithRetry(String key, int retryTimes) { 97 int failTime = 0; 98 while (failTime < retryTimes) { 99 try { 100 return redisTemplateForGeneralize.opsForValue().get(key); 101 } catch (Exception e) { 102 failTime++; 103 if (failTime >= retryTimes) { 104 throw e; 105 } 106 } 107 } 108 return null; 109 } 110 111 /** 112 * 解锁 113 * @param key 114 * @return 115 */ 116 public boolean unlock(String key) { 117 logger.debug("redis unlock debug, start. resource:[{}].",key); 118 redisTemplateForGeneralize.delete(key); 119 return Success; 120 } 121 }
到这里就可以完成分布式锁工具的搭建。
如果需要更优雅的实现方式,可以考虑用aop:
1 package com.atlas.sys.annoation; 2 3 import java.lang.annotation.ElementType; 4 import java.lang.annotation.Retention; 5 import java.lang.annotation.RetentionPolicy; 6 import java.lang.annotation.Target; 7 8 /** 9 * @program: atlas-parent 10 * @description: 11 * @author: siming.wang 12 * @create: 2018-12-14 14:21 13 **/ 14 @Retention(RetentionPolicy.RUNTIME) 15 @Target(ElementType.METHOD) 16 public @interface RedisLockAnnoation { 17 18 String keyPrefix() default ""; 19 20 /** 21 * 要锁定的key中包含的属性 22 */ 23 String[] keys() default {}; 24 25 /** 26 * 是否阻塞锁; 27 * 1. true:获取不到锁,阻塞一定时间; 28 * 2. false:获取不到锁,立即返回 29 */ 30 boolean isSpin() default false; 31 32 /** 33 * 超时时间 34 */ 35 int expireTime() default 10000; 36 37 /** 38 * 等待时间 39 */ 40 int waitTime() default 50; 41 42 /** 43 * 获取不到锁的等待时间 44 */ 45 int retryTimes() default 20; 46 }
以及:
1 package com.atlas.sys.listener; 2 3 import com.atlas.model.download.DownloadReturnOther; 4 import com.atlas.sys.annoation.RedisLockAnnoation; 5 import com.atlas.sys.service.RedisDistributionLock; 6 import org.apache.commons.lang3.StringUtils; 7 import org.apache.commons.lang3.reflect.MethodUtils; 8 import org.aspectj.lang.ProceedingJoinPoint; 9 import org.aspectj.lang.annotation.Around; 10 import org.aspectj.lang.annotation.Aspect; 11 import org.aspectj.lang.reflect.MethodSignature; 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 import org.springframework.stereotype.Component; 15 16 import javax.annotation.Resource; 17 import java.lang.reflect.Method; 18 import java.lang.reflect.Parameter; 19 20 /** 21 * @program: atlas-parent 22 * @description: 23 * @author: siming.wang 24 * @create: 2018-12-14 14:22 25 **/ 26 @Component 27 @Aspect 28 public class RedisLockAdvice { 29 30 private static final Logger logger = LoggerFactory.getLogger(RedisLockAdvice.class); 31 32 @Resource 33 private RedisDistributionLock redisDistributionLock; 34 35 @Around("@annotation(redisLockAnnoation)") 36 // @Around("execution(* com.atlas.ibd.service.IbdAsnReceiptReturnService.insertIbdAsnReceipt())") 37 public Object processAround(ProceedingJoinPoint pjp,RedisLockAnnoation redisLockAnnoation) throws Throwable { 38 //获取方法上的注解对象 39 String methodName = pjp.getSignature().getName(); 40 Class<?> classTarget = pjp.getTarget().getClass(); 41 Class<?>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes(); 42 Method objMethod = classTarget.getMethod(methodName, par); 43 redisLockAnnoation = objMethod.getDeclaredAnnotation(RedisLockAnnoation.class); 44 45 //拼装分布式锁的key 46 String[] keys = redisLockAnnoation.keys(); 47 Object[] args = pjp.getArgs(); 48 Object arg = args[0]; 49 StringBuilder temp = new StringBuilder(); 50 temp.append(redisLockAnnoation.keyPrefix()); 51 for (String key : keys) { 52 String getMethod = "get" + StringUtils.capitalize(key); 53 temp.append(MethodUtils.invokeExactMethod(arg, getMethod)).append("_"); 54 } 55 String redisKey = StringUtils.removeEnd(temp.toString(), "_"); 56 57 //执行分布式锁的逻辑 58 if (redisLockAnnoation.isSpin()) { 59 //阻塞锁 60 int lockRetryTime = 0; 61 try { 62 while (!redisDistributionLock.lock(redisKey, redisLockAnnoation.expireTime())) { 63 if (lockRetryTime++ > redisLockAnnoation.retryTimes()) { 64 logger.error("lock exception. key:{}, lockRetryTime:{}", redisKey, lockRetryTime); 65 throw new RuntimeException("lock exception. key:{"+redisKey+"}, lockRetryTime:{"+lockRetryTime+"}"); 66 } 67 } 68 return pjp.proceed(); 69 } finally { 70 redisDistributionLock.unlock(redisKey); 71 } 72 } else { 73 //非阻塞锁 74 try { 75 if (!redisDistributionLock.lock(redisKey)) { 76 logger.error("lock exception. key:{}", redisKey); 77 throw new RuntimeException("lock exception. key:{"+redisKey+"}"); 78 } 79 return pjp.proceed(); 80 } finally { 81 // Thread.sleep(2000); 82 // redisDistributionLock.unlock(redisKey); 83 } 84 } 85 } 86 }
这样在你需要用锁的方法上使用这个注解,并根据你的参数值设置需要锁定的key。
后记(自己项目所踩的坑):
由于需要在注解上拿到锁key的值,所以我把注解放在了可以拿到锁值的service层,controller拿不到锁《或者拿锁太繁杂》,这样这个切面的环绕就是我的service,但是commit操作又是在controller层,所以把解锁的代码从aop拿掉了,直接在controller层commit之后再去unlock锁
由于没有实现按照请求来区分锁是哪个线程加的,导致其他线程也可以解锁,所以这里在catch异常处没有unlock锁,这样会导致死锁。
有一个方法(low):判断exception的类型,是业务错误则需要把锁给unlock,不是业务错误则说明是未获取到锁,则不允许释放锁。
还有另一种最优雅的实现方式是:解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。setnx第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
写在最后:最后还是强烈推荐大家使用github上7k多星星的开源好项目:redisson https://github.com/redisson/redisson 关于redis相关的他都帮你实现了??
标签:org lease this RoCE ota ptr spl proc http
原文地址:https://www.cnblogs.com/wangsiming/p/10123368.html