/** * 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface LocalCacheOperation { long localCacheInterval() default 1000 * 60 * 60; String localCacheKey() default ""; }
//为缓存对象包上一层时间戳 public class CacheObject implements Serializable { private static final long serialVersionUID = 4873268779348802945L; private long timestamp; private Object object; ..... }
public interface ConfigService { public BloomFilter<String> getAllPassengerNames(); public BloomFilter<Long> getAllTrades(); } public class ConfigServiceImpl implements ConfigService { private PassengerManager passengerManager; private TradeManager tradeManager; @LocalCacheOperation(localCacheInterval=1000*60*60*24) public BloomFilter<String> getAllPassengerNames() { return passengerManager.getAllPassengerNames(); } @LocalCacheOperation public BloomFilter<Long> getAllTrades() { return tradeManager.getAllTrades(); } }
/** * 本地缓存切面实现 */ @Aspect public class LocalCacheAspect { private static final Log log = LogFactory.getLog(LocalCacheAspect.class); private final ConcurrentMap<String, Future<CacheObject>> localCache = new ConcurrentHashMap<String, Future<CacheObject>>(); //控制台 private ConsoleBean consoleBean; private final ConcurrentMap<String, SoftReference<CacheObject>> concurrentLocalCache = new ConcurrentHashMap<String, SoftReference<CacheObject>>(); /** * 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程 * 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null */ private final static Map<String, Lock> localCacheLocks = new HashMap<String, Lock>(); static { localCacheLocks.put("getAllTrades", new ReentrantLock()); } /** * Advice aound audit operations * * @param pjpParam * @return */ @Around("execution(@LocalCacheOperation * *(..))") public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable { if(!getConsoleBean().isLocalCacheSwitchOn()) { log.warn("localCache not switch on, please pay attention"); return pjpParam.proceed(pjpParam.getArgs()); } final ProceedingJoinPoint pjp = pjpParam; Signature sig = pjp.getSignature(); if (sig instanceof MethodSignature) { MethodSignature mSig = (MethodSignature) sig; LocalCacheOperation co = mSig.getMethod().getAnnotation( LocalCacheOperation.class); long localCacheInterval = 0; String localCacheKey = null; /** * AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空 * @author chenlei.cl */ if( co == null ){ localCacheInterval = consoleBean.getLocalCacheInterval(); localCacheKey = mSig.getName(); } else { localCacheInterval = co.localCacheInterval(); localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ? co.localCacheKey() : mSig.getName(); } if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁 return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey); } while (true) {// 等待某个线程将数据获取到本地缓存 Future<CacheObject> f = localCache.get(localCacheKey); try { long currentTime = System.currentTimeMillis(); if (f != null && f.get() != null && currentTime - f.get().getTimestamp() > localCacheInterval) { localCache.remove(localCacheKey, f); f = null; } if (f == null) { Callable<CacheObject> eval = new Callable<CacheObject>() { public CacheObject call() throws InterruptedException { Object res; try { res = pjp.proceed(pjp.getArgs()); } catch (Throwable e) { log.error("Fail to process method", e); throw new ServiceException(e.getMessage()); } CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); return cacheObject; } }; FutureTask<CacheObject> ft = new FutureTask<CacheObject>(eval); f = localCache.putIfAbsent(localCacheKey, ft); if (f == null) { f = ft; ft.run(); } } CacheObject obj = f.get(); if (obj != null) return obj.getObject(); } catch (CancellationException e) { localCache.remove(localCacheKey, f); } catch (ExecutionException e) { throw new ServiceException(e.getMessage()); } } } return pjp.proceed(pjp.getArgs()); } @SuppressWarnings("static-access") public Object doConcurrentLocalCache(ProceedingJoinPoint pjp, long localCacheInterval, String localCacheKey) throws Throwable { try { long currentTime = System.currentTimeMillis(); SoftReference<CacheObject> weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null && currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) { // 缓存过期 weakRefCacheObj.get().setObject(null); // 清空引用 concurrentLocalCache.remove(localCacheKey, weakRefCacheObj); weakRefCacheObj = null; } else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { return weakRefCacheObj.get().getObject(); } if (this.localCacheLocks.get(localCacheKey).tryLock()) { weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { // double check return weakRefCacheObj.get().getObject(); } try { Object res = pjp.proceed(pjp.getArgs()); CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); weakRefCacheObj = new SoftReference<CacheObject>( cacheObject); concurrentLocalCache.put(localCacheKey, weakRefCacheObj); return res; } finally { this.localCacheLocks.get(localCacheKey).unlock(); } } else { return null; // make the other part wait } } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } } }
原文地址:http://blog.csdn.net/troy__/article/details/39320699