标签:start sage 输入 types 结果 queue 阻塞 python 用户
RabbitMQ
关键在于消息的发布与消费、消息的路由。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,可以视作Queue的name,
消费者将消息发送给Exchange时,一般会指定一个routing key
当binding key 与 routing key 相匹配时,消息就会被路由到对应的Queue中。
Exchange Types
fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
direct direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic 与direct类似,但是是模糊匹配,*”用于匹配一个单词,“#”用于匹配多个单词
binding key 类似 *.*.rabbit,routing key 为quick.orange.rabbit的消息会被路由到该Queue
headers headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
参考:http://www.diggerplus.org/archives/3110
开启RabbitMQ后台管理:
1.在rabbitMQ安装目录下的sbin目录,打开终端执行:rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ
2.浏览器中输入http://localhost:15672/
3.输入用户名和密码(默认为guest)
生产者
import pika ######### 生产者 ######### # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址) connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) # 创建频道 channel = connection.channel() # 创建一个队列名叫test channel.queue_declare(queue=‘test‘) # channel.basic_publish向队列中发送信息 # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 # routing_key 指定向哪个队列中发送消息 # body是要插入的内容, 字符串格式 while True: # 循环向队列中发送信息,quit退出程序 inp = input(">>>").strip() if inp == ‘quit‘: break channel.basic_publish(exchange=‘‘, routing_key=‘test‘, body=inp) print("生产者向队列发送信息%s" % inp) # 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接 connection.close() # 输出结果 # >> > python # 生产者向队列发送信息python # >> > quit
消费者
#!/usr/bin/env python 3 import pika ######### 消费者 ######### # 链接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) # 创建频道 channel = connection.channel() # 如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,所有消费者也创建这个队列,如果队列已经存在,则这条无效 channel.queue_declare(queue=‘test‘) # 接收消息需要使用callback这个函数来接收,他会被pika库来调用,接受到的数据都是字节类型的 def callback(ch, method, properties, body): """ ch : 代表 channel method :队列名 properties : 连接rabbitmq时设置的属性 body : 从队列中取到的内容,获取到的数据时字节类型 """ print(" [x] Received %r" % body) # channel.basic_consume 表示从队列中取数据,如果拿到数据 那么将执行callback函数,callback是回调函数 # no_ack=True 表示消费完这个消息以后不主动把完成状态通知rabbitmq channel.basic_consume(callback, queue=‘test‘, no_ack=True) print(‘ [*] 等待信息. To exit press CTRL+C‘) # 永远循环等待数据处理和callback处理的数据,start_consuming方法会阻塞循环执行 channel.start_consuming() # 输出结果,一直等待处理队列中的消息,不知终止,除非人为ctrl+c # [*]等待消息,To exit press CTRL+C # [x] Received b‘python‘
消费者acknowledgement消息不丢失的方法
# no_ack = False , 如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。在消费者端做设定条件。 # 生产者,代码同上,未改变 # 消费者代码 import pika import time # 链接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) # 创建频道 channel = connection.channel() # 如果生产者没有运行创建队列,那么消费者创建队列,如果队列已存在,创建队列操作会被忽略 channel.queue_declare(queue=‘test‘) # 回调函数 def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(10) print(‘ok‘) ch.basic_ack(delivery_tag=method.delivery_tag) # 当上面消息处理完成后,通知rabbitmq,消息处理完成,不要在发送了 channel.basic_consume(callback, queue=‘test‘, no_ack=False) # 表示消费完这个消息后,主动通知rabbitmq完成状态,如果不通知,rabbitmq会把这条消息重新放回队列中,避免丢失 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
标签:start sage 输入 types 结果 queue 阻塞 python 用户
原文地址:https://www.cnblogs.com/jec1999/p/9410841.html