码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ 消息队列

时间:2017-08-17 16:25:11      阅读:166      评论:0      收藏:0      [点我收藏+]

标签:无法   ons   报错   receive   1.2   mod   http   routing   one   

一、前提

Python中的队列:

1.线程QUEUE

  线程队列不能跨进程,只是单线程下的多个线程间的数据交互

2.进程QUEUE

  支持父进程于子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。

 

  因此,两个独立的程序之间是不能使用Python中的QUEUE实现交互。(因为每个程序是独立的,是一个独立的进程,所以Python的队列无法实现两个独立的程序之间的交互)。

  所以想要实现两个独立的进程间的通信,可以使用下面几种方法:

  如:

  在独立的程序之间建立socket,实现通信;

  将要通信的内容通过json写到硬盘中,a程序写入到硬盘中,b程序中硬盘中读取(耗时);

  使用broker中间商代理,a程序与代理建立socket,b程序也与代理建立socket,a程序的消息发给代理,代理再发给a程序(易于维护)。

  本节主要介绍broker的使用,目前流行的broker有RabbitMQ,ZeroMQ,ActiveMQ,MSMQ。

 

二、RabbitMQ 消息队列介绍

  RabbitMQ是用erlang开发的,所以RabbitMQ依赖于erlang语言。在windows上安装和使用RabbitMQ的时候,要先装上erlang语言。

 技术分享

  在windows上安装完成erlang和RabbitMQ之后就自动启动,RabbitMQ在任务管理器的“服务”中可以进行查看确认。

  RabbitMQ支持多种语言,如:Java,.NET,Ruby,Python,PHP,JavaScript等,可以在官网 http://www.rabbitmq.com查看相应的语言支持的模块。
  在Python中:

  pika,a pure-Python AMQP 0-9-1 client ( source code ,API reference )  

  Celery ,a distributed task queue for Django and pure Python

  Halgha, an asynchrounous AMQP 0-0-1 client based on libevent ( the source code and docs are on github )

  其中,pika是本文介绍使用的模块。

 

三、RabbitMQ基础信息

在RabbitMQ中,关于在Python中如何使用的官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

3.1 RabbitMQ官方介绍

RabbitMQ是一个消息代理(中间商):它接收并推送消息。

想象一下,有一个邮政局:当你把想要邮寄出去的信放入邮箱,你可以确定邮政员先生最终可以将你的信发送到你的收信方。类似的,RabbitMQ在这种情况下,就是一个邮箱,邮政局和邮政员先生。

RabbitMQ与邮政局最大的区别是,邮政局可以发送纸质的信息,但是RabbitMQ不行。RabbitMQ只接收,存储和发送二进制的数据信息。

RabbitMQ一般情况下使用一些术语来传输信息。

3.1.1 数据定义

技术分享

Producing就是发送。发送信息的程序是生产者。用P来指代。

技术分享

Consuming就是接收。接收信息的程序就是消费者。用C来指代。

技术分享

queue(队列)是在RabbitMQ内的一个邮箱的名称。尽管消息通过RabbitMQ和你的应用程序传输,信息只能被存储在queue中。队列仅有主机的内存和字旁限制绑定,它本质上是一个大的消息缓冲区。许多生产者可以发送信息到指定的队列,许多的消费者可以从指定的队列中收取信息。用queue_name表示。

注意:生产者,消费者和代理不必驻留在同一台主机上。在实际大多数应用中,也是如此。

3.1.2 RabbitMQ库介绍 

RabbitMQ说的是AMQP 0.9.1,它是一个开放的、通用的消息传递协议。有许多不同语言的RabbitMQ客户端。在这个实例中,我们将使用Pika,这是RabbitMQ团队推荐的Python模块。要安装它,您可以使用pip包管理工具。

pip install pika

3.1.3 生产者,消费者和代理之间实现通信的图示 

技术分享

在生产者消费者模型中,生产者产生的信息发送隔离Broker,由Broker发送给相应的消费者。

 

四、在Python中利用pika模块实现不同类型的队列通信

实例1:最简单的使用pika实例——Hello World!

在官方文档中,关于使用pika实现的最简单的Hello World案例中,将用Python写两个小程序。Proceder(发送方)发送信息,Consumer(接收方)接收信息并且打印这些信息。要传输的信息就是‘Hello World!‘

在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。

我们的整体设计将会是:

技术分享

生产者将消息发送到“hello”队列。消费者从该队列接收消息。

 

Sending

 技术分享

第一个程序send.py将会把单条信息‘Hello World!‘发送给队列hello。在send.py文件中,我们需要做的第一个事就是建立和RabbitMQ server的连接。

import pika  #导入模块pika

connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 
channel=connection.channel()  #建立和RabbitMQ server的连接我们 

通过上面步骤,我们的send.py小程序实现了连接到本地机器上的broker(代理)。如果我们想要连接到另一台机器上的代理,我们只需在这里指定它的名称或IP地址。

 

接下来,在发送之前,我们需要确定收件人队列是否存在。如果我们将消息发送到不存在的位置,RabbitMQ将会删除该消息。让我们创建一个名为hello的队列,send发送的消息将会发送到hello队列:

channel.queue_declare(queue=‘hello‘)

在这时,我们准备发送一条消息。我们的第一个消息仅为‘Hello World !‘字符串。我们想把它发送到我们的hello队列。

 

