标签:介绍 客户端 port 删除 highlight header body 必须 tag
python操作rabbitmq的模块叫做pika
1个生产者对应1个消费者
如果生产者发送一条消息,那么消费者也只能接受到1条消息
如果生产者发送两条消息,那么消费者就可以接受到2条消息
1个生产者对应2个消费者
如果生产者发送1条消息,那么只有1个消费者能接受到消息
如果生产者发送2条消息,那么每个消费者都接受到1条消息
如果生产者先启动了,生产者的代码已经执行完成,程序退出,这个时候消费者才启动,消费者也能接受到消息
work queue
如果一个生产者对应多个消费者,那么生产者会依次把消息轮训到多个消费者上,实现一个负载均衡的效果
但是如果rabbitmq server重启后,则队列和消息就会丢失
1、队列持久化
durable=True
2、消息持久化
delivery_mode = 2
如果申明一个queue为持久化,那么就需要在服务端和客户端都需要设置
如果申明一个消息的持久化,加一个
delivery_mode = 2
消息公平分发
确保每个消费者同时只能有固定的数量的任务在处理,比如这个固定的任务是1或者2或者3之类的
这个需要在消费者端设置,设置我这个消费者同时只能处理几个任务
设置方法如下,这个1就是同时只能处理一个任务
channel.basic_qos(prefetch_count=1)
发布订阅
前面的一条消息只能被一个客户端消费,那么发布和订阅就是一条消息可以被
多个客户端消息,这里就需要用到“Exchange”,Exchange在定义的时候是有
类型的,以决定到底哪些queue符合条件,可以接受消息,一个exchange可以绑
定多个queue
fanout:所有绑定在这个exchange上的que都会接受到消息
direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息
可以根据关键字发送,即:队列绑定关键字,发送者将根据关键字发送消息
到指定的队列中
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所绑定
的queue都可以接收到消息
表达式符合说明:#号代表一个或多个字符,*代表人和字符
例如
#a.a会匹配a.a,aa.a,aaa.a
*.a会匹配a.a,b.a,c.a
headers:通过headers来决定把消息发给哪些queue
1、先看下使用rabbitmq实现最简单的通信
先看生产端的代码
import pika #操作rabbitmq的模块,官方的模块叫做pika # 1、先连接到rabbitmq # 2、创建一个管道 # 3、在管道中跑队列 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #生成一个阻塞的连接,连接上rabbitmaq test_channel = connection.channel() #创建一个管道 test_channel.queue_declare(queue="hello2") #在管道中申明一个队列,队列的名字就叫做hello # test_channel.queue_declare(queue="hello",durable=True) #设置hello这个queue为持久化,这个持久化是重启rabbitmq server的服务器重启,这个queue也不会丢失,需要在客户端和服务端都需要做持久化durable=True #一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者 #重新申明一个queue,才能把queue设置为持久化 test_channel.basic_publish(exchange="", routing_key="hello2", body="hello,my first python rabbitmq message 2") # test_channel.basic_publish(exchange="", # routing_key="hello", # body="hello,my first python rabbitmq message 2") #rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息 #routing_key就是往哪个队列中发数据 #body就是往这个队列中发的内容 #上面这3个是规定,必须要按照上面的格式去写 print("[x] send ‘hello,my first python rabbitmq message‘") #打印一下,这里没有特殊意义 test_channel.close() # 关闭这个rabbitmq
在看接收端的代码
import pika client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() #生成一个管道 channel.queue_declare(queue="hello2") #在管道申明一个队列,在服务端已经申明了一个管道,名称为hello,这里为什么又要创建一个管道 #这里主要为了避免代码报错,如果客户端先执行,发现没有hello这个队列,则会报错,所以这里申 # 明一个队列,如果没有这个管道,则创建一个管道,如果有,就什么也不做,直接忽略了。 # def callback(body): def callback(ch,method,properties,body): print("[x] Receive %r" %body) print("----->sh",ch) print("----->method",method) print("----->properties",properties) #回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数 channel.basic_consume(callback, queue="hello2", no_ack=True) #开始接受数据 #callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数 # 从队列hello中接受消息 #no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息 print("[*] waiting for message") channel.start_consuming() #这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞
2、在看如何实现序列化持久化
先看生产端的代码
import pika #操作rabbitmq的模块,官方的模块叫做pika # 1、先连接到rabbitmq # 2、创建一个管道 # 3、在管道中跑队列 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #生成一个阻塞的连接,连接上rabbitmaq test_channel = connection.channel() #创建一个管道 test_channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 #这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。 #一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者 #重新申明一个queue,才能把queue设置为持久化 test_channel.basic_publish(exchange="", routing_key="test_duralbe_queue", body="hello,my first python rabbitmq message 2") # test_channel.basic_publish(exchange="", # routing_key="hello", # body="hello,my first python rabbitmq message 2") #rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息 #routing_key就是往哪个队列中发数据 #body就是往这个队列中发的内容 #上面这3个是规定,必须要按照上面的格式去写 print("[x] send ‘hello,my first python rabbitmq message‘") #打印一下,这里没有特殊意义 test_channel.close() # 关闭这个rabbitmq
在看接收端的代码
import pika client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() #生成一个管道 channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 # def callback(body): def callback(ch,method,properties,body): print("[x] Receive %r" %body) print("----->sh",ch) print("----->method",method) print("----->properties",properties) #回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数 channel.basic_consume(callback, queue="test_duralbe_queue", no_ack=True) #开始接受数据 #callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数 # 从队列hello中接受消息 #no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息 print("[*] waiting for message") channel.start_consuming() #这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞
3、在看如何实现消息持久化
先看生产端的代码
import pika #操作rabbitmq的模块,官方的模块叫做pika # 1、先连接到rabbitmq # 2、创建一个管道 # 3、在管道中跑队列 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #生成一个阻塞的连接,连接上rabbitmaq test_channel = connection.channel() #创建一个管道 test_channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 #这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。 #一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者 #重新申明一个queue,才能把queue设置为持久化 test_channel.basic_publish(exchange="", routing_key="test_duralbe_queue", body="hello,my first python rabbitmq message 3", properties=pika.BasicProperties(delivery_mode=2)) #properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后, #直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化 #rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息 #routing_key就是往哪个队列中发数据 #body就是往这个队列中发的内容 #上面这3个是规定,必须要按照上面的格式去写 print("[x] send ‘hello,my first python rabbitmq message‘") #打印一下,这里没有特殊意义 test_channel.close() # 关闭这个rabbitmq
在看接收端的代码
import pika client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() #生成一个管道 channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 # def callback(body): def callback(ch,method,properties,body): ch.basic_ack(delivery_tag=method.delivery_tag) print("[x] Receive %r" %body) print("----->sh",ch) print("----->method",method) print("----->properties",properties) #如果要实现消息持久化,需要在消费者端加这么一个行 # ch.basic_ack(delivery_tag=method.delivery_tag) #回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数 channel.basic_consume(callback, queue="test_duralbe_queue", no_ack=True) #开始接受数据 #callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数 # 从队列hello中接受消息 #no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息 print("[*] waiting for message") channel.start_consuming() #这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞
4、再看如何用rabbitmaq实现
先看生产端的代码
import pika #操作rabbitmq的模块,官方的模块叫做pika # 1、先连接到rabbitmq # 2、创建一个管道 # 3、在管道中跑队列 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #生成一个阻塞的连接,连接上rabbitmaq test_channel = connection.channel() #创建一个管道 test_channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 #这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。 #一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者 #重新申明一个queue,才能把queue设置为持久化 test_channel.basic_publish(exchange="", routing_key="test_duralbe_queue", body="hello,my first python rabbitmq message 3", properties=pika.BasicProperties(delivery_mode=2)) #properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后, #直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化 #rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息 #routing_key就是往哪个队列中发数据 #body就是往这个队列中发的内容 #上面这3个是规定,必须要按照上面的格式去写 print("[x] send ‘hello,my first python rabbitmq message‘") #打印一下,这里没有特殊意义 test_channel.close() # 关闭这个rabbitmq
在看接收端的代码
import pika client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() #生成一个管道 channel.queue_declare(queue="test_duralbe_queue",durable=True) #durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列 # def callback(body): def callback(ch,method,properties,body): ch.basic_ack(delivery_tag=method.delivery_tag) print("[x] Receive %r" %body) print("----->sh",ch) print("----->method",method) print("----->properties",properties) channel.basic_qos(prefetch_count=1) #如果要实现消息持久化,需要在消费者端加这么一个行 # ch.basic_ack(delivery_tag=method.delivery_tag) #回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数 channel.basic_consume(callback, queue="test_duralbe_queue", no_ack=True) #开始接受数据 #callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数 # 从队列hello中接受消息 #no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息 print("[*] waiting for message") channel.start_consuming() #这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞
5、在看如何使用rabbitmq实现全部广播
先看生产端的代码
import pika import sys # 如果是fanout,则不需要申明queue client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") message = "".join(sys.argv[1:]) or "info:hello world!" channel.basic_publish(exchange="logs", routing_key="", body=message) print("[x] send %r" % message) client_connection.close()
再看接收端的代码
import pika client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") result = channel.queue_declare(exclusive=True) #不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者 #断开后自动被删除 queue_name = result.method.queue channel.queue_bind(exchange="logs", queue=queue_name) #把queue绑定到exchange上,他才能接受消息 print("waiting for message") def callback(ch,method,properties,body): print("receive message %r" %body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
6、在看使用rabbitmap实现指定的广播
先看发送端的代码
import pika import sys #这个的意思是,把队列绑定在一个管道中,不同的客户端把这个队列申明为不同的key,在生产端 #可以指定这个消息发送到队列中的哪些key中 #根据关键字把消息发送到指定的队列中 client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() #创建一个管道 channel.exchange_declare(exchange="direct_logs", exchange_type="direct") #申明管道的类型 severity = sys.argv[1] if len(sys.argv) > 1 else "info" # 指定往哪个队列中发,如果没有指定,则往info里发 message = "".join(sys.argv[1:]) or "hello world" channel.basic_publish(exchange="direct_logs", routing_key=severity, #往队列中的那个routing_key中发 body=message) print("[x] send %r" % message) client_connection.close()
在看接收端的代码
import pika import sys client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) #首先也是要连接上rabbitmq,和生产者端是一样的 channel = client_connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") result = channel.queue_declare(exclusive=True) #不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者 #断开后自动被删除 queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning ] [error]\n" %sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity) #把队列申明为指定的路由key,客户端申明为路由key为1,那么只有服务端发送到队列中路由key为1 # 的客户端,才能收到消息 print("waiting for message") def callback(ch,method,properties,body): print("receive message %r" %body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
标签:介绍 客户端 port 删除 highlight header body 必须 tag
原文地址:http://www.cnblogs.com/bainianminguo/p/7502653.html