标签:
Xmemcached是一个高性能的基于java nio的memcached客户端。在经过三个RC版本后,正式发布1.10-final版本。
xmemcached特性一览:
1、高性能
2、支持完整的memcached文本协议,二进制协议将在1.2版本实现。
3、支持JMX,可以通过MBean调整性能参数、动态添加/移除server、查看统计等。
4、支持客户端统计
5、支持memcached节点的动态增减。
6、支持memcached分布:余数分布和一致性哈希分布。
7、更多的性能调整选项。
<!--xmemcached config -->
<bean name="xmemcachedClient"
class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean"
destroy-method="shutdown">
<!-- multi service node -->
<property name="servers" value="${memcached.servers}" />
<property name="weights">
<!-- memcached Aggregate -->
<list>
<value>1</value>
</list>
</property>
<property name="connectionPoolSize" value="${memcached.connectionPoolSize}" />
<property name="sessionLocator">
<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator" />
</property>
<property name="transcoder">
<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" />
</property>
<property name="bufferAllocator">
<bean class="net.rubyeye.xmemcached.buffer.SimpleBufferAllocator" />
</property>
<property name="failureMode" value="true" />
</bean>
<!--memcached client -->
<bean name="memcachedClient" class="com.cl.common.tool.MemCachedUtil">
<property name="memcachedClient" ref="xmemcachedClient" />
</bean>
其中各参数的意义:
|
参数 |
含义 |
|
servers |
服务器列表,格式:ip:port |
|
weights |
主机映射:host1对应1号、host2对应2号.. |
|
sessionLocator |
Session 分配器,有自带的,影响分布式 |
|
transcoder |
通信编码方式 |
|
bufferAllocator |
缓冲区分配器 |
注:
默认标准Hash, hash(key) mod server_count (余数分布)
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));MemcachedClient mc = builder.build();
可以改为Consistent Hash(一致性哈希):
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));builder.setSessionLocator(new KetamaMemcachedSessionLocator());
MemcachedClient mc = builder.build();
使用实例
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
/**
* Memcached客户端工具类
*
* @author cl
*/
public class MemCachedUtil {
private static final Logger LOG = LoggerFactory
.getLogger(MemCachedUtil.class);
private MemcachedClient memcachedClient;
private static Long DEFAULT_OP_TIMEOUT = 10000L;// 10min
public static int defaultExpireTime = 86400;// 默认缓存1天:24*60*60=86400
/**
* Memcached是否可用
*
* @return
* @author cl
*/
public boolean isAvaliable() {
if (memcachedClient.isShutdown()) {
LOG.error("memcached客户端已关闭");
return false;
}
Map<InetSocketAddress, String> map = null;
try {
map = memcachedClient.getVersions();
} catch (Exception e) {
LOG.error("获取memcached server version时异常", e);
}
if (map == null || map.size() == 0) {
LOG.error("当前没有可用的memcached server");
return false;
}
return true;
}
/**
* @param key
* @return
* @author cl
*/
public Object getValue(String key) {
Object value = null;
Assert.notNull(key);
try {
value = memcachedClient.get(key);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return value;
}
/**
* @param key
* @param t
* @return
* @author cl
*/
public <T> T getValue(String key, Class<T> t) {
T value = null;
Assert.notNull(key);
try {
value = this.memcachedClient.get(key);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return value;
}
/**
* 在cache中保存value
*
* @param key
* @param value
* @author cl
*/
public void setValue(String key, Object value) {
Assert.notNull(key, "cache key is null.");
try {
memcachedClient.set(key, defaultExpireTime, value);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
}
/**
* 在cache中保存value
*
* @param key
* @param value
* @param exp
* 表示被保存的时长,单位:秒
* @author cl
*/
public void setValue(String key, int exp, Object value) {
Assert.notNull(key, "cache key is null.");
Assert.notNull(exp > 0, "exp must greate than zero.");
try {
memcachedClient.set(key, exp, value);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
}
/**
* 删除cache保存的value
*
* @param key
* @return
* @author cl
*/
public Boolean remove(String key) {
try {
return memcachedClient.delete(key, DEFAULT_OP_TIMEOUT);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return Boolean.FALSE;
}
/**
* 删除cache保存的value,设置超时时间
*
* @param key
* @param opTimeout
* @return
* @author cl
*/
public Boolean remove(String key, Long opTimeout) {
try {
return memcachedClient.delete(key, opTimeout);
} catch (TimeoutException e) {
LOG.error("Cache TimeoutException", e);
} catch (InterruptedException e) {
LOG.error("Cache InterruptedException", e);
} catch (MemcachedException e) {
LOG.error("Cache MemcachedException", e);
}
return Boolean.FALSE;
}
public MemcachedClient getMemcachedClient() {
return memcachedClient;
}
public void setMemcachedClient(MemcachedClient memcachedClient) {
this.memcachedClient = memcachedClient;
}
}为了将N个前端数据同步,通过Memcached完成数据打通,但带来了一些新问题:

rivate MemcachedClientBuilder createMemcachedClientBuilder(
Properties properties) {
String addresses = properties.getProperty(ADDRESSES).trim();
if (logger.isInfoEnabled()) {
logger.info("Configure Properties:[addresses = " + addresses + "]");
}
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(addresses));
// 使用二进制文件
builder.setCommandFactory(new BinaryCommandFactory());
// 使用一致性哈希算法(Consistent Hash Strategy)
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
// 使用序列化传输编码
builder.setTranscoder(new SerializingTranscoder());
// 进行数据压缩,大于1KB时进行压缩
builder.getTranscoder().setCompressionThreshold(1024);
return builder;
}
为了应对上述情况,做了如下调整:
用Memcached的add方法,就可以很快速的解决问题。不需要很繁琐的开发,也不需要依赖数据库记录,完全内存操作。![]()
以下实现一个判定冲突的方法:
**
* 冲突延时 1秒
*/
public static final int MUTEX_EXP = 1;
/**
* 冲突键
*/
public static final String MUTEX_KEY_PREFIX = "MUTEX_";
/**
* 冲突判定
*
* @param key
*/
public boolean isMutex(String key) {
return isMutex(key, MUTEX_EXP);
}
/**
* 冲突判定
*
* @param key
* @param exp
* @return true 冲突
*/
public boolean isMutex(String key, int exp) {
boolean status = true;
try {
if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {
status = false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return status;
}
做个说明:
| 选项 | 说明 |
| add | 仅当存储空间中不存在键相同的数据时才保存 |
| replace | 仅当存储空间中存在键相同的数据时才保存 |
| set | 与add和replace不同,无论何时都保存 |
也就是说,如果add操作返回为true,则认为当前不冲突!![]()
回归场景,恶意用户1秒钟操作6次,遇到上述这个方法,只有乖乖地1秒后再来。别小看这1秒钟,一个数据库操作不过几毫秒。1秒延迟,足以降低系统负载,增加恶意用户成本。
附我用到的基于XMemcached实现:
import net.rubyeye.xmemcached.MemcachedClient;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author Snowolf
* @version 1.0
* @since 1.0
*/
@Component
public class MemcachedManager {
/**
* 缓存时效 1天
*/
public static final int CACHE_EXP_DAY = 3600 * 24;
/**
* 缓存时效 1周
*/
public static final int CACHE_EXP_WEEK = 3600 * 24 * 7;
/**
* 缓存时效 1月
*/
public static final int CACHE_EXP_MONTH = 3600 * 24 * 30 * 7;
/**
* 缓存时效 永久
*/
public static final int CACHE_EXP_FOREVER = 0;
/**
* 冲突延时 1秒
*/
public static final int MUTEX_EXP = 1;
/**
* 冲突键
*/
public static final String MUTEX_KEY_PREFIX = "MUTEX_";
/**
* Logger for this class
*/
private static final Logger logger = Logger
.getLogger(MemcachedManager.class);
/**
* Memcached Client
*/
@Autowired
private MemcachedClient memcachedClient;
/**
* 缓存
*
* @param key
* @param value
* @param exp
* 失效时间
*/
public void cacheObject(String key, Object value, int exp) {
try {
memcachedClient.set(key, exp, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Cache Object: [" + key + "]");
}
/**
* Shut down the Memcached Cilent.
*/
public void finalize() {
if (memcachedClient != null) {
try {
if (!memcachedClient.isShutdown()) {
memcachedClient.shutdown();
logger.debug("Shutdown MemcachedManager...");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* 清理对象
*
* @param key
*/
public void flushObject(String key) {
try {
memcachedClient.deleteWithNoReply(key);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Flush Object: [" + key + "]");
}
/**
* 冲突判定
*
* @param key
*/
public boolean isMutex(String key) {
return isMutex(key, MUTEX_EXP);
}
/**
* 冲突判定
*
* @param key
* @param exp
* @return true 冲突
*/
public boolean isMutex(String key, int exp) {
boolean status = true;
try {
if (memcachedClient.add(MUTEX_KEY_PREFIX + key, exp, "true")) {
status = false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return status;
}
/**
* 加载缓存对象
*
* @param key
* @return
*/
public <T> T loadObject(String key) {
T object = null;
try {
object = memcachedClient.<T> get(key);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("Load Object: [" + key + "]");
return object;
}
}标签:
原文地址:http://blog.csdn.net/clypm/article/details/51480571