标签:
本节目录:安装配置阿里云镜像
安装erlang
$ yum -y install erlang
安装RabbitMQ
$ yum -y install rabbitmq-server
service rabbitmq-server start/stop
import queue
import threading
message = queue.Queue(10)
def producer(i):
‘‘‘厨师,生产包子放入队列‘‘‘
while True:
message.put(i)
def consumer(i):
‘‘‘消费者,从队列中取包子吃‘‘‘
while True:
mes = message.get()
for i in range(12): #厨师的线程包子
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10): #消费者的线程 \ 吃包子.
t = threading.Thread(target=consumer, args=(i,))
t.start()
#!/usr/bin/env python3
#coding:utf8
import pika
# ######################### 生产者 #########################
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel() #创建频道
channel.queue_declare(queue=‘hello‘) #创建一个队列名叫hello
channel.basic_publish(exchange=‘‘, exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
routing_key=‘hello‘, 队列 routing_key是队列名。
body=‘Hello World!‘) body是要插入的内容
print("队列开始")
connection.close() #缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
#!/usr/bin/env python
import pika
# ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) #链接rabbit
channel = connection.channel() #创建频道
提示:如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,
所以,消费者也可自行-创建这个队列,,避免报错。
channel.queue_declare(queue=‘hello‘)
#接收消息需要使用callback这个函数来接收,它会被pika库来调用
def callback(ch, method, properties, body):
print(" [x] 接受信息 %r" % body)
channel.basic_consume(callback, #scallback是回调函数 如果拿到数据 那么将执行callback函数
queue=‘hello‘, #选择 操作的队列
no_ack=True) #是否为了保证自己的消费端的数据安全,回复一个ack,当MQ没有接收的话,会再次发送。此参数是不回复。
print(‘ [*] 等待信息.退出 Ctrl+c‘)
channel.start_consuming() #永远循环等待数据处理和callback处理的数据
- 背景:执行一个任务能消耗几秒. 你可能想知道当一个consumer在执行一个艰巨任务或执行到一半是死掉了会发生什么。就我们当前的代码而言,一旦RabbitMQ 的分发完消息给 consumer后 就立即从内存中移除该消息。这样的话,如果一个worker刚启动你就结束掉,那么消息就丢失了。
那么所有发送给这个 worker 的还没有处理完成的消息也将丢失,但是我们不想丢失任何任务,如果worker死掉了,我们希望这个任务能够发送给其它的worker。
- 实现:为了确保一个消息不会丢失,RabbitMQ支持消息的 ack nowlegements , 一个 ack(nowlegement) 是从consumer端发送一个回执去告诉RabbitMQ 消息已经接 收了、处理了,RabbitMQ可以释放并删除掉了。
RabbitMQ 就会知道这个消息没有被完全处理并会重新发送到消息队列中,
- 1、 如果一个consumer【消费者】 死掉了(channel关闭、connection关闭、或者TCP连接断开了)而没有发送ack,
这个是没有超时的,当消费方(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。
- 2、如果同时有另外一个consumer在线,将会很快转发到另外一个consumer中。 那样的话你就能确保虽然worker死掉,但消息不会丢失。
- 消息的 acknowlegments 默认是打开的,在前面的例子中关闭了: no_ack = True . 现在删除这个标识 然后 发送一个 acknowledgment。
#coding:utf-8
#!/usr/bin/env python
import pika
# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) #链接rabbit服务器
channel = connection.channel() #创建频道
channel.queue_declare(queue=‘Myqueue‘) #创建一个队列名叫Myqueue
channel.basic_publish(exchange=‘‘, #向队列插入数值 routing_key是队列名 body是要插入的内容
routing_key=‘Myqueue‘,
body=‘Hello World!‘)
print("开始队列")
connection.close()
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) #链接rabbit
channel = connection.channel() #创建频道
channel.queue_declare(queue=‘Myqueue‘) #如果生产者没有运行创建队列,那么消费者创建队列
def callback(ch, method, properties, body):
print(" [x] 接收到 %r" % body)
import time
time.sleep(10)
print (‘ok‘)
ch.basic_ack(delivery_tag = method.delivery_tag) #主要使用此代码
channel.basic_consume(callback,
queue=‘Myqueue‘,
no_ack=False)
print(‘ [*] 等待接受信息... 退出按 ctrl+c ‘)
channel.start_consuming()
我们已经学习了即使客户端死掉了任务也不会丢失。但是如果RabbitMQ服务停止了的话,我们的任务还是会丢失。当RabbitMQ退出或宕掉的话将会丢失queues和消息信息,除非你进行设置告诉服务器队列不能丢失。要确保消息不会丢失需要做两件事:我们需要将队列和消息标记为 durable.
channel.queue_declare(queue=‘hello‘, durable=True尽管此命令本身定义是正确的,但我们设置后还是不会工作。因为我们已经定义了个名为 hello ,但不是durable属性的队列。
channel.queue_declare(queue=‘task_queue‘, durable=True)
channel.basic_publish(exchange=‘‘,routing_key="task_queue", #队列名 body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent ))
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) #链接rabbit服务器
channel = connection.channel() #创建频道
‘‘‘
提示:两处都要声明下,1、在创建队列时候durable=True 2、在队列使用的参数中 delivery_mode=2
‘‘‘
channel.queue_declare(queue=‘Myqueue‘, durable=True)
#创建队列,使用durable方法。
#如果想让队列实现持久化那么加上durable=True 。
channel.basic_publish(exchange=‘‘,
routing_key=‘Myqueue‘,
body=‘Hello World!‘,
properties=pika.BasicProperties(
delivery_mode=2,
#标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2
#这样必须设置,让消息实现持久化
))
#这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:
# 如果存在routing_key, 消息路由到routing_key指定的队列中。
print(" [x] 开始队列‘")
connection.close()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel() #创建频道
channel.queue_declare(queue=‘Myqueue‘, durable=True) #创建队列,使用durable方法
def callback(ch, method, properties, body):
print(" [x] 接受信息 %r" % body)
import time
time.sleep(10)
print (‘ok‘)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue=‘Myqueue‘,
no_ack=False)
print(‘ [*] 等待队列. 退出 CTRL+C‘)
channel.start_consuming()
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘task_queue‘, durable=True) # 设置队列为持久化的队列
message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
routing_key=‘task_queue‘,
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, #设置消息为持久化的
))
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘task_queue‘,durable=True) # 设置队列持久化
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print (‘ok‘)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
#消息未处理完前不要发送信息的消息
#表示谁来谁取,不再按照奇偶数排列
channel.basic_consume(callback,
queue=‘task_queue‘,
no_ack=False)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
标签:
原文地址:http://www.cnblogs.com/zhangju/p/5720224.html