标签:
Xmemcached是一个高性能的基于java nio的memcached客户端。在经过三个RC版本后,正式发布1.10-final版本。
xmemcached特性一览:
1、高性能
2、支持完整的memcached文本协议,二进制协议将在1.2版本实现。
3、支持JMX,可以通过MBean调整性能参数、动态添加/移除server、查看统计等。
4、支持客户端统计
5、支持memcached节点的动态增减。
6、支持memcached分布:余数分布和一致性哈希分布。
7、更多的性能调整选项。
<!--xmemcached config --> <bean name="xmemcachedClient" class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean" destroy-method="shutdown"> <!-- multi service node --> <property name="servers" value="${memcached.servers}" /> <property name="weights"> <!-- memcached Aggregate --> <list> <value>1</value> </list> </property> <property name="connectionPoolSize" value="${memcached.connectionPoolSize}" /> <property name="sessionLocator"> <bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator" /> </property> <property name="transcoder"> <bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" /> </property> <property name="bufferAllocator"> <bean class="net.rubyeye.xmemcached.buffer.SimpleBufferAllocator" /> </property> <property name="failureMode" value="true" /> </bean> <!--memcached client --> <bean name="memcachedClient" class="com.cl.common.tool.MemCachedUtil"> <property name="memcachedClient" ref="xmemcachedClient" /> </bean>
其中各参数的意义:
参数 |
含义 |
servers |
服务器列表,格式:ip:port |
weights |
主机映射:host1对应1号、host2对应2号.. |
sessionLocator |
Session 分配器,有自带的,影响分布式 |
transcoder |
通信编码方式 |
bufferAllocator |
缓冲区分配器 |
注:
默认标准Hash, hash(key) mod server_count (余数分布)
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
MemcachedClient mc = builder.build();
可以改为Consistent Hash(一致性哈希):
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
MemcachedClient mc = builder.build();
使用实例
import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.TimeoutException; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.exception.MemcachedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; /** * Memcached客户端工具类 * * @author cl */ public class MemCachedUtil { private static final Logger LOG = LoggerFactory .getLogger(MemCachedUtil.class); private MemcachedClient memcachedClient; private static Long DEFAULT_OP_TIMEOUT = 10000L;// 10min public static int defaultExpireTime = 86400;// 默认缓存1天:24*60*60=86400 /** * Memcached是否可用 * * @return * @author cl */ public boolean isAvaliable() { if (memcachedClient.isShutdown()) { LOG.error("memcached客户端已关闭"); return false; } Map<InetSocketAddress, String> map = null; try { map = memcachedClient.getVersions(); } catch (Exception e) { LOG.error("获取memcached server version时异常", e); } if (map == null || map.size() == 0) { LOG.error("当前没有可用的memcached server"); return false; } return true; } /** * @param key * @return * @author cl */ public Object getValue(String key) { Object value = null; Assert.notNull(key); try { value = memcachedClient.get(key); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } return value; } /** * @param key * @param t * @return * @author cl */ public <T> T getValue(String key, Class<T> t) { T value = null; Assert.notNull(key); try { value = this.memcachedClient.get(key); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } return value; } /** * 在cache中保存value * * @param key * @param value * @author cl */ public void setValue(String key, Object value) { Assert.notNull(key, "cache key is null."); try { memcachedClient.set(key, defaultExpireTime, value); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } } /** * 在cache中保存value * * @param key * @param value * @param exp * 表示被保存的时长,单位:秒 * @author cl */ public void setValue(String key, int exp, Object value) { Assert.notNull(key, "cache key is null."); Assert.notNull(exp > 0, "exp must greate than zero."); try { memcachedClient.set(key, exp, value); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } } /** * 删除cache保存的value * * @param key * @return * @author cl */ public Boolean remove(String key) { try { return memcachedClient.delete(key, DEFAULT_OP_TIMEOUT); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } return Boolean.FALSE; } /** * 删除cache保存的value,设置超时时间 * * @param key * @param opTimeout * @return * @author cl */ public Boolean remove(String key, Long opTimeout) { try { return memcachedClient.delete(key, opTimeout); } catch (TimeoutException e) { LOG.error("Cache TimeoutException", e); } catch (InterruptedException e) { LOG.error("Cache InterruptedException", e); } catch (MemcachedException e) { LOG.error("Cache MemcachedException", e); } return Boolean.FALSE; } public MemcachedClient getMemcachedClient() { return memcachedClient; } public void setMemcachedClient(MemcachedClient memcachedClient) { this.memcachedClient = memcachedClient; } }
为了将N个前端数据同步,通过Memcached完成数据打通,但带来了一些新问题:
rivate MemcachedClientBuilder createMemcachedClientBuilder( Properties properties) { String addresses = properties.getProperty(ADDRESSES).trim(); if (logger.isInfoEnabled()) { logger.info("Configure Properties:[addresses = " + addresses + "]"); } MemcachedClientBuilder builder = new XMemcachedClientBuilder( AddrUtil.getAddresses(addresses)); // 使用二进制文件 builder.setCommandFactory(new BinaryCommandFactory()); // 使用一致性哈希算法(Consistent Hash Strategy) builder.setSessionLocator(new KetamaMemcachedSessionLocator()); // 使用序列化传输编码 builder.setTranscoder(new SerializingTranscoder()); // 进行数据压缩,大于1KB时进行压缩 builder.getTranscoder().setCompressionThreshold(1024); return builder; }
为了应对上述情况,做了如下调整:
用Memcached的add方法,就可以很快速的解决问题。不需要很繁琐的开发,也不需要依赖数据库记录,完全内存操作。
以下实现一个判定冲突的方法:
** * 冲突延时 1秒 */ public static final int MUTEX_EXP = 1; /** * 冲突键 */ public static final String MUTEX_KEY_PREFIX = "MUTEX_"; /** * 冲突判定 * * @param key */ public boolean isMutex(String key) { return isMutex(key, MUTEX_EXP); } /** * 冲突判定 * * @param key * @param exp * @return true 冲突 */ public boolean isMutex(String key, int exp) { boolean status = true; try { if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) { status = false; } } catch (Exception e) { logger.error(e.getMessage(), e); } return status; }
做个说明:
选项 | 说明 |
add | 仅当存储空间中不存在键相同的数据时才保存 |
replace | 仅当存储空间中存在键相同的数据时才保存 |
set | 与add和replace不同,无论何时都保存 |
也就是说,如果add操作返回为true,则认为当前不冲突!
回归场景,恶意用户1秒钟操作6次,遇到上述这个方法,只有乖乖地1秒后再来。别小看这1秒钟,一个数据库操作不过几毫秒。1秒延迟,足以降低系统负载,增加恶意用户成本。
附我用到的基于XMemcached实现:
import net.rubyeye.xmemcached.MemcachedClient; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * * @author Snowolf * @version 1.0 * @since 1.0 */ @Component public class MemcachedManager { /** * 缓存时效 1天 */ public static final int CACHE_EXP_DAY = 3600 * 24; /** * 缓存时效 1周 */ public static final int CACHE_EXP_WEEK = 3600 * 24 * 7; /** * 缓存时效 1月 */ public static final int CACHE_EXP_MONTH = 3600 * 24 * 30 * 7; /** * 缓存时效 永久 */ public static final int CACHE_EXP_FOREVER = 0; /** * 冲突延时 1秒 */ public static final int MUTEX_EXP = 1; /** * 冲突键 */ public static final String MUTEX_KEY_PREFIX = "MUTEX_"; /** * Logger for this class */ private static final Logger logger = Logger .getLogger(MemcachedManager.class); /** * Memcached Client */ @Autowired private MemcachedClient memcachedClient; /** * 缓存 * * @param key * @param value * @param exp * 失效时间 */ public void cacheObject(String key, Object value, int exp) { try { memcachedClient.set(key, exp, value); } catch (Exception e) { logger.error(e.getMessage(), e); } logger.info("Cache Object: [" + key + "]"); } /** * Shut down the Memcached Cilent. */ public void finalize() { if (memcachedClient != null) { try { if (!memcachedClient.isShutdown()) { memcachedClient.shutdown(); logger.debug("Shutdown MemcachedManager..."); } } catch (Exception e) { logger.error(e.getMessage(), e); } } } /** * 清理对象 * * @param key */ public void flushObject(String key) { try { memcachedClient.deleteWithNoReply(key); } catch (Exception e) { logger.error(e.getMessage(), e); } logger.info("Flush Object: [" + key + "]"); } /** * 冲突判定 * * @param key */ public boolean isMutex(String key) { return isMutex(key, MUTEX_EXP); } /** * 冲突判定 * * @param key * @param exp * @return true 冲突 */ public boolean isMutex(String key, int exp) { boolean status = true; try { if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) { status = false; } } catch (Exception e) { logger.error(e.getMessage(), e); } return status; } /** * 加载缓存对象 * * @param key * @return */ public <T> T loadObject(String key) { T object = null; try { object = memcachedClient.<T> get(key); } catch (Exception e) { logger.error(e.getMessage(), e); } logger.info("Load Object: [" + key + "]"); return object; } }
标签:
原文地址:http://blog.csdn.net/clypm/article/details/51480571