码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ消息队列(五): 主题分发

时间:2016-03-18 19:41:04      阅读:182      评论:0      收藏:0      [点我收藏+]

标签:

1. 主题(Topics):

fanout模式只能进行简单的广播,direct模式虽然在过滤上进行了一定的提升,但是不能支持复杂的条件,

比如我们的日志消息,现在不仅要知道消息级别,也要知道消息来源。在这样的复杂需求下,我们需要使用

主题交换。

 

2. 主题交换:

发送主题交换的的routing_key不是任意的,必须遵循如下格式:使用.分隔的一些字。通常这些字用来表示

消息的某些特性,如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。

注意routing_key的最大长度是255。

绑定routing_key也必须是同样的格式,交换后端形式与直接路由相似,交换匹配消息中的routing_key和绑定队列

所需要接受消息的routing_key,并且将满足条件的消息进行派发。

通配符:

* -- 代表一个字(word)

# -- 代表零个或者多个字

 

如下图模型,我们使用"<celerity>.<colour>.<species>"来形容动物,可见Q1关心所有橘黄的动物,

Q2关心所有兔子或者懒惰的动物。

技术分享

"quick.orange.rabbit" -- 分发到Q1和Q2

"lazy.orange.elephant" -- 分发到Q1和Q2

"quick.orange.fox" -- 分发到Q1

"lazy.brown.fox" -- 分发到Q2

"lazy.pink.rabbit" -- 只分发一次到Q2,尽管匹配两个条件

"quick.brown.fox" -- 无匹配,丢弃

"quick.orange.male.rabbit" -- 无匹配,丢弃

"lazy.orange.male.rabbit" -- 匹配规则3,分发到Q2

 

3. 测试代码:

emit_log_topic.py

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host=localhost))
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange=topic_logs,
10                          type=topic)
11 
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
13 message =  .join(sys.argv[2:]) or Hello World!
14 channel.basic_publish(exchange=topic_logs,
15                       routing_key=routing_key,
16                       body=message)
17 print(" [x] Sent %r:%r" % (routing_key, message))
18 connection.close()

 

receive_logs_topic.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=topic_logs,
                         type=topic)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=topic_logs,
                       queue=queue_name,
                       routing_key=binding_key)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

RabbitMQ消息队列(五): 主题分发

标签:

原文地址:http://www.cnblogs.com/wanpengcoder/p/5292954.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!