码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ

时间:2016-10-14 20:46:38      阅读:136      评论:0      收藏:0      [点我收藏+]

标签:

简介

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

安装

# 安装 rabbitMQ 程序
yum -y install rabbitmq-server

启动

# 启动 rabbitMQ
systemctl restart rabbitmq-server

# 启动管理插件,启动后访问 http://localhost:15672,默认用户名密码 guest/guest
rabbitmq-plugins enable rabbitmq-management

使用 API 操作 RabbitMQ

生产者消费者模型是通过队列来实现的

技术分享
import time
import queue
import threading


q = queue.Queue(10)


def producer(msg):
    while True:
        time.sleep(1)
        q.put(msg)


def consumer():
    while True:
        time.sleep(0.5)
        msg = q.get()
        print(msg)


for i in range(5):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer)
    t.start()
基于 queue 模块实现的生产者消费者模型

RabbitMQ 也是一个队列,也可以实现生产者消费者模型,区别在于对象并非存在内存的 queue 中,而是存在于一台服务器基于 RabbitMQ Server 实现的消息队列中

技术分享
import pika

# 创建一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

# 创建一个队列,队列名为 hello
channel.queue_declare(queue=hello)

# 向 hello 队列中添加一条消息 ‘Hello World!‘
channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body=Hello World!)

print(" [x] Sent ‘Hello World!‘")
connection.close()
生产者
技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

channel.queue_declare(queue=hello)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback, queue=hello, no_ack=True)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
消费者

Message acknowledgment 消息确认

no-ack=False 默认值,如果消费者由于故障(its channel is closed, connection is closed, or TCP connection is lost)为发送消息确认,生产者会将数据再重新添加到队列中

技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

channel.queue_declare(queue=hello)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 消息确认,当未执行时,消息会重新添加回队列
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue=hello)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
消费者确认消息

Message durability 消息持久

当 RabbitMQ Server 因故重启,希望数据消息不丢失

技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

# durable=True
channel.queue_declare(queue=task_queue, durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=task_queue,
                      body=Hello World!,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 保持消息持久
                      ))

print(" [x] Sent ‘Hello World!‘")
connection.close()
生产者
技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

# durable=True
channel.queue_declare(queue=task_queue, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue=task_queue)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
消费者

消息获取顺序

消费者在获取消息时默认为平均获取。当有两个消费者获取消息,消费者一处理消息需要2s,消费者二处理消息需要4s,RabbitMQ 按照默认的调度平均分配消息,会导致消费者一很闲,消费者二很忙。修改 basic.qos 为 prefetch_count=1,表示谁来谁去,不在按照顺序调度任务

技术分享
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.5))
channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 修改调度顺序
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue=task_queue)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()
消费者

 

RabbitMQ

标签:

原文地址:http://www.cnblogs.com/wenchong/p/5961520.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!