码迷,mamicode.com
首页 > 编程语言 > 详细

Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁

时间:2016-07-06 11:57:12      阅读:535      评论:0      收藏:0      [点我收藏+]

标签:

Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁

    最近项目中使用redis,学习了一下,client端使用jedis-2.1.0 

首先是一个redis实现的跨jvm的lock, 
接着是一个简单封装的工具类,也对pipeline处理进行了几个常用的封装 
然后是对应Spring的相关配置 

Java代码  技术分享
  1. public class RedisLock {  
  2.   
  3.     /** 加锁标志 */  
  4.     public static final String LOCKED = "TRUE";  
  5.     /** 毫秒与毫微秒的换算单位 1毫秒 = 1000000毫微秒 */  
  6.     public static final long MILLI_NANO_CONVERSION = 1000 * 1000L;  
  7.     /** 默认超时时间(毫秒) */  
  8.     public static final long DEFAULT_TIME_OUT = 1000;  
  9.     public static final Random RANDOM = new Random();  
  10.     /** 锁的超时时间(秒),过期删除 */  
  11.     public static final int EXPIRE = 3 * 60;  
  12.   
  13.     private ShardedJedisPool shardedJedisPool;  
  14.     private ShardedJedis jedis;  
  15.     private String key;  
  16.     // 锁状态标志  
  17.     private boolean locked = false;  
  18.   
  19.     /** 
  20.      * This creates a RedisLock 
  21.      * @param key key 
  22.      * @param shardedJedisPool 数据源 
  23.      */  
  24.     public RedisLock(String key, ShardedJedisPool shardedJedisPool) {  
  25.         this.key = key + "_lock";  
  26.         this.shardedJedisPool = shardedJedisPool;  
  27.         this.jedis = this.shardedJedisPool.getResource();  
  28.     }  
  29.   
  30.     /** 
  31.      * 加锁 
  32.      * 应该以: 
  33.      * lock(); 
  34.      * try { 
  35.      *      doSomething(); 
  36.      * } finally { 
  37.      *      unlock(); 
  38.      * } 
  39.      * 的方式调用  
  40.      * @param timeout 超时时间 
  41.      * @return 成功或失败标志 
  42.      */  
  43.     public boolean lock(long timeout) {  
  44.         long nano = System.nanoTime();  
  45.         timeout *= MILLI_NANO_CONVERSION;  
  46.         try {  
  47.             while ((System.nanoTime() - nano) < timeout) {  
  48.                 if (this.jedis.setnx(this.key, LOCKED) == 1) {  
  49.                     this.jedis.expire(this.key, EXPIRE);  
  50.                     this.locked = true;  
  51.                     return this.locked;  
  52.                 }  
  53.                 // 短暂休眠,避免出现活锁  
  54.                 Thread.sleep(3, RANDOM.nextInt(500));  
  55.             }  
  56.         } catch (Exception e) {  
  57.             throw new RuntimeException("Locking error", e);  
  58.         }  
  59.         return false;  
  60.     }  
  61.   
  62.     /** 
  63.      * 加锁 
  64.      * 应该以: 
  65.      * lock(); 
  66.      * try { 
  67.      *      doSomething(); 
  68.      * } finally { 
  69.      *      unlock(); 
  70.      * } 
  71.      * 的方式调用 
  72.      * @param timeout 超时时间 
  73.      * @param expire 锁的超时时间(秒),过期删除 
  74.      * @return 成功或失败标志 
  75.      */  
  76.     public boolean lock(long timeout, int expire) {  
  77.         long nano = System.nanoTime();  
  78.         timeout *= MILLI_NANO_CONVERSION;  
  79.         try {  
  80.             while ((System.nanoTime() - nano) < timeout) {  
  81.                 if (this.jedis.setnx(this.key, LOCKED) == 1) {  
  82.                     this.jedis.expire(this.key, expire);  
  83.                     this.locked = true;  
  84.                     return this.locked;  
  85.                 }  
  86.                 // 短暂休眠,避免出现活锁  
  87.                 Thread.sleep(3, RANDOM.nextInt(500));  
  88.             }  
  89.         } catch (Exception e) {  
  90.             throw new RuntimeException("Locking error", e);  
  91.         }  
  92.         return false;  
  93.     }  
  94.   
  95.     /** 
  96.      * 加锁 
  97.      * 应该以: 
  98.      * lock(); 
  99.      * try { 
  100.      *      doSomething(); 
  101.      * } finally { 
  102.      *      unlock(); 
  103.      * } 
  104.      * 的方式调用 
  105.      * @return 成功或失败标志 
  106.      */  
  107.     public boolean lock() {  
  108.         return lock(DEFAULT_TIME_OUT);  
  109.     }  
  110.   
  111.     /** 
  112.      * 解锁 
  113.      * 无论是否加锁成功,都需要调用unlock 
  114.      * 应该以: 
  115.      * lock(); 
  116.      * try { 
  117.      *      doSomething(); 
  118.      * } finally { 
  119.      *      unlock(); 
  120.      * } 
  121.      * 的方式调用 
  122.      */  
  123.     public void unlock() {  
  124.         try {  
  125.             if (this.locked) {  
  126.                 this.jedis.del(this.key);  
  127.             }  
  128.         } finally {  
  129.             this.shardedJedisPool.returnResource(this.jedis);  
  130.         }  
  131.     }  
  132. }  


