码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ学习笔记

时间:2015-08-05 18:22:18      阅读:162      评论:0      收藏:0      [点我收藏+]

标签:erlang   rabbitmq   java   



**************************************rabbitmq 安装配置************************************
参考:http://blog.csdn.net/boonya/article/details/37879739


1.下载erlang支持包:http://www.erlang.org/download.html  安装otp_win32_18.0.exe
2.下载rabbitmq server:http://www.rabbitmq.com/releases/rabbitmq-server/  安装rabbitmq-server-3.5.4.exe
3.下载RabbitMQ client:http://www.rabbitmq.com/releases/rabbitmq-java-client/或者在http://www.rabbitmq.com/java-client.html 安装rabbitmq-java-client-3.5.0.tar.gz
4.配置环境变量
1>新建变量:ERLANG_HOME=D:\Program Files\erl6.1,然后再path中添加%ERLANG_HOME%\bin;
2>新建变量:RABBITMQ_SERVER=D:\Program Files\rabbitmq_server-3.3.4,然后再path中添加%RABBITMQ_SERVER%\sbin;
3>运行sbin/rabbitmq-server.bat,启动RabbitMQ服务器
5.客户端测试




**************************************实现网页端控制台***************************************
参考:http://blog.csdn.net/spyiu/article/details/24697221


1.CMD打开 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.5.4\sbin 目录
2.CMD运行 rabbitmq-plugins.bat list 查看已安装的插件列表
3.CMD运行 rabbitmq-plugins.bat enable rabbitmq_management  开启网页版控制台
4.重启RabbitMQ服务生效
5.在浏览器输入 http://localhost(ip):15672 进入控制台




*************************************rabbitmq 用户配置***************************************
参考:http://my.oschina.net/hncscwc/blog/262246?p=


1.用户管理 
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
相应的命令
1>新增一个用户
  rabbitmqctl  add_user  Username  Password 
2>删除一个用户 
  rabbitmqctl  delete_user  Username 
3>修改用户的密码
  rabbitmqctl  change_password  Username  Newpassword 
4>查看当前用户列表
  rabbitmqctl  list_users 
2.用户角色 
按照个人理解,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。
1>超级管理员(administrator) 
  可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。 
2>监控者(monitoring) 
  可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) 
3>策略制定者(policymaker) 
  可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 
4>普通管理者(management) 
  仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
5>其他(None)
  无法登陆管理控制台,通常就是普通的生产者和消费者。 
3.设置角色
  1>命令为:rabbitmqctl  set_user_tags  User  Tag 
    User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
    也可以给同一用户设置多个角色,例如 rabbitmqctl  set_user_tags  hncscwc  monitoring  policymaker 
4.用户权限([-p  VHostPath] 为虚拟机名称。具体不清楚这种怎么用,我的用法是1.网页端设置 2.使用另一种也就是5设置虚拟机和权限)
  1>用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。 
  2>例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。
  3>相关命令如下:
    1)设置用户权限
      rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP 
    2)查看(指定hostpath)所有用户的权限信息 
      rabbitmqctl  list_permissions  [-p  VHostPath] 
    3)查看指定用户的权限信息 
      rabbitmqctl  list_user_permissions  User 
    4)清除用户的权限信息 
      rabbitmqctl  clear_permissions  [-p VHostPath]  User 
5.设置虚拟机和权限(这里设置完的权限是归属于你创建的虚拟机)
参考:http://www.cnblogs.com/daizhj/archive/2010/10/21/1857374.html


虚拟机(基于sbin目录下):
  1>首先创建vhosts,命令如下:
    rabbitmqctl add_vhost dnt_mq   
  2>删除虚拟机
    rabbitmqctl delete_vhost vhostpath
  3>显示出所有虚拟主机信息
    rabbitmqctl list_vhosts
添加用户名和密码:
  1>添加用户和密码(用户名ayf, 密码:pwd):
    rabbitmqctl add_user ayf pwd
  2>修改用户密码
    rabbitmqctl change_password username newpassword
权限设置:
  1>绑定用户权限
    rabbitmqctl set_permissions -p dnt_mq ayf ".*" ".*" ".*"
  2>列出用户权限
    rabbitmqctl list_user_permissions ayf
  3>清除用户权限
    rabbitmqctl clear_permissions [-p vhostpath] username
 
