二、Topic(主题) (using the Java client)
为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。
"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
- * 可以匹配一个标识符。
- # 可以匹配0个或多个标识符。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。 当绑定键中不包含任何#与*时,类似direct类型转发器
四、Putting it all together(全部代码)
发送 EmitLogTopic.java:
- public class EmitLogTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv)
- throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String routingKey = getRouting(argv);
- String message = getMessage(argv);
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
- System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");
- connection.close();
- }
- }
- public class ReceiveLogsTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv)
- throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String queueName = channel.queueDeclare().getQueue();
- if (argv.length < 1){
- System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
- System.exit(1);
- }
- for(String bindingKey : argv){
- channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
- }
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
- }
- }
- }
Run the following examples, including the classpath as in Tutorial 1 - on Windows, use %CP%.
To receive all the logs:
$ java -cp $CP ReceiveLogsTopic "#"
To receive all logs from the facility "kern":
$ java -cp $CP ReceiveLogsTopic "kern.*"
Or if you want to hear only about "critical" logs:
$ java -cp $CP ReceiveLogsTopic "*.critical"
You can create multiple bindings:
$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
Have fun playing with these programs. Note that the code doesn‘t make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.
Some teasers:
- Will "*" binding catch a message sent with an empty routing key?
- Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
- How different is "a.*.#" from "a.#"?
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String routingKeyOne = "logs.error.one";
- for (int i = 0; i <= 1; i++) {
- String messageOne = "this is one error logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyOne, null, messageOne
- .getBytes());
- System.out.println(" [x] Sent ‘" + routingKeyOne + "‘:‘"
- + messageOne + "‘");
- }
- System.out.println("################################");
- String routingKeyTwo = "logs.error.two";
- for (int i = 0; i <= 2; i++) {
- String messageTwo = "this is two error logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyTwo, null, messageTwo
- .getBytes());
- System.out.println(" [x] Sent ‘" + routingKeyTwo + "‘:‘"
- + messageTwo + "‘");
- }
- System.out.println("################################");
- String routingKeyThree = "logs.info.one";
- for (int i = 0; i <= 3; i++) {
- String messageThree = "this is one info logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyThree, null,
- messageThree.getBytes());
- System.out.println(" [x] Sent ‘" + routingKeyThree + "‘:‘"
- + messageThree + "‘");
- }
- channel.close();
- connection.close();
- }
- }
‘logs.error.one‘:‘this is one error logs:0‘
- [x] Sent ‘logs.error.one‘:‘this is one error logs:1‘
- ################################
- [x] Sent ‘logs.error.two‘:‘this is two error logs:0‘
- [x] Sent ‘logs.error.two‘:‘this is two error logs:1‘
- [x] Sent ‘logs.error.two‘:‘this is two error logs:2‘
- ################################
- [x] Sent ‘logs.info.one‘:‘this is one info logs:0‘
- [x] Sent ‘logs.info.one‘:‘this is one info logs:1‘
- [x] Sent ‘logs.info.one‘:‘this is one info logs:2‘
- [x] Sent ‘logs.info.one‘:‘this is one info logs:3‘
- package com.abin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";// 定义Exchange名称
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 声明topic类型的Exchange
String queueName = "queue_topic_logs1";// 定义队列名为“queue_topic_logs1”的Queue
channel.queueDeclare(queueName, false, false, false, null);
// String routingKeyOne = "*.error.two";// "error"路由规则
// channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);// 把Queue、Exchange及路由绑定
String routingKeyTwo = "logs.*.one";//通配所有logs下第三词(最后一个)词为one的消息
channel.queueBind(queueName, EXCHANGE_NAME, routingKeyTwo);
System.out.println(" [*] Waiting for messages.");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message
+ "‘");
- [*] Waiting for messages.
[x] Received ‘logs.error.one‘:‘this is one error logs:0‘
[x] Received ‘logs.error.one‘:‘this is one error logs:1‘
[x] Received ‘logs.error.two‘:‘this is two error logs:0‘
[x] Received ‘logs.error.two‘:‘this is two error logs:1‘
[x] Received ‘logs.error.two‘:‘this is two error logs:2‘
[x] Received ‘logs.info.one‘:‘this is one info logs:0‘
[x] Received ‘logs.info.one‘:‘this is one info logs:1‘
[x] Received ‘logs.info.one‘:‘this is one info logs:2‘
[x] Received ‘logs.info.one‘:‘this is one info logs:3‘