标签:create local obj 调用 nta statistic ddp duration oid
/** * CacheFilter * group为consumer或者provider 同时 含有cache=的配置的时候 返回此过滤器 */ @Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY) public class CacheFilter implements Filter { /** * CacheFactory$Adaptive 对象。 * <p> * X * 通过 Dubbo SPI 机制,调用 {@link #setCacheFactory(CacheFactory)} 方法,进行注入 */ private CacheFactory cacheFactory; public void setCacheFactory(CacheFactory cacheFactory) { this.cacheFactory = cacheFactory; } @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { //方法开启 Cache 功能 判断method是否配置了cache if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) { //通过cacheFactory获取cache对象 Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation); if (cache != null) { String key = StringUtils.toArgumentString(invocation.getArguments()); Object value = cache.get(key); if (value != null) { return new RpcResult(value); } Result result = invoker.invoke(invocation); //没发生异常 存入cache if (!result.hasException() && result.getValue() != null) { cache.put(key, result.getValue()); } return result; } } return invoker.invoke(invocation); } }
//默认是lru @SPI("lru") public interface CacheFactory { //是否带SPI扩展根据cache参数获取 @Adaptive("cache") Cache getCache(URL url, Invocation invocation); }
/** * AbstractCacheFactory */ public abstract class AbstractCacheFactory implements CacheFactory { private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>(); /** * 模板方法模式 父类通过将cacheKey存入缓存 子类负责创建Cache * @param url * @param invocation * @return */ @Override public Cache getCache(URL url, Invocation invocation) { url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName()); //获取Key 并从缓存获取 String key = url.toFullString(); Cache cache = caches.get(key); if (cache == null) { //交给子类创建缓存 caches.put(key, createCache(url)); cache = caches.get(key); } return cache; } protected abstract Cache createCache(URL url); }
/** * LruCacheFactory */ public class LruCacheFactory extends AbstractCacheFactory { @Override protected Cache createCache(URL url) { return new LruCache(url); } }
public class LruCache implements Cache { /** * 缓存集合 */ private final Map<Object, Object> store; public LruCache(URL url) { // `"cache.size"` 配置项,设置缓存大小 final int max = url.getParameter("cache.size", 1000); // 创建 LRUCache 对象 内部采用likendMap实现 this.store = new LRUCache<Object, Object>(max); } @Override public void put(Object key, Object value) { store.put(key, value); } @Override public Object get(Object key) { return store.get(key); } }
/** * ThreadLocalCacheFactory */ public class ThreadLocalCacheFactory extends AbstractCacheFactory { @Override protected Cache createCache(URL url) { return new ThreadLocalCache(url); } }
/** * ThreadLocalCache */ public class ThreadLocalCache implements Cache { private final ThreadLocal<Map<Object, Object>> store; public ThreadLocalCache(URL url) { this.store = new ThreadLocal<Map<Object, Object>>() { @Override protected Map<Object, Object> initialValue() { return new HashMap<Object, Object>(); } }; } @Override public void put(Object key, Object value) { store.get().put(key, value); } @Override public Object get(Object key) { return store.get().get(key); } }
与 JSR107 集成,可以桥接各种缓存实现。
/** * JCacheFactory */ public class JCacheFactory extends AbstractCacheFactory { @Override protected Cache createCache(URL url) { return new JCache(url); } }
public class JCache implements com.alibaba.dubbo.cache.Cache { private final Cache<Object, Object> store; public JCache(URL url) { String method = url.getParameter(Constants.METHOD_KEY, ""); String key = url.getAddress() + "." + url.getServiceKey() + "." + method; // jcache parameter is the full-qualified class name of SPI implementation String type = url.getParameter("jcache"); CachingProvider provider = type == null || type.length() == 0 ? Caching.getCachingProvider() : Caching.getCachingProvider(type); CacheManager cacheManager = provider.getCacheManager(); Cache<Object, Object> cache = cacheManager.getCache(key); if (cache == null) { try { //configure the cache MutableConfiguration config = new MutableConfiguration<Object, Object>() .setTypes(Object.class, Object.class) .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, url.getMethodParameter(method, "cache.write.expire", 60 * 1000)))) .setStoreByValue(false) .setManagementEnabled(true) .setStatisticsEnabled(true); cache = cacheManager.createCache(key, config); } catch (CacheException e) { // concurrent cache initialization cache = cacheManager.getCache(key); } } this.store = cache; } @Override public void put(Object key, Object value) { store.put(key, value); } @Override public Object get(Object key) { return store.get(key); } }
我们可以通过JCCache扩展我们的缓存 比如redis缓存
标签:create local obj 调用 nta statistic ddp duration oid