标签:oscache 缓存 源码 并发 lru linkedhashmap
在并发量比较大的场景,如果采用直接访问数据库的方式,将会对数据库带来巨大的压力,严重的情况下可能会导致数据库不可用状态,并且时间的消耗也是不能容忍的。在这种情况下,一般采用缓存的方式。将经常访问的热点数据提前加载到内存中,这样能够大大降低数据库的压力。
OSCache是一个开源的缓存框架,虽然现在已经停止维护了,但是对于OSCache的实现还是值得学习和借鉴的。下面通过OSCache的部分源码分析OSCache的设计思想。
缓存数据结构
通常缓存都是通过<K,V>这种数据结构存储,但缓存都是应用在多线程的场景下,需要保证线程安全。Java中可以选择HashTable、ConcurrentHashMap、synchronizedMap等。OSCache本质上使用的HashTable实现的,具体实现代码在com.opensymphony.oscache.base.algorithm.AbstractConcurrentReadCache中。由于HashTable在保证线程安全上采用的加锁整个代码段,因此,适合读多写少(mostly-concurrent reading, but exclusive writing)的场景。如果写很多的话,可以采用ConcurrentHashMap数据结构。
获得缓存内容
//从缓存中获取指定key对应的内容 public Object getFromCache(String key, int refreshPeriod, String cronExpiry) throws NeedsRefreshException { //首先尝试获取内容,如果获取不到,则新建一个CacheEntry对象 CacheEntry cacheEntry = this.getCacheEntry(key, null, null); Object content = cacheEntry.getContent(); CacheMapAccessEventType accessEventType = CacheMapAccessEventType.HIT; boolean reload = false; // 检查缓存是否过期,如果过期分为以下几种情况处理 if (this.isStale(cacheEntry, refreshPeriod, cronExpiry)) { //获取更新状态,如果没有,新建一个同时引用计数默认为1 EntryUpdateState updateState = getUpdateState(key); try { synchronized (updateState) { if (updateState.isAwaitingUpdate() || updateState.isCancelled()) { // 如果状态为等待刷新或者已经取消刷新,说明当前没有其他线程对其进行刷新操作 // 因此这里启动刷新操作(这里会将状态更新为刷新中同时引用计数+1) updateState.startUpdate(); //如果是新建的CacheEntry对象(即之前未缓存该key对应的对象) if (cacheEntry.isNew()) { //设置命中状态为未命中 accessEventType = CacheMapAccessEventType.MISS; } else { //否则说明虽然命中了,但是需要刷新 accessEventType = CacheMapAccessEventType.STALE_HIT; } } else if (updateState.isUpdating()) { // 如果更新状态为刷新中,说明另有一个线程正对该缓存对象执行刷新操作 // 此时如果是新建的CacheEntry对象或者同步模式设置为true,那么该线程将阻塞 //通过putInCache或者cancelUpdate可以让线程继续运行 // 否则获取到的很有可能是脏数据 if (cacheEntry.isNew() || blocking) { do { try { updateState.wait(); } catch (InterruptedException e) { } } while (updateState.isUpdating()); //如果更新状态变成了取消,说明另外一个线程取消了刷新缓存操作,那么让该线程尝试刷新 if (updateState.isCancelled()) { //更新状态设置为更新中并将引用计数+1 updateState.startUpdate(); if (cacheEntry.isNew()) { accessEventType = CacheMapAccessEventType.MISS; } else { accessEventType = CacheMapAccessEventType.STALE_HIT; } } else if (updateState.isComplete()) { reload = true; } else { log.error("Invalid update state for cache entry " + key); } } } else { reload = true; } } } finally { //将引用计数-1同时检查如果引用计数=0,将updateState移除 releaseUpdateState(updateState, key); } } // 如果该标志位为true,说明缓存一定刷新了 if (reload) { cacheEntry = (CacheEntry) cacheMap.get(key); if (cacheEntry != null) { content = cacheEntry.getContent(); } else { log.error("Could not reload cache entry after waiting for it to be rebuilt"); } } dispatchCacheMapAccessEvent(accessEventType, cacheEntry, null); // 如果缓存不存在或者缓存过期将抛出需要刷新的异常 if (accessEventType != CacheMapAccessEventType.HIT) { throw new NeedsRefreshException(content); } return content; }
从上面可以看到EntryUpdateState很关键。EntryUpdateState用来标记某个key对应的缓存的更新状态以及线程引用计数(可以理解为一个计数器),并且每一个key对应一个EntryUpdateState。如果缓存存在并且没有过期,EntryUpdateState为空。这样做的目的就是防止过多的使用synchronize,从而对性能不会造成很大的影响。
查看定义如下:
//默认 public static final int NOT_YET_UPDATING = -1; public static final int UPDATE_IN_PROGRESS = 0; public static final int UPDATE_COMPLETE = 1; public static final int UPDATE_CANCELLED = 2; int state = NOT_YET_UPDATING; //引用计数 private int nbConcurrentUses = 1;
这里的引用计数,代表了当前有多少线程在缓存更新或存入过程中进行访问。
通过上面从缓存获取指定key的代码可以发现一个问题:当缓存不存在或者缓存过期的情况下,都会抛出NeedsRefreshException的异常,在这种情况下,如果blocking设置为true(通常设置为true),其他访问的线程将处于阻塞状态,直到缓存更新完毕才会继续运行,倘若这里处理不当,将会导致死锁的发生。
因此在该异常产生时,需要进行缓存的刷新操作,官方给出了两种方法:
//第一种:with fail over String myKey = "myKey"; String myValue; int myRefreshPeriod = 1000; try { // Get from the cache myValue = (String) admin.getFromCache(myKey, myRefreshPeriod); } catch (NeedsRefreshException nre) { try { // Get the value (probably by calling an EJB) myValue = "This is the content retrieved."; // Store in the cache admin.putInCache(myKey, myValue); } catch (Exception ex) { // We have the current content if we want fail-over. myValue = (String) nre.getCacheContent(); // It is essential that cancelUpdate is called if the // cached content is not rebuilt admin.cancelUpdate(myKey); } }
//第二种:without fail over String myKey = "myKey"; String myValue; int myRefreshPeriod = 1000; try { // Get from the cache myValue = (String) admin.getFromCache(myKey, myRefreshPeriod); } catch (NeedsRefreshException nre) { try { // Get the value (probably by calling an EJB) myValue = "This is the content retrieved."; // Store in the cache admin.putInCache(myKey, myValue); updated = true; } finally { if (!updated) { // It is essential that cancelUpdate is called if the // cached content could not be rebuilt admin.cancelUpdate(myKey); } } }
正如之前的代码,如果这里不调用putInCache或者cancelUpdate,其他访问该缓存的线程将会由于得不到资源始终处于阻塞状态,导致死锁的发生。因此这里是一个非常重要的关注点。只有调用了putInCache或者cancelUpdate方法,阻塞的线程才会开始运行。
下面看一下putInCache和cancelUpdate方法具体做了什么:
public void putInCache(String key, Object content, String[] groups, EntryRefreshPolicy policy, String origin) { CacheEntry cacheEntry = this.getCacheEntry(key, policy, origin); boolean isNewEntry = cacheEntry.isNew(); // 首先判断缓存中是否已经存在 if (!isNewEntry) { cacheEntry = new CacheEntry(key, policy); } cacheEntry.setContent(content); cacheEntry.setGroups(groups); cacheMap.put(key, cacheEntry); // 更新状态及引用计数,通知其它阻塞线程可以获取缓存了 completeUpdate(key); //...... //...... } } protected void completeUpdate(String key) { EntryUpdateState state; synchronized (updateStates) { state = (EntryUpdateState) updateStates.get(key); if (state != null) { synchronized (state) { //更新状态为UPDATE_COMPLETE,引用计数-1 int usageCounter = state.completeUpdate(); //唤醒其它等待该缓存资源的线程 state.notifyAll(); checkEntryStateUpdateUsage(key, state, usageCounter); } } else { //如果putInCache方法直接调用(如不是因NeedRefreshException异常调用)这样EntryUpdateState将为null,不执行操作 } } }
cancelUpdate的逻辑和putInCache基本相同:
public void cancelUpdate(String key) { EntryUpdateState state; if (key != null) { synchronized (updateStates) { state = (EntryUpdateState) updateStates.get(key); if (state != null) { synchronized (state) { //更新状态为UPDATE_CANCELLED,引用计数-1 int usageCounter = state.cancelUpdate(); state.notify(); checkEntryStateUpdateUsage(key, state, usageCounter); } } else { if (log.isErrorEnabled()) { log.error("internal error: expected to get a state from key [" + key + "]"); } } } } }
从上面的代码可以看到,当发生需要刷新缓存(NeedsRefreshException)的异常时,需要通过putInCache()方法进行缓存的更新或者cancelUpdate()方法放弃刷新缓存,从而释放资源,唤醒其它阻塞的线程。
缓存淘汰(替换)策略
因为我们的内存不是无限的,缓存不可能无限的扩大,因此在缓存占满时,我们需要将缓存中一些“不重要”的内容剔除,从而腾出空间缓存新的内容。如何丈量这个“不重要”,就是我们需要考虑的缓存淘汰(替换)策略。
一般有以下策略:
Least Frequently Used(LFU):计算每个缓存对象的使用频率,将频率最低的剔除;
Least Recently User(LRU):最近最少使用,具体是将最近访问的内容始终放在最顶端,一直未访问或者最久未访问的内容放在最底端,当需要替换的时候,只需将最底端的剔除即可,这样可以使得最常访问的内容始终在缓存中,使用比较广泛,OSCache中默认也是采用该方法。LRU的这种特性,在Java中很容易通过LinkedHashMap实现,具体实现方法可以参考下面的介绍。
First in First out(FIFO):先进先出。实现起来最为简单,但是不适用。
Random Cache:随机替换。
当然还有很多替换算法,这里就不一一列举了。仅就最常用的LRU算法进行介绍。
Java中的LinkedHashMap可以保持插入顺序或者访问顺序,对于第二个特性,跟LRU的机制很相似,因此,可以很简单的采用LinkedHashMap来实现LRU算法。
查看LinkedHashMap的定义,有下面一个参数:
final boolean accessOrder;
再看几个构造函数的定义:
public LinkedHashMap(int initialCapacity, float loadFactor) { super(initialCapacity, loadFactor); accessOrder = false; } public LinkedHashMap(int initialCapacity) { super(initialCapacity); accessOrder = false; } public LinkedHashMap() { super(); accessOrder = false; } public LinkedHashMap(Map<? extends K, ? extends V> m) { super(); accessOrder = false; putMapEntries(m, false); } public LinkedHashMap(int initialCapacity, float loadFactor, boolean accessOrder) { super(initialCapacity, loadFactor); this.accessOrder = accessOrder; }
可以看到,除了最后一个构造函数,其余的accessOrder默认为false。当accessOrder为false时,LinkedHashMap保持插入顺序,而accessOrder如果为true,将保持访问顺序,因此这正是关键点。具体如何保持插入顺序或者访问顺序,可以参考LinkedHashMap的实现代码,并不复杂。
仅仅是保持访问顺序还不行,我们还要淘汰最近最少使用的对象。LinkedHashMap重写了父类HashMap的afterNodeInsertion方法:
void afterNodeInsertion(boolean evict) { // possibly remove eldest LinkedHashMap.Entry<K,V> first; if (evict && (first = head) != null && removeEldestEntry(first)) { K key = first.key; removeNode(hash(key), key, null, false, true); } } /** * Returns <tt>true</tt> if this map should remove its eldest entry. * This method is invoked by <tt>put</tt> and <tt>putAll</tt> after * inserting a new entry into the map. It provides the implementor * with the opportunity to remove the eldest entry each time a new one * is added. This is useful if the map represents a cache: it allows * the map to reduce memory consumption by deleting stale entries. * * <p>Sample use: this override will allow the map to grow up to 100 * entries and then delete the eldest entry each time a new entry is * added, maintaining a steady state of 100 entries. * <pre> * private static final int MAX_ENTRIES = 100; * * protected boolean removeEldestEntry(Map.Entry eldest) { * return size() > MAX_ENTRIES; * } * </pre> * * <p>This method typically does not modify the map in any way, * instead allowing the map to modify itself as directed by its * return value. It <i>is</i> permitted for this method to modify * the map directly, but if it does so, it <i>must</i> return * <tt>false</tt> (indicating that the map should not attempt any * further modification). The effects of returning <tt>true</tt> * after modifying the map from within this method are unspecified. * * <p>This implementation merely returns <tt>false</tt> (so that this * map acts like a normal map - the eldest element is never removed). * * @param eldest The least recently inserted entry in the map, or if * this is an access-ordered map, the least recently accessed * entry. This is the entry that will be removed it this * method returns <tt>true</tt>. If the map was empty prior * to the <tt>put</tt> or <tt>putAll</tt> invocation resulting * in this invocation, this will be the entry that was just * inserted; in other words, if the map contains a single * entry, the eldest entry is also the newest. * @return <tt>true</tt> if the eldest entry should be removed * from the map; <tt>false</tt> if it should be retained. */ protected boolean removeEldestEntry(Map.Entry<K,V> eldest) { return false; }
removeEldestEntry方法默认返回false,即默认不移除。因此我们只要在这里加以判断:如果缓存已经占满,返回true,就可以将最近最少使用的对象移除了。因此,通过使用LinkedHashMap,仅需要非常简单的修改即可实现LRU算法。
下面附上LRU的实现代码:
import java.util.LinkedHashMap; import java.util.Collection; import java.util.Map; import java.util.ArrayList; /** * An LRU cache, based on <code>LinkedHashMap</code>. * * <p> * This cache has a fixed maximum number of elements (<code>cacheSize</code>). * If the cache is full and another entry is added, the LRU (least recently * used) entry is dropped. * * <p> * This class is thread-safe. All methods of this class are synchronized. * * <p> * Author: Christian d‘Heureuse, Inventec Informatik AG, Zurich, Switzerland<br> * Multi-licensed: EPL / LGPL / GPL / AL / BSD. */ public class LRUCache<K, V> { private static final float hashTableLoadFactor = 0.75f; private LinkedHashMap<K, V> map; private int cacheSize; /** * Creates a new LRU cache. 在该方法中,new LinkedHashMap<K,V>(hashTableCapacity, * hashTableLoadFactor, true)中,true代表使用访问顺序 * * @param cacheSize * the maximum number of entries that will be kept in this cache. */ public LRUCache(int cacheSize) { this.cacheSize = cacheSize; int hashTableCapacity = (int) Math .ceil(cacheSize / hashTableLoadFactor) + 1; map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) { // (an anonymous inner class) private static final long serialVersionUID = 1; @Override protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { return size() > LRUCache.this.cacheSize; } }; } /** * Retrieves an entry from the cache.<br> * The retrieved entry becomes the MRU (most recently used) entry. * * @param key * the key whose associated value is to be returned. * @return the value associated to this key, or null if no value with this * key exists in the cache. */ public synchronized V get(K key) { return map.get(key); } /** * Adds an entry to this cache. The new entry becomes the MRU (most recently * used) entry. If an entry with the specified key already exists in the * cache, it is replaced by the new entry. If the cache is full, the LRU * (least recently used) entry is removed from the cache. * * @param key * the key with which the specified value is to be associated. * @param value * a value to be associated with the specified key. */ public synchronized void put(K key, V value) { map.put(key, value); } /** * Clears the cache. */ public synchronized void clear() { map.clear(); } /** * Returns the number of used entries in the cache. * * @return the number of entries currently in the cache. */ public synchronized int usedEntries() { return map.size(); } /** * Returns a <code>Collection</code> that contains a copy of all cache * entries. * * @return a <code>Collection</code> with a copy of the cache content. */ public synchronized Collection<Map.Entry<K, V>> getAll() { return new ArrayList<Map.Entry<K, V>>(map.entrySet()); } }
最后,缓存在使用过程中,需要考虑一致性问题。缓存的刷新就是为了保持一致性。具体如何去刷新,需要根据具体的使用场景进行设计。
本文出自 “一梦一程-记录学习点滴” 博客,请务必保留此出处http://jincheng.blog.51cto.com/4625177/1826160
标签:oscache 缓存 源码 并发 lru linkedhashmap
原文地址:http://jincheng.blog.51cto.com/4625177/1826160