码迷,mamicode.com
首页 > 其他好文 > 详细

hystrix总结之缓存

时间:2018-01-11 15:29:59      阅读:528      评论:0      收藏:0      [点我收藏+]

标签:get   ash   sem   state   ++   finally   call()   是的   create   

  通过实现HystrixCommand或者HystrixObservableCommand的getCacheKey方法,可以启动缓存。

public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
    private final int value;
    protected CommandUsingRequestCache(int value) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.value = value;
    }
    @Override
    protected Boolean run() {
        return value == 0 || value % 2 == 0;
    }
    @Override
    protected String getCacheKey() {
        return String.valueOf(value);
    }
}

执行命令

@Test
        public void testWithoutCacheHits() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                assertTrue(new CommandUsingRequestCache(2).execute());
                assertFalse(new CommandUsingRequestCache(1).execute());
                assertTrue(new CommandUsingRequestCache(0).execute());
                assertTrue(new CommandUsingRequestCache(58672).execute());
            } finally {
                context.shutdown();
            }
        }

  Hystrix通过getCacheKey方法来获取缓存中的值,缓存值的生命周期为一个请求。

  本质上,在toObservable方法中,在执行前添加了从缓存中获取缓存的逻辑,在执行后,将返回结果存入缓存的逻辑。

public Observable<R> toObservable() {
...
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
               ...
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                Observable<R> afterCache;
                if (requestCacheEnabled && cacheKey != null) {
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    ...
                } else {
                    afterCache = hystrixObservable;
                }
...
            }
        });
    }

  HystrixCachedObservable内部使用type(命令类型:HystrixCommandKey为1,HystrixCollapserKey为2),commandKey的name,HystrixConcurrencyStrategy作为缓存的key  

private static class RequestCacheKey {
        private final short type; 
        private final String key;
        private final HystrixConcurrencyStrategy concurrencyStrategy;
...
}
 private static class ValueCacheKey {
        private final RequestCacheKey rvKey;
        private final String valueCacheKey;
...
}

  HystrixCachedObservable的值存储一个Observable,并且通过HystrixCachedObservable进行封装。

protected HystrixCachedObservable(final Observable<R> originalObservable) {
        ReplaySubject<R> replaySubject = ReplaySubject.create();
        this.originalSubscription = originalObservable
                .subscribe(replaySubject);

        this.cachedObservable = replaySubject
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions--;
                        if (outstandingSubscriptions == 0) {
                            originalSubscription.unsubscribe();
                        }
                    }
                })
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions++;
                    }
                });
    }

  HystrixCachedObservable内部使用一个Map作为缓存, 这个Map存储在一个HystrixRequestVariableHolder中,所以它的生命周期为一个请求内部。

private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(); 
<T> HystrixCachedObservable<T> get(String cacheKey) {
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            /* look for the stored value */
            return (HystrixCachedObservable<T>) cacheInstance.get(key);
        }
        return null;
    }

如果异常 是否也缓存?

  是的,fallback的异常也会被缓存下来。

缓存的有效范围?

  为同一个请求内。

hystrix总结之缓存

标签:get   ash   sem   state   ++   finally   call()   是的   create   

原文地址:https://www.cnblogs.com/zhangwanhua/p/8257496.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!