在RabbitMQ中,消息从来不能直接发送到队列中,它总是需要通过交换。但是,让我们不要被细节所拖累——如果想要了解关于exchange的内容,可以在本教程的第三部分阅读更多关于交流exchange的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换(exchange=‘‘)。这个交换exchange是特殊的——它允许我们精确地指定消息应该发送到哪个队列。在routing_key参数中需要指定队列名称:

channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=‘Hello World!‘)
print("[x] Sent ‘Hello World!‘")

在退出程序之前,我们需要确保网络缓冲区已被刷新,并且我们的消息已发送到RabbitMQ。我们可以关闭连接。

connection.close()

 

综上所述:

 1 import pika
 2 #第一步:与RabbitMQ server建立连接
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 4 channel=connection.channel()
 5 
 6 #第二步:生产指定要发送的队列
 7 channel.queue_declare(queue=hello)
 8 
 9 #第三步:通过交换exchange,发送数据到相应的队列;通过routing_key指定queue name;body中存放要传输的信息
10 channel.basic_publish(exchange=‘‘,routing_key=hello,body=Hello World!)
11 print("[x] Sent ‘Hello World!‘")
12 
13 #第四步:关闭连接
14 connection.close()

 

可能出现的问题:Sending doesn‘t work!

如果这是你第一次使用RabbitMQ,而你没有看到“发送”的消息,那么你可能会挠头,想知道到底出了什么问题。可能代理开始时没有足够的空闲磁盘空间(默认情况下,它至少需要200 MB),因此代理拒绝接收消息。检查代理日志文件以确认并减少必要的限制。配置文件文档将向您展示如何设置disk_free_limit。

 

 

Receiving

技术分享

 

 

  

我们的第二个程序receive.py将从队列queue接收消息并在屏幕上打印它们。

首先,我们需要连接到RabbitMQ服务器。连接到RabbitMQ的代码和send.py中一样。

import pika  #导入模块pika

connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 
channel=connection.channel()  #建立和RabbitMQ server的连接我们 

  

第二步,就像send.py一样,确保队列存在。使用queue_declare创建队列是具有幂等性的——我们可以按照我们喜欢的次数运行这个命令,并且只创建一个命令。

channel.queue_declare(queue=‘hello‘)

  您可能会问为什么我们再次声明队列——我们已经在以前的代码中声明了它。如果我们确信队列已经存在,我们可以避免这种情况。例如,如果send.py程序之前运行过。但是我们还不能确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是很好的做法。

 

Listing queues中看RabbitMQ中的queues和messages

您可能希望看到RabbitMQ中有哪些队列,有多少消息在队列中。可以使用rabbitmqctl工具(作为特权用户):

sudo rabbitmqctl list_queues

在Windows上,省略了sudo

rabbitmqctl.bat list_queues

  

从队列接收消息更加复杂。它的工作方式是订阅一个回调函数到一个队列。当我们收到消息时,这个回调函数被Pika库调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。

def callback(ch,method,properties,body):
    print("[x] Received %r"%body)

  

 接下来,我们需要告诉RabbitMQ,这个特定的回调函数应该接收来自我们的hello队列的消息:

channel.basic_consume(callback,queue=‘hello‘,no_ack=True)

Receiving.py要成功运行,我们必须确保我们希望订阅的队列是存在的。幸运的是,我们对此有信心——我们已经创建了一个上面的队列——使用queue_declare。

no_ack参数在之后会进行解释。

 

最后,我们输入一个永无止境的循环,等待数据并在必要时运行回调函数。

print("[x] Waiting for messages.To exit press CTRL")
channel.start_consuming()

 

综上所述:

 1 import pika
 2 #第一步:与RabbitMQ server建立连接
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 4 channel=connection.channel()
 5 
 6 #第二步:生产指定要接收的队列
 7 channel.queue_declare(queue=hello)
 8 
 9 #第三步:用回调函数到队列queue中,接收到信息时回调函数会被Pika库调用
10 def callback(ch,method,properties,body):
11     print("[x] Received %r"%body)
12 
13 #第四步:告诉RabbitMQ,这个特定的回调函数将接收来自hello队列的信息
14 channel.basic_consume(callback,queue=hello,no_ack=True)
15 
16 #第五步:进入死循环等待数据,在必要的时候运行回调函数。
17 print("[x] Waiting for messages.To exit press CTRL + C")
18 channel.start_consuming()

 

现在我们就完成了一个一对一的Producer和Consumer的单条消息队列传输。

首先运行receive.py,它会一直等待数据。再运行send.py之后,Producer会在数据传输完成后停止。Consumer还会继续保持等待数据的状态。(receive.py可以一直运行,但是可能可以被Ctrl+C打断)

消息分发轮询:

先运行N个receive.py,再运行send.py。每个receive.py会按照运行的先后相继收到新触发的send.py的消息。每个Consumer轮询地去接收Producer的信息。

 

实例2:构建一个简单的工作队列 Work queues

技术分享

 

在上面的第一个"Hello World!"的例子中,我们编写了一些程序来从一个已命名的队列发送和接收消息。在这个过程中,我们将创建一个工作队列work queue,用于在多个工作之间分配耗时的任务。

工作队列work queues(即任务队列task queues)背后的主要思想是避免立即执行资源密集型任务,并避免进程必须等待它完成。相反,我们可以把任务安排在以后做。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程会弹出任务并最终执行任务。当你运行多个工作时,任务将在他们之间共享。

Work queues这个概念在web应用程序中特别有用,因为在短HTTP请求窗口中无法处理复杂的任务。

Preparation

