标签:覆盖 内存 还需 调用 实例 技术 queue logs web
之前的几篇文章介绍了一下RabbitMQ的概念以及环境的搭建和配置,有了RabbitMQ环境就可以基于其实现一些特殊的任务场景了。RabbitMQ官方有个很好的Tutorials基本覆盖了RabbitMQ的各中常见应用场景,现以代码加注释的方式以其Python客户端pika为例简单介绍如下。更详尽的信息可参阅:http://www.rabbitmq.com/getstarted.html 。
之前的几篇文章:
RabbitMQ概念及环境搭建(一)单节点安装与配置
RabbitMQ概念及环境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及环境搭建(三)RabbitMQ cluster
RabbitMQ概念及环境搭建(四)RabbitMQ High Availability
RabbitMQ概念及环境搭建(五)与web的整合
RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。
应用场景1-“Hello Word”
一个P向queue发送一个message,一个C从该queue接收message并打印。
send.py
producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.queue_declare(queue=‘hello‘, durable=True)
-
- channel.basic_publish(exchange=‘‘,
- routing_key=‘hello‘,
- body=‘Hello World!‘)
-
- print " [x] Sent ‘Hello World!‘"
-
- connection.close()
receive.py
consumer,连接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.queue_declare(queue=‘hello‘)
-
- print ‘ [*] Waiting for messages. To exit press CTRL+C‘
-
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
-
- channel.basic_consume(callback,
- queue=‘hello‘,
- no_ack=True)
-
- channel.start_consuming()
测试
- python send.py
- python receive.py
应用场景2-work queues
将耗时的消息处理通过队列分配给多个consumer来处理,我们称此处的consumer为worker,我们将此处的queue称为Task Queue,其目的是为了避免资源密集型的task的同步处理,也即立即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。
new_task.py
建立连接,声明队列,发送可以模拟耗时任务的message,断开连接、退出。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.queue_declare(queue=‘task_queue‘, durable=True)
-
- message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
-
- channel.basic_publish(exchange=‘‘,
- routing_key=‘task_queue‘,
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2,
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
worker.py
建立连接,声明队列,不断的接收message,处理任务,进行确认。
- import pika
- import time
-
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.queue_declare(queue=‘task_queue‘, durable=True)
-
- print ‘ [*] Waiting for messages. To exit press CTRL+C‘
-
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count(‘.‘) )
- print " [x] Done"
-
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
- channel.basic_qos(prefetch_count=1)
-
- channel.basic_consume(callback,
- queue=‘task_queue‘)
-
- channel.start_consuming()
测试
- python new_task.py "A very hard task which takes two seconds.."
- python worker.py
应用场景3-Publish/Subscribe
在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。现在我们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息可以被所有运行的log接收者接收。因此,我们可以运行一个log接收者直接在屏幕上显示log,同时运行另一个log接收者将log写入磁盘文件。
receive_logs.py
日志消息接收者:建立连接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘logs‘,
- type=‘fanout‘)
-
- result = channel.queue_declare(exclusive=True)
-
- queue_name = result.method.queue
-
- channel.queue_bind(exchange=‘logs‘,
- queue=queue_name)
-
- print ‘ [*] Waiting for logs. To exit press CTRL+C‘
-
- def callback(ch, method, properties, body):
- print " [x] %r" % (body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log.py
日志消息发送者:建立连接,声明fanout类型的exchange,通过exchage向queue发送日志消息,消息被广播给所有接收者,关闭连接,退出。
-
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘logs‘,
- type=‘fanout‘)
-
- message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
-
- channel.basic_publish(exchange=‘logs‘,
- routing_key=‘‘,
- body=message)
-
- print " [x] Sent %r" % (message,)
-
- connection.close()
测试
- python receive_logs.py
- python emit_log.py "info: This is the log message"
应用场景4-Routing
应用场景3中构建了简单的log系统,可以将log message广播至多个receiver。现在我们将考虑只把指定的message类型发送给其subscriber,比如,只把error message写到log file而将所有log message显示在控制台。
receive_logs_direct.py
log message接收者:建立连接,声明direct类型的exchange,声明queue,使用提供的参数作为routing_key将queue绑定到exchange,开始循环接收log message并打印。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘direct_logs‘,
- type=‘direct‘)
-
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
-
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
-
- for severity in severities:
-
-
-
-
-
- channel.queue_bind(exchange=‘direct_logs‘,
- queue=queue_name,
- routing_key=severity)
-
- print ‘ [*] Waiting for logs. To exit press CTRL+C‘
-
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log_direct.py
log message发送者:建立连接,声明direct类型的exchange,生成并发送log message到exchange,关闭连接,退出。
-
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘direct_logs‘,
- type=‘direct‘)
-
- severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
- message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
-
- channel.basic_publish(exchange=‘direct_logs‘,
- routing_key=severity,
- body=message)
-
- print " [x] Sent %r:%r" % (severity, message)
- connection.close()
测试:
python receive_logs_direct.py info
python emit_log_direct.py info "The message"
应用场景5-topic
应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不同的log message发送给不同的subscriber(也即分别通过不同的routing_key将queue绑定到exchange,这样exchange便可将不同的message根据message内容路由至不同的queue)。但仍然存在限制,不能根据多个规则路由消息,比如接收者要么只能收error类型的log message要么只能收info类型的message。如果我们不仅想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,需要topic类型的exchange。topic类型的exchange中routing_key中可以包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。
receive_logs_topic.py
log message接收者:建立连接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘direct_logs‘,
- type=‘direct‘)
-
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
-
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
-
- for severity in severities:
-
-
-
-
-
- channel.queue_bind(exchange=‘direct_logs‘,
- queue=queue_name,
- routing_key=severity)
-
- print ‘ [*] Waiting for logs. To exit press CTRL+C‘
-
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
-
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
-
- channel.start_consuming()
emit_log_topic.py
log message发送者:建立连接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭连接,退出。
- import pika
- import sys
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=‘topic_logs‘,
- type=‘topic‘)
-
- routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
- message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
-
- channel.basic_publish(exchange=‘topic_logs‘,
- routing_key=routing_key,
- body=message)
-
- print " [x] Sent %r:%r" % (routing_key, message)
- connection.close()
测试:
- python receive_logs_topic.py "*.rabbit"
- python emit_log_topic.py red.rabbit Hello
应用场景6-PRC
在应用场景2中描述了如何使用work queue将耗时的task分配到不同的worker中。但是,如果我们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个完全不同的故事。这一模式被称为远程过程调用。现在,我们将构建一个RPC系统,包含一个client和可扩展的RPC server,通过返回斐波那契数来模拟RPC service。
rpc_server.py
RPC server:建立连接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包含参数的调用请求后调用自己的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。
- import pika
-
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
- channel = connection.channel()
-
- channel.queue_declare(queue=‘rpc_queue‘)
-
- def fib(n):
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return fib(n-1) + fib(n-2)
-
- def on_request(ch, method, props, body):
-
- n = int(body)
-
- print " [.] fib(%s)" % (n,)
-
- response = fib(n)
-
-
-
-
-
-
- ch.basic_publish(exchange=‘‘,
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = \
- props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
- channel.basic_qos(prefetch_count=1)
-
- channel.basic_consume(on_request, queue=‘rpc_queue‘)
-
- print " [x] Awaiting RPC requests"
-
- channel.start_consuming()
rpc_client.py
RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的连接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性作出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。
- import pika
- import uuid
-
- class FibonacciRpcClient(object):
- def __init__(self):
-
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=‘localhost‘))
-
- self.channel = self.connection.channel()
-
-
- result = self.channel.queue_declare(exclusive=True)
- self.callback_queue = result.method.queue
-
-
-
-
-
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
-
-
-
-
- def on_response(self, ch, method, props, body):
- if self.corr_id == props.correlation_id:
- self.response = body
-
- def call(self, n):
-
- self.response = None
- self.corr_id = str(uuid.uuid4())
-
-
-
-
- self.channel.basic_publish(exchange=‘‘,
- routing_key=‘rpc_queue‘,
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
-
- body=str(n))
-
- while self.response is None:
- self.connection.process_data_events()
-
- return int(self.response)
-
- fibonacci_rpc = FibonacciRpcClient()
-
- print " [x] Requesting fib(30)"
- response = fibonacci_rpc.call(30)
- print " [.] Got %r" % (response,)
测试:
- python rpc_server.py
- python rpc_client.py
RabbitMQ的几种应用场景
标签:覆盖 内存 还需 调用 实例 技术 queue logs web
原文地址:http://www.cnblogs.com/syuee/p/7580166.html