RabbitMQ(三)
——发布订阅
(转载请附上本文链接——linhxx)
一、概述
RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理。该模式也符合上一节工作队列中的ack、预取等规则。
发布订阅模式如下图所示:
二、交换机(exchange)
生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系。在没有定义交换机的时候,RabbitMQ会启用内部预定义的交换机。即所有没有定义交换机,直接采用生产者发送消息到队列的,都是将消息发送给默认交换机。
交换机是一个很简单的东西,即将生产者发送的消息,按照预先定好的规则,转发给对应的队列。
1、广播发送
rabbitmq中,交换机的规则有fanout、direct、topic、headers等。本节的发布订阅模式,主要可以采用fanout模式。
fanout,类似网络技术中,子网的广播发送模式。即,fanout模式下,交换机会将信息发送给所有与其绑定的队列,即实现“发布”的功能。通常,再将每个队列给不同的消费者去消费,实现每个消费者都可以取到该消息,并各自进行后续相应的处理。
fanout模式如下图所示:
2、注意事项
1)提前绑定
fanout模式下,需要提前将交换机与队列进行绑定,一个交换机可以绑定多个队列,一个队列可以同多个交换机进行绑定。接受到消息的交换机没有与任何队列绑定,则消息会被抛弃。
2)该模式不需要routing key。
3、使用
1)使用
$channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false);
$channel->basic_publish($msg, ‘logs‘);
其中,第一个参数是交换机的名字,第二个参数,是交换机的模式。如果没有定义交换机的名称、模式,则采用默认的交换机转发生产者的消息。即可以看作,空字符串’’也是交换机的一个名字。
2)查看当前交换机数量
cli模式下,sudo rabbitmqctl list_exchanges
三、临时队列(Temporary queues)
当rabbitmq运行时,队列的名字非常重要,因为其在生产者方,交换机会将信息按照名字发送给队列,而消费者方,消费者需要通过队列的名字去队列取消息。因此,队列是rabbitmq中连接生产者与消费者的桥梁。
当开始使用时,需要的是一个空的、未被使用的队列;当连接断开,需要将队列关闭。
1、队列取名
为了保证队列是一个全新的队列,需要将给队列取一个随机的名字。rabbitmq提供了系统随机给生成队列名的方式,如下:
list($queue_name, ,) = $channel->queue_declare("");
即,当queue_declare时没有指定名字,采用空字符串"",则rabbitmq会给队列取一个随机的名字,形如amq.gen-JzTY20BRgKO-HjmUJj0wLg,则可以使用$queue_name,其就是队列的名字。
2、php的list
上述list是php中的list的用法,上述式子表示$channel->queue_declare("");的结果是一个含有3个元素的数组,将第一个元素赋值给$queue_name。
list()不是一个函数,而是类似array()一样的用法。
PHP官方文档中,list的示例:
$info = array(‘coffee‘, ‘brown‘, ‘caffeine‘);
list( , , $power) = $info;//$power=’caffeine’
四、绑定(binding)
上述提到,交换机必须与队列绑定,如果没有队列和交换机绑定,交换机会丢弃接收到的消息。绑定方式如下:
$channel->queue_bind($queue_name, ‘logs‘);
其中,第一个参数是队列的名字,第二个参数是交换机的名字。
在cli中,查看绑定的方式如下:rabbitmqctl list_bindings。
——written by linhxx
更多最新文章,欢迎关注微信公众号“决胜机器学习”,或扫描右边二维码。