在教程的"Hello World!"例子中,我们发送了一个包含“Hello World !”的消息。现在我们将发送用于复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或者呈现pdf文件,所以让我们通过使用time . sleep()函数来假装我们很忙。我们取字符串中的点dot的个数作为它的复杂度的依据(假设情况);字符串中有多少个dot,就说明处理该条字符串需要多少秒例如,一个“Hello…”字符串中有三个点,因此假定处理该字符串需要三秒钟。

我们会稍微修改一下send.py,使得Producer允许从命令行发送任意消息。这个修改过的程序将任务调度到我们的工作队列中。我们将其命名为new_task.py:

 1 import sys
 2 import pika
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=hello)
10 
11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
12 message=‘‘.join(sys.argv[1:]) or "Hello World!"
13 channel.basic_publish(exchange=‘‘,routing_key=hello,body=message)
14 print("[x] Sent %r"%message)
15 
16 #step4:close
17 connection.close()

 

我们也要稍微修改一下receive.py,使得接收的数据不立即在屏幕上打印,而是等待数据处理(依据字符串的dot数作为处理时长)完成后再向屏幕打印数据。新的Consumer将被命名为work.py:

 1 import pika
 2 import time
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=hello)
10 
11 #step3:define callback function.
12 def callback(ch,method,properties,body):
13     print([x] Received %r%body)
14     time.sleep(body.count(b.))
15     print([x] Done.)
16 
17 #step4:when Consumer receive message,the program call callback function to deal with message.
18 channel.basic_consume(callback,queue=hello,no_ack=True)
19 
20 print([x] Waiting for messages. To exit press "CTRL+C")
21 
22 #write an endless loop.
23 channel.start_consuming()

 

 

Round-robin dispatching循环调度

使用任务队列的优点之一是能够很容易地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务,这样就可以很容易地扩大规模。

首先,让我们同时运行两个worker.py脚本。他们将从队列中获取消息,但具体如何呢?让我们来看看。

你需要三个控制台。两个将会运行这个worker.py脚本。这些控制台将是我们的两个消费者——C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

  

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

在第三个console,可以发布Producer任务:

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

返回结果:

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received ‘First message.‘
# => [x] Received ‘Third message...‘
# => [x] Received ‘Fifth message.....‘

  

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received ‘Second message..‘
# => [x] Received ‘Fourth message....‘

  默认情况下,RabbitMQ将按顺序将每个消息发送到下一个消费者。平均每个消费者将得到相同数量的消息。这种分配消息的方式称为循环。

 

Message acknowledgment消息确认

完成一个任务可能需要几秒钟。如果其中一个消费者开始了一项很长的任务,而只在一定程度上完成了,那么会发生什么呢?使用我们当前的代码,一旦RabbitMQ将消息传递给Consumer,RabbitMQ就会立即从内存中删除队列中的这条信息。在这种情况下,如果Consumer中断,我们将失去这个Consumer正在处理的信息。我们也将丢失所有发送给这个特定Consumer的消息,但是实际上这条信息还没有被Consumer处理。 

但我们不想失去任何任务信息。如果一个Consumer中断,我们希望把任务交给另一个Consumer。

为了确保消息不会丢失,RabbitMQ支持消息确认。Consumer在处理完该任务信息后,会给RabbitMQ发送一个ack(全称 acknowledgement),告诉RabbittMQ可以删除它。

