标签:并且 finally sage exit 情况 channel 类型 workqueue 初识
RabbitMQ是一种消息队列,有着类似代理的作用,它允许软件、应用相互连接和扩展。这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接,消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
--- 进一步理解
RabbitMQ如同一个邮局,在其中通过接收寄信人发过来的邮件,并且将这些邮件进行归纳好分别发给各个对应的邮箱,然后再由快递员将各个邮件送至收件人手中。在其中,寄信人相当于生产者,邮件相当于消息,邮箱相当于消息队列,收件人相当于消费者。
--- 结构概念
生产者:指发布消息的应用程序
消息队列:用于存储消息的缓存区,本质上是存储在内存或者磁盘上,负责接收消息、保存消息以及发送消息
消费者:指等待处理消息程序
交换机:生产者发过来的消息需要经过交换机,交换机将决定将消息放到哪些队列当中
RabbitMQ是基于AMQP网络协议的一种消息代理机制,由于AMQP是一个网络协议,所以这个过程中的生产者,消费者,消息代理 可以存在于不同的设备上。如下图:
--- 相关概念
循环分发:即在消息队列中的消息是通过轮流的方式发给消费者的,比如当消息队列中存在a,b,c,d四个消息,同时有两个消费者C1、C2,那么可能a和c消息被C1接收了,b和d消息被C2接收了,这样的好处就是不会让消息都存储着而没有被利用,同时也有个坏处,假如当a和c都是比较耗时的任务,b和d都是比较简单的任务,就会导致C1可能要不停地工作,而C2就会产生空闲,造成资源浪费。
公平发放:指消息队列只会把消息发给比较空闲的消费者,如果它看到原本要发送的消费者正在忙,那么就会自动去寻找下个空闲的消费者。
消息确认:正常情况下,当一个消息发送出去之后,RabbitMQ会将该消息自动从队列中剔除出去,但是,假如处理该消息的时候出现了失误之类的,此时会造成该消息的丢失,所以引入了一个“消息确认”的东西,即消息代理不会完全地把消息从队列中剔除出去,直到接收到消费者返回的确认回执,才会将该消息清除,这样即使消费者挂掉了,消息也不会丢失。
----?相关代码
消费者:
1 package com.maxwell.rabbitdemo.workqueues; 2 3 import java.io.IOException; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 import com.rabbitmq.client.Consumer; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 import com.rabbitmq.client.AMQP.BasicProperties; 11 12 //工作队列,一个生产者分发给多个消费者(循环分发、消息确认、消息持久、公平分发) 13 public class Worker { 14 private static final String TASK_QUEUE_NAME = "task_queue"; 15 16 public static void main(String[] args) throws Exception { 17 ConnectionFactory factory = new ConnectionFactory(); 18 factory.setHost("192.168.153.136"); 19 factory.setPort(5672); 20 21 final Connection connection = factory.newConnection(); 22 final Channel channel = connection.createChannel(); 23 24 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 25 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 26 27 //工作者让rabbitmq不会去轮流地发送消息 而是去寻找空闲的工作者 28 //一次只接收一条确认的消息 29 channel.basicQos(1); 30 31 final Consumer consumer = new DefaultConsumer(channel){ 32 @Override 33 public void handleDelivery(String consumerTag, Envelope envelope, 34 BasicProperties properties, byte[] body) throws IOException { 35 String message = new String(body, "UTF-8"); 36 System.out.println(" [x] Received ‘" + message + "‘"); 37 38 try { 39 doWork(message); 40 } finally { 41 System.out.println(" [x] Done"); 42 channel.basicAck(envelope.getDeliveryTag(), false); 43 } 44 } 45 }; 46 47 //消息确认机制:当消费者接收信息过程中出现了问题,没有接收消费者发出的确认消息,mq会自己将该信息重新加到队列中同时就算消费者挂掉了 48 //信息也会当做没处理过的,重新加回队列,等待重新发送给消费者 49 //默认该机制是开启的,这里将其关了 50 boolean autoAck = false; 51 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); 52 } 53 54 private static void doWork(String task) { 55 for (char ch : task.toCharArray()) { 56 if (ch == ‘.‘) { 57 try { 58 Thread.sleep(1000); 59 } catch (InterruptedException _ignored) { 60 Thread.currentThread().interrupt(); 61 } 62 } 63 } 64 } 65 }
生产者:
1 package com.maxwell.rabbitdemo.workqueues; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 import com.rabbitmq.client.MessageProperties; 7 8 public class NewTask { 9 private static final String TASK_QUEUE_NAME = "task_queue"; 10 11 public static void main(String[] args) throws Exception { 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("192.168.153.136"); 14 factory.setPort(5672); 15 16 Connection con = factory.newConnection(); 17 Channel channel = con.createChannel(); 18 19 //设置队列不丢失 20 boolean qurable = true; 21 channel.queueDeclare(TASK_QUEUE_NAME, qurable, false, false, null); 22 String message = "4"; 23 24 //发布消息 设置MessageProperties的值为PERSISTENT_TEXT_PLAIN使得消息持久化 25 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); 26 System.out.println(" [x] Sent ‘" + message + "‘"); 27 28 channel.close(); 29 con.close(); 30 } 31 }
运行结果:先开启两个消费者程序,再运行一个生产者,之后再修改发送的内容再次运行,以下是两个消费者的运行情况:
--- 交换机类型
首先,生产者发送消息到交换机,同时发送一个 key
,也称之为路由键,通过这个 key
,交换机就知道该把消息发到哪个队列。随后交换机把消息发送到相应的队列中。由队列将消息发送给消费者。消费者监听某些队列,当有消息过来时,就立即处理消息。
在RabbitMQ一共有四种交换机,分别是直连型交换机(direct exchange)、扇型交换机(funout exchange)、主题交换机(topic exchanges)和头部交换机(headers exchange)。
直型交换机是根据消息携带的路由键(routing key)将消息投递给对应队列的,它是将先将一个队列绑定在直连交换机上的时候,同时会赋予一个key,当之后的消息发送过来的时候,也会携带一个key,当只有这个key与交换机和队列之间的那个对应上了,交换机才会让该消息进入队列之中,否则会被丢弃。
如上图所示,只有消息携带的key是小埋或者是康纳的时候才能进入Q1或者Q2队列之中,同时我们可以看到,直连交换机支持多重绑定,即多个队列以相同的key绑定到交换机上,如上边的key为康纳,这样即当消息发送过来携带的key是康纳的话,那么这个消息就会一同地发往Q1和Q2队列中。
----?相关代码
生产者:
1 package com.maxwell.rabbitdemo.direct; 2 import com.rabbitmq.client.BuiltinExchangeType; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class EmitLogDirect { 8 9 private static final String EXCHANGE_NAME = "direct_logs"; 10 11 public static void main(String[] args) throws Exception{ 12 //创建连接 13 ConnectionFactory factory = new ConnectionFactory(); 14 factory.setHost("192.168.153.136"); 15 factory.setPort(5672); 16 17 Connection con = factory.newConnection(); 18 Channel channel = con.createChannel(); 19 20 //声明路由器和类型 21 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 22 23 String severity = "小埋"; 24 String message = ".........i am msg........."; 25 26 //发布消息 27 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); 28 System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); 29 30 channel.close(); 31 con.close(); 32 } 33 34 }
消费者:
1 package com.maxwell.rabbitdemo.direct; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.BuiltinExchangeType; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 import com.rabbitmq.client.AMQP.BasicProperties; 13 14 //实现分别监听不同等级内容的消息 核心思想:在实现基本的分布信息基础上,通过改变路由类型为Direct,并且将要监听的类型进行绑定 15 public class ReceiveLogsDirect { 16 17 private static final String EXCHANGE_NAME = "direct_logs"; 18 public static void main(String[] args) throws Exception{ 19 //创建连接 20 ConnectionFactory factory = new ConnectionFactory(); 21 factory.setHost("192.168.153.136"); 22 factory.setPort(5672); 23 24 Connection con = factory.newConnection(); 25 Channel channel = con.createChannel(); 26 27 //声明路由器以及类型 28 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 29 //声明队列 30 String queueName = channel.queueDeclare().getQueue(); 31 //定义要监听的级别 32 String[] severities = {"康纳"}; 33 34 //根据绑定键进行绑定 35 for(String severity : severities){ 36 channel.queueBind(queueName, EXCHANGE_NAME, severity); 37 } 38 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 39 40 Consumer consumer = new DefaultConsumer(channel) { 41 @Override 42 public void handleDelivery(String consumerTag, Envelope envelope, 43 BasicProperties properties, byte[] body) throws IOException { 44 String message = new String(body, "UTF-8"); 45 System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘"); 46 } 47 }; 48 49 channel.basicConsume(queueName, true, consumer); 50 } 51 }
运行结果:通过先运行route_key为康纳,小埋,然后修改为康纳再次运行,此时就有两个绑定不同的消费者在后台监听,之后分别运行生产者的route_key为康纳和小埋的,结果如下:
扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。类似广播的作用。
----?相关代码
生产者:
1 package com.maxwell.rabbitdemo.fanout; 2 3 import com.rabbitmq.client.BuiltinExchangeType; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 8 //日志发布者 9 public class EmitLog { 10 11 public static final String Exchange_name = "logs"; 12 13 public static void main(String[] args) throws Exception{ 14 //建立通道和连接 15 ConnectionFactory factory = new ConnectionFactory(); 16 factory.setHost("192.168.153.136"); 17 factory.setPort(5672); 18 19 Connection con = factory.newConnection(); 20 Channel channel = con.createChannel(); 21 22 //声明路由器以及路由类型 23 channel.exchangeDeclare(Exchange_name, BuiltinExchangeType.FANOUT); 24 25 String msg = "康纳赛高..."; 26 27 //发布消息 其中第一个参数是代表Exchange的名字,默认情况下是空字符串 28 channel.basicPublish(Exchange_name, "", null, msg.getBytes("UTF-8")); 29 System.out.println("[x] Sent ‘"+msg+"‘"); 30 31 //关闭连接和通道 32 channel.close(); 33 con.close(); 34 } 35 }
消费者:
1 package com.maxwell.rabbitdemo.fanout; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.BuiltinExchangeType; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 import com.rabbitmq.client.AMQP.BasicProperties; 13 14 //一个生产者发布给多个消费者(发布、订阅) 核心思想:定义exchange,定义路由器类型,再将路由与定义的随机队列进行绑定 15 //日志接收者 16 public class ReceiveLogs { 17 //定义exchange的名字,作用于接收生产者发来的消息并且推送到队列中 18 public static final String Exchange_name = "logs"; 19 20 public static void main(String[] args) throws Exception { 21 ConnectionFactory factory = new ConnectionFactory(); 22 factory.setHost("192.168.153.136"); 23 factory.setPort(5672); 24 25 Connection con = factory.newConnection(); 26 Channel channel = con.createChannel(); 27 //声明路由器以及类型 fanout会将接受到的信息广播给其他队列 28 channel.exchangeDeclare(Exchange_name, BuiltinExchangeType.FANOUT); 29 //声明一个随机名字的队列 30 String queueName = channel.queueDeclare().getQueue(); 31 //绑定队列到路由器上 32 channel.queueBind(queueName, Exchange_name, ""); 33 34 System.out.println(" [*]Waiting for messages, To exit press Ctrl+C"); 35 36 //开始监听消息 37 Consumer consumer = new DefaultConsumer(channel){ 38 39 @Override 40 public void handleDelivery(String consumerTag, Envelope envelope, 41 BasicProperties properties, byte[] body) throws IOException { 42 String message = new String(body, "UTF-8"); 43 System.out.println(" [x] Received ‘" + message + "‘"); 44 } 45 }; 46 channel.basicConsume(queueName, true, consumer); 47 } 48 }
运行情况:先运行两次消费者程序,此时后台有两个消费者在同时监听,当运行没运行一次生产者程序,消费者都能够监听到生产者发送的信息。
主题交换机通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列,所以经常用于消息的多播路由。
主题交换机其实和直连交换机有点类似,也是需要路由键将队列和交换机进行绑定,但是唯一不同的点是,路由键的它必须是一些单词的集合,中间用点号.
分割。这些单词可以是任意的,但通常会体现出消息的特征。一些有效的路由键示例:stock.usd.nyse
,nyse.vmw
,quick.orange.rabbit
。这些路由键可以包含很多单词,但路由键总长度不能超过255个字节。其中,一些特殊符号表示如下:
①*(星号)仅代表一个单词
②#(井号)代表任意个单词
按照上图可以看到:
Q1绑定的路由键:"*.康纳"、"#.小埋"
Q2绑定的路由键:“#.康纳”
所以,假如消息携带的路由键为小.康纳,那么Q1、Q2都能接收到该消息,假如携带的是小.小.康纳,那么就只有Q1能接收到该信息,依次类推。
(注意以英文句号划分的都当做一个单词)
----?相关代码
生产者:
1 package com.maxwell.rabbitdemo.topic; 2 3 import com.rabbitmq.client.BuiltinExchangeType; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 8 public class EmitLogTopic { 9 10 private static final String EXCHANGE_NAME = "topic_logs"; 11 12 public static void main(String[] argv) { 13 Connection connection = null; 14 Channel channel = null; 15 try { 16 //建立连接和通道 17 ConnectionFactory factory = new ConnectionFactory(); 18 factory.setHost("192.168.153.136"); 19 factory.setPort(5672); 20 connection = factory.newConnection(); 21 channel = connection.createChannel(); 22 23 //声明路由器和路由器类型 24 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 25 26 //定义路由键和消息 27 String routingKey = "小.小.康纳"; 28 String message = "小恶龙咆哮"; 29 30 //发布消息 31 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); 32 System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘"); 33 34 } catch (Exception e) { 35 e.printStackTrace(); 36 } finally { 37 if (connection != null) { 38 try { 39 connection.close(); 40 } catch (Exception ignore) { 41 } 42 } 43 } 44 } 45 }
消费者:
1 package com.maxwell.rabbitdemo.topic; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.BuiltinExchangeType; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 import com.rabbitmq.client.AMQP.BasicProperties; 13 14 public class ReceiveLogsTopic { 15 private static final String EXCHANGE_NAME = "topic_logs"; 16 17 public static void main(String[] args) throws Exception { 18 //建立连接和通道 19 ConnectionFactory factory = new ConnectionFactory(); 20 factory.setHost("192.168.153.136"); 21 factory.setPort(5672); 22 23 Connection connection = factory.newConnection(); 24 Channel channel = connection.createChannel(); 25 26 //声明路由器和路由器类型 27 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 28 String queueName = channel.queueDeclare().getQueue(); 29 30 //*(星号)仅代表一个单词 #(井号)代表任意个单词 31 String bingingKeys[] = {"#.康纳"}; 32 33 for (String bindingKey : bingingKeys) { 34 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 35 } 36 37 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 38 39 //监听消息 40 Consumer consumer = new DefaultConsumer(channel) { 41 @Override 42 public void handleDelivery(String consumerTag, 43 Envelope envelope, BasicProperties properties, 44 byte[] body) throws IOException { 45 String message = new String(body, "UTF-8"); 46 System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘"); 47 } 48 }; 49 channel.basicConsume(queueName, true, consumer); 50 } 51 }
运行情况:通过修改消费者中路由键的信息分别运行Q1和Q2,然后修改生产者中路由键为小.康纳,携带信息为恶龙咆哮,运行;继续修改生产者路由键为小.小.康纳,携带信息为小恶龙咆哮,运行,结果如下:
参考文档:RabbitMQ总结教程
标签:并且 finally sage exit 情况 channel 类型 workqueue 初识
原文地址:https://www.cnblogs.com/liangyueyuan/p/11304740.html