标签:打开 lan tin env bytes 重启 prope ann head
/**
* 声明转发器和类型 可用的转发器类型Direct Topic Headers Fanout
* Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
* Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
* 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
* Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
* 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
*/
channel.exchangeDeclare(ex_log, "fanout");
/**
* 指定队列
* queue: 队列名称
* durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
* 保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
* exclusive:是否排外的,有两个作用,
* 1:当连接关闭时connection.close()该队列是否会自动删除;
* 2:该队列是否是私有的private,
* 如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,
* 如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,
* 如果强制访问会报异常,一般等于true的话用于一个队列只能有一个消费者来消费的场景
* autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,
* 可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
* arguments:队列中的消息什么时候会自动被删除?
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Send
/**
* exchange:转发器
* routingKey:指定routingKey
* props:消息为持久化 —— MessageProperties.PERSISTENT_TEXT_PLAIN
* body:msg字节
*/
channel.basicPublish(ex_log, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
Receive
/**
* 应答机制
* ack= true: Round-robin 转发 消费者被杀死,消息会丢失
* ack=false:消息应答 ,为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。
* 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。
* 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。
* 通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。
* 消费者需要耗费特别特别长的时间是允许的。
*/
boolean ack = false ; //打开应答机制
channel.basicConsume(QUEUE_NAME, ack,consumer);
/**
* 公平转发,设置最大服务转发消息数量,只有在消费者空闲的时候会发送下一条信息,同一时间每次发给一个消息给一个worker。
* 解决:一个生产者与多个消费者时,避免RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。
*/
Integer prefetchCount = 1;
channel.basicQos(prefetchCount);
channel.basicConsume(QUEUE_NAME, ack,consumer);
/**
* 发送应答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
标签:打开 lan tin env bytes 重启 prope ann head
原文地址:https://www.cnblogs.com/yifanSJ/p/8986581.html