如果一个Consumer中断(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而没有发送ack,RabbitMQ将理解一条消息没有被完全处理,并且将重新排队。如果同时有其他Consumer在运行,它会很快将其转递给另一个Consumer。这样我们就可以确信,即使Consumer偶尔死亡,也不会失去任何信息。 

其中,没有任何消息超时的限制。RabbitMQ将在Consumer中段后重新传递消息。即使处理消息需要很长时间,也不会由于超时原因导致失败。 

默认情况下打开消息确认。在前面的例子中,我们明确地通过no_ack = True标记关闭了它们。

所以,在默认情况下,我们都不用设置no_ack,直接使用默认的no_ack=False即可。这样确保每次Consumer收到处理完消息后给Producer返回ack消息。

def callback(ch, method, properties, body):
    print ("[x] Received %r" % (body,))
    time.sleep( body.count(‘.‘) )
    print ("[x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue=‘hello‘)  #删除了no_ack=True

  把worker.py脚本中的代码按照上面的代码进行修改,就能保证Consumer中止,未给RabbitMQ发送acknowlegement,队列中的信息不会被删除。再次运行新的Consumer,信息还是会再次发送给新的在运行中的Consumer。

 

Forgotten acknowledgment

  忽略basic_ack是一个常见的错误。这是一个简单的错误,但后果却是严重的。当您的客户端退出时,消息将会被重新发送(看起来可能是随机的再次发送),但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未加处理的消息。

  为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

  如果脚本中忽略了basic_ack,那么队列中的信息会不停地在内存中堆积,队列中已经删除的信息,在内存中还是会放着。

c:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.10\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Listing queues
hello   0       3

  通过上面在终端中查看的结果发现,队列中所有的信息已经被Consumer处理完后,messages

_unacknowleged的数量并未随着Consumer处理掉的信息(序列中删除的信息)的结果而减少相应的条数。

  加上了basic_ack之后,随着Consumer的处理结束,messages_unacknowleged的数量也随之减少,最后降为0.

技术分享

 

Message durability 消息持久化 

我们已经学会了如何确保即使Consumer中断或停止,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。 

当RabbitMQ退出或崩溃时,它将忘记队列queues和消息messages。为了将队列和消息持久化,我们需要做两件事.。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要宣布它是持久的:

channel.queue_declare(queue=‘hello‘,durable=True)

尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义一个具有不同参数的现有队列,并将对试图执行此操作的任何程序返回一个错误。但是有一个快速的解决方案——让我们用不同的名称来声明一个队列,例如task_queue:

channel.queue_declare(queue=‘task_queue‘, durable=True)

上面的queue_declare的修改在Producer和Consumer之间都需要修改。

在这一点上,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要把我们的信息标记为persisitent持久的。标记信息为persisitent,可以通过下面的代码实现:

channel.basic_publish(exchange=‘‘,
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

  

Note on message persistence

将消息标记为持久的并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并保存消息期间,仍然有很短的时间窗口。而且,RabbitMQ对每条消息都不执行fsync(2),它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于简单的任务队列来说已经足够了。如果你需要一个更有力的保证,那么你可以使用publisher confirms.

 

Fair dispatch 消息公平分发

您可能已经注意到,调度仍然不像我们希望的那样工作。例如,在一个有两个Consumer在运行的情况下,当部分消息需要很长时间处理,部分消息只需极短的时间处理时,一个Consumer会不停地在处理,另一个Consumer几乎没有什么计算量。RabbitMQ对此一无所知,并且仍然还是会轮询地平均分配消息。

这是因为当消息进入队列时,RabbitMQ才会发送一条消息。它不考虑Consumer未确认的消息messages_noackownledgement的数量。它只是盲目地向n个Consumer发送n个消息。

 技术分享

为了解决上面的这个问题,我们可以是basic.qos方法,将prefetch_count设置为1。

channel.basic_qos(prefetch_count=1)

这告诉RabbitMQ一次不给一个Consumer发送一个以上的消息。或者,换句话说,在Consumer没有处理并确认之前的消息之前,不要向员工发送新消息。相反,它会把它发送给下一个不在处理的Consumer。

Note about queue size

 如果所有在运行中的Consumers都在处理中。如果想要对此保持关注,那么可能需要增加更多的Consumer,或者使用message TTL

 

最后,下面是实现消息确认+消息和队列持久化+消息公平分发的Work queues:

 new_task.py:

 1 import sys
 2 import pika
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=hello,durable=True)  #durable=True实现队列持久化
10 
11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数
12 message=‘‘.join(sys.argv[1:]) or "Hello World!"
13 channel.basic_publish(exchange=‘‘,routing_key=hello,body=message,
14                       properties=pika.BasicProperties(delivery_mode=2,)  #make message persisitent实现消息持久化
15                       )
16 print("[x] Sent %r"%message)
17 
18 #step4:close
19 connection.close()

 

work.py:

 1 import pika
 2 import time
 3 
 4 #step1:create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
 6 channel=connection.channel()
 7 
 8 #step2:create a named queue
 9 channel.queue_declare(queue=hello,durable=True)  #durable=True实现队列持久化,C和P都要保持一致
10 
11 #step3:define callback function.
12 def callback(ch,method,properties,body):
13     print([x] Received %r%body)
14     time.sleep(body.count(b.))
15     print([x] Done.)
16     ch.basic_ack(delivery_tag=method.delivery_tag)  #basic_ack是消息确认的不可缺少的一部分,少了它内存中的messages_noacknowledgement不会自动较少
17 
18 
19 #step4:when Consumer receive message,the program call callback function to deal with message.
20 channel.basic_qos(prefetch_count=1)  #消息公平分发Fair dispatch ,设置RabbitMQ只向每个C发送一条message
21 channel.basic_consume(callback,queue=hello) #使用no_ack=False的默认值,确保消息确认
22 
23 print([x] Waiting for messages. To exit press "CTRL+C")
24 
25 #write an endless loop.
26 channel.start_consuming()

 

实例3:构建一个简单的日志记录系统:消息发布\订阅Publish/Subscribe

在第二个实例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都交付给一个worker。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。

为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,接收程序Consumer的每一个运行副本都将得到消息。这样我们就能运行一个接收器,并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上打印日志。

基本上,发布的日志消息将被所有Consumer接收,这种类型叫做广播broadcast。

 

Exchanges

在前两个实例中,Producer发送消息到队列,Consumer从队列中接收消息列。现在,我们来介绍一下RabbitMQ的完整消息传递模型。

让我们快速回顾一下前面的教程中介绍的内容:

  生产者是发送消息的用户应用程序。

  队列是存储消息的缓冲区。

  消费者是接收消息的用户应用程序。

RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到哪个队列。

生产者只能将消息发送到exchange。exchange接收来自生产者的消息,再将消息推送到队列中。

exchange必须知道如何处理它收到的消息:

  它是否应该附加到特定的队列?

  它应该被附加到许多队列吗?

  或者应该被抛弃。

这些规则由交换类型定义。

 

技术分享

可用的交换类型有:direct,topic,headers,fanout。

1.fanout  以类似广播的方式向所有的C发送信息:所有bind到类型为fanout的这个exchange的queue都可以接收信息

2.direct  通过routingKey和exchange决定的那个唯一的queue可以接收消息

3.topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

4.headers

 

在这个实例中,我们将使用fanout类型的exchange来实现。

channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘)

上面的代码声明了类型为fanout,名为logs的exchange。

 

fanout交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。

代码实现:

channel.basic_publish(exchange=‘log‘,  #exchange的名称为logs
                      routing_key=‘‘,   #不指定序列名称
                      body=message)
                 

  

 

Listing exchanges

想要内存在RabbitMQ Server中的所有exchanges,也可以使用rabbitmqctl工具:

rabbitmqctl list_exchanges

在返回的列表中会有一些amq.*交换 和 默认的(未命名)交换。这些exchange是默认创建的,但现在我们不太可能需要使用它们。

 技术分享

 

The default exchange

在本教程的前几部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,我们通过空字符串(“”)来识别它。

交换参数是交换的名称。空字符串表示默认或匿名交换:消息被发送到参数routing_key指定的队列中去。

 

Temporary queues 临时队列

在前两个实例中,我们使用有指定名称的队列(如:hello)。对我们来说,能够命名队列是至关重要的——我们需要将Consumer指向相同的队列。当您想要在生产者和消费者之间共享队列时,给队列命名是很重要的。

但在这个实例中,命名队列的情况却并非如此。

我们想要听到所有日志信息,而不仅仅是他们的子集。我们也只对当前flowing messages感兴趣,而不是旧消息。要解决这两个问题,我们需要两步。

第一步,当我们连接RabbbitMQ时,我们需要一个新的空队列。我们可以创建一个随机名称的队列或让server选择一个随机队列名称。这个可以通过不在queue_declare()中设置参数queue来实现:

result = channel.queue_declare()

result.method.queue包含了一个随机队列名。比方说“amq.gen-JzTY20BRgKO-HjmUJj0wLg”这样的类似的名称。

第二步,一旦Consumer不和该随机名称的队列连接了,这个队列就可以删除。通过设置exclusive参数实现。

result = channel.queue_declare(exclusive=True)  

 

Bindings

技术分享

 我们已经创建了一个类型为fanout的exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列中去。exchange和queue之间的联系被称为binding绑定。

channel.queue_bind(exchange=‘logs‘,
            queue=result.method.queue #result.method.queue中生成一个随机名的queue )

从现在起,名为logs的exchange将会向我们的队列中添加消息。

 

Listing bindings

 我们可以通过rabbitmqctl工具列出现有的bindings。

rabbitmqctl list_bindings

  

综上所述:

技术分享

Producer程序负责发送日志信息,大部分实现和前面两个实例中的Producer没有大的区别。最重要的变化是,我们现在想要把消息发布到名为logs,类型为fanout的exchange中去,而不是默认的exchange。我们需要在发送时提供一个routing_key,但是它的值被忽略了。

下面是Producer的代码,emit_log.py:

 1 import pika
 2 import sys
 3 
 4 #step1:create connection with RabbitMQ server
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 6 channel = connection.channel()
 7 
 8 #step2:declare a fanout exchange
 9 channel.exchange_declare(exchange=logs,
10                          type=fanout)
11 
12 message =  .join(sys.argv[1:]) or "info: Hello World!"
13 
14 #step3:send message to exchange ‘log‘
15 channel.basic_publish(exchange=logs,
16                       routing_key=‘‘,
17                       body=message)
18 print(" [x] Sent %r" % message)
19 
20 #step4:close
21 connection.close()

正如您所看到的,在建立连接之后,我们声明了exchange。这个步骤是必要的,因为将信息发送给一个不存在的exchange是不行的。 

如果没有队列绑定到交换中,消息将丢失,但这对我们来说是可以的;如果没有消费者在听,我们可以安全地丢弃这个消息。

 

receive_logs.py的代码:

 1 import pika
 2 
 3 #step1:create connection with RabbitMQ server
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 5 channel = connection.channel()
 6 
 7 #step2:declare a fanout exchange
 8 channel.exchange_declare(exchange=logs,
 9                          type=fanout)
10 
11 #step3:declare a random name queue
12 result = channel.queue_declare(exclusive=True)  #exclusive=True使得生成的随机名称的queue在使用后在内存中删除。
13 queue_name = result.method.queue  #result.method.queue生成一个随机名称的queue
14 
15 #step4:create a bind with logs and queue
16 channel.queue_bind(exchange=logs,
17                    queue=queue_name)
18 
19 print( [*] Waiting for logs. To exit press CTRL+C)
20 
21 #step5:define a callback function
22 def callback(ch, method, properties, body):
23     print(" [x] %r" % body)
24 
25 #step6:call a callback function when C receives a message
26 channel.basic_consume(callback,
27                       queue=queue_name,
28                       no_ack=True)
29 
30 #step7:write a endless loop
31 channel.start_consuming()

Question:为什么在Consumer端需要一个随机名称的queue呢?

因为Producer不会直接把消息发送给queue,而是先发送给exchange。exchange(fanout类型的)收到消息后,会遍历绑定在这个exchange上的所有的queue,一次将消息发送给所有与之bind的queue。再由quque将消息发送给Consumer。

由于在这个过程中,queue在Consumer接收处理完数据后就自动删除了。所以就用了随机名称的queue。

 

 

实例4:日志记录系统有选择的接收消息:Routing 

在实例3中,我们构建了一个简单的日志记录系统。通过这个简单的日志记录系统,我们可以像广播的形式将日志信息发送给许多接收端。

在实例4中,我们会给它增加一些特性——我们将使它可以只订阅消息的一部分,而不是全部消息。例如,我们将能够将关键error消息直接发给日志文件(以节省磁盘空间),同时还能够在控制台上打印所有日志消息。

 

Bindings

在前面的示例中,我们已经创建了binding。创建binding的代码:

channel.queue_bind(exchange=exchange_name,queue=queue_name)

绑定binding是交换exchange和队列queue之间的关系。换言之,队列对bind的exchange传来的消息感兴趣。

绑定binding可以使用一个额外的参数routing_key。为了避免与basic_publish参数的混淆,我们将把它称为绑定键binding_key。下面是创建一个具有键binding_key的绑定binding:

channel.queue_bind(exchange=exchange_name,queue=queue_name,
                   routing_key=‘black‘) #增加了routing_key参数

绑定键binding_key的含义取决于交换类型exchange type。实例3中使用的fanout exchange,只是忽略了它的值。

 

Direct exchange

我们希望扩展实例3中的日志记录系统的功能,依据消息的严重性来过滤消息。

例如,我们可能想要将日志消息写入磁盘的脚本只接收关键错误error,而不是在警告warning或信息info日志消息上浪费磁盘空间。

实例3中使用的是fanout exchange,它不会给我们太多的灵活性——它只会无意识地将信息传递给所有bind它的队列。

想要实现这个扩展的功能,实现日志信息记录的过滤接收,可以使用direct exchange。direct exchange的算法很简单——一条消息传递给队列,queue的绑定键binding_key与publish消息的路由键routing_key完全匹配。

 

为了说明这一点,请考虑以下设置:

技术分享

在这个设置中,我们可以看到direct exchange和两个queues绑定。第一个队列与绑定键为orange的exchange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定键位green。

在这样的设置中,通过routing_key=orange的direct exchange发送的消息,将会被Q1接收。通过routing_key=black或routing_key=green的direct exchange发送的消息,将会被Q2接收。所有其他消息将被丢弃。

 

 

Multiple bindings

 技术分享

用相同的绑定键binding key绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键black。在这种情况下,direct exchange表现为fanout,并将消息广播到所有匹配队列。一个带有routing_key=black的消息将被发送到Q1和Q2。

 

Emitting logs

我们将在我们的日志系统中使用这个模型。

我们将发送消息到direct exchange。

我们将提供日志的严重性log serverity作为路由键routing key。

这样Consumer将能够选择它想要接收的信息。让我们先关注一下emiiting logs。

 

需要创建一个exchange:

channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘)

发送信息:

channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)