*******************************************其它*****************************************


**********详细讲解**********
参考:http://www.cnblogs.com/dubing/p/4017613.html


简介:
1.要了解rabbitMQ 首先要了解AMQP协议 百科上给的很详细 http://baike.baidu.com/view/4023136.htm?fr=aladdin
2.AMQP 有四个非常重要的概念:虚拟机(virtual host),通道(exchange),队列(queue)和绑定(binding)。
  1>虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
  2>交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
  3>队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
  4>绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。
  5>想要进阶学习的可以参考 https://www.rabbitmq.com/tutorials/amqp-concepts.html
3.RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。
4.你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。
5.对于rabbitMQ本身的特点 参考官网 http://www.rabbitmq.com/features.html
  1>可靠性(Reliability)
    RabbitMQ提供很多特性供我们可以在性能和可靠性作出折中的选择,包括持久化、发送确认、发布者确认和高可用性等。
  2>弹性选路(Flexible Routing)
    消息在到达队列前通过交换(exchanges)来被选路。RabbitMQ为典型的选路逻辑设计了几个内置的交换类型。对于更加复杂的选路,我们可以将exchanges绑定在一起或者写属于自己的exchange类型插件。
  3>集群化(Clustering)
    在一个局域网内的几个RabbitMQ服务器可以集群起来,组成一个逻辑的代理人。
    http://www.360doc.com/content/14/0911/17/15077656_408713893.shtml
    所有命令 在 sbin目录下
    1.行命令rabbitmqctl status,查询节点的状态,输出 Status of node ‘rabbit@WORKGROUP-1‘ ...。
    2.假设 192.168.10.111 的节点名称为 rabbit@H1-PC,192.168.10.112的节点名称为 rabbit@H2-PC
    3.分别在两台机器的rabbitmq数据、日志文件所在路径下(本文为C:/Users/Administrator.PC-20131017PAWC/AppData/Roaming/RabbitMQ(可以在网页端查看)),创建集群配置文件rabbitmq.config
      rabbitmq.config是一个标准的erlang配置文件。它必须符合erlang配置文件的标准。它既有默认的目录,也可以在rabbitmq-env.conf文件中配置。文件的内容详见:http://www.rabbitmq.com/configure.html#config-items
      内容为(包括最后的.):[{rabbit,[{cluster_nodes, [‘rabbit@app118‘, ‘rabbit@app119‘]}]}].
    4.分别配置两台机器的host(C:\Windows\System32\drivers\etc\hosts)
      111:
        192.168.10.111 H1-PC  
192.168.10.112 H2-PC    
      112:
        192.168.10.112 H2-PC  
192.168.10.111 H1-PC
    5.分别在两台机器的rabbitmq数据、日志文件所在路径下(本文为C:/Users/Administrator.PC-20131017PAWC/AppData/Roaming/RabbitMQ),创建rabbitmq环境变量的配置文件rabbitmq-env.conf
      192.168.10.111的rabbitmq-env.conf内容为:
      NODENAME=rabbit@H1-PC
      NODE_IP_ADDRESS=192.168.10.111
      NODE_PORT=5672
      RABBITMQ_MNESIA_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\db
      RABBITMQ_LOG_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\log
    6.192.168.10.112的rabbitmq-env.conf内容为:
      NODENAME=rabbit@H2-PC
      NODE_IP_ADDRESS=192.168.10.112
      NODE_PORT=5672
      RABBITMQ_MNESIA_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\db
      RABBITMQ_LOG_BASE=C:\Users\Administrator\AppData\Roaming\RabbitMQ\log
    7.将192.168.10.111的C:\Users\Administrator文件夹下的.erlang.cookie文件替换掉192.168.10.112 C:\Users\Administrator和C:\Windows下的该文件(该文件是集群节点进行通信的验证密钥,所有节点必须一致。反过来亦可)。
    8.重启两台机器的rabbitmq。
      1>192.168.10.112,在控制台D:\RabbitMQ Server\rabbitmq_server-2.8.1\sbin 路径下分别执行以下语句:
        rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
      2>192.168.10.111,在控制台D:\RabbitMQServer\rabbitmq_server-2.8.1\sbin路径下分别执行以下语句:
        rabbitmqctl stop_app
