# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定 Broker: 192.168.0.xx virtual host: vhosttest Exchange: exchangetest Queue: queuetest Routing key: rkeytest
【Python 环境】
OS: Windows 10 Python: 3.6.3 x64 kombu: 4.1.0
【查看队列状态】
# 通过浏览器查看队列状态 http://192.168.0.xx:15672/api/queues/vhosttest/queuetest # 通过命令行查看队列状态curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | jq # 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged) curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | \ jq '.messages'
【send.py】
#encoding: utf-8 #author: walker #date: 2018-03-09 #summary: 发送方/生产者 import os, sys, time from kombu import Connection def Main(): with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn: with conn.channel() as channel: #producer = Producer(channel) producer = channel.Producer() while True: message = time.strftime('%H:%M:%S', time.localtime()) producer.publish( body=message, retry=True, exchange='exchangetest', routing_key='rkeytest' ) print('send message: %s' % message) while True: # 检查队列,以重新得到消息计数 queue = channel.queue_declare(queue='queuetest', passive=True) messageCount = queue.message_count print('messageCount: %d' % messageCount) if messageCount < 100: break time.sleep(1) if __name__ == '__main__': Main()
【recv.py】
#encoding: utf-8 #author: walker #date: 2018-03-09 #summary: 接收方/消费者 import os, sys, time from kombu import Connection, Queue from kombu.mixins import ConsumerMixin class C(ConsumerMixin): def __init__(self, connection, queueNmae): self.connection = connection self.queues = [Queue(queueNmae, durable=False)] def get_consumers(self, Consumer, channel): return [ Consumer(self.queues, callbacks=[self.on_message]), ] # 接收处理消息的回调函数 def on_message(self, body, message): print("Received %s" % body) message.ack() def Main(): with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn: C(conn, 'queuetest').run() if __name__ == '__main__': Main()
【相关阅读】
*** walker ***
Python3 通过 kombu 连接 RabbitMQ 的基本用法
原文地址:http://blog.51cto.com/walkerqt/2084510