为了简化问题,我们假设“严重性”severity可以是“信息”info、“警告”warning、“错误”error。

 

Subscribing

 

接收信息像实例3一样运行,只有一个例外——我们将为每个serverity创建一个新的binding。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

  

综上所述:

技术分享

emit_log_direct.py的代码:

 1 import pika
 2 import sys
 3 
 4 #step1: create connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 6 channel = connection.channel()
 7 
 8 #step2: declare a  direct exchange
 9 channel.exchange_declare(exchange=direct_logs,
10                          type=direct)
11 
12 #step3: aacept message severity
13 severity = sys.argv[1] if len(sys.argv) > 1 else info
14 message =  .join(sys.argv[2:]) or Hello World!
15 
16 #step4: send message to exchange with the specific severity
17 channel.basic_publish(exchange=direct_logs,
18                       routing_key=severity,
19                       body=message)
20 print(" [x] Sent %r:%r" % (severity, message))
21 
22 #step5:close
23 connection.close()

 

 

receive_logs_direct.py的代码:

 1 import pika
 2 import sys
 3 
 4 #step1: create connection with RabbitMQ server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 6 channel = connection.channel()
 7 
 8 #step2: declare a direct exchange 
 9 channel.exchange_declare(exchange=direct_logs,
10                          type=direct)
11 
12 #step3:declare a random name queue
13 result = channel.queue_declare(exclusive=True)
14 queue_name = result.method.queue
15 
16 #step4: 
17 severities = sys.argv[1:]
18 if not severities:
19     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
20     sys.exit(1)
21 
22 for severity in severities:
23     channel.queue_bind(exchange=direct_logs,
24                        queue=queue_name,
25                        routing_key=severity)
26 
27 print( [*] Waiting for logs. To exit press CTRL+C)
28 
29 #step5: define a callback function 
30 def callback(ch, method, properties, body):
31     print(" [x] %r:%r" % (method.routing_key, body))
32 
33 #step6: call a callback function when receive a message 
34 channel.basic_consume(callback,
35                       queue=queue_name,
36                       no_ack=True)
37 
38 #step7: write a endless loop
39 channel.start_consuming()

 

 