rabbitmqctl reset
rabbtimqctl cluster rabbit@H2-PC
rabbitmqctl start_app
      3>上述命令先停掉rabbitmq应用,reset集群状态,然后调用cluster命令,将H1-PC连接到H2-PC,使两者成为一个集群,最后重启rabbitmq应用。在这个cluster命令下,H1-PC是内存节点,H2-PC是磁盘节点(RabbitMQ启动后,默认是磁盘节点)。
        如果要使H1-PC在集群里也是磁盘节点,那么更改上述第3句如下:
rabbitmqctl cluster rabbit@H2-PC rabbit@H1-PC
只要在节点列表里包含了自己,它就成为一个磁盘节点。在RabbitMQ集群里,必须至少有一个磁盘节点存在。
    9.在H1-PC和H2-PC上,运行rabbitmqctl cluster_status命令查看集群状态.
      Cluster status of node ‘rabbit@H1-PC‘...
      [{nodes,[{disc,[‘rabbit@H2-PC‘]},{ram,[‘rabbit@H1-PC‘]}]},
      {running_nodes,[‘rabbit@H2-PC‘,‘rabbit@H1-PC‘]}]
      ...done.


  4>联盟(Federation)
    对于那些需要比集群更加松散和非可靠连接的服务器来说,RabbitMQ提供一个联盟模型(Federation Model)
  5>高可用队列(High Available Queue)
    可以在一个集群里的几个机器里对队列做镜像,确保即时发生了硬件失效,你的消息也是安全的。
  6>多客户端(Many Clients)
    有各种语言的RabbitMQ客户端
  7>管理UI(Management UI)
    RabbitMQ提供一个易用的管理UI来监控和控制消息代理人的各个方面。
  8>跟踪(Tracing)
    如果你的消息系统行为异常,RabbitMQ提供跟踪支持来找出错误的根源。
  9>插件系统(Plugin System)
    RabbitMQ提供各种方式的插件扩展,我们可以实现自己的插件。
6.使用任务队列一个优点是能够轻易地并行处理任务。当处理大量积压的任务,只要增加工作队列,通过这个方式,能够实现轻易的缩放。
7.exchange常用有三种类型:
  1>Direct :处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
  2>Fanout :不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
  3>Topic : 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。


****************属性介绍**********
8.exchange属性
  1>exchange的类型分为fanout,direct,topic.还有一种不常用的headers。
  2>headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型
    举个例子,发送端定义2个键值{k1,1},{k2,2},接收端绑定队列的时候定义{"x-match", "any"},那么接收端的键值属性里只要存在{k1,1}或{k2,2}都可以获取到消息。
  这样的类型扩展的程度很大,适合非常复杂的业务场景。
  3>Durability
    持久性,这是exchange的可选属性,如果你Durability设置为false,那些当前会话结束的时候,该exchange也会被销毁。 
  4>Auto delete
    当没有队列或者其他exchange绑定到此exchange的时候,该exchange被销毁。这个很简单就不示例了。
  5>Internal 
    表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。
    无法声明2个名称相同 但是类型却不同的exchange
9.Queue属性
  1>Durability 和exchange相同,未持久化的队列,服务重启后销毁。
  2>Auto delete 当没有消费者连接到该队列的时候,队列自动销毁。
  3>Exclusive 使队列成为私有队列,只有当前应用程序可用,当你需要限制队列只有一个消费者,这是很有用的。
  4>扩展属性如下对应源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的参数
  5>Message TTL 当一个消息被推送在该队列的时候 可以存在的时间 单位为ms,(对应扩展参数argument "x-message-ttl" )
  6>Auto expire 在队列自动删除之前可以保留多长时间(对应扩展参数argument "x-expires")
  7>Max length 一个队列可以容纳的已准备消息的数量(对应扩展参数argument "x-max-length")
  8>更多参考 http://www.rabbitmq.com/extensions.html
  9>一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
