标签:
一、RabbitMQ
python的Queue与RabbitMQ之间的理解:
python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。
1、简单的实现生产者与消费者
生产者
(1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)
消费者
(1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika #建立socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #声明一个管道 channel = connection.channel() #声明一个队列 channel.queue_declare(queue=‘hello‘) #通过管道发消息,routing_key 队列queue名字 ,body发送内容 channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World! 1 2‘) print("[x] send ‘Hello World! 1 2 ‘") connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika,time #建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #声明一个管道 channel = connection.channel() #声明队列,防止生产者(发送端)没开启,消费者端报错 channel.queue_declare(queue=‘hello‘) #ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息 def callbak(ch,method,properties,body): print("[x] Received %r " % body) # time.sleep(30) #消费消息 channel.basic_consume(callbak, queue=‘hello‘, no_ack=True #消息有没处理,都不给生产者发确认消息 ) print(‘[*] Waitting for messages TO exit press ctrl+c‘) channel.start_consuming() #开始
2、消费者对生产者,可以1对多,而且默认是轮询机制
no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除
如下图注释了no_ack,加了一个时间,
开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息
3、队列查询
清除队列消息
4、消息持久化
(1)durable只是队列持久化
channel.queue_declare(queue=‘hello‘,durable=True)
生产者和消费者都需要添加durable=True
(2)要实现消息持久化,还需要
5、消息(1对多)实现权重功能
消费者端添加在消费消息之前
channel.basic_qos(prefetch_count=1)
6、广播消息fanout(纯广播)订阅发布
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) #message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!2" channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("random queuename",queue_name) channel.queue_bind(exchange=‘logs‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
7、direct广播模式(有选择性的发送接收消息)
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) print(severities) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
8、更细致的消息判断 type = topic
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
python运维开发之第十一天(RabbitMQ,redis)
标签:
原文地址:http://www.cnblogs.com/willpower-chen/p/5977633.html