标签:持久 方法 pre 对象 back live sage etc block
rabbitmq安装
使用docker搜索、拉取镜像、运行为容器
docker search rabbitmq
docker pull rabbitmq 若不指定版本,默认拉取最新的版本
docker run -d --name rabbit -p 5672:5672 -p 15672:15672 --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 458123c67b79 最后为rabbit的镜像ID
RabbitMQ概述
RabbitMQ整体架构
生产者的实现
import pika
# 1. 获得与rabbitmq代理连接对象
connection_host = ‘192.168.1.38‘
connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
# 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
channel = connection.channel()
# 3. 创建名字my_queue的消息队列,如果存在不创建
channel.queue_declare(‘my_queue‘)
# 4. 发送消息到rabbitmq中名字为my_queue的队列中
channel.basic_publish(exchange=‘‘, routing_key=‘my_queue‘, body=‘hello word‘)
# 5. 关闭和rabbitmq代理的连接
connection.close()
print(‘消息发送完毕!‘)
消费者的实现
import pika
# 1. 获取与rabbitmq的连接对象
connection_host = ‘192.168.1.38‘
connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=connection_host, credentials=connection_credentials))
# 2. 通过连接对象获得channel对象,用于操作rabbitmq
channel = connection.channel()
# 3. 创建名字为my_queue的消息队列,如果不存在就创建
channel.queue_declare(queue=‘my_queue‘)
# 4. 按照rabbitmq要求定义消息处理函数
def callback(ch, method, properties, body):
print(‘接收到的消息是:‘, body)
# 此刻接收到的消息是二进制格式
# 5. 关联队列,并设置队列中的消息处理函数
# channel.basic_consume(callback, queue=‘my_queue‘, no_ack=True) 以前版本的写法,后改为下面方式
channel.basic_consume(‘my_queue‘, callback, False)
# 6. 启动并开始处理消息,该程序会一直运行,进行监听
channel.start_consuming()
任务队列
task.py文件中内容
import time
def send_email():
print(‘开始发送邮件‘)
time.sleep(3)
print(‘邮件发送完毕‘)
def send_message():
print(‘开始发送短信‘)
time.sleep(3)
print(‘短信发送完毕‘)
consumer.py文件中内容
task_list = {
‘email‘: task.send_email,
‘message‘: task.send_message
}
# 任务后不可以加(),否则会立即执行
# 4. 按照rabbitmq要求定义消息处理函数
def callback(ch, method, properties, body):
task_name = body.decode()
# 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
if task_name not in task_list:
print(‘error:{}任务没有注册‘.format(task_name))
return
task_list[task_name]()
producter.py文件中的内容
# 4. 发送消息到rabbitmq中名字为my_queue的队列中
channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘)
只需要改变body中的内容,就可以生产出不同的任务到队列中去
消息确认机制
原因:会出现的意外情况:消费者取到任务以后,并未执行完成任务,就死了。
rabbitmq默认会在将消息发送给消费者以后,会将任务从队列中删掉。
使用消息确认机制,若消费者意外死亡,则不能给队列反馈,队列就不会删除被该消费者取走的任务。
实现方法:
def callback(ch, method, properties, body):
task_name = body.decode()
# 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
if task_name not in task_list:
print(‘error:{}任务没有注册‘.format(task_name))
return
task_list[task_name]()
# 消息处理完成后,确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(‘work_queue‘, callback, True)
循环调度机制
公平调度机制---在consumer文件中设置公平调度
消息确认机制打开
设置公平调度机制,消费者不确认,就不要再给该消费之任务了,因为他目前的耗时任务还在执行,可以把任务给其他已经确认了的消费者。
在消费者中设置公平调度机制
channel.basic_qos(prefetch_count=1)
队列及其中的消息持久化---在product文件中设置队列及消息持久化
重启rabbitmq服务器会使队列和任务消失
解决方法:
在product文件中,生成队列的代码中加参数
# 3. 创建名字my_queue的消息队列,如果存在不创建
channel.queue_declare(‘work_queue‘, durable=True)
durable=True就可以保持,队列的持久化
# 4. 发送消息到rabbitmq中名字为my_queue的队列中
channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘, properties=pika.BasicProperties(delivery_mode=2))
# properties=pika.BasicProperties(delivery_mode=2)可以保持消息持久化
交换机的三种模式
生产者
import pika
# 1. 获得与rabbitmq代理连接对象
connection_host = ‘192.168.1.38‘
connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
# 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
channel = connection.channel()
# 3. 创建名字my_queue的消息队列,如果存在不创建
channel.queue_declare(‘work_queue‘, durable=True)
# 4. 发送消息到rabbitmq中名字为my_queue的队列中
channel.basic_publish(exchange=‘‘, routing_key=‘work_queue‘, body=‘message‘, properties=pika.BasicProperties(delivery_mode=2))
# 只要生产者,发送不同的body,就会消费者中的处理函数,就会调用不同的task
# 5. 关闭和rabbitmq代理的连接
connection.close()
print(‘消息发送完毕!‘)
消费者
import pika
import task
task_list = {
‘email‘: task.send_email,
‘message‘: task.send_message
}
# 任务后不可以加(),否则会立即执行
# 1. 获取与rabbitmq的连接对象
connection_host = ‘192.168.1.38‘
connection_credentials = pika.PlainCredentials(‘root‘, ‘123‘)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=connection_host, credentials=connection_credentials))
# 2. 通过连接对象获得channel对象,用于操作rabbitmq
channel = connection.channel()
# 3. 创建名字为my_queue的消息队列,如果不存在就创建
channel.queue_declare(queue=‘work_queue‘)
# 4. 按照rabbitmq要求定义消息处理函数
def callback(ch, method, properties, body):
task_name = body.decode()
# 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
if task_name not in task_list:
print(‘error:{}任务没有注册‘.format(task_name))
return
task_list[task_name]()
# 消息处理完成后,确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置公平调度机制
channel.basic_qos(prefetch_count=1)
# 5. 关联队列,并设置队列中的消息处理函数
# channel.basic_consume(callback, queue=‘my_queue‘, no_ack=True) 以前版本的写法,后改为下面方式
channel.basic_consume(‘work_queue‘, callback, False)
# 6. 启动并开始处理消息
channel.start_consuming()
任务
import time
def send_email():
print(‘开始发送邮件‘)
time.sleep(1)
print(‘邮件发送完毕‘)
def send_message():
print(‘开始发送短信‘)
time.sleep(1)
print(‘短信发送完毕‘)
标签:持久 方法 pre 对象 back live sage etc block
原文地址:https://www.cnblogs.com/yuyangbk/p/13963363.html