实例5:日志记录系统有选择的接收消息升级版:Topics

在实例4中,我们改进了日志系统。我们使用了一个direct exchange,而不是使用一个仅能进行虚拟广播的fanout exchange。在实例4中实现了选择性地接收日志的可能性。 

但是实例4改进过的日志系统还是具有它的局限性——不能基于多个标准进行选择性地接收信息。

在我们的日志系统中,我们可能希望订阅的不仅是基于严重性severity为标准的日志,还应该基于发出日志的来源。我们可能从syslog unix工具中了解这个概念,该工具根据严重性severity(info / warn / crit…)和设施facility(auth / cron / kern…)来发送日志。 

这将给我们带来很大的灵活性——我们可以收到来自“cron”和“kern”的关键错误。

要在我们的日志系统中实现这一点,我们需要了解一个更复杂的topic exchange。

 

Topic exchange

发送到topic exchange的消息不能有任意的routing_key——它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定一些与消息相关的特性。一些有效的routing_key示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。在routing_key中可以有任意多个单词,最多可达255个字节。

绑定键binding key也必须以相同的形式出现。top exchange背后的逻辑类似于direct exchange——一个带有特定routing key的消息将被传送到绑定一个匹配绑定键binding_key的所有队列。然而,绑定键有两个重要的特殊情况: 

  *(star)可以代替一个词。

  #(hash)可以替代零个或更多的词。

在一个例子中,最简单的解释是:

 技术分享

