生产方(Fanout_Publisher.py)
1 # __author__ = ‘STEVEN‘ 2 import pika 3 #开启socket 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) 5 #开启一个通道 6 channel = connection.channel() 7 #这里不用再创建队列 8 channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘) 9 #消息内容 10 mes = ‘publisher said hello‘ 11 #发布消息exchange=‘logs‘是给他起了一个名字,随便什么都行 12 channel.basic_publish(exchange=‘logs‘,routing_key=‘‘,body=mes) 13 print(‘[x] send the mes%s to queue‘%mes) 14 #关闭连接 15 connection.close()
消费方(Fanout_Consumer.py)
1 # __author__ = ‘STEVEN‘ 2 import pika 3 #建立socket 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 #开启通道 6 channel = connection.channel() 7 #通道先声明exchange 8 channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘) 9 #声明queue 10 result = channel.queue_declare(exclusive=True) 11 #获取queue_name 12 queue_name = result.method.queue 13 #绑定queue 14 channel.queue_bind(exchange=‘logs‘,queue=queue_name) 15 #回调函数 16 def callback(ch,method,properties,body): 17 print(‘[x] receive mess%s‘%body.decode()) 18 #指定消费相关参数 19 channel.basic_consume(callback,queue=queue_name,no_ack=True) 20 print(‘[*] is waiting for the message‘) 21 #开启消费 22 channel.start_consuming()
与上一篇模式的转变:
1.加入了exchange类型,他有如下几种常用方式:
2.模式图: