标签:启动 plugin .com false ssi type exit mini 解决
rabbitMQ是消息队列;想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互)、进程queue(父进程与子进程进行交互或者同属于同一父进程下的多个子进程进行交互);如果两个独立的程序,那么之间是不能通过queue进行交互的,这时候我们就需要一个中间代理即rabbitMQ
消息队列:
一. 安装
1.1 安装: sudo apt-get install rabbitmq-server
1.2 启动rabbitmq web服务:
sudo invoke-rc.d rabbitmq-server stop
sudo invoke-rc.d rabbitmq-server start
启动web管理:sudo rabbitmq-plugins enable rabbitmq_management
1.3 远程访问rabbitmq,自己增加一个用户,步骤如下:
之后就能用admin用户远程连接rabbitmq server了。
二. 代码实现
发送端:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import pika credentials = pika.PlainCredentials( ‘admin‘ , ‘123123‘ ) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘ , 5672 , ‘/‘ , credentials)) channel = connection.channel() #声明queue channel.queue_declare(queue = ‘hello‘ ) #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange = ‘‘, routing_key = ‘hello‘ , body = ‘Hello World!‘ ) print ( " [x] Sent ‘Hello World!‘" ) connection.close() |
接收端:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import pika credentials = pika.PlainCredentials( ‘admin‘ , ‘123123‘ ) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘ , 5672 , ‘/‘ , credentials)) channel = connection.channel() channel.queue_declare(queue = ‘hello‘ ) def callback(ch, method, properties, body): print ( " [x] Received %r" % body) channel.basic_consume(callback, queue = ‘hello‘ , no_ack = True ) print ( ‘ [*] Waiting for messages. To exit press CTRL+C‘ ) channel.start_consuming() |
先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
import pika credentials = pika.PlainCredentials(‘admin‘, ‘123123‘) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘, 5672, ‘/‘, credentials)) channel = connection.channel() channel.queue_declare(queue=‘cc‘) def callback(ch,method,properties,body): print(ch,method,properties) #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90> 管道内存对象地址 #methon:<Basic.Deliver([‘consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38‘, #具体信息 # ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=lzl‘])> #properties:<BasicProperties> print("Received %r"%body) channel.basic_consume(callback, #如果收到消息,就调用callback函数处理消息 queue="cc", no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming() #开始收消息 consume.py
通过执行pubulish.py和consume.py可以实现上面的消息公平分发,那假如c1收到消息之后宕机了,会出现什么情况呢?rabbitMQ是如何处理的?现在我们模拟一下:
1 import pika 2 3 credentials = pika.PlainCredentials(‘admin‘, ‘123123‘) 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 ‘192.168.16.82‘, 5672, ‘/‘, credentials)) 6 7 channel = connection.channel() #声明一个管道(管道内发消息) 8 9 channel.queue_declare(queue=‘cc‘) #声明queue队列 10 11 channel.basic_publish(exchange=‘‘, 12 routing_key=‘cc‘, #routing_key 就是queue名 13 body=‘Hello World!‘ 14 ) 15 print("Sent ‘Hello,World!‘") 16 connection.close() #关闭 17 18 publish.py
import pika,time credentials = pika.PlainCredentials(‘admin‘, ‘123123‘) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘, 5672, ‘/‘, credentials)) channel = connection.channel() channel.queue_declare(queue=‘cc‘) def callback(ch,method,properties,body): print("->>",ch,method,properties) time.sleep(15) # 模拟处理时间 print("Received %r"%body) channel.basic_consume(callback, #如果收到消息,就调用callback函数处理消息 queue="cc", no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming() #开始收消息 consume.py
在这种模式下,RabbitMQ会默认把p发的消息公平的依次分发给各个消费者(c),跟负载均衡差不多
如果消息在传输过程中rabbitMQ服务器宕机了,会发现之前的消息队列就不存在了,这时我们就要用到消息持久化,消息持久化会让队列不随着服务器宕机而消失,会永久的保存下去
发送端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import pika credentials = pika.PlainCredentials( ‘admin‘ , ‘123123‘ ) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘ , 5672 , ‘/‘ , credentials)) channel = connection.channel() #声明一个管道(管道内发消息) channel.queue_declare(queue = ‘cc‘ ,durable = True ) #队列持久化 channel.basic_publish(exchange = ‘‘, routing_key = ‘cc‘ , #routing_key 就是queue名 body = ‘Hello World!‘ , properties = pika.BasicProperties( delivery_mode = 2 #消息持久化 ) ) print ( "Sent ‘Hello,World!‘" ) connection.close() #关闭 |
接收端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import pika,time credentials = pika.PlainCredentials( ‘admin‘ , ‘123123‘ ) connection = pika.BlockingConnection(pika.ConnectionParameters( ‘192.168.16.82‘ , 5672 , ‘/‘ , credentials)) channel = connection.channel() channel.queue_declare(queue = ‘cc‘ ,durable = True ) def callback(ch,method,properties,body): print ( "->>" ,ch,method,properties) time.sleep( 15 ) # 模拟处理时间 print ( "Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, #如果收到消息,就调用callback函数处理消息 queue = "cc" , ) print ( ‘ [*] Waiting for messages. To exit press CTRL+C‘ ) channel.start_consuming() #开始收消息 |
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了
1
|
channel.basic_qos(prefetch_count = 1 ) |
带消息持久化+公平分发:
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列
标签:启动 plugin .com false ssi type exit mini 解决
原文地址:http://www.cnblogs.com/zoee/p/5983549.html