在这个例子中,我们将发送所有描述动物的信息。消息将发送一个由三个单词组成的routing key(两个点)。routing key中的第一个词将描述一个celerity,第二个词将描述一种colour,第三个词将描述一种species:“< celerity > . <colour> . <species>”。

我们创建了三个binding key:Q1绑定binding key " * . orange . * ",Q2 绑定binding key" *.*.rabbit "和" lazy.# "。 

这些绑定可以概括为:

  Q1对所有的orange动物都感兴趣。

  Q2对所有的rabbit都敢兴趣,同时也对所有的lazy的动物感兴趣。

一个routing_key=" quick.orange.rabbit "的消息,将会被发送到队列Q1,Q2中去;

一个routing_key=" lazy.orange.elephant "的消息,也将会被发送到队列Q1,Q2中去;

一个routing_key=" quick.orange.fox "的消息,仅会被发送到队列Q1中去;

一个routing_key=" lazy.brown.fox "的消息,仅会被发送到队列Q2中去;

一个routing_key=" lazy.pink.rabbit "的消息,仅会被发送到队列Q2中去,即便它有符合两条binding key " *.*.rabbit "和" lazy.# ";

一个routing_key=" quick.brown.fox "的消息,没有和任何一条binding key匹配上,所以这条消息会被Q1,Q2丢弃。

 

如果我们违反规定(routing_key只能是以两个dot分隔的三个单词列表),用一个或四个单词发送一条信息,比如一条信息的routing_key="orange"或"quick.orange.male.rabbit"。这些消息不匹配任何binding key,将丢失。

另一方面," lazy.orange.male.rabbit "。虽然它有四个单词,但它将匹配最后一个绑定" lazy.# ",并将被传送到队列Q2 。

 

Topic exchange

topic exchange很强大,它不仅能实现更复杂的数据筛选接收,还能实现fanout exchange和direct exchange一样的效果。

当队列与“#”(hash)binding key绑定时,它将接收所有消息,实现了fanout exchange的功能。

当特殊字符“*”(star)和“#”(hash)没有在binding key中使用时,实现了direct exchange的功能。

 

综上所述:

我们将用topic exchange实现一个新的日志系统。

我们假设日志的routing key将由两个单词组成:" <facility>.<severity>"。

emit_log_topic.py的代码:

 1 import pika
 2 import sys
 3 
 4 #step1:create a connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 6 channel = connection.channel()
 7 
 8 #step2:declare a topic exchange
 9 channel.exchange_declare(exchange=topic_logs, type=topic)
10 
11 #step3:receive routing_key message from sys.argv
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
13 message =  .join(sys.argv[2:]) or Hello World!
14 
15 #step4:send message to topic_logs
16 channel.basic_publish(exchange=topic_logs,
17                       routing_key=routing_key,
18                       body=message)
19 
20 print(" [x] Sent %r:%r" % (routing_key, message))
21 
22 #step5:close.
23 connection.close()

 

receive_logs_topic.py的代码:

 1 import pika
 2 import sys
 3 
 4 #step1:create a connection with RabbitMQ Server
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 6 channel = connection.channel()
 7 
 8 #step2:declare a topic exchange
 9 channel.exchange_declare(exchange=topic_logs,
10                          type=topic)
11 
12 #step3:declare a random name queue
13 result = channel.queue_declare(exclusive=True)
14 queue_name = result.method.queue
15 
16 #step4:receive messages from sys.argv
17 binding_keys = sys.argv[1:]
18 if not binding_keys:
19     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
20     sys.exit(1)
21 
22 #step5:bind a topic exchange with a queue 
23 for binding_key in binding_keys:
24     channel.queue_bind(exchange=topic_logs,
25                        queue=queue_name,
26                        routing_key=binding_key)
27 
28 print( [*] Waiting for logs. To exit press CTRL+C)
29 
30 #step6:create a callback function
31 def callback(ch, method, properties, body):
32     print(" [x] %r:%r" % (method.routing_key, body))
33 
34 #step7:call a callback function when receive a message
35 channel.basic_consume(callback,
36                       queue=queue_name,
37                       no_ack=True)
38 
39 #step8:write a endless loop
40 channel.start_consuming()

 

 

 

实例6:Remote procedure call (RPC)

在实例2中,我们学习了如何使用Work Queues在多个工作进程之间分配耗时的任务。(消息公平分发channel.basic_qos(prefetch_count=1) )

但如果我们需要在远程计算机上运行一个函数并等待结果呢?那是另一回事了。这种模式通常被称为远程过程调用或RPC。

在实例6中,我们将使用RabbitMQ构建一个RPC系统:客户机client和可伸缩的RPC服务器scalable RPC server。由于我们没有任何可分配的耗时任务,因此我们将创建一个能够返回斐波那契数列的虚拟RPC服务。

 

Client interface客户端接口

为了说明RPC服务如何使用,我们将创建一个简单的客户端类。它将公开一个名为call的方法,call方法将发送一个RPC请求和块,直到收到响应:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

  

A note on RPC

虽然RPC在计算中是一个很常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是RPC时,问题就出现了。这样的混淆导致了不可预测的系统结果,并增加了调试的复杂性。与简化软件不同,误用的RPC可能会导致代码无法维护。

考虑到这一点,有以下建议: 

  确保函数调用来源的明确性,调用的函数是本地的还是远程的清晰可判断。

  用文件记录系统。使组件之间的依赖关系变得清晰。

  处理错误情况。当RPC服务器停机很长时间时,客户机应该如何反应?

