码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ入门_02_HelloWorld

时间:2017-06-04 00:54:45      阅读:260      评论:0      收藏:0      [点我收藏+]

标签:clu   close   exchange   分组   代码   rabbit   获取   相同   匹配   

A. AMQP基础

RabbitMQ 并不是基于 Java 开发人员熟悉的 JMS 规范设计开发的,而是基于一个比 JMS 更新更合理的 AMQP (Advanced Message Queuing Protocol) 协议。所以,在开始 RabbitMQ 之旅前,需要先对 AMQP 有一定的了解。毕竟,RabbitMQ 就是 AMQP 协议的第一个开源实现。

a. AMQP messaging 中的基本概念

以下内容来自 http://www.cnblogs.com/frankyou/p/5283539.html
技术分享

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的TCP连接。断开连接的操作只会在 client 端进行,broker不会断开连接,除非出现网络故障或 broker 服务出现问题
  • Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走。一个 message 可以被同时拷贝到多个 queue 中
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

简而言之,Publisher 发送的消息通过 Connection 中的 Channel 到达 Broker 的某个 Virtual Host,消息经过指定的 Exchange,根据 Binding 提供的分发依据,分发到 0~n 个 Queue 中;Queue 中的消息等待 Consumer 消费。

b. AMQP 模型解释

官网有一篇更详尽的模型解释:https://www.rabbitmq.com/tutorials/amqp-concepts.html
这篇文章一看就知道是好东西,可惜读起来太累了,以后再来回顾。

c. AMQP 规范

官网有 AMQP 规范的快速参考:https://www.rabbitmq.com/amqp-0-9-1-quickref.html
AMQP 规范全部内容可在 AMQP 官网查阅:http://www.amqp.org/
这些内容短期不会接触,留个脚印以后有机会再研究。

B. 你好,世界

参考资料:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
示例代码:https://github.com/gordonklg/study,rabbitmq module

gordon.study.rabbitmq.helloworld.Sender.java

  1. import java.util.concurrent.TimeUnit;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. publicclassSender{
  6. privatestaticfinalString QUEUE_NAME ="hello";
  7. publicstaticvoid main(String[] argv)throwsException{
  8. ConnectionFactory factory =newConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  13. for(int i =0; i <10;){
  14. String message ="NO. "+++i;
  15. TimeUnit.MILLISECONDS.sleep(1000);
  16. channel.basicPublish("", QUEUE_NAME,null, message.getBytes("UTF-8"));
  17. System.out.printf("(%1$s)[===>%2$s ] %3$s\n","Sender",":"+ QUEUE_NAME, message);
  18. }
  19. channel.close();
  20. connection.close();
  21. }
  22. }

代码第12-14行通过 ConnectionFactory 创建了一个 Connection。Broker Host 地址为 localhost,端口是默认的 5672,用户密码是默认的 guest,VHOST 是默认的 "/"。(实际上默认的 Host 就是 localhost,第13行可以省略)

第15行在 Connection 中创建了一个虚拟的 Channel。

第17行申明了一个名字为 hello 的 Queue。不同于 JMS,AMQP 协议允许通过编程方式创建绝大多数的模型,例如 Exchange、Queue 等等。之所以是申明,是因为该方法会先判断当前 VHOST 中是否已存在一个同名的 Queue,如果不存在,则创建 Queue;如果存在,可能会直接使用已存在的 Queue,也可能返回异常(例如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)。

queueDeclare 方法中三个 boolean 型参数指定了 Queue 的属性:

  • durable:队列本身是否需要持久化。如果为 false,则 RabbitMQ 重启后该 Queue 消失。
  • exclusive:排他性队列确保该队列只对申明它的连接可见,注意是连接而不是 Channel。当相应连接关闭时,该队列自动删除。
  • autoDelete:设置为 true 的队列会在所有 Consumer 都断开连接时自动删除(不管是否是 durable)。队列被第一个 Consumer 连接前不会被删除。

第22行发布一个消息。basicPublish 方法第一个参数指定发布该消息所使用的 Exchange 名称,空字符串为系统预先定义的默认 Exchange,类型为 direct。第二个参数指定路由键,对于 direct 类型的 Exchange,会将该消息发送到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中。

gordon.study.rabbitmq.helloworld.Receiver.java

  1. publicclassReceiver{
  2. privatefinalstaticString QUEUE_NAME ="hello";
  3. publicstaticvoid main(String[] argv)throwsException{
  4. ConnectionFactory factory =newConnectionFactory();
  5. factory.setHost("localhost");
  6. Connection connection = factory.newConnection();
  7. Channel channel = connection.createChannel();
  8. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  9. Consumer consumer =newDefaultConsumer(channel){
  10. @Override
  11. publicvoid handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,byte[] body)
  12. throwsIOException{
  13. String message =newString(body,"UTF-8");
  14. System.out.printf(" [ %2$s<===](%1$s) %3$s\n","Receiver", QUEUE_NAME, message);
  15. try{
  16. TimeUnit.MILLISECONDS.sleep(500);
  17. }catch(InterruptedException e){
  18. }
  19. }
  20. };
  21. channel.basicConsume(QUEUE_NAME,true, consumer);
  22. }
  23. }

代码从第13行开始定义一个 Consumer,通过 handleDelivery 回调接口,我们可以处理从队列中获取的消息。

第25行启动一个 Consumer,basicConsume 方法第一个参数指定 Consumer 消费的队列,即 hello 队列;第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认;第三个参数指定 Consumer

现在,可以花式运行两个类,看看发生了什么了。

RabbitMQ入门_02_HelloWorld

标签:clu   close   exchange   分组   代码   rabbit   获取   相同   匹配   

原文地址:http://www.cnblogs.com/gordonkong/p/6938942.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!