标签:tst 字符 对象 localhost 资源 技术 处理 后台 证明
在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面做记录。
list操作参考:https://www.cnblogs.com/qlqwjy/p/7789125.html
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。(类似于java的ArrayList)
redis对list的操作命令中。L表示从左边(头部)开始插与弹出,R表示从右边(尾部)开始插与弹出。
(1)从左向右插入,从右向左弹出:
127.0.0.1:6379> lpush mylist a b c d (integer) 4 127.0.0.1:6379> lrange mylist 0 -1 1) "d" 2) "c" 3) "b" 4) "a" 127.0.0.1:6379> rpop mylist "a" 127.0.0.1:6379> rpop mylist "b"
执行完 lpush mylist a b c d 之后数据结构如下:(满足先进先出的队列模式)
执行完第一次:rpop mylist之后数据结构如下:
(2)从右向左插入,从左向右弹出:
127.0.0.1:6379> rpush mylist2 a b c d (integer) 4 127.0.0.1:6379> lrange mylist2 0 -1 1) "a" 2) "b" 3) "c" 4) "d" 127.0.0.1:6379> lpop mylist2 "a" 127.0.0.1:6379> lpop mylist2 "b"
执行完:rpush mylist2 a b c d之后的数据结构如下
第一次执行完 lpop mylist2 之后数据结构如下:(满足先进先出的队列模式)
redis.properties
redis.url=localhost
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=10000
获取连接的工具类:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author: qlq * @Description * @Date: 21:32 2018/10/9 */ public class JedisPoolUtils { private static JedisPool pool = null; static { //加载配置文件 InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties"); Properties pro = new Properties(); try { pro.load(in); } catch (IOException e) { e.printStackTrace(); } //获得池子对象 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大闲置个数 poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数 poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数 poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数 pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString())); } //获得jedis资源的方法 public static Jedis getJedis() { return pool.getResource(); } public static void main(String[] args) { Jedis jedis = getJedis(); System.out.println(jedis); } }
(1)消息生产者:(开启5个线程生产消息)
import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description * @Date: 21:29 2018/10/9 */ public class MessageProducer extends Thread { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; public void putMessage(String message) { Jedis jedis = JedisPoolUtils.getJedis(); Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(Thread.currentThread().getName() + " put message,size=" + size + ",count=" + count); count++; } @Override public synchronized void run() { for (int i = 0; i < 5; i++) { putMessage("message" + count); } } public static void main(String[] args) { MessageProducer messageProducer = new MessageProducer(); Thread t1 = new Thread(messageProducer, "thread1"); Thread t2 = new Thread(messageProducer, "thread2"); Thread t3 = new Thread(messageProducer, "thread3"); Thread t4 = new Thread(messageProducer, "thread4"); Thread t5 = new Thread(messageProducer, "thread5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); } }
结果:(证明了redis是单线程操作,只能一个一个操作)
thread1 put message,size=1,count=0 thread1 put message,size=2,count=1 thread1 put message,size=3,count=2 thread1 put message,size=4,count=3 thread1 put message,size=5,count=4 thread3 put message,size=6,count=5 thread3 put message,size=7,count=6 thread3 put message,size=8,count=7 thread3 put message,size=9,count=8 thread3 put message,size=10,count=9 thread4 put message,size=11,count=10 thread4 put message,size=12,count=11 thread4 put message,size=13,count=12 thread4 put message,size=14,count=13 thread4 put message,size=15,count=14 thread5 put message,size=16,count=15 thread5 put message,size=17,count=16 thread5 put message,size=18,count=17 thread5 put message,size=19,count=18 thread5 put message,size=20,count=19 thread2 put message,size=21,count=20 thread2 put message,size=22,count=21 thread2 put message,size=23,count=22 thread2 put message,size=24,count=23 thread2 put message,size=25,count=24
redis后台查看:
127.0.0.1:6379> lrange message:queue 0 -1 1) "message24" 2) "message23" 3) "message22" 4) "message21" 5) "message20" 6) "message19" 7) "message18" 8) "message17" 9) "message16" 10) "message15" 11) "message14" 12) "message13" 13) "message12" 14) "message11" 15) "message10" 16) "message9" 17) "message8" 18) "message7" 19) "message6" 20) "message5" 21) "message4" 22) "message3" 23) "message2" 24) "message1" 25) "message0"
(2)消息消费者:(开启两个线程消费消息)
import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description * @Date: 22:34 2018/10/9 */ public class MessageConsumer implements Runnable { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); String message = jedis.rpop(MESSAGE_KEY); System.out.println(Thread.currentThread().getName() + " consumer message,message=" + message + ",count=" + count); count++; } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread6"); Thread t2 = new Thread(messageConsumer, "thread7"); t1.start(); t2.start(); } }
结果:(满足先进先出的规则)--虽然消息已经消费完了,但是仍然在不停的rpop,所以造成浪费
thread6 consumer message,message=message0,count=0 thread6 consumer message,message=message1,count=1 thread6 consumer message,message=message2,count=2 thread6 consumer message,message=message3,count=3 thread7 consumer message,message=message4,count=4 thread6 consumer message,message=message5,count=5 thread7 consumer message,message=message6,count=6 thread6 consumer message,message=message7,count=7 thread7 consumer message,message=message8,count=8 thread6 consumer message,message=message9,count=9 thread7 consumer message,message=message10,count=10 thread6 consumer message,message=message11,count=11 thread7 consumer message,message=message12,count=12 thread6 consumer message,message=message13,count=13 thread7 consumer message,message=message14,count=14 thread6 consumer message,message=message15,count=15 thread7 consumer message,message=message16,count=16 thread6 consumer message,message=message17,count=16 thread7 consumer message,message=message18,count=18 thread6 consumer message,message=message19,count=19 thread7 consumer message,message=message20,count=20 thread6 consumer message,message=message21,count=20 thread7 consumer message,message=message22,count=22 thread6 consumer message,message=message23,count=22 thread7 consumer message,message=message24,count=24 thread6 consumer message,message=null,count=25 thread7 consumer message,message=null,count=26 thread6 consumer message,message=null,count=27 thread7 consumer message,message=null,count=28 thread6 consumer message,message=null,count=28 thread7 consumer message,message=null,count=30 thread6 consumer message,message=null,count=31
...
但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:
1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。
所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,
import redis.clients.jedis.Jedis; import java.util.List; /** * @Author: qlq * @Description * @Date: 22:34 2018/10/9 */ public class MessageConsumer implements Runnable { public static final String MESSAGE_KEY = "message:queue"; private volatile int count; private Jedis jedis = JedisPoolUtils.getJedis(); public void consumerMessage() { List<String> brpop = jedis.brpop(0, MESSAGE_KEY); System.out.println(brpop); } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread6"); Thread t2 = new Thread(messageConsumer, "thread7"); t1.start(); t2.start(); } }
然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前的连接个数。
当启动生产者生产消息之后,消费者会自动消费消息,而且消费者线程不会停止。
[message:queue, message0]
[message:queue, message1]
[message:queue, message2]
[message:queue, message3]
[message:queue, message4]
[message:queue, message5]
[message:queue, message6]
[message:queue, message7]
[message:queue, message8]
[message:queue, message9]
[message:queue, message10]
[message:queue, message11]
[message:queue, message12]
[message:queue, message13]
[message:queue, message14]
[message:queue, message15]
[message:queue, message16]
[message:queue, message17]
[message:queue, message18]
[message:queue, message19]
[message:queue, message20]
[message:queue, message21]
[message:queue, message22]
[message:queue, message23]
[message:queue, message24]
标签:tst 字符 对象 localhost 资源 技术 处理 后台 证明
原文地址:https://www.cnblogs.com/qlqwjy/p/9763754.html