当有疑问要时要避免使用RPC。如果可以,应该使用异步管道asynchronous pipeline(而不是像rpc一样的阻塞进程),异步地将结果推进到下一个计算阶段。

 

Callback queue

一般来说,在RabbitMQ上执行RPC是很容易的。客户端发送请求消息,服务器响应消息。为了接收响应,客户端需要发送一个“callback” 队列地址。让我们试一试:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange=‘‘,
                      routing_key=‘rpc_queue‘,
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

  

Message properties

AMQP 0 - 9 - 1协议预先定义了一个14个属性集合。除以下情况外,大多数属性都很少使用:

  delivery_mode:将消息标记为持久性(值为2)或瞬态transient(任何其他值)。您可能还记得实例2中的这个属性。

  content_type:用于描述编码的MIME类型。例如,对于经常使用的JSON编码,将此属性设置为:

应用程序/ JSON是一种很好的做法。

  reply_to:通常用于命名回调队列callback queue。

  correlationship _id:将RPC响应与请求联系起来.

 

Correlation id

在上述方法中,我们建议为每个RPC请求创建一个回调队列。这很低效,但幸运的是有更好的方法——让我们为每个客户端创建一个回调队列。 

这引发了一个新的问题,在该队列中收到了响应,不清楚响应属于哪个请求。这就是使用correlation_id属性时的情况。我们将为每个请求设置唯一值。稍后,当我们在回调队列中收到消息时,我们将查看correlation_id,并基于correlation_id,我们将能够匹配响应和请求。如果我们看到一个未知的correlationship _id值,我们可以安全地丢弃该消息——它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是由于报错而失败呢?这是由于服务器端可能出现race condition。虽然不太可能,但是RPC服务器可能在发送答案后,但未发送请求确认acknowlegement信息前。如果发生这种情况,重新启动的RPC服务器将再次处理此请求。这就是为什么在客户端我们必须优雅地处理重复的响应,由于RPC服务是幂等的idempotent(意味着可以安全地对失败请求进行重试)。

 

综上所述:

技术分享

我们的RPC将这样工作: 

当客户机启动时,它创建一个匿名回调队列。

对于一个RPC请求,客户机发送一条信息(有两个属性:reply_to和correlationship_id):reply_to是用来设置回调序列的,correlation_id是用来设置每个请求的唯一值的。

请求被发送到一个rpc_queue队列。

RPC客户端在等待从这个队列中接收请求。当一个请求出现时,它处理完请求的工作并且发送处理结果信息给客户端时,通过检查correlation_id属性找到唯一的该请求,通过reply_to回调队列发送回客户端。

客户机在回调队列上等待数据。当消息出现时,它检查correlationship _id属性。如果它与请求的值相匹配,则返回对应用程序的响应。

 

rpc_server.py的代码:

 1 import pika
 2 
 3 #建立连接
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 5 channel = connection.channel()
 6 
 7 #声明队列
 8 channel.queue_declare(queue=rpc_queue)
 9 
10 #声明fibonacci函数,只假设有效的正整数输入。
11 def fib(n):
12     if n == 0:
13         return 0
14     elif n == 1:
15         return 1
16     else:
17         return fib(n-1) + fib(n-2)
18 
19 
20 def on_request(ch, method, props, body):
21     n = int(body)
22 
23     print(" [.] fib(%s)" % n)
24     response = fib(n)
25 
26     ch.basic_publish(exchange=‘‘,
27                      routing_key=props.reply_to,
28                      properties=pika.BasicProperties(correlation_id = 29                                                          props.correlation_id),
30                      body=str(response))
31     ch.basic_ack(delivery_tag = method.delivery_tag)
32 
33 channel.basic_qos(prefetch_count=1)  #为了在多个服务器上平均分配负载,我们需要设置prefetch_count设置。
34 channel.basic_consume(on_request, queue=rpc_queue)
35 #我们声明对basic_consume的回调,RPC服务器的核心。在接收请求时执行它。它执行工作并发送响应。
36 
37 print(" [x] Awaiting RPC requests")
38 channel.start_consuming()

 

 

 

rpc_client.py的代码:

 1 import pika
 2 import uuid
 3 
 4 class FibonacciRpcClient(object):
 5     def __init__(self):
 6         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
 7       #建立连接
 8         self.channel = self.connection.channel()
 9 
10         result = self.channel.queue_declare(exclusive=True)  #声明一个排他的回调队列
11         self.callback_queue = result.method.queue
12 
13         self.channel.basic_consume(self.on_response, no_ack=True,
14                                    queue=self.callback_queue)
15 
16     def on_response(self, ch, method, props, body):
17         if self.corr_id == props.correlation_id:
18             self.response = body
19 
20     def call(self, n):
21         self.response = None
22         self.corr_id = str(uuid.uuid4())
23         self.channel.basic_publish(exchange=‘‘,
24                                    routing_key=rpc_queue,
25                                    properties=pika.BasicProperties(
26                                          reply_to = self.callback_queue,
27                                          correlation_id = self.corr_id,
28                                          ),
29                                    body=str(n))
30         while self.response is None:
31             self.connection.process_data_events()
32         return int(self.response)
33 
34 fibonacci_rpc = FibonacciRpcClient()
35 
36 print(" [x] Requesting fib(30)")
37 response = fibonacci_rpc.call(30)
38 print(" [.] Got %r" % response)

 

RabbitMQ 消息队列

标签:无法   ons   报错   receive   1.2   mod   http   routing   one   

原文地址:http://www.cnblogs.com/zoe233/p/7327770.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!