标签:clu close exchange 分组 代码 rabbit 获取 相同 匹配
RabbitMQ 并不是基于 Java 开发人员熟悉的 JMS 规范设计开发的,而是基于一个比 JMS 更新更合理的 AMQP (Advanced Message Queuing Protocol) 协议。所以,在开始 RabbitMQ 之旅前,需要先对 AMQP 有一定的了解。毕竟,RabbitMQ 就是 AMQP 协议的第一个开源实现。
以下内容来自 http://www.cnblogs.com/frankyou/p/5283539.html
简而言之,Publisher 发送的消息通过 Connection 中的 Channel 到达 Broker 的某个 Virtual Host,消息经过指定的 Exchange,根据 Binding 提供的分发依据,分发到 0~n 个 Queue 中;Queue 中的消息等待 Consumer 消费。
官网有一篇更详尽的模型解释:https://www.rabbitmq.com/tutorials/amqp-concepts.html
这篇文章一看就知道是好东西,可惜读起来太累了,以后再来回顾。
官网有 AMQP 规范的快速参考:https://www.rabbitmq.com/amqp-0-9-1-quickref.html
AMQP 规范全部内容可在 AMQP 官网查阅:http://www.amqp.org/
这些内容短期不会接触,留个脚印以后有机会再研究。
参考资料:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
示例代码:https://github.com/gordonklg/study,rabbitmq module
gordon.study.rabbitmq.helloworld.Sender.java
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
publicclassSender{
privatestaticfinalString QUEUE_NAME ="hello";
publicstaticvoid main(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i =0; i <10;){
String message ="NO. "+++i;
TimeUnit.MILLISECONDS.sleep(1000);
channel.basicPublish("", QUEUE_NAME,null, message.getBytes("UTF-8"));
System.out.printf("(%1$s)[===>%2$s ] %3$s\n","Sender",":"+ QUEUE_NAME, message);
}
channel.close();
connection.close();
}
}
代码第12-14行通过 ConnectionFactory 创建了一个 Connection。Broker Host 地址为 localhost,端口是默认的 5672,用户密码是默认的 guest,VHOST 是默认的 "/"。(实际上默认的 Host 就是 localhost,第13行可以省略)
第15行在 Connection 中创建了一个虚拟的 Channel。
第17行申明了一个名字为 hello 的 Queue。不同于 JMS,AMQP 协议允许通过编程方式创建绝大多数的模型,例如 Exchange、Queue 等等。之所以是申明,是因为该方法会先判断当前 VHOST 中是否已存在一个同名的 Queue,如果不存在,则创建 Queue;如果存在,可能会直接使用已存在的 Queue,也可能返回异常(例如申明的 Queue 的属性与已存在的 Queue 的属性不一致时)。
queueDeclare 方法中三个 boolean 型参数指定了 Queue 的属性:
第22行发布一个消息。basicPublish 方法第一个参数指定发布该消息所使用的 Exchange 名称,空字符串为系统预先定义的默认 Exchange,类型为 direct。第二个参数指定路由键,对于 direct 类型的 Exchange,会将该消息发送到所有绑定到该 Exchange 并且也设定了相同的路由键的 Queue 中。
gordon.study.rabbitmq.helloworld.Receiver.java
publicclassReceiver{
privatefinalstaticString QUEUE_NAME ="hello";
publicstaticvoid main(String[] argv)throwsException{
ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer =newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,byte[] body)
throwsIOException{
String message =newString(body,"UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n","Receiver", QUEUE_NAME, message);
try{
TimeUnit.MILLISECONDS.sleep(500);
}catch(InterruptedException e){
}
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
}
代码从第13行开始定义一个 Consumer,通过 handleDelivery 回调接口,我们可以处理从队列中获取的消息。
第25行启动一个 Consumer,basicConsume 方法第一个参数指定 Consumer 消费的队列,即 hello 队列;第二个参数 autoAck 指定消息确认模式,true 表示消息确认是自动完成的(至少在进入 handleDelivery 方法前就已经自动确认了),false 表示必须由 Consumer 自己确认;第三个参数指定 Consumer
现在,可以花式运行两个类,看看发生了什么了。
标签:clu close exchange 分组 代码 rabbit 获取 相同 匹配
原文地址:http://www.cnblogs.com/gordonkong/p/6938942.html