Java代码  技术分享
  1. /** 
  2.  * 内存数据库Redis的辅助类,负责对内存数据库的所有操作 
  3.  * @version V1.0 
  4.  * @author fengjc 
  5.  */  
  6. public class RedisUtil {  
  7.   
  8.     // 数据源  
  9.     private ShardedJedisPool shardedJedisPool;  
  10.   
  11.     /** 
  12.      * 执行器,{@link com.futurefleet.framework.base.redis.RedisUtil}的辅助类, 
  13.      * 它保证在执行操作之后释放数据源returnResource(jedis) 
  14.      * @version V1.0 
  15.      * @author fengjc 
  16.      * @param <T> 
  17.      */  
  18.     abstract class Executor<T> {  
  19.   
  20.         ShardedJedis jedis;  
  21.         ShardedJedisPool shardedJedisPool;  
  22.   
  23.         public Executor(ShardedJedisPool shardedJedisPool) {  
  24.             this.shardedJedisPool = shardedJedisPool;  
  25.             jedis = this.shardedJedisPool.getResource();  
  26.         }  
  27.   
  28.         /** 
  29.          * 回调 
  30.          * @return 执行结果 
  31.          */  
  32.         abstract T execute();  
  33.   
  34.         /** 
  35.          * 调用{@link #execute()}并返回执行结果 
  36.          * 它保证在执行{@link #execute()}之后释放数据源returnResource(jedis) 
  37.          * @return 执行结果 
  38.          */  
  39.         public T getResult() {  
  40.             T result = null;  
  41.             try {  
  42.                 result = execute();  
  43.             } catch (Throwable e) {  
  44.                 throw new RuntimeException("Redis execute exception", e);  
  45.             } finally {  
  46.                 if (jedis != null) {  
  47.                     shardedJedisPool.returnResource(jedis);  
  48.                 }  
  49.             }  
  50.             return result;  
  51.         }  
  52.     }  
  53.   
  54.     /** 
  55.      * 删除模糊匹配的key 
  56.      * @param likeKey 模糊匹配的key 
  57.      * @return 删除成功的条数 
  58.      */  
  59.     public long delKeysLike(final String likeKey) {  
  60.         return new Executor<Long>(shardedJedisPool) {  
  61.   
  62.             @Override  
  63.             Long execute() {  
  64.                 Collection<Jedis> jedisC = jedis.getAllShards();  
  65.                 Iterator<Jedis> iter = jedisC.iterator();  
  66.                 long count = 0;  
  67.                 while (iter.hasNext()) {  
  68.                     Jedis _jedis = iter.next();  
  69.                     Set<String> keys = _jedis.keys(likeKey + "*");  
  70.                     count += _jedis.del(keys.toArray(new String[keys.size()]));  
  71.                 }  
  72.                 return count;  
  73.             }  
  74.         }.getResult();  
  75.     }  
  76.   
  77.     /** 
  78.      * 删除 
  79.      * @param key 匹配的key 
  80.      * @return 删除成功的条数 
  81.      */  
  82.     public Long delKey(final String key) {  
  83.         return new Executor<Long>(shardedJedisPool) {  
  84.   
  85.             @Override  
  86.             Long execute() {  
  87.                 return jedis.del(key);  
  88.             }  
  89.         }.getResult();  
  90.     }  
  91.   
  92.     /** 
  93.      * 删除 
  94.      * @param keys 匹配的key的集合 
  95.      * @return 删除成功的条数 
  96.      */  
  97.     public Long delKeys(final String[] keys) {  
  98.         return new Executor<Long>(shardedJedisPool) {  
  99.   
  100.             @Override  
  101.             Long execute() {  
  102.                 Collection<Jedis> jedisC = jedis.getAllShards();  
  103.                 Iterator<Jedis> iter = jedisC.iterator();  
  104.                 long count = 0;  
  105.                 while (iter.hasNext()) {  
  106.                     Jedis _jedis = iter.next();  
  107.                     count += _jedis.del(keys);  
  108.                 }  
  109.                 return count;  
  110.             }  
  111.         }.getResult();  
  112.     }  
  113.   
  114.     /** 
  115.      * 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。 
  116.      * 在 Redis 中,带有生存时间的 key 被称为『可挥发』(volatile)的。 
  117.      * @param key key 
  118.      * @param expire 生命周期,单位为秒 
  119.      * @return 1: 设置成功 0: 已经超时或key不存在 
  120.      */  
  121.     public Long expire(final String key, final int expire) {  
  122.         return new Executor<Long>(shardedJedisPool) {  
  123.   
  124.             @Override  
  125.             Long execute() {  
  126.                 return jedis.expire(key, expire);  
  127.             }  
  128.         }.getResult();  
  129.     }  
  130.   
  131.     /** 
  132.      * 一个跨jvm的id生成器,利用了redis原子性操作的特点 
  133.      * @param key id的key 
  134.      * @return 返回生成的Id 
  135.      */  
  136.     public long makeId(final String key) {  
  137.         return new Executor<Long>(shardedJedisPool) {  
  138.   
  139.             @Override  
  140.             Long execute() {  
  141.                 long id = jedis.incr(key);  
  142.                 if ((id + 75807) >= Long.MAX_VALUE) {  
  143.                     // 避免溢出,重置,getSet命令之前允许incr插队,75807就是预留的插队空间  
  144.                     jedis.getSet(key, "0");  
  145.                 }  
  146.                 return id;  
  147.             }  
  148.         }.getResult();  
  149.     }  
  150.   
  151.     /* ======================================Strings====================================== */  
  152.   
  153.     /** 
  154.      * 将字符串值 value 关联到 key 。 
  155.      * 如果 key 已经持有其他值, setString 就覆写旧值,无视类型。 
  156.      * 对于某个原本带有生存时间(TTL)的键来说, 当 setString 成功在这个键上执行时, 这个键原有的 TTL 将被清除。 
  157.      * 时间复杂度:O(1) 
  158.      * @param key key 
  159.      * @param value string value 
  160.      * @return 在设置操作成功完成时,才返回 OK 。 
  161.      */  
  162.     public String setString(final String key, final String value) {  
  163.         return new Executor<String>(shardedJedisPool) {  
  164.   
  165.             @Override  
  166.             String execute() {  
  167.                 return jedis.set(key, value);  
  168.             }  
  169.         }.getResult();  
  170.     }  
  171.   
  172.     /** 
  173.      * 将值 value 关联到 key ,并将 key 的生存时间设为 expire (以秒为单位)。 
  174.      * 如果 key 已经存在, 将覆写旧值。 
  175.      * 类似于以下两个命令: 
  176.      * SET key value 
  177.      * EXPIRE key expire # 设置生存时间 
  178.      * 不同之处是这个方法是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成,在 Redis 用作缓存时,非常实用。 
  179.      * 时间复杂度:O(1) 
  180.      * @param key key 
  181.      * @param value string value 
  182.      * @param expire 生命周期 
  183.      * @return 设置成功时返回 OK 。当 expire 参数不合法时,返回一个错误。 
  184.      */  
  185.     public String setString(final String key, final String value, final int expire) {  
  186.         return new Executor<String>(shardedJedisPool) {  
  187.   
  188.             @Override  
  189.             String execute() {  
  190.                 return jedis.setex(key, expire, value);  
  191.             }  
  192.         }.getResult();  
  193.     }  
  194.   
  195.     /** 
  196.      * 将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 setStringIfNotExists 不做任何动作。 
  197.      * 时间复杂度:O(1) 
  198.      * @param key key 
  199.      * @param value string value 
  200.      * @return 设置成功,返回 1 。设置失败,返回 0 。 
  201.      */  
  202.     public Long setStringIfNotExists(final String key, final String value) {  
  203.         return new Executor<Long>(shardedJedisPool) {  
  204.   
  205.             @Override  
  206.             Long execute() {  
  207.                 return jedis.setnx(key, value);  
  208.             }  
  209.         }.getResult();  
  210.     }  
  211.   
  212.     /** 
  213.      * 返回 key 所关联的字符串值。如果 key 不存在那么返回特殊值 nil 。 
  214.      * 假如 key 储存的值不是字符串类型,返回一个错误,因为 getString 只能用于处理字符串值。 
  215.      * 时间复杂度: O(1) 
  216.      * @param key key 
  217.      * @return 当 key 不存在时,返回 nil ,否则,返回 key 的值。如果 key 不是字符串类型,那么返回一个错误。 
  218.      */  
  219.     public String getString(final String key) {  
  220.         return new Executor<String>(shardedJedisPool) {  
  221.   
  222.             @Override  
  223.             String execute() {  
  224.                 return jedis.get(key);  
  225.             }  
  226.         }.getResult();  
  227.     }  
  228.   
  229.     /** 
  230.      * 批量的 {@link #setString(String, String)} 
  231.      * @param pairs 键值对数组{数组第一个元素为key,第二个元素为value} 
  232.      * @return 操作状态的集合 
  233.      */  
  234.     public List<Object> batchSetString(final List<Pair<String, String>> pairs) {  
  235.         return new Executor<List<Object>>(shardedJedisPool) {  
  236.   
  237.             @Override  
  238.             List<Object> execute() {  
  239.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  240.                 for (Pair<String, String> pair : pairs) {  
  241.                     pipeline.set(pair.getKey(), pair.getValue());  
  242.                 }  
  243.                 return pipeline.syncAndReturnAll();  
  244.             }  
  245.         }.getResult();  
  246.     }  
  247.   
  248.     /** 
  249.      * 批量的 {@link #getString(String)} 
  250.      * @param keys key数组 
  251.      * @return value的集合 
  252.      */  
  253.     public List<String> batchGetString(final String[] keys) {  
  254.         return new Executor<List<String>>(shardedJedisPool) {  
  255.   
  256.             @Override  
  257.             List<String> execute() {  
  258.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  259.                 List<String> result = new ArrayList<String>(keys.length);  
  260.                 List<Response<String>> responses = new ArrayList<Response<String>>(keys.length);  
  261.                 for (String key : keys) {  
  262.                     responses.add(pipeline.get(key));  
  263.                 }  
  264.                 pipeline.sync();  
  265.                 for (Response<String> resp : responses) {  
  266.                     result.add(resp.get());  
  267.                 }  
  268.                 return result;  
  269.             }  
  270.         }.getResult();  
  271.     }  
  272.   
  273.     /* ======================================Hashes====================================== */  
  274.   
  275.     /** 
  276.      * 将哈希表 key 中的域 field 的值设为 value 。 
  277.      * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。 
  278.      * 如果域 field 已经存在于哈希表中,旧值将被覆盖。 
  279.      * 时间复杂度: O(1) 
  280.      * @param key key 
  281.      * @param field 域 
  282.      * @param value string value 
  283.      * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。 
  284.      */  
  285.     public Long hashSet(final String key, final String field, final String value) {  
  286.         return new Executor<Long>(shardedJedisPool) {  
  287.   
  288.             @Override  
  289.             Long execute() {  
  290.                 return jedis.hset(key, field, value);  
  291.             }  
  292.         }.getResult();  
  293.     }  
  294.   
  295.     /** 
  296.      * 将哈希表 key 中的域 field 的值设为 value 。 
  297.      * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。 
  298.      * 如果域 field 已经存在于哈希表中,旧值将被覆盖。 
  299.      * @param key key 
  300.      * @param field 域 
  301.      * @param value string value 
  302.      * @param expire 生命周期,单位为秒 
  303.      * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。 
  304.      */  
  305.     public Long hashSet(final String key, final String field, final String value, final int expire) {  
  306.         return new Executor<Long>(shardedJedisPool) {  
  307.   
  308.             @Override  
  309.             Long execute() {  
  310.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  311.                 Response<Long> result = pipeline.hset(key, field, value);  
  312.                 pipeline.expire(key, expire);  
  313.                 pipeline.sync();  
  314.                 return result.get();  
  315.             }  
  316.         }.getResult();  
  317.     }  
  318.   
  319.     /** 
  320.      * 返回哈希表 key 中给定域 field 的值。 
  321.      * 时间复杂度:O(1) 
  322.      * @param key key 
  323.      * @param field 域 
  324.      * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。 
  325.      */  
  326.     public String hashGet(final String key, final String field) {  
  327.         return new Executor<String>(shardedJedisPool) {  
  328.   
  329.             @Override  
  330.             String execute() {  
  331.                 return jedis.hget(key, field);  
  332.             }  
  333.         }.getResult();  
  334.     }  
  335.   
  336.     /** 
  337.      * 返回哈希表 key 中给定域 field 的值。 如果哈希表 key 存在,同时设置这个 key 的生存时间 
  338.      * @param key key 
  339.      * @param field 域 
  340.      * @param expire 生命周期,单位为秒 
  341.      * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。 
  342.      */  
  343.     public String hashGet(final String key, final String field, final int expire) {  
  344.         return new Executor<String>(shardedJedisPool) {  
  345.   
  346.             @Override  
  347.             String execute() {  
  348.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  349.                 Response<String> result = pipeline.hget(key, field);  
  350.                 pipeline.expire(key, expire);  
  351.                 pipeline.sync();  
  352.                 return result.get();  
  353.             }  
  354.         }.getResult();  
  355.     }  
  356.   
  357.     /** 
  358.      * 同时将多个 field-value (域-值)对设置到哈希表 key 中。 
  359.      * 时间复杂度: O(N) (N为fields的数量) 
  360.      * @param key key 
  361.      * @param hash field-value的map 
  362.      * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。 
  363.      */  
  364.     public String hashMultipleSet(final String key, final Map<String, String> hash) {  
  365.         return new Executor<String>(shardedJedisPool) {  
  366.   
  367.             @Override  
  368.             String execute() {  
  369.                 return jedis.hmset(key, hash);  
  370.             }  
  371.         }.getResult();  
  372.     }  
  373.   
  374.     /** 
  375.      * 同时将多个 field-value (域-值)对设置到哈希表 key 中。同时设置这个 key 的生存时间 
  376.      * @param key key 
  377.      * @param hash field-value的map 
  378.      * @param expire 生命周期,单位为秒 
  379.      * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。 
  380.      */  
  381.     public String hashMultipleSet(final String key, final Map<String, String> hash, final int expire) {  
  382.         return new Executor<String>(shardedJedisPool) {  
  383.   
  384.             @Override  
  385.             String execute() {  
  386.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  387.                 Response<String> result = pipeline.hmset(key, hash);  
  388.                 pipeline.expire(key, expire);  
  389.                 pipeline.sync();  
  390.                 return result.get();  
  391.             }  
  392.         }.getResult();  
  393.     }  
  394.   
  395.     /** 
  396.      * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。 
  397.      * 时间复杂度: O(N) (N为fields的数量) 
  398.      * @param key key 
  399.      * @param fields field的数组 
  400.      * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。 
  401.      */  
  402.     public List<String> hashMultipleGet(final String key, final String... fields) {  
  403.         return new Executor<List<String>>(shardedJedisPool) {  
  404.   
  405.             @Override  
  406.             List<String> execute() {  
  407.                 return jedis.hmget(key, fields);  
  408.             }  
  409.         }.getResult();  
  410.     }  
  411.   
  412.     /** 
  413.      * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。 
  414.      * 同时设置这个 key 的生存时间 
  415.      * @param key key 
  416.      * @param fields field的数组 
  417.      * @param expire 生命周期,单位为秒 
  418.      * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。 
  419.      */  
  420.     public List<String> hashMultipleGet(final String key, final int expire, final String... fields) {  
  421.         return new Executor<List<String>>(shardedJedisPool) {  
  422.   
  423.             @Override  
  424.             List<String> execute() {  
  425.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  426.                 Response<List<String>> result = pipeline.hmget(key, fields);  
  427.                 pipeline.expire(key, expire);  
  428.                 pipeline.sync();  
  429.                 return result.get();  
  430.             }  
  431.         }.getResult();  
  432.     }  
  433.   
  434.     /** 
  435.      * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行 
  436.      * @param pairs 多个hash的多个field 
  437.      * @return 操作状态的集合 
  438.      */  
  439.     public List<Object> batchHashMultipleSet(final List<Pair<String, Map<String, String>>> pairs) {  
  440.         return new Executor<List<Object>>(shardedJedisPool) {  
  441.   
  442.             @Override  
  443.             List<Object> execute() {  
  444.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  445.                 for (Pair<String, Map<String, String>> pair : pairs) {  
  446.                     pipeline.hmset(pair.getKey(), pair.getValue());  
  447.                 }  
  448.                 return pipeline.syncAndReturnAll();  
  449.             }  
  450.         }.getResult();  
  451.     }  
  452.   
  453.     /** 
  454.      * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行 
  455.      * @param data Map<String, Map<String, String>>格式的数据 
  456.      * @return 操作状态的集合 
  457.      */  
  458.     public List<Object> batchHashMultipleSet(final Map<String, Map<String, String>> data) {  
  459.         return new Executor<List<Object>>(shardedJedisPool) {  
  460.   
  461.             @Override  
  462.             List<Object> execute() {  
  463.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  464.                 for (Map.Entry<String, Map<String, String>> iter : data.entrySet()) {  
  465.                     pipeline.hmset(iter.getKey(), iter.getValue());  
  466.                 }  
  467.                 return pipeline.syncAndReturnAll();  
  468.             }  
  469.         }.getResult();  
  470.     }  
  471.   
  472.     /** 
  473.      * 批量的{@link #hashMultipleGet(String, String...)},在管道中执行 
  474.      * @param pairs 多个hash的多个field 
  475.      * @return 执行结果的集合 
  476.      */  
  477.     public List<List<String>> batchHashMultipleGet(final List<Pair<String, String[]>> pairs) {  
  478.         return new Executor<List<List<String>>>(shardedJedisPool) {  
  479.   
  480.             @Override  
  481.             List<List<String>> execute() {  
  482.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  483.                 List<List<String>> result = new ArrayList<List<String>>(pairs.size());  
  484.                 List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(pairs.size());  
  485.                 for (Pair<String, String[]> pair : pairs) {  
  486.                     responses.add(pipeline.hmget(pair.getKey(), pair.getValue()));  
  487.                 }  
  488.                 pipeline.sync();  
  489.                 for (Response<List<String>> resp : responses) {  
  490.                     result.add(resp.get());  
  491.                 }  
  492.                 return result;  
  493.             }  
  494.         }.getResult();  
  495.   
  496.     }  
  497.   
  498.     /** 
  499.      * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。 
  500.      * 时间复杂度: O(N) 
  501.      * @param key key 
  502.      * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。 
  503.      */  
  504.     public Map<String, String> hashGetAll(final String key) {  
  505.         return new Executor<Map<String, String>>(shardedJedisPool) {  
  506.   
  507.             @Override  
  508.             Map<String, String> execute() {  
  509.                 return jedis.hgetAll(key);  
  510.             }  
  511.         }.getResult();  
  512.     }  
  513.   
  514.     /** 
  515.      * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。 
  516.      * 同时设置这个 key 的生存时间 
  517.      * @param key key 
  518.      * @param expire 生命周期,单位为秒 
  519.      * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。 
  520.      */  
  521.     public Map<String, String> hashGetAll(final String key, final int expire) {  
  522.         return new Executor<Map<String, String>>(shardedJedisPool) {  
  523.   
  524.             @Override  
  525.             Map<String, String> execute() {  
  526.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  527.                 Response<Map<String, String>> result = pipeline.hgetAll(key);  
  528.                 pipeline.expire(key, expire);  
  529.                 pipeline.sync();  
  530.                 return result.get();  
  531.             }  
  532.         }.getResult();  
  533.     }  
  534.   
  535.     /** 
  536.      * 批量的{@link #hashGetAll(String)} 
  537.      * @param keys key的数组 
  538.      * @return 执行结果的集合 
  539.      */  
  540.     public List<Map<String, String>> batchHashGetAll(final String... keys) {  
  541.         return new Executor<List<Map<String, String>>>(shardedJedisPool) {  
  542.   
  543.             @Override  
  544.             List<Map<String, String>> execute() {  
  545.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  546.                 List<Map<String, String>> result = new ArrayList<Map<String, String>>(keys.length);  
  547.                 List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);  
  548.                 for (String key : keys) {  
  549.                     responses.add(pipeline.hgetAll(key));  
  550.                 }  
  551.                 pipeline.sync();  
  552.                 for (Response<Map<String, String>> resp : responses) {  
  553.                     result.add(resp.get());  
  554.                 }  
  555.                 return result;  
  556.             }  
  557.         }.getResult();  
  558.     }  
  559.   
  560.     /** 
  561.      * 批量的{@link #hashMultipleGet(String, String...)},与{@link #batchHashGetAll(String...)}不同的是,返回值为Map类型 
  562.      * @param keys key的数组 
  563.      * @return 多个hash的所有filed和value 
  564.      */  
  565.     public Map<String, Map<String, String>> batchHashGetAllForMap(final String... keys) {  
  566.         return new Executor<Map<String, Map<String, String>>>(shardedJedisPool) {  
  567.   
  568.             @Override  
  569.             Map<String, Map<String, String>> execute() {  
  570.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  571.   
  572.                 // 设置map容量防止rehash  
  573.                 int capacity = 1;  
  574.                 while ((int) (capacity * 0.75) <= keys.length) {  
  575.                     capacity <<= 1;  
  576.                 }  
  577.                 Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(capacity);  
  578.                 List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);  
  579.                 for (String key : keys) {  
  580.                     responses.add(pipeline.hgetAll(key));  
  581.                 }  
  582.                 pipeline.sync();  
  583.                 for (int i = 0; i < keys.length; ++i) {  
  584.                     result.put(keys[i], responses.get(i).get());  
  585.                 }  
  586.                 return result;  
  587.             }  
  588.         }.getResult();  
  589.     }  
  590.   
  591.     /* ======================================List====================================== */  
  592.   
  593.     /** 
  594.      * 将一个或多个值 value 插入到列表 key 的表尾(最右边)。 
  595.      * @param key key 
  596.      * @param values value的数组 
  597.      * @return 执行 listPushTail 操作后,表的长度 
  598.      */  
  599.     public Long listPushTail(final String key, final String... values) {  
  600.         return new Executor<Long>(shardedJedisPool) {  
  601.   
  602.             @Override  
  603.             Long execute() {  
  604.                 return jedis.rpush(key, values);  
  605.             }  
  606.         }.getResult();  
  607.     }  
  608.   
  609.     /** 
  610.      * 将一个或多个值 value 插入到列表 key 的表头 
  611.      * @param key key 
  612.      * @param value string value 
  613.      * @return 执行 listPushHead 命令后,列表的长度。 
  614.      */  
  615.     public Long listPushHead(final String key, final String value) {  
  616.         return new Executor<Long>(shardedJedisPool) {  
  617.   
  618.             @Override  
  619.             Long execute() {  
  620.                 return jedis.lpush(key, value);  
  621.             }  
  622.         }.getResult();  
  623.     }  
  624.   
  625.     /** 
  626.      * 将一个或多个值 value 插入到列表 key 的表头, 当列表大于指定长度是就对列表进行修剪(trim) 
  627.      * @param key key 
  628.      * @param value string value 
  629.      * @param size 链表超过这个长度就修剪元素 
  630.      * @return 执行 listPushHeadAndTrim 命令后,列表的长度。 
  631.      */  
  632.     public Long listPushHeadAndTrim(final String key, final String value, final long size) {  
  633.         return new Executor<Long>(shardedJedisPool) {  
  634.   
  635.             @Override  
  636.             Long execute() {  
  637.                 Pipeline pipeline = jedis.getShard(key).pipelined();  
  638.                 Response<Long> result = pipeline.lpush(key, value);  
  639.                 // 修剪列表元素, 如果 size - 1 比 end 下标还要大,Redis将 size 的值设置为 end 。  
  640.                 pipeline.ltrim(key, 0, size - 1);  
  641.                 pipeline.sync();  
  642.                 return result.get();  
  643.             }  
  644.         }.getResult();  
  645.     }  
  646.   
  647.     /** 
  648.      * 批量的{@link #listPushTail(String, String...)},以锁的方式实现 
  649.      * @param key key 
  650.      * @param values value的数组 
  651.      * @param delOld 如果key存在,是否删除它。true 删除;false: 不删除,只是在行尾追加 
  652.      */  
  653.     public void batchListPushTail(final String key, final String[] values, final boolean delOld) {  
  654.         new Executor<Object>(shardedJedisPool) {  
  655.   
  656.             @Override  
  657.             Object execute() {  
  658.                 if (delOld) {  
  659.                     RedisLock lock = new RedisLock(key, shardedJedisPool);  
  660.                     lock.lock();  
  661.                     try {  
  662.                         Pipeline pipeline = jedis.getShard(key).pipelined();  
  663.                         pipeline.del(key);  
  664.                         for (String value : values) {  
  665.                             pipeline.rpush(key, value);  
  666.                         }  
  667.                         pipeline.sync();  
  668.                     } finally {  
  669.                         lock.unlock();  
  670.                     }  
  671.                 } else {  
  672.                     jedis.rpush(key, values);  
  673.                 }  
  674.                 return null;  
  675.             }  
  676.         }.getResult();  
  677.     }  
  678.   
  679.     /** 
  680.      * 同{@link #batchListPushTail(String, String[], boolean)},不同的是利用redis的事务特性来实现 
  681.      * @param key key 
  682.      * @param values value的数组 
  683.      * @return null 
  684.      */  
  685.     public Object updateListInTransaction(final String key, final List<String> values) {  
  686.         return new Executor<Object>(shardedJedisPool) {  
  687.   
  688.             @Override  
  689.             Object execute() {  
  690.                 Transaction transaction = jedis.getShard(key).multi();  
  691.                 transaction.del(key);  
  692.                 for (String value : values) {  
  693.                     transaction.rpush(key, value);  
  694.                 }  
  695.                 transaction.exec();  
  696.                 return null;  
  697.             }  
  698.         }.getResult();  
  699.     }  
  700.   
  701.     /** 
  702.      * 在key对应list的尾部部添加字符串元素,如果key存在,什么也不做 
  703.      * @param key key 
  704.      * @param values value的数组 
  705.      * @return 执行insertListIfNotExists后,表的长度 
  706.      */  
  707.     public Long insertListIfNotExists(final String key, final String[] values) {  
  708.         return new Executor<Long>(shardedJedisPool) {  
  709.   
  710.             @Override  
  711.             Long execute() {  
  712.                 RedisLock lock = new RedisLock(key, shardedJedisPool);  
  713.                 lock.lock();  
  714.                 try {  
  715.                     if (!jedis.exists(key)) {  
  716.                         return jedis.rpush(key, values);  
  717.                     }  
  718.                 } finally {  
  719.                     lock.unlock();  
  720.                 }  
  721.                 return 0L;  
  722.             }  
  723.         }.getResult();  
  724.     }  
  725.   
  726.     /** 
  727.      * 返回list所有元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表 
  728.      * @param key key 
  729.      * @return list所有元素 
  730.      */  
  731.     public List<String> listGetAll(final String key) {  
  732.         return new Executor<List<String>>(shardedJedisPool) {  
  733.   
  734.             @Override  
  735.             List<String> execute() {  
  736.                 return jedis.lrange(key, 0, -1);  
  737.             }  
  738.         }.getResult();  
  739.     }  
  740.   
  741.     /** 
  742.      * 返回指定区间内的元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表 
  743.      * @param key key 
  744.      * @param beginIndex 下标开始索引(包含) 
  745.      * @param endIndex 下标结束索引(不包含) 
  746.      * @return 指定区间内的元素 
  747.      */  
  748.     public List<String> listRange(final String key, final long beginIndex, final long endIndex) {  
  749.         return new Executor<List<String>>(shardedJedisPool) {  
  750.   
  751.             @Override  
  752.             List<String> execute() {  
  753.                 return jedis.lrange(key, beginIndex, endIndex - 1);  
  754.             }  
  755.         }.getResult();  
  756.     }  
  757.   
  758.     /** 
  759.      * 一次获得多个链表的数据 
  760.      * @param keys key的数组 
  761.      * @return 执行结果 
  762.      */  
  763.     public Map<String, List<String>> batchGetAllList(final List<String> keys) {  
  764.         return new Executor<Map<String, List<String>>>(shardedJedisPool) {  
  765.   
  766.             @Override  
  767.             Map<String, List<String>> execute() {  
  768.                 ShardedJedisPipeline pipeline = jedis.pipelined();  
  769.                 Map<String, List<String>> result = new HashMap<String, List<String>>();  
  770.                 List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(keys.size());  
  771.                 for (String key : keys) {  
  772.                     responses.add(pipeline.lrange(key, 0, -1));  
  773.                 }  
  774.                 pipeline.sync();  
  775.                 for (int i = 0; i < keys.size(); ++i) {  
  776.                     result.put(keys.get(i), responses.get(i).get());  
  777.                 }  
  778.                 return result;  
  779.             }  
  780.         }.getResult();  
  781.     }  
  782.   
  783.     /* ======================================Pub/Sub====================================== */  
  784.   
  785.     /** 
  786.      * 将信息 message 发送到指定的频道 channel。 
  787.      * 时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。 
  788.      * @param channel 频道 
  789.      * @param message 信息 
  790.      * @return 接收到信息 message 的订阅者数量。 
  791.      */  
  792.     public Long publish(final String channel, final String message) {  
  793.         return new Executor<Long>(shardedJedisPool) {  
  794.   
  795.             @Override  
  796.             Long execute() {  
  797.                 Jedis _jedis = jedis.getShard(channel);  
  798.                 return _jedis.publish(channel, message);  
  799.             }  
  800.               
  801.         }.getResult();  
  802.     }  
  803.   
  804.     /** 
  805.      * 订阅给定的一个频道的信息。 
  806.      * @param jedisPubSub 监听器 
  807.      * @param channel 频道 
  808.      */  
  809.     public void subscribe(final JedisPubSub jedisPubSub, final String channel) {  
  810.         new Executor<Object>(shardedJedisPool) {  
  811.   
  812.             @Override  
  813.             Object execute() {  
  814.                 Jedis _jedis = jedis.getShard(channel);  
  815.                 // 注意subscribe是一个阻塞操作,因为当前线程要轮询Redis的响应然后调用subscribe  
  816.                 _jedis.subscribe(jedisPubSub, channel);  
  817.                 return null;  
  818.             }  
  819.         }.getResult();  
  820.     }  
  821.   
  822.     /** 
  823.      * 取消订阅 
  824.      * @param jedisPubSub 监听器 
  825.      */  
  826.     public void unSubscribe(final JedisPubSub jedisPubSub) {  
  827.         jedisPubSub.unsubscribe();  
  828.     }  
  829.   
  830.     /* ======================================Sorted set================================= */  
  831.   
  832.     /** 
  833.      * 将一个 member 元素及其 score 值加入到有序集 key 当中。 
  834.      * @param key key 
  835.      * @param score score 值可以是整数值或双精度浮点数。 
  836.      * @param member 有序集的成员 
  837.      * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。 
  838.      */  
  839.     public Long addWithSortedSet(final String key, final double score, final String member) {  
  840.         return new Executor<Long>(shardedJedisPool) {  
  841.   
  842.             @Override  
  843.             Long execute() {  
  844.                 return jedis.zadd(key, score, member);  
  845.             }  
  846.         }.getResult();  
  847.     }  
  848.   
  849.     /** 
  850.      * 将多个 member 元素及其 score 值加入到有序集 key 当中。 
  851.      * @param key key 
  852.      * @param scoreMembers score、member的pair 
  853.      * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。 
  854.      */  
  855.     public Long addWithSortedSet(final String key, final Map<Double, String> scoreMembers) {  
  856.         return new Executor<Long>(shardedJedisPool) {  
  857.   
  858.             @Override  
  859.             Long execute() {  
  860.                 return jedis.zadd(key, scoreMembers);  
  861.             }  
  862.         }.getResult();  
  863.     }  
  864.   
  865.     /** 
  866.      * 返回有序集 key 中, score 值介于 max 和 min 之间(默认包括等于 max 或 min )的所有的成员。 
  867.      * 有序集成员按 score 值递减(从大到小)的次序排列。 
  868.      * @param key key 
  869.      * @param max score最大值 
  870.      * @param min score最小值 
  871.      * @return 指定区间内,带有 score 值(可选)的有序集成员的列表 
  872.      */  
  873.     public Set<String> revrangeByScoreWithSortedSet(final String key, final double max, final double min) {  
  874.         return new Executor<Set<String>>(shardedJedisPool) {  
  875.   
  876.             @Override  
  877.             Set<String> execute() {  
  878.                 return jedis.zrevrangeByScore(key, max, min);  
  879.             }  
  880.         }.getResult();  
  881.     }  
  882.   
  883.     /* ======================================Other====================================== */  
  884.   
  885.     /** 
  886.      * 设置数据源 
  887.      * @param shardedJedisPool 数据源 
  888.      */  
  889.     public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) {  
  890.         this.shardedJedisPool = shardedJedisPool;  
  891.     }  
  892.   
  893.     /** 
  894.      * 构造Pair键值对 
  895.      * @param key key 
  896.      * @param value value 
  897.      * @return 键值对 
  898.      */  
  899.     public <K, V> Pair<K, V> makePair(K key, V value) {  
  900.         return new Pair<K, V>(key, value);  
  901.     }  
  902.   
  903.     /** 
  904.      * 键值对 
  905.      * @version V1.0 
  906.      * @author fengjc 
  907.      * @param <K> key 
  908.      * @param <V> value 
  909.      */  
  910.     public class Pair<K, V> {  
  911.   
  912.         private K key;  
  913.         private V value;  
  914.   
  915.         public Pair(K key, V value) {  
  916.             this.key = key;  
  917.             this.value = value;  
  918.         }  
  919.   
  920.         public K getKey() {  
  921.             return key;  
  922.         }  
  923.   
  924.         public void setKey(K key) {  
  925.             this.key = key;  
  926.         }  
  927.   
  928.         public V getValue() {  
  929.             return value;  
  930.         }  
  931.   
  932.         public void setValue(V value) {  
  933.             this.value = value;  
  934.         }  
  935.     }  
  936. }  


