标签:无法 ons 报错 receive 1.2 mod http routing one
Python中的队列:
1.线程QUEUE
线程队列不能跨进程,只是单线程下的多个线程间的数据交互
2.进程QUEUE
支持父进程于子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。
因此,两个独立的程序之间是不能使用Python中的QUEUE实现交互。(因为每个程序是独立的,是一个独立的进程,所以Python的队列无法实现两个独立的程序之间的交互)。
所以想要实现两个独立的进程间的通信,可以使用下面几种方法:
如:
在独立的程序之间建立socket,实现通信;
将要通信的内容通过json写到硬盘中,a程序写入到硬盘中,b程序中硬盘中读取(耗时);
使用broker中间商代理,a程序与代理建立socket,b程序也与代理建立socket,a程序的消息发给代理,代理再发给a程序(易于维护)。
本节主要介绍broker的使用,目前流行的broker有RabbitMQ,ZeroMQ,ActiveMQ,MSMQ。
RabbitMQ是用erlang开发的,所以RabbitMQ依赖于erlang语言。在windows上安装和使用RabbitMQ的时候,要先装上erlang语言。
在windows上安装完成erlang和RabbitMQ之后就自动启动,RabbitMQ在任务管理器的“服务”中可以进行查看确认。
RabbitMQ支持多种语言,如:Java,.NET,Ruby,Python,PHP,JavaScript等,可以在官网 http://www.rabbitmq.com查看相应的语言支持的模块。
在Python中:
pika,a pure-Python AMQP 0-9-1 client ( source code ,API reference )
Celery ,a distributed task queue for Django and pure Python
Halgha, an asynchrounous AMQP 0-0-1 client based on libevent ( the source code and docs are on github )
其中,pika是本文介绍使用的模块。
在RabbitMQ中,关于在Python中如何使用的官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
RabbitMQ是一个消息代理(中间商):它接收并推送消息。
想象一下,有一个邮政局:当你把想要邮寄出去的信放入邮箱,你可以确定邮政员先生最终可以将你的信发送到你的收信方。类似的,RabbitMQ在这种情况下,就是一个邮箱,邮政局和邮政员先生。
RabbitMQ与邮政局最大的区别是,邮政局可以发送纸质的信息,但是RabbitMQ不行。RabbitMQ只接收,存储和发送二进制的数据信息。
RabbitMQ一般情况下使用一些术语来传输信息。
Producing就是发送。发送信息的程序是生产者。用P来指代。
Consuming就是接收。接收信息的程序就是消费者。用C来指代。
queue(队列)是在RabbitMQ内的一个邮箱的名称。尽管消息通过RabbitMQ和你的应用程序传输,信息只能被存储在queue中。队列仅有主机的内存和字旁限制绑定,它本质上是一个大的消息缓冲区。许多生产者可以发送信息到指定的队列,许多的消费者可以从指定的队列中收取信息。用queue_name表示。
注意:生产者,消费者和代理不必驻留在同一台主机上。在实际大多数应用中,也是如此。
RabbitMQ说的是AMQP 0.9.1,它是一个开放的、通用的消息传递协议。有许多不同语言的RabbitMQ客户端。在这个实例中,我们将使用Pika,这是RabbitMQ团队推荐的Python模块。要安装它,您可以使用pip包管理工具。
pip install pika
在生产者消费者模型中,生产者产生的信息发送隔离Broker,由Broker发送给相应的消费者。
在官方文档中,关于使用pika实现的最简单的Hello World案例中,将用Python写两个小程序。Proceder(发送方)发送信息,Consumer(接收方)接收信息并且打印这些信息。要传输的信息就是‘Hello World!‘
在下面的图表中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。
我们的整体设计将会是:
生产者将消息发送到“hello”队列。消费者从该队列接收消息。
第一个程序send.py将会把单条信息‘Hello World!‘发送给队列hello。在send.py文件中,我们需要做的第一个事就是建立和RabbitMQ server的连接。
import pika #导入模块pika connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) channel=connection.channel() #建立和RabbitMQ server的连接我们
通过上面步骤,我们的send.py小程序实现了连接到本地机器上的broker(代理)。如果我们想要连接到另一台机器上的代理,我们只需在这里指定它的名称或IP地址。
接下来,在发送之前,我们需要确定收件人队列是否存在。如果我们将消息发送到不存在的位置,RabbitMQ将会删除该消息。让我们创建一个名为hello的队列,send发送的消息将会发送到hello队列:
channel.queue_declare(queue=‘hello‘)
在这时,我们准备发送一条消息。我们的第一个消息仅为‘Hello World !‘字符串。我们想把它发送到我们的hello队列。
在RabbitMQ中,消息从来不能直接发送到队列中,它总是需要通过交换。但是,让我们不要被细节所拖累——如果想要了解关于exchange的内容,可以在本教程的第三部分阅读更多关于交流exchange的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换(exchange=‘‘)。这个交换exchange是特殊的——它允许我们精确地指定消息应该发送到哪个队列。在routing_key参数中需要指定队列名称:
channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=‘Hello World!‘) print("[x] Sent ‘Hello World!‘")
在退出程序之前,我们需要确保网络缓冲区已被刷新,并且我们的消息已发送到RabbitMQ。我们可以关闭连接。
connection.close()
综上所述:
1 import pika 2 #第一步:与RabbitMQ server建立连接 3 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel=connection.channel() 5 6 #第二步:生产指定要发送的队列 7 channel.queue_declare(queue=‘hello‘) 8 9 #第三步:通过交换exchange,发送数据到相应的队列;通过routing_key指定queue name;body中存放要传输的信息 10 channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=‘Hello World!‘) 11 print("[x] Sent ‘Hello World!‘") 12 13 #第四步:关闭连接 14 connection.close()
可能出现的问题:Sending doesn‘t work!
如果这是你第一次使用RabbitMQ,而你没有看到“发送”的消息,那么你可能会挠头,想知道到底出了什么问题。可能代理开始时没有足够的空闲磁盘空间(默认情况下,它至少需要200 MB),因此代理拒绝接收消息。检查代理日志文件以确认并减少必要的限制。配置文件文档将向您展示如何设置disk_free_limit。
我们的第二个程序receive.py将从队列queue接收消息并在屏幕上打印它们。
首先,我们需要连接到RabbitMQ服务器。连接到RabbitMQ的代码和send.py中一样。
import pika #导入模块pika connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) channel=connection.channel() #建立和RabbitMQ server的连接我们
第二步,就像send.py一样,确保队列存在。使用queue_declare创建队列是具有幂等性的——我们可以按照我们喜欢的次数运行这个命令,并且只创建一个命令。
channel.queue_declare(queue=‘hello‘)
您可能会问为什么我们再次声明队列——我们已经在以前的代码中声明了它。如果我们确信队列已经存在,我们可以避免这种情况。例如,如果send.py程序之前运行过。但是我们还不能确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是很好的做法。
Listing queues中看RabbitMQ中的queues和messages
您可能希望看到RabbitMQ中有哪些队列,有多少消息在队列中。可以使用rabbitmqctl工具(作为特权用户):
sudo rabbitmqctl list_queues
在Windows上,省略了sudo
rabbitmqctl.bat list_queues
从队列接收消息更加复杂。它的工作方式是订阅一个回调函数到一个队列。当我们收到消息时,这个回调函数被Pika库调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。
def callback(ch,method,properties,body): print("[x] Received %r"%body)
接下来,我们需要告诉RabbitMQ,这个特定的回调函数应该接收来自我们的hello队列的消息:
channel.basic_consume(callback,queue=‘hello‘,no_ack=True)
Receiving.py要成功运行,我们必须确保我们希望订阅的队列是存在的。幸运的是,我们对此有信心——我们已经创建了一个上面的队列——使用queue_declare。
no_ack参数在之后会进行解释。
最后,我们输入一个永无止境的循环,等待数据并在必要时运行回调函数。
print("[x] Waiting for messages.To exit press CTRL") channel.start_consuming()
综上所述:
1 import pika 2 #第一步:与RabbitMQ server建立连接 3 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 4 channel=connection.channel() 5 6 #第二步:生产指定要接收的队列 7 channel.queue_declare(queue=‘hello‘) 8 9 #第三步:用回调函数到队列queue中,接收到信息时回调函数会被Pika库调用 10 def callback(ch,method,properties,body): 11 print("[x] Received %r"%body) 12 13 #第四步:告诉RabbitMQ,这个特定的回调函数将接收来自hello队列的信息 14 channel.basic_consume(callback,queue=‘hello‘,no_ack=True) 15 16 #第五步:进入死循环等待数据,在必要的时候运行回调函数。 17 print("[x] Waiting for messages.To exit press CTRL + C") 18 channel.start_consuming()
现在我们就完成了一个一对一的Producer和Consumer的单条消息队列传输。
首先运行receive.py,它会一直等待数据。再运行send.py之后,Producer会在数据传输完成后停止。Consumer还会继续保持等待数据的状态。(receive.py可以一直运行,但是可能可以被Ctrl+C打断)
消息分发轮询:
先运行N个receive.py,再运行send.py。每个receive.py会按照运行的先后相继收到新触发的send.py的消息。每个Consumer轮询地去接收Producer的信息。
在上面的第一个"Hello World!"的例子中,我们编写了一些程序来从一个已命名的队列发送和接收消息。在这个过程中,我们将创建一个工作队列work queue,用于在多个工作之间分配耗时的任务。
工作队列work queues(即任务队列task queues)背后的主要思想是避免立即执行资源密集型任务,并避免进程必须等待它完成。相反,我们可以把任务安排在以后做。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程会弹出任务并最终执行任务。当你运行多个工作时,任务将在他们之间共享。
Work queues这个概念在web应用程序中特别有用,因为在短HTTP请求窗口中无法处理复杂的任务。
在教程的"Hello World!"例子中,我们发送了一个包含“Hello World !”的消息。现在我们将发送用于复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或者呈现pdf文件,所以让我们通过使用time . sleep()函数来假装我们很忙。我们取字符串中的点dot的个数作为它的复杂度的依据(假设情况);字符串中有多少个dot,就说明处理该条字符串需要多少秒。例如,一个“Hello…”字符串中有三个点,因此假定处理该字符串需要三秒钟。
我们会稍微修改一下send.py,使得Producer允许从命令行发送任意消息。这个修改过的程序将任务调度到我们的工作队列中。我们将其命名为new_task.py:
1 import sys 2 import pika 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=‘hello‘) 10 11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数 12 message=‘‘.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=message) 14 print("[x] Sent %r"%message) 15 16 #step4:close 17 connection.close()
我们也要稍微修改一下receive.py,使得接收的数据不立即在屏幕上打印,而是等待数据处理(依据字符串的dot数作为处理时长)完成后再向屏幕打印数据。新的Consumer将被命名为work.py:
1 import pika 2 import time 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=‘hello‘) 10 11 #step3:define callback function. 12 def callback(ch,method,properties,body): 13 print(‘[x] Received %r‘%body) 14 time.sleep(body.count(b‘.‘)) 15 print(‘[x] Done.‘) 16 17 #step4:when Consumer receive message,the program call callback function to deal with message. 18 channel.basic_consume(callback,queue=‘hello‘,no_ack=True) 19 20 print(‘[x] Waiting for messages. To exit press "CTRL+C"‘) 21 22 #write an endless loop. 23 channel.start_consuming()
使用任务队列的优点之一是能够很容易地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务,这样就可以很容易地扩大规模。
首先,让我们同时运行两个worker.py脚本。他们将从队列中获取消息,但具体如何呢?让我们来看看。
你需要三个控制台。两个将会运行这个worker.py脚本。这些控制台将是我们的两个消费者——C1和C2。
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
在第三个console,可以发布Producer任务:
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
返回结果:
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received ‘First message.‘ # => [x] Received ‘Third message...‘ # => [x] Received ‘Fifth message.....‘
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received ‘Second message..‘ # => [x] Received ‘Fourth message....‘
默认情况下,RabbitMQ将按顺序将每个消息发送到下一个消费者。平均每个消费者将得到相同数量的消息。这种分配消息的方式称为循环。
完成一个任务可能需要几秒钟。如果其中一个消费者开始了一项很长的任务,而只在一定程度上完成了,那么会发生什么呢?使用我们当前的代码,一旦RabbitMQ将消息传递给Consumer,RabbitMQ就会立即从内存中删除队列中的这条信息。在这种情况下,如果Consumer中断,我们将失去这个Consumer正在处理的信息。我们也将丢失所有发送给这个特定Consumer的消息,但是实际上这条信息还没有被Consumer处理。
但我们不想失去任何任务信息。如果一个Consumer中断,我们希望把任务交给另一个Consumer。
为了确保消息不会丢失,RabbitMQ支持消息确认。Consumer在处理完该任务信息后,会给RabbitMQ发送一个ack(全称 acknowledgement),告诉RabbittMQ可以删除它。
如果一个Consumer中断(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而没有发送ack,RabbitMQ将理解一条消息没有被完全处理,并且将重新排队。如果同时有其他Consumer在运行,它会很快将其转递给另一个Consumer。这样我们就可以确信,即使Consumer偶尔死亡,也不会失去任何信息。
其中,没有任何消息超时的限制。RabbitMQ将在Consumer中段后重新传递消息。即使处理消息需要很长时间,也不会由于超时原因导致失败。
默认情况下打开消息确认。在前面的例子中,我们明确地通过no_ack = True标记关闭了它们。
所以,在默认情况下,我们都不用设置no_ack,直接使用默认的no_ack=False即可。这样确保每次Consumer收到处理完消息后给Producer返回ack消息。
def callback(ch, method, properties, body): print ("[x] Received %r" % (body,)) time.sleep( body.count(‘.‘) ) print ("[x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,queue=‘hello‘) #删除了no_ack=True
把worker.py脚本中的代码按照上面的代码进行修改,就能保证Consumer中止,未给RabbitMQ发送acknowlegement,队列中的信息不会被删除。再次运行新的Consumer,信息还是会再次发送给新的在运行中的Consumer。
忽略basic_ack是一个常见的错误。这是一个简单的错误,但后果却是严重的。当您的客户端退出时,消息将会被重新发送(看起来可能是随机的再次发送),但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未加处理的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
如果脚本中忽略了basic_ack,那么队列中的信息会不停地在内存中堆积,队列中已经删除的信息,在内存中还是会放着。
c:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.10\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged Listing queues hello 0 3
通过上面在终端中查看的结果发现,队列中所有的信息已经被Consumer处理完后,messages
_unacknowleged的数量并未随着Consumer处理掉的信息(序列中删除的信息)的结果而减少相应的条数。
加上了basic_ack之后,随着Consumer的处理结束,messages_unacknowleged的数量也随之减少,最后降为0.
我们已经学会了如何确保即使Consumer中断或停止,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列queues和消息messages。为了将队列和消息持久化,我们需要做两件事.。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了这样做,我们需要宣布它是持久的:
channel.queue_declare(queue=‘hello‘,durable=True)
尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您重新定义一个具有不同参数的现有队列,并将对试图执行此操作的任何程序返回一个错误。但是有一个快速的解决方案——让我们用不同的名称来声明一个队列,例如task_queue:
channel.queue_declare(queue=‘task_queue‘, durable=True)
上面的queue_declare的修改在Producer和Consumer之间都需要修改。
在这一点上,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要把我们的信息标记为persisitent持久的。标记信息为persisitent,可以通过下面的代码实现:
channel.basic_publish(exchange=‘‘, routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
将消息标记为持久的并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并保存消息期间,仍然有很短的时间窗口。而且,RabbitMQ对每条消息都不执行fsync(2),它可能只保存到缓存,而不是真正写入磁盘。持久性保证不强,但是对于简单的任务队列来说已经足够了。如果你需要一个更有力的保证,那么你可以使用publisher confirms.
您可能已经注意到,调度仍然不像我们希望的那样工作。例如,在一个有两个Consumer在运行的情况下,当部分消息需要很长时间处理,部分消息只需极短的时间处理时,一个Consumer会不停地在处理,另一个Consumer几乎没有什么计算量。RabbitMQ对此一无所知,并且仍然还是会轮询地平均分配消息。
这是因为当消息进入队列时,RabbitMQ才会发送一条消息。它不考虑Consumer未确认的消息messages_noackownledgement的数量。它只是盲目地向n个Consumer发送n个消息。
为了解决上面的这个问题,我们可以是basic.qos方法,将prefetch_count设置为1。
channel.basic_qos(prefetch_count=1)
这告诉RabbitMQ一次不给一个Consumer发送一个以上的消息。或者,换句话说,在Consumer没有处理并确认之前的消息之前,不要向员工发送新消息。相反,它会把它发送给下一个不在处理的Consumer。
如果所有在运行中的Consumers都在处理中。如果想要对此保持关注,那么可能需要增加更多的Consumer,或者使用message TTL。
最后,下面是实现消息确认+消息和队列持久化+消息公平分发的Work queues:
new_task.py:
1 import sys 2 import pika 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=‘hello‘,durable=True) #durable=True实现队列持久化 10 11 #step3:message的默认值为"Hello World!",如果terminal中传入了参数,则为参数 12 message=‘‘.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=message, 14 properties=pika.BasicProperties(delivery_mode=2,) #make message persisitent实现消息持久化 15 ) 16 print("[x] Sent %r"%message) 17 18 #step4:close 19 connection.close()
work.py:
1 import pika 2 import time 3 4 #step1:create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 6 channel=connection.channel() 7 8 #step2:create a named queue 9 channel.queue_declare(queue=‘hello‘,durable=True) #durable=True实现队列持久化,C和P都要保持一致 10 11 #step3:define callback function. 12 def callback(ch,method,properties,body): 13 print(‘[x] Received %r‘%body) 14 time.sleep(body.count(b‘.‘)) 15 print(‘[x] Done.‘) 16 ch.basic_ack(delivery_tag=method.delivery_tag) #basic_ack是消息确认的不可缺少的一部分,少了它内存中的messages_noacknowledgement不会自动较少 17 18 19 #step4:when Consumer receive message,the program call callback function to deal with message. 20 channel.basic_qos(prefetch_count=1) #消息公平分发Fair dispatch ,设置RabbitMQ只向每个C发送一条message 21 channel.basic_consume(callback,queue=‘hello‘) #使用no_ack=False的默认值,确保消息确认 22 23 print(‘[x] Waiting for messages. To exit press "CTRL+C"‘) 24 25 #write an endless loop. 26 channel.start_consuming()
在第二个实例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都交付给一个worker。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。
为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志系统中,接收程序Consumer的每一个运行副本都将得到消息。这样我们就能运行一个接收器,并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上打印日志。
基本上,发布的日志消息将被所有Consumer接收,这种类型叫做广播broadcast。
在前两个实例中,Producer发送消息到队列,Consumer从队列中接收消息列。现在,我们来介绍一下RabbitMQ的完整消息传递模型。
让我们快速回顾一下前面的教程中介绍的内容:
生产者是发送消息的用户应用程序。
队列是存储消息的缓冲区。
消费者是接收消息的用户应用程序。
RabbitMQ中的消息传递模型的核心思想是,生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到哪个队列。
生产者只能将消息发送到exchange。exchange接收来自生产者的消息,再将消息推送到队列中。
exchange必须知道如何处理它收到的消息:
它是否应该附加到特定的队列?
它应该被附加到许多队列吗?
或者应该被抛弃。
这些规则由交换类型定义。
可用的交换类型有:direct,topic,headers,fanout。
1.fanout 以类似广播的方式向所有的C发送信息:所有bind到类型为fanout的这个exchange的queue都可以接收信息
2.direct 通过routingKey和exchange决定的那个唯一的queue可以接收消息
3.topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
4.headers
在这个实例中,我们将使用fanout类型的exchange来实现。
channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘)
上面的代码声明了类型为fanout,名为logs的exchange。
fanout交换非常简单。它只是将接收到的所有消息广播到它所知道的所有队列。
代码实现:
channel.basic_publish(exchange=‘log‘, #exchange的名称为logs routing_key=‘‘, #不指定序列名称 body=message)
想要内存在RabbitMQ Server中的所有exchanges,也可以使用rabbitmqctl工具:
rabbitmqctl list_exchanges
在返回的列表中会有一些amq.*交换 和 默认的(未命名)交换。这些exchange是默认创建的,但现在我们不太可能需要使用它们。
在本教程的前几部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,我们通过空字符串(“”)来识别它。
交换参数是交换的名称。空字符串表示默认或匿名交换:消息被发送到参数routing_key指定的队列中去。
在前两个实例中,我们使用有指定名称的队列(如:hello)。对我们来说,能够命名队列是至关重要的——我们需要将Consumer指向相同的队列。当您想要在生产者和消费者之间共享队列时,给队列命名是很重要的。
但在这个实例中,命名队列的情况却并非如此。
我们想要听到所有日志信息,而不仅仅是他们的子集。我们也只对当前flowing messages感兴趣,而不是旧消息。要解决这两个问题,我们需要两步。
第一步,当我们连接RabbbitMQ时,我们需要一个新的空队列。我们可以创建一个随机名称的队列或让server选择一个随机队列名称。这个可以通过不在queue_declare()中设置参数queue来实现:
result = channel.queue_declare()
result.method.queue包含了一个随机队列名。比方说“amq.gen-JzTY20BRgKO-HjmUJj0wLg”这样的类似的名称。
第二步,一旦Consumer不和该随机名称的队列连接了,这个队列就可以删除。通过设置exclusive参数实现。
result = channel.queue_declare(exclusive=True)
我们已经创建了一个类型为fanout的exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列中去。exchange和queue之间的联系被称为binding绑定。
channel.queue_bind(exchange=‘logs‘,
queue=result.method.queue #result.method.queue中生成一个随机名的queue )
从现在起,名为logs的exchange将会向我们的队列中添加消息。
我们可以通过rabbitmqctl工具列出现有的bindings。
rabbitmqctl list_bindings
综上所述:
Producer程序负责发送日志信息,大部分实现和前面两个实例中的Producer没有大的区别。最重要的变化是,我们现在想要把消息发布到名为logs,类型为fanout的exchange中去,而不是默认的exchange。我们需要在发送时提供一个routing_key,但是它的值被忽略了。
下面是Producer的代码,emit_log.py:
1 import pika 2 import sys 3 4 #step1:create connection with RabbitMQ server 5 connection =pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 6 channel = connection.channel() 7 8 #step2:declare a fanout exchange 9 channel.exchange_declare(exchange=‘logs‘, 10 type=‘fanout‘) 11 12 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 13 14 #step3:send message to exchange ‘log‘ 15 channel.basic_publish(exchange=‘logs‘, 16 routing_key=‘‘, 17 body=message) 18 print(" [x] Sent %r" % message) 19 20 #step4:close 21 connection.close()
正如您所看到的,在建立连接之后,我们声明了exchange。这个步骤是必要的,因为将信息发送给一个不存在的exchange是不行的。
如果没有队列绑定到交换中,消息将丢失,但这对我们来说是可以的;如果没有消费者在听,我们可以安全地丢弃这个消息。
receive_logs.py的代码:
1 import pika 2 3 #step1:create connection with RabbitMQ server 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 #step2:declare a fanout exchange 8 channel.exchange_declare(exchange=‘logs‘, 9 type=‘fanout‘) 10 11 #step3:declare a random name queue 12 result = channel.queue_declare(exclusive=True) #exclusive=True使得生成的随机名称的queue在使用后在内存中删除。 13 queue_name = result.method.queue #result.method.queue生成一个随机名称的queue 14 15 #step4:create a bind with logs and queue 16 channel.queue_bind(exchange=‘logs‘, 17 queue=queue_name) 18 19 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 20 21 #step5:define a callback function 22 def callback(ch, method, properties, body): 23 print(" [x] %r" % body) 24 25 #step6:call a callback function when C receives a message 26 channel.basic_consume(callback, 27 queue=queue_name, 28 no_ack=True) 29 30 #step7:write a endless loop 31 channel.start_consuming()
Question:为什么在Consumer端需要一个随机名称的queue呢?
因为Producer不会直接把消息发送给queue,而是先发送给exchange。exchange(fanout类型的)收到消息后,会遍历绑定在这个exchange上的所有的queue,一次将消息发送给所有与之bind的queue。再由quque将消息发送给Consumer。
由于在这个过程中,queue在Consumer接收处理完数据后就自动删除了。所以就用了随机名称的queue。
在实例3中,我们构建了一个简单的日志记录系统。通过这个简单的日志记录系统,我们可以像广播的形式将日志信息发送给许多接收端。
在实例4中,我们会给它增加一些特性——我们将使它可以只订阅消息的一部分,而不是全部消息。例如,我们将能够将关键error消息直接发给日志文件(以节省磁盘空间),同时还能够在控制台上打印所有日志消息。
在前面的示例中,我们已经创建了binding。创建binding的代码:
channel.queue_bind(exchange=exchange_name,queue=queue_name)
绑定binding是交换exchange和队列queue之间的关系。换言之,队列对bind的exchange传来的消息感兴趣。
绑定binding可以使用一个额外的参数routing_key。为了避免与basic_publish参数的混淆,我们将把它称为绑定键binding_key。下面是创建一个具有键binding_key的绑定binding:
channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key=‘black‘) #增加了routing_key参数
绑定键binding_key的含义取决于交换类型exchange type。实例3中使用的fanout exchange,只是忽略了它的值。
我们希望扩展实例3中的日志记录系统的功能,依据消息的严重性来过滤消息。
例如,我们可能想要将日志消息写入磁盘的脚本只接收关键错误error,而不是在警告warning或信息info日志消息上浪费磁盘空间。
实例3中使用的是fanout exchange,它不会给我们太多的灵活性——它只会无意识地将信息传递给所有bind它的队列。
想要实现这个扩展的功能,实现日志信息记录的过滤接收,可以使用direct exchange。direct exchange的算法很简单——一条消息传递给队列,queue的绑定键binding_key与publish消息的路由键routing_key完全匹配。
为了说明这一点,请考虑以下设置:
在这个设置中,我们可以看到direct exchange和两个queues绑定。第一个队列与绑定键为orange的exchange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定键位green。
在这样的设置中,通过routing_key=orange的direct exchange发送的消息,将会被Q1接收。通过routing_key=black或routing_key=green的direct exchange发送的消息,将会被Q2接收。所有其他消息将被丢弃。
用相同的绑定键binding key绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键black。在这种情况下,direct exchange表现为fanout,并将消息广播到所有匹配队列。一个带有routing_key=black的消息将被发送到Q1和Q2。
我们将在我们的日志系统中使用这个模型。
我们将发送消息到direct exchange。
我们将提供日志的严重性log serverity作为路由键routing key。
这样Consumer将能够选择它想要接收的信息。让我们先关注一下emiiting logs。
需要创建一个exchange:
channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘)
发送信息:
channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message)
为了简化问题,我们假设“严重性”severity可以是“信息”info、“警告”warning、“错误”error。
接收信息像实例3一样运行,只有一个例外——我们将为每个serverity创建一个新的binding。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity)
综上所述:
emit_log_direct.py的代码:
1 import pika 2 import sys 3 4 #step1: create connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 6 channel = connection.channel() 7 8 #step2: declare a direct exchange 9 channel.exchange_declare(exchange=‘direct_logs‘, 10 type=‘direct‘) 11 12 #step3: aacept message severity 13 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ 14 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 15 16 #step4: send message to exchange with the specific severity 17 channel.basic_publish(exchange=‘direct_logs‘, 18 routing_key=severity, 19 body=message) 20 print(" [x] Sent %r:%r" % (severity, message)) 21 22 #step5:close 23 connection.close()
receive_logs_direct.py的代码:
1 import pika 2 import sys 3 4 #step1: create connection with RabbitMQ server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 6 channel = connection.channel() 7 8 #step2: declare a direct exchange 9 channel.exchange_declare(exchange=‘direct_logs‘, 10 type=‘direct‘) 11 12 #step3:declare a random name queue 13 result = channel.queue_declare(exclusive=True) 14 queue_name = result.method.queue 15 16 #step4: 17 severities = sys.argv[1:] 18 if not severities: 19 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 20 sys.exit(1) 21 22 for severity in severities: 23 channel.queue_bind(exchange=‘direct_logs‘, 24 queue=queue_name, 25 routing_key=severity) 26 27 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 28 29 #step5: define a callback function 30 def callback(ch, method, properties, body): 31 print(" [x] %r:%r" % (method.routing_key, body)) 32 33 #step6: call a callback function when receive a message 34 channel.basic_consume(callback, 35 queue=queue_name, 36 no_ack=True) 37 38 #step7: write a endless loop 39 channel.start_consuming()
在实例4中,我们改进了日志系统。我们使用了一个direct exchange,而不是使用一个仅能进行虚拟广播的fanout exchange。在实例4中实现了选择性地接收日志的可能性。
但是实例4改进过的日志系统还是具有它的局限性——不能基于多个标准进行选择性地接收信息。
在我们的日志系统中,我们可能希望订阅的不仅是基于严重性severity为标准的日志,还应该基于发出日志的来源。我们可能从syslog unix工具中了解这个概念,该工具根据严重性severity(info / warn / crit…)和设施facility(auth / cron / kern…)来发送日志。
这将给我们带来很大的灵活性——我们可以收到来自“cron”和“kern”的关键错误。
要在我们的日志系统中实现这一点,我们需要了解一个更复杂的topic exchange。
发送到topic exchange的消息不能有任意的routing_key——它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定一些与消息相关的特性。一些有效的routing_key示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。在routing_key中可以有任意多个单词,最多可达255个字节。
绑定键binding key也必须以相同的形式出现。top exchange背后的逻辑类似于direct exchange——一个带有特定routing key的消息将被传送到绑定一个匹配绑定键binding_key的所有队列。然而,绑定键有两个重要的特殊情况:
*(star)可以代替一个词。
#(hash)可以替代零个或更多的词。
在一个例子中,最简单的解释是:
在这个例子中,我们将发送所有描述动物的信息。消息将发送一个由三个单词组成的routing key(两个点)。routing key中的第一个词将描述一个celerity,第二个词将描述一种colour,第三个词将描述一种species:“< celerity > . <colour> . <species>”。
我们创建了三个binding key:Q1绑定binding key " * . orange . * ",Q2 绑定binding key" *.*.rabbit "和" lazy.# "。
这些绑定可以概括为:
Q1对所有的orange动物都感兴趣。
Q2对所有的rabbit都敢兴趣,同时也对所有的lazy的动物感兴趣。
一个routing_key=" quick.orange.rabbit "的消息,将会被发送到队列Q1,Q2中去;
一个routing_key=" lazy.orange.elephant "的消息,也将会被发送到队列Q1,Q2中去;
一个routing_key=" quick.orange.fox "的消息,仅会被发送到队列Q1中去;
一个routing_key=" lazy.brown.fox "的消息,仅会被发送到队列Q2中去;
一个routing_key=" lazy.pink.rabbit "的消息,仅会被发送到队列Q2中去,即便它有符合两条binding key " *.*.rabbit "和" lazy.# ";
一个routing_key=" quick.brown.fox "的消息,没有和任何一条binding key匹配上,所以这条消息会被Q1,Q2丢弃。
如果我们违反规定(routing_key只能是以两个dot分隔的三个单词列表),用一个或四个单词发送一条信息,比如一条信息的routing_key="orange"或"quick.orange.male.rabbit"。这些消息不匹配任何binding key,将丢失。
另一方面," lazy.orange.male.rabbit "。虽然它有四个单词,但它将匹配最后一个绑定" lazy.# ",并将被传送到队列Q2 。
topic exchange很强大,它不仅能实现更复杂的数据筛选接收,还能实现fanout exchange和direct exchange一样的效果。
当队列与“#”(hash)binding key绑定时,它将接收所有消息,实现了fanout exchange的功能。
当特殊字符“*”(star)和“#”(hash)没有在binding key中使用时,实现了direct exchange的功能。
综上所述:
我们将用topic exchange实现一个新的日志系统。
我们假设日志的routing key将由两个单词组成:" <facility>.<severity>"。
emit_log_topic.py的代码:
1 import pika 2 import sys 3 4 #step1:create a connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 6 channel = connection.channel() 7 8 #step2:declare a topic exchange 9 channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) 10 11 #step3:receive routing_key message from sys.argv 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 13 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 14 15 #step4:send message to topic_logs 16 channel.basic_publish(exchange=‘topic_logs‘, 17 routing_key=routing_key, 18 body=message) 19 20 print(" [x] Sent %r:%r" % (routing_key, message)) 21 22 #step5:close. 23 connection.close()
receive_logs_topic.py的代码:
1 import pika 2 import sys 3 4 #step1:create a connection with RabbitMQ Server 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 6 channel = connection.channel() 7 8 #step2:declare a topic exchange 9 channel.exchange_declare(exchange=‘topic_logs‘, 10 type=‘topic‘) 11 12 #step3:declare a random name queue 13 result = channel.queue_declare(exclusive=True) 14 queue_name = result.method.queue 15 16 #step4:receive messages from sys.argv 17 binding_keys = sys.argv[1:] 18 if not binding_keys: 19 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 20 sys.exit(1) 21 22 #step5:bind a topic exchange with a queue 23 for binding_key in binding_keys: 24 channel.queue_bind(exchange=‘topic_logs‘, 25 queue=queue_name, 26 routing_key=binding_key) 27 28 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 29 30 #step6:create a callback function 31 def callback(ch, method, properties, body): 32 print(" [x] %r:%r" % (method.routing_key, body)) 33 34 #step7:call a callback function when receive a message 35 channel.basic_consume(callback, 36 queue=queue_name, 37 no_ack=True) 38 39 #step8:write a endless loop 40 channel.start_consuming()
在实例2中,我们学习了如何使用Work Queues在多个工作进程之间分配耗时的任务。(消息公平分发channel.basic_qos(prefetch_count=1) )
但如果我们需要在远程计算机上运行一个函数并等待结果呢?那是另一回事了。这种模式通常被称为远程过程调用或RPC。
在实例6中,我们将使用RabbitMQ构建一个RPC系统:客户机client和可伸缩的RPC服务器scalable RPC server。由于我们没有任何可分配的耗时任务,因此我们将创建一个能够返回斐波那契数列的虚拟RPC服务。
为了说明RPC服务如何使用,我们将创建一个简单的客户端类。它将公开一个名为call的方法,call方法将发送一个RPC请求和块,直到收到响应:
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)
虽然RPC在计算中是一个很常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是RPC时,问题就出现了。这样的混淆导致了不可预测的系统结果,并增加了调试的复杂性。与简化软件不同,误用的RPC可能会导致代码无法维护。
考虑到这一点,有以下建议:
确保函数调用来源的明确性,调用的函数是本地的还是远程的清晰可判断。
用文件记录系统。使组件之间的依赖关系变得清晰。
处理错误情况。当RPC服务器停机很长时间时,客户机应该如何反应?
当有疑问要时要避免使用RPC。如果可以,应该使用异步管道asynchronous pipeline(而不是像rpc一样的阻塞进程),异步地将结果推进到下一个计算阶段。
一般来说,在RabbitMQ上执行RPC是很容易的。客户端发送请求消息,服务器响应消息。为了接收响应,客户端需要发送一个“callback” 队列地址。让我们试一试:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ... and some code to read a response message from the callback_queue ...
AMQP 0 - 9 - 1协议预先定义了一个14个属性集合。除以下情况外,大多数属性都很少使用:
delivery_mode:将消息标记为持久性(值为2)或瞬态transient(任何其他值)。您可能还记得实例2中的这个属性。
content_type:用于描述编码的MIME类型。例如,对于经常使用的JSON编码,将此属性设置为:
应用程序/ JSON是一种很好的做法。
reply_to:通常用于命名回调队列callback queue。
correlationship _id:将RPC响应与请求联系起来.
在上述方法中,我们建议为每个RPC请求创建一个回调队列。这很低效,但幸运的是有更好的方法——让我们为每个客户端创建一个回调队列。
这引发了一个新的问题,在该队列中收到了响应,不清楚响应属于哪个请求。这就是使用correlation_id属性时的情况。我们将为每个请求设置唯一值。稍后,当我们在回调队列中收到消息时,我们将查看correlation_id,并基于correlation_id,我们将能够匹配响应和请求。如果我们看到一个未知的correlationship _id值,我们可以安全地丢弃该消息——它不属于我们的请求。
您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是由于报错而失败呢?这是由于服务器端可能出现race condition。虽然不太可能,但是RPC服务器可能在发送答案后,但未发送请求确认acknowlegement信息前。如果发生这种情况,重新启动的RPC服务器将再次处理此请求。这就是为什么在客户端我们必须优雅地处理重复的响应,由于RPC服务是幂等的idempotent(意味着可以安全地对失败请求进行重试)。
综上所述:
我们的RPC将这样工作:
当客户机启动时,它创建一个匿名回调队列。
对于一个RPC请求,客户机发送一条信息(有两个属性:reply_to和correlationship_id):reply_to是用来设置回调序列的,correlation_id是用来设置每个请求的唯一值的。
请求被发送到一个rpc_queue队列。
RPC客户端在等待从这个队列中接收请求。当一个请求出现时,它处理完请求的工作并且发送处理结果信息给客户端时,通过检查correlation_id属性找到唯一的该请求,通过reply_to回调队列发送回客户端。
客户机在回调队列上等待数据。当消息出现时,它检查correlationship _id属性。如果它与请求的值相匹配,则返回对应用程序的响应。
rpc_server.py的代码:
1 import pika 2 3 #建立连接 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 #声明队列 8 channel.queue_declare(queue=‘rpc_queue‘) 9 10 #声明fibonacci函数,只假设有效的正整数输入。 11 def fib(n): 12 if n == 0: 13 return 0 14 elif n == 1: 15 return 1 16 else: 17 return fib(n-1) + fib(n-2) 18 19 20 def on_request(ch, method, props, body): 21 n = int(body) 22 23 print(" [.] fib(%s)" % n) 24 response = fib(n) 25 26 ch.basic_publish(exchange=‘‘, 27 routing_key=props.reply_to, 28 properties=pika.BasicProperties(correlation_id = 29 props.correlation_id), 30 body=str(response)) 31 ch.basic_ack(delivery_tag = method.delivery_tag) 32 33 channel.basic_qos(prefetch_count=1) #为了在多个服务器上平均分配负载,我们需要设置prefetch_count设置。 34 channel.basic_consume(on_request, queue=‘rpc_queue‘) 35 #我们声明对basic_consume的回调,RPC服务器的核心。在接收请求时执行它。它执行工作并发送响应。 36 37 print(" [x] Awaiting RPC requests") 38 channel.start_consuming()
rpc_client.py的代码:
1 import pika 2 import uuid 3 4 class FibonacciRpcClient(object): 5 def __init__(self): 6 self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 7 #建立连接 8 self.channel = self.connection.channel() 9 10 result = self.channel.queue_declare(exclusive=True) #声明一个排他的回调队列 11 self.callback_queue = result.method.queue 12 13 self.channel.basic_consume(self.on_response, no_ack=True, 14 queue=self.callback_queue) 15 16 def on_response(self, ch, method, props, body): 17 if self.corr_id == props.correlation_id: 18 self.response = body 19 20 def call(self, n): 21 self.response = None 22 self.corr_id = str(uuid.uuid4()) 23 self.channel.basic_publish(exchange=‘‘, 24 routing_key=‘rpc_queue‘, 25 properties=pika.BasicProperties( 26 reply_to = self.callback_queue, 27 correlation_id = self.corr_id, 28 ), 29 body=str(n)) 30 while self.response is None: 31 self.connection.process_data_events() 32 return int(self.response) 33 34 fibonacci_rpc = FibonacciRpcClient() 35 36 print(" [x] Requesting fib(30)") 37 response = fibonacci_rpc.call(30) 38 print(" [.] Got %r" % response)
标签:无法 ons 报错 receive 1.2 mod http routing one
原文地址:http://www.cnblogs.com/zoe233/p/7327770.html