10.Message属性
  1>Durability 
    消息的持久在代码中设置的方法与exchange和queue不同,有2种方法
    1.IBasicProperties properties = channel.CreateBasicProperties();
      properties.SetPersistent(true);
      byte[] payload = Encoding.ASCII.GetBytes(message);
      channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);
    2.IBasicProperties properties = channel.CreateBasicProperties();
      properties.DeliveryMode = 2;
      byte[] payload = Encoding.ASCII.GetBytes(message);
      channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);
    contentType: 标识消息内容的MIME,例如JSON用application/json
    replayTo: 标识回调的queue的地址
    correlationId:用于request和response的关联,确保消息的请求和响应的同一性
  2>Message的2种状态:
    Ready  此状态的消息存在于队列中待处理。
    Unacknowledged  此状态的消息表示已经在处理未确认。
    说到Unacknowledged,这里需要了解一个ack的概念。当Consumer接收到消息、处理任务完成之后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成。RabbitMQ会一直等到处理某个消息的Consumer的链接失去之后,才确定这个消息没有正确处理,从而RabbitMQ重发这个消息。
    Message acknowledgment是默认关闭的。初始化Consumer时有个noAck参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。
    string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)
    一般来说,常用的场景noack一般就是设置成true,但是对于风险要求比较高的项目,例如支付。对于每一条消息我们都需要保证他的完整性和正确性。就需要获取消息后确认执行完正确的业务逻辑后再主动返回一个ack给server。可以通过rabbitmqctl list_queues name message_rady message_unacknowleded 命令来查看队列中的消息情况,也可以通过后台管理界面。
    message的消费还分为consume和baseget 
11.binding相关
  1>如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除
  2>在声明一个队列的同时,server会默认让此队列绑定在默认的exchange上,这个exchange的名称为空。
12.发布订阅
  1>RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要把消息发送给一个exchange。exchange非常简单,它一边从发布者方接收消息,一边把消息推入队列。exchange必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。
  2>发布订阅其实很简单,例如上章我所示例,假设我们一开始没有任何消息,现在有一个生产者P1,他是一个天气预报播放者。然后我们有2个消费者来订阅他的消息。
  3>P1通过广播类型的交换机fEx来发布他的天气消息,c1,c2分别建立一个队列为Q1,Q2. 并且订阅P1的fEx.
  4>发布订阅中发布者所产生的消息通过exchange对所有绑定他的队列队形消息推送,每个队列获取绑定所对应的消息
13.WorkQueue (可用于消费者集群)
  1>区分于发布订阅,消费者集群主要解决横向服务器扩展问题,如果一个队列积压太多,如何均与的让不同的消费者来承担。
  2>默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
  3>默认地,RabbitMQ会逐一地向下一个Consumer发放消息,每一个Consumer会得到数目相同的消息。文中所示之所以是按照1条一条的轮询,是因为程序中控制了一个队列单次消费的数量。
14.Message的BasicGet于consume的区别
  1>consume的功能上一张介绍过,basicget更偏向于我们平时用过的其他类型的MessageQueue,它就是最基本的接受消息,consume的消费针对basicget来说属于一个长连接于短连接的区别
  2>消费者关系一旦确定,基本上默认它就是在侦听通道的消息是否在生产。而basicget则是由客户端手动来控制。
  3>如果你选择了消费消息,那么基本上代码层面是这样来完成的
    var consumer = new QueueingBasicConsumer(channel);
    channel.BasicQos(0, 1, false);
    channel.BasicConsume(queue.name, rbAckTrue.Checked, consumer);
    while (true){
      var e = consumer.Queue.Dequeue();
      MessageBox.Show(string.Format("队列{0}获取消息{1},线程id为{2}", queue.name, Encoding.ASCII.GetString(e.Body), Process.GetCurrentProcess().Id));
      Thread.Sleep(1000);
    }




***发送接收消息问题
1.设置vhost
  factory.setVirtualHost("dnt_mq");
2.设置用户名密码
  factory.setPort(5672);
  factory.setUsername("abc");
  factory.setPassword("abc");
3.设置地址
  factory.setHost("localhost");
4.如果发送端设置了 vhost 接收端要设置vhost和用户名密码
  factory.setVirtualHost("dnt_mq");
  factory.setHost("localhost");
  factory.setPort(5672);
  factory.setUsername("abc");
  factory.setPassword("abc");
5.关于接收消息
  1>接收完直接告诉服务器接收成功
    channel.basicConsume(QUEUE_NAME, true, consumer);
  2>等处理完再告诉服务器消息接收成功
    channel.basicConsume(QUEUE_NAME, false, consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received ‘" + message + "‘");
      //告诉服务器 消息接收成功
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
    }
