码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

时间:2015-12-06 14:23:12      阅读:597      评论:0      收藏:0      [点我收藏+]

标签:

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599

rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析。 编程模型伪代码如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel 以下是基于channel上的两种消费方式。

1、Subscribe订阅方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag",      new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag,                                     Envelope envelope,                                     AMQP.BasicProperties properties,                                     byte[] body)              throws IOException          {              String routingKey = envelope.getRoutingKey();              String contentType = properties.contentType;              long deliveryTag = envelope.getDeliveryTag();              // (process the message components here ...)              channel.basicAck(deliveryTag, false);          }      });

订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息, 这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。 参见ChannelN中的方法     public String basicConsume(String queue, boolean autoAck, String consumerTag,                                boolean noLocal, boolean exclusive, Map<String, Object> arguments,                                final Consumer callback)         throws IOException     {     ......         rpc((Method)             new Basic.Consume.Builder()              .queue(queue)              .consumerTag(consumerTag)              .noLocal(noLocal)              .noAck(autoAck)              .exclusive(exclusive)              .arguments(arguments)             .build(),             k);

        try {             return k.getReply();         } catch(ShutdownSignalException ex) {             throw wrap(ex);         }     }

Consumer接收消息的过程: 创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,     public void handleFrame(Frame frame) throws IOException {         AMQCommand command = _command;         if (command.handleFrame(frame)) { // 对消息进行协议assemble             _command = new AMQCommand(); // prepare for the next one             handleCompleteInboundCommand(command);//对消息消费处理         }     } ChannelN.handleCompleteInboundCommand        ---ChannelN.processAsync            ----dispatcher.handleDelivery                  ---QueueingConsumer.handleDelivery                      ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中 每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。 接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()

对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效

2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

标签:

原文地址:http://www.cnblogs.com/changbaishan/p/5023373.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!