Spring配置文件: 
Java代码  技术分享
  1. <?xml version="1.0" encoding="UTF-8" ?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
  4.     xmlns:context="http://www.springframework.org/schema/context"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  7.     http://www.springframework.org/schema/context  
  8.     http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  9.   
  10.     <!-- POOL配置 -->  
  11.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
  12.         <property name="maxActive" value="${redis.jedisPoolConfig.maxActive}" />  
  13.         <property name="maxIdle" value="${redis.jedisPoolConfig.maxIdle}" />  
  14.         <property name="maxWait" value="${redis.jedisPoolConfig.maxWait}" />  
  15.         <property name="testOnBorrow" value="${redis.jedisPoolConfig.testOnBorrow}" />  
  16.     </bean>  
  17.   
  18.     <!-- jedis shard信息配置 -->  
  19.     <bean id="jedis.shardInfoCache1" class="redis.clients.jedis.JedisShardInfo">  
  20.         <constructor-arg index="0" value="${redis.jedis.shardInfoCache1.host}" />  
  21.         <constructor-arg index="1"  type="int" value="${redis.jedis.shardInfoCache1.port}" />  
  22.     </bean>  
  23.     <bean id="jedis.shardInfoCache2" class="redis.clients.jedis.JedisShardInfo">  
  24.         <constructor-arg index="0" value="${redis.jedis.shardInfoCache2.host}" />  
  25.         <constructor-arg index="1"  type="int" value="${redis.jedis.shardInfoCache2.port}" />  
  26.     </bean>  
  27.   
  28.     <!-- jedis shard pool配置 -->  
  29.     <bean id="shardedJedisPoolCache" class="redis.clients.jedis.ShardedJedisPool">  
  30.         <constructor-arg index="0" ref="jedisPoolConfig" />  
  31.         <constructor-arg index="1">  
  32.             <list>  
  33.                 <ref bean="jedis.shardInfoCache1" />  
  34.                 <ref bean="jedis.shardInfoCache2" />  
  35.             </list>  
  36.         </constructor-arg>  
  37.     </bean>  
  38.   
  39.     <bean id="redisCache" class="com.**.RedisUtil">  
  40.         <property name="shardedJedisPool" ref="shardedJedisPoolCache" />  
  41.     </bean>  
  42. </beans>  

Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁

标签:

原文地址:http://blog.csdn.net/bestlove12345/article/details/51836640

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!