6.Server端的Queue持久化
  1>如果已经声明了同名非持久化的Queue,则再次声明无效。
  2>发送方和接收方都需要指定该参数(第二个参数)
    boolean durable = true;  
    channel.queueDeclare("task_queue", durable, false, false, null);  
7.Message持久化(第三个参数)
  1>channel.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());  
8.负载分配
  1>为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-robin
  2>int prefetchCount = 1; channel.basicQos(prefetchCount); 
  3>意思为,最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。


**********路由模式**********
9.固定关键词路由:Routing
  1>使用类型为direct的exchange,发送特定关键词(RoutingKey)的消息给订阅该关键词的Queue。
  2>场景示例:消息发送方发送了类型为[error][info]的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印的接收两者
  3>发送方
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost");  
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel();  
    /*exchange类型为direct
     *第一个为交换机名称  第二个为 exchange类型
    */
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //绑定队列
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
    /*关键词=info*/
    channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes()); 
    channel.close(); 
    connection.close();  
  4>接收方
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost");  
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel();  
    /*exchange类型为direct 和发送方统一*/
    channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 
    // 创建匿名Queue  
    String queueName = channel.queueDeclare().getQueue(); 
    // 订阅某个关键词,绑定到匿名Queue中  第一个为队列名称 第二个为交换机名称 第三个为关键词
    channel.queueBind(quueName,EXCHANGE_NAME,"error"); 
    channel.queueBind(quueName,EXCHANGE_NAME,"info"); 
    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(queueName, true, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... 
    String message = new String(delivery.getBody());  
    String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词
10.关键词模式路由:Topics
  1>这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅模糊关键词。
  2>关键词必须是一组word,由点号分割。例如"xxx.yyy.zzz",限定255bytes。
    * 表示一个word; # 表示0个或者多个word;
  3>发送方
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost");  
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel();  
    /*exchange类型 为 topic */
    channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
    //绑定队列
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
    /*关键词routingKey*/
    channel.basicPublish(EXCHANGE_NAME, "xxx.yyy", null, message.getBytes());  
    channel.close(); 
    connection.close();  
  4>接收方
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost");  
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel();  
    /*exchange类型为topic 和发送方统一*/
    channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
    // 创建匿名Queue  
    String queueName = channel.queueDeclare().getQueue(); 
    // 订阅某个关键词,绑定到匿名Queue中  第一个为队列名称 第二个为交换机名称 第三个为关键词
    channel.queueBind(quueName,EXCHANGE_NAME,"*.yyy");
    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(queueName, true, consumer); 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // Blocking... 
    String message = new String(delivery.getBody());  
    String routingKey = delivery.getEnvelope().getRoutingKey(); // 可获取路由关键词




*******************************************消息阻塞*************************************
参考:http://www.cnblogs.com/lyz-pro/archive/2012/07/11/2587014.html


1.消息会处于阻塞状态,可以通过(man rabbitmqctl 可以获得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)list_channels pid connection ; close_connection [connection] "" ,查看和解决阻塞 
rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged
解决阻塞的办法,可以在subscribe消息队列是设置autoAck=true,这样会避免消息队列中消息阻塞,这种情况是worker接到消息后,就会把消息从消息队列删除,不管消息是否被正确处理,另一种是设置autoAck=false,这样worker在接受消息后,必须给予服务端一个ack响应,该消息才会从消息队列中删除,这样会防止消息的意外丢失,但要注意的是,消息队列如果没有接收到ack响应,该消息对了的消息就会一直阻塞,对于rabbitmq-server来说,他是没有超时存在的,即除非重启rabbitmq,否则该消息队列会一直阻塞,直到收到响应,但如果与该消息队列的subscirbe断开的话,则表明过期,即该消息队列中消息会尝试重新发消息给一个订阅者进行处理。
2.关于健壮的消息处理
当rabbitmq server重启,或意外当掉的话,所用消息的订阅都会跟着坏掉(当然也可以设置持久化的消息队列设置),解决办法是捕获ShutdownSignalException异常(对rabbitmq)出现该异常说明消息服务无法连到,故可以进行相应的处理,另每次消息发送,消息订阅之前都要进行一次消息队列,exchange,绑定的重定义,防止消息对了重启后改消息队列/exchang已消失。
组合命令
list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections 
close_connection
3.关于消息处理的管理
每个worker都应该有一个可标识的tag,尽量不使用系统生成的,这样便于以后的debug
具体代码例子:
Connection conn = null;
Channel channel = null;
QueueingConsumer consumer = null;
try {
    conn = qFactory.newConnection();
    channel = conn.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);


    /**
    * This tells RabbitMQ not to give more than one message to a
    * worker at a time. Or, in other words, don‘t dispatch a new
    * message to a worker until it has processed and acknowledged
    * the previous one. Instead, it will dispatch it to the next
    * worker that is not still busy
    * 这告诉RabbitMQ不发送多条消息到同一个worker。或者,换句话说,在它处理完成前,不要发送一个新的消息给它。而是将他发送到另一个没有消息的worker
    * 也就是 一条一条处理
    */
    channel.basicQos(1);


    consumer = new QueueingConsumer(channel);
    //Use hostname as consume tagname , So that We can monitor who consume this Queue
                 channel.basicConsume(this.queueName, false, hostName, consumer);
    } catch (IOException e) {
        e.printStackTrace();
    }


    while (true) {
        try {
            //Get next message
            delivery = consumer.nextDelivery();
        } catch (ShutdownSignalException e) {
        //If rabbitmq-server has closed , out of loop
            e.printStackTrace();
            isSignalBroken = true ;
            break;
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
            log.warn("The consumer has cancelled , Try to re-consume");
            //If the channel and conn have closed .
            try{
            //Sleep 1s and reconnect to rabbitmq-server
                Thread.sleep(1000);
                conn = qFactory.newConnection();
                channel = conn.createChannel();
                            channel.queueDeclare(queueName, false, false, false, null);
                            
                channel.basicQos(1);
                            
                consumer = new  QueueingConsumer(channel);
                        channel.basicConsume(this.queueName, false, consumer);
                continue;
            }catch (IOException e1) {
                e1.printStackTrace();
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
                        
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        
        try{     
            //process message
        }catch (Exception e) {
            //If throw exception when process message , close channel and conn , make sure this message not block . then re-work
            e.printStackTrace();
            try {
                 channel.basicCancel(hostName);
                channel.close();
                conn.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            continue;
        }     
    /**
    * If a consumer dies without sending an ack, RabbitMQ will
    * understand that a message wasn‘t processed fully and will
    * redeliver it to another consumer 
    * There aren‘t any message timeouts; 
    * RabbitMQ will redeliver the message only when
    * the worker connection dies. It‘s fine even if processing
    * a message takes a very, very long time
    */
        try {
            channel.basicAck(delivery.getEnvelope() .getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s
    if(isSignalBroken){
        log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds");
        try {
            Thread.sleep(1000*60);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.run();
    }
}




*******************************************例子*****************************************
1.发送端
package com.boonya.rabbitmq;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "1234";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
//设置使用哪个vhost 不设置为/  如果这里设置了 在接收端也需要设置,而且接收端要设置用户名密码
factory.setVirtualHost("dnt_mq");
factory.setHost("localhost");
factory.setPort(5672);
//设置用户名和密码
factory.setUsername("ayf");
factory.setPassword("ayf");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Date date = new Date();
for (int i = 0; i < 500000; i++) {
String message = "Hello world!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
}
System.out.println("1:"+date+"----2:"+new Date());
channel.close();
connection.close();
//1000个 10:36:55  10:36:36  19秒//20个      10:38:07  10:37:48  19秒//5个         10:39:19  10:39:01  18秒//1个         10:40:31  10:40:13  18秒
}
}
2.接收端
package com.boonya.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.ShutdownSignalException;
public class Receive {
private final static String QUEUE_NAME = "1234";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
//如果发送端设置了vhost 这里也要设置
factory.setVirtualHost("dnt_mq");
factory.setHost("localhost");
factory.setPort(5672);
//如果发送端设置了vhost 这里要设置用户名和密码
factory.setUsername("abc");
factory.setPassword("abc");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//设置每次取一个队列
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
}
}
}













版权声明:本文为博主原创文章,未经博主允许不得转载。

RabbitMQ学习笔记

标签:erlang   rabbitmq   java   

原文地址:http://blog.csdn.net/u010838555/article/details/47300879

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!