首先是一个redis实现的跨jvm的lock,
接着是一个简单封装的工具类,也对pipeline处理进行了几个常用的封装
然后是对应Spring的相关配置
-
public class RedisLock {
-
-
-
public static final String LOCKED = "TRUE";
-
-
public static final long MILLI_NANO_CONVERSION = 1000 * 1000L;
-
-
public static final long DEFAULT_TIME_OUT = 1000;
-
public static final Random RANDOM = new Random();
-
-
public static final int EXPIRE = 3 * 60;
-
-
private ShardedJedisPool shardedJedisPool;
-
private ShardedJedis jedis;
-
private String key;
-
-
private boolean locked = false;
-
-
-
-
-
-
-
public RedisLock(String key, ShardedJedisPool shardedJedisPool) {
-
this.key = key + "_lock";
-
this.shardedJedisPool = shardedJedisPool;
-
this.jedis = this.shardedJedisPool.getResource();
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public boolean lock(long timeout) {
-
long nano = System.nanoTime();
-
timeout *= MILLI_NANO_CONVERSION;
-
try {
-
while ((System.nanoTime() - nano) < timeout) {
-
if (this.jedis.setnx(this.key, LOCKED) == 1) {
-
this.jedis.expire(this.key, EXPIRE);
-
this.locked = true;
-
return this.locked;
-
}
-
-
Thread.sleep(3, RANDOM.nextInt(500));
-
}
-
} catch (Exception e) {
-
throw new RuntimeException("Locking error", e);
-
}
-
return false;
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public boolean lock(long timeout, int expire) {
-
long nano = System.nanoTime();
-
timeout *= MILLI_NANO_CONVERSION;
-
try {
-
while ((System.nanoTime() - nano) < timeout) {
-
if (this.jedis.setnx(this.key, LOCKED) == 1) {
-
this.jedis.expire(this.key, expire);
-
this.locked = true;
-
return this.locked;
-
}
-
-
Thread.sleep(3, RANDOM.nextInt(500));
-
}
-
} catch (Exception e) {
-
throw new RuntimeException("Locking error", e);
-
}
-
return false;
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public boolean lock() {
-
return lock(DEFAULT_TIME_OUT);
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public void unlock() {
-
try {
-
if (this.locked) {
-
this.jedis.del(this.key);
-
}
-
} finally {
-
this.shardedJedisPool.returnResource(this.jedis);
-
}
-
}
-
}
-
-
-
-
-
-
public class RedisUtil {
-
-
-
private ShardedJedisPool shardedJedisPool;
-
-
-
-
-
-
-
-
-
abstract class Executor<T> {
-
-
ShardedJedis jedis;
-
ShardedJedisPool shardedJedisPool;
-
-
public Executor(ShardedJedisPool shardedJedisPool) {
-
this.shardedJedisPool = shardedJedisPool;
-
jedis = this.shardedJedisPool.getResource();
-
}
-
-
-
-
-
-
abstract T execute();
-
-
-
-
-
-
-
public T getResult() {
-
T result = null;
-
try {
-
result = execute();
-
} catch (Throwable e) {
-
throw new RuntimeException("Redis execute exception", e);
-
} finally {
-
if (jedis != null) {
-
shardedJedisPool.returnResource(jedis);
-
}
-
}
-
return result;
-
}
-
}
-
-
-
-
-
-
-
public long delKeysLike(final String likeKey) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
Collection<Jedis> jedisC = jedis.getAllShards();
-
Iterator<Jedis> iter = jedisC.iterator();
-
long count = 0;
-
while (iter.hasNext()) {
-
Jedis _jedis = iter.next();
-
Set<String> keys = _jedis.keys(likeKey + "*");
-
count += _jedis.del(keys.toArray(new String[keys.size()]));
-
}
-
return count;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public Long delKey(final String key) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.del(key);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public Long delKeys(final String[] keys) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
Collection<Jedis> jedisC = jedis.getAllShards();
-
Iterator<Jedis> iter = jedisC.iterator();
-
long count = 0;
-
while (iter.hasNext()) {
-
Jedis _jedis = iter.next();
-
count += _jedis.del(keys);
-
}
-
return count;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public Long expire(final String key, final int expire) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.expire(key, expire);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public long makeId(final String key) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
long id = jedis.incr(key);
-
if ((id + 75807) >= Long.MAX_VALUE) {
-
-
jedis.getSet(key, "0");
-
}
-
return id;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
public String setString(final String key, final String value) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
return jedis.set(key, value);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public String setString(final String key, final String value, final int expire) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
return jedis.setex(key, expire, value);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public Long setStringIfNotExists(final String key, final String value) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.setnx(key, value);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public String getString(final String key) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
return jedis.get(key);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<Object> batchSetString(final List<Pair<String, String>> pairs) {
-
return new Executor<List<Object>>(shardedJedisPool) {
-
-
@Override
-
List<Object> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
for (Pair<String, String> pair : pairs) {
-
pipeline.set(pair.getKey(), pair.getValue());
-
}
-
return pipeline.syncAndReturnAll();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<String> batchGetString(final String[] keys) {
-
return new Executor<List<String>>(shardedJedisPool) {
-
-
@Override
-
List<String> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
List<String> result = new ArrayList<String>(keys.length);
-
List<Response<String>> responses = new ArrayList<Response<String>>(keys.length);
-
for (String key : keys) {
-
responses.add(pipeline.get(key));
-
}
-
pipeline.sync();
-
for (Response<String> resp : responses) {
-
result.add(resp.get());
-
}
-
return result;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
public Long hashSet(final String key, final String field, final String value) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.hset(key, field, value);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
-
-
public Long hashSet(final String key, final String field, final String value, final int expire) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<Long> result = pipeline.hset(key, field, value);
-
pipeline.expire(key, expire);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public String hashGet(final String key, final String field) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
return jedis.hget(key, field);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public String hashGet(final String key, final String field, final int expire) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<String> result = pipeline.hget(key, field);
-
pipeline.expire(key, expire);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public String hashMultipleSet(final String key, final Map<String, String> hash) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
return jedis.hmset(key, hash);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public String hashMultipleSet(final String key, final Map<String, String> hash, final int expire) {
-
return new Executor<String>(shardedJedisPool) {
-
-
@Override
-
String execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<String> result = pipeline.hmset(key, hash);
-
pipeline.expire(key, expire);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public List<String> hashMultipleGet(final String key, final String... fields) {
-
return new Executor<List<String>>(shardedJedisPool) {
-
-
@Override
-
List<String> execute() {
-
return jedis.hmget(key, fields);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
public List<String> hashMultipleGet(final String key, final int expire, final String... fields) {
-
return new Executor<List<String>>(shardedJedisPool) {
-
-
@Override
-
List<String> execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<List<String>> result = pipeline.hmget(key, fields);
-
pipeline.expire(key, expire);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<Object> batchHashMultipleSet(final List<Pair<String, Map<String, String>>> pairs) {
-
return new Executor<List<Object>>(shardedJedisPool) {
-
-
@Override
-
List<Object> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
for (Pair<String, Map<String, String>> pair : pairs) {
-
pipeline.hmset(pair.getKey(), pair.getValue());
-
}
-
return pipeline.syncAndReturnAll();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<Object> batchHashMultipleSet(final Map<String, Map<String, String>> data) {
-
return new Executor<List<Object>>(shardedJedisPool) {
-
-
@Override
-
List<Object> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
for (Map.Entry<String, Map<String, String>> iter : data.entrySet()) {
-
pipeline.hmset(iter.getKey(), iter.getValue());
-
}
-
return pipeline.syncAndReturnAll();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<List<String>> batchHashMultipleGet(final List<Pair<String, String[]>> pairs) {
-
return new Executor<List<List<String>>>(shardedJedisPool) {
-
-
@Override
-
List<List<String>> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
List<List<String>> result = new ArrayList<List<String>>(pairs.size());
-
List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(pairs.size());
-
for (Pair<String, String[]> pair : pairs) {
-
responses.add(pipeline.hmget(pair.getKey(), pair.getValue()));
-
}
-
pipeline.sync();
-
for (Response<List<String>> resp : responses) {
-
result.add(resp.get());
-
}
-
return result;
-
}
-
}.getResult();
-
-
}
-
-
-
-
-
-
-
-
public Map<String, String> hashGetAll(final String key) {
-
return new Executor<Map<String, String>>(shardedJedisPool) {
-
-
@Override
-
Map<String, String> execute() {
-
return jedis.hgetAll(key);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public Map<String, String> hashGetAll(final String key, final int expire) {
-
return new Executor<Map<String, String>>(shardedJedisPool) {
-
-
@Override
-
Map<String, String> execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<Map<String, String>> result = pipeline.hgetAll(key);
-
pipeline.expire(key, expire);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<Map<String, String>> batchHashGetAll(final String... keys) {
-
return new Executor<List<Map<String, String>>>(shardedJedisPool) {
-
-
@Override
-
List<Map<String, String>> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
List<Map<String, String>> result = new ArrayList<Map<String, String>>(keys.length);
-
List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);
-
for (String key : keys) {
-
responses.add(pipeline.hgetAll(key));
-
}
-
pipeline.sync();
-
for (Response<Map<String, String>> resp : responses) {
-
result.add(resp.get());
-
}
-
return result;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public Map<String, Map<String, String>> batchHashGetAllForMap(final String... keys) {
-
return new Executor<Map<String, Map<String, String>>>(shardedJedisPool) {
-
-
@Override
-
Map<String, Map<String, String>> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
-
-
int capacity = 1;
-
while ((int) (capacity * 0.75) <= keys.length) {
-
capacity <<= 1;
-
}
-
Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(capacity);
-
List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);
-
for (String key : keys) {
-
responses.add(pipeline.hgetAll(key));
-
}
-
pipeline.sync();
-
for (int i = 0; i < keys.length; ++i) {
-
result.put(keys[i], responses.get(i).get());
-
}
-
return result;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
public Long listPushTail(final String key, final String... values) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.rpush(key, values);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public Long listPushHead(final String key, final String value) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.lpush(key, value);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public Long listPushHeadAndTrim(final String key, final String value, final long size) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
Response<Long> result = pipeline.lpush(key, value);
-
-
pipeline.ltrim(key, 0, size - 1);
-
pipeline.sync();
-
return result.get();
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public void batchListPushTail(final String key, final String[] values, final boolean delOld) {
-
new Executor<Object>(shardedJedisPool) {
-
-
@Override
-
Object execute() {
-
if (delOld) {
-
RedisLock lock = new RedisLock(key, shardedJedisPool);
-
lock.lock();
-
try {
-
Pipeline pipeline = jedis.getShard(key).pipelined();
-
pipeline.del(key);
-
for (String value : values) {
-
pipeline.rpush(key, value);
-
}
-
pipeline.sync();
-
} finally {
-
lock.unlock();
-
}
-
} else {
-
jedis.rpush(key, values);
-
}
-
return null;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public Object updateListInTransaction(final String key, final List<String> values) {
-
return new Executor<Object>(shardedJedisPool) {
-
-
@Override
-
Object execute() {
-
Transaction transaction = jedis.getShard(key).multi();
-
transaction.del(key);
-
for (String value : values) {
-
transaction.rpush(key, value);
-
}
-
transaction.exec();
-
return null;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public Long insertListIfNotExists(final String key, final String[] values) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
RedisLock lock = new RedisLock(key, shardedJedisPool);
-
lock.lock();
-
try {
-
if (!jedis.exists(key)) {
-
return jedis.rpush(key, values);
-
}
-
} finally {
-
lock.unlock();
-
}
-
return 0L;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public List<String> listGetAll(final String key) {
-
return new Executor<List<String>>(shardedJedisPool) {
-
-
@Override
-
List<String> execute() {
-
return jedis.lrange(key, 0, -1);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
public List<String> listRange(final String key, final long beginIndex, final long endIndex) {
-
return new Executor<List<String>>(shardedJedisPool) {
-
-
@Override
-
List<String> execute() {
-
return jedis.lrange(key, beginIndex, endIndex - 1);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
public Map<String, List<String>> batchGetAllList(final List<String> keys) {
-
return new Executor<Map<String, List<String>>>(shardedJedisPool) {
-
-
@Override
-
Map<String, List<String>> execute() {
-
ShardedJedisPipeline pipeline = jedis.pipelined();
-
Map<String, List<String>> result = new HashMap<String, List<String>>();
-
List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(keys.size());
-
for (String key : keys) {
-
responses.add(pipeline.lrange(key, 0, -1));
-
}
-
pipeline.sync();
-
for (int i = 0; i < keys.size(); ++i) {
-
result.put(keys.get(i), responses.get(i).get());
-
}
-
return result;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
-
public Long publish(final String channel, final String message) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
Jedis _jedis = jedis.getShard(channel);
-
return _jedis.publish(channel, message);
-
}
-
-
}.getResult();
-
}
-
-
-
-
-
-
-
public void subscribe(final JedisPubSub jedisPubSub, final String channel) {
-
new Executor<Object>(shardedJedisPool) {
-
-
@Override
-
Object execute() {
-
Jedis _jedis = jedis.getShard(channel);
-
-
_jedis.subscribe(jedisPubSub, channel);
-
return null;
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
public void unSubscribe(final JedisPubSub jedisPubSub) {
-
jedisPubSub.unsubscribe();
-
}
-
-
-
-
-
-
-
-
-
-
-
public Long addWithSortedSet(final String key, final double score, final String member) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.zadd(key, score, member);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public Long addWithSortedSet(final String key, final Map<Double, String> scoreMembers) {
-
return new Executor<Long>(shardedJedisPool) {
-
-
@Override
-
Long execute() {
-
return jedis.zadd(key, scoreMembers);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
-
-
public Set<String> revrangeByScoreWithSortedSet(final String key, final double max, final double min) {
-
return new Executor<Set<String>>(shardedJedisPool) {
-
-
@Override
-
Set<String> execute() {
-
return jedis.zrevrangeByScore(key, max, min);
-
}
-
}.getResult();
-
}
-
-
-
-
-
-
-
-
public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) {
-
this.shardedJedisPool = shardedJedisPool;
-
}
-
-
-
-
-
-
-
-
public <K, V> Pair<K, V> makePair(K key, V value) {
-
return new Pair<K, V>(key, value);
-
}
-
-
-
-
-
-
-
-
-
public class Pair<K, V> {
-
-
private K key;
-
private V value;
-
-
public Pair(K key, V value) {
-
this.key = key;
-
this.value = value;
-
}
-
-
public K getKey() {
-
return key;
-
}
-
-
public void setKey(K key) {
-
this.key = key;
-
}
-
-
public V getValue() {
-
return value;
-
}
-
-
public void setValue(V value) {
-
this.value = value;
-
}
-
}
-
}
Spring配置文件:
-
<?xml version="1.0" encoding="UTF-8" ?>
-
<beans xmlns="http://www.springframework.org/schema/beans"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
-
xmlns:context="http://www.springframework.org/schema/context"
-
xsi:schemaLocation="http:
-
http:
-
http:
-
http:
-
-
<!-- POOL配置 -->
-
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
-
<property name="maxActive" value="${redis.jedisPoolConfig.maxActive}" />
-
<property name="maxIdle" value="${redis.jedisPoolConfig.maxIdle}" />
-
<property name="maxWait" value="${redis.jedisPoolConfig.maxWait}" />
-
<property name="testOnBorrow" value="${redis.jedisPoolConfig.testOnBorrow}" />
-
</bean>
-
-
<!-- jedis shard信息配置 -->
-
<bean id="jedis.shardInfoCache1" class="redis.clients.jedis.JedisShardInfo">
-
<constructor-arg index="0" value="${redis.jedis.shardInfoCache1.host}" />
-
<constructor-arg index="1" type="int" value="${redis.jedis.shardInfoCache1.port}" />
-
</bean>
-
<bean id="jedis.shardInfoCache2" class="redis.clients.jedis.JedisShardInfo">
-
<constructor-arg index="0" value="${redis.jedis.shardInfoCache2.host}" />
-
<constructor-arg index="1" type="int" value="${redis.jedis.shardInfoCache2.port}" />
-
</bean>
-
-
<!-- jedis shard pool配置 -->
-
<bean id="shardedJedisPoolCache" class="redis.clients.jedis.ShardedJedisPool">
-
<constructor-arg index="0" ref="jedisPoolConfig" />
-
<constructor-arg index="1">
-
<list>
-
<ref bean="jedis.shardInfoCache1" />
-
<ref bean="jedis.shardInfoCache2" />
-
</list>
-
</constructor-arg>
-
</bean>
-
-
<bean id="redisCache" class="com.**.RedisUtil">
-
<property name="shardedJedisPool" ref="shardedJedisPoolCache" />
-
</bean>
-
</beans>