安装 http://www.rabbitmq.com/install-standalone-mac.html
安装python rabbitMQ module
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
1 # 添加用户 2 rabbitmqctl add_user name pass 3 # 分配角色 4 rabbitmqctl set_user_tags name administrator 5 6 # 授权网段 7 rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 6 # 连接Rabbit 7 credentials = pika.PlainCredentials("laiying", "laiying123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters("", 9 credentials=credentials)) 10 channel = connection.channel() # 创建rabbitmq协议通道 11 12 # 生命queue 13 channel.queue_declare(queue="hello") 14 15 channel.basic_publish(exchange=‘‘, 16 routing_key="hello", 17 body="hello world") 18 print("send hello world") 19 connection.close()
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 import time 6 7 credentials = pika.PlainCredentials("laiying", "laiying123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters("", 9 credentials=credentials)) 10 channel = connection.channel() # 创建rabbitmq协议通道 11 12 channel.queue_declare(queue="hello") 13 14 15 def callback(ch, method, properties, body): 16 # print(‘12--‘, ch) 17 # print(method) 18 # print(properties) 19 print("消费者1 %s" % body) 20 time.sleep(20) 21 print("消费者接受%s" % body) 22 23 24 channel.basic_consume(callback, 25 queue="hello", 26 no_ack=True) 27 28 print("rabbitMQ 链接") 29 30 channel.start_consuming()
1 import pika 2 import time 3 4 credentials = pika.PlainCredentials(‘alex‘, ‘alex123‘) 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 ‘‘,credentials=credentials)) 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘task_queue‘,durable=True) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 import sys 14 15 message = ‘ ‘.join(sys.argv[1:]) or "Hello World! %s" % time.time() 16 17 channel.basic_publish(exchange=‘‘, 18 routing_key=‘task_queue‘, 19 body=message, 20 properties=pika.BasicProperties( 21 delivery_mode=2, # make message persistent 22 ) 23 24 ) 25 print(" [x] Sent %r" % message) 26 connection.close()
1 import pika, time 2 3 credentials = pika.PlainCredentials(‘alex‘, ‘alex123‘) 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 ‘‘,credentials=credentials)) 6 channel = connection.channel() 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(20) 12 print(" [x] Done") 13 print("method.delivery_tag", method.delivery_tag) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 #ackownledgement 16 17 18 channel.basic_consume(callback, 19 queue=‘task_queue‘, 20 #no_ack=True 21 ) 22 23 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 24 channel.start_consuming()
1 # 队列持久化 2 channel.queue_declare(queue=‘hello‘, durable=True) 3 #消息持久化 4 channel.basic_publish(exchange=‘‘, 5 routing_key="task_queue", 6 body=message, 7 properties=pika.BasicProperties( 8 delivery_mode = 2, # make message persistent 9 ))
# on_ack = True, 默认为false,该值为false 后,会自动回收消费者没有执行完成的消息,继续发给小一个队列 # ch.basic_ack(delivery_tag=method.delivery_tag) 消费者处理完成队列后返回的队列标识符
1 channel.basic_qos(prefetch_count=1)
1 import pika 2 import time 3 4 credentials = pika.PlainCredentials(‘alex‘, ‘alex123‘) 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 ‘‘,credentials=credentials)) 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘task_queue‘,durable=True) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 import sys 14 15 message = ‘ ‘.join(sys.argv[1:]) or "Hello World! %s" % time.time() 16 17 channel.basic_publish(exchange=‘‘, 18 routing_key=‘task_queue‘, 19 body=message, 20 properties=pika.BasicProperties( 21 delivery_mode=2, # make message persistent 22 ) 23 24 ) 25 print(" [x] Sent %r" % message) 26 connection.close()
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 credentials = pika.PlainCredentials(‘alex‘, ‘alex123‘) 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘‘,credentials=credentials)) 8 channel = connection.channel() 9 10 channel.queue_declare(queue=‘task_queue‘, durable=True) 11 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 12 13 def callback(ch, method, properties, body): 14 print(" [x] Received %r" % body) 15 time.sleep(body.count(b‘.‘)) 16 print(" [x] Done") 17 ch.basic_ack(delivery_tag = method.delivery_tag) 18 19 channel.basic_qos(prefetch_count=1) 20 channel.basic_consume(callback, 21 queue=‘task_queue‘) 22 23 channel.start_consuming()
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
