标签:订阅 通配 代码 exce 主题模式 队列模式 业务 背景 oid
?志采集系统 ELK
发送端
public class Send { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String error = "我是订单服务的error日志"; String info = "我是订单服务的info日志"; String debug = "我是订单服务的debug日志"; channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8)); System.out.println("direct消息发送成功"); } } }
发送端1,注意和下面的发送端2不一样
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, direct交换机需要指定routingkey channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey"); channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey"); channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body=" + new String(body, "utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName, false, consumer); } }
发送端2
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, direct交换机需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
注意
测试,下?的匹配规则是怎样的
quick.orange.rabbit 只会匹配 *.orange.* 和*.*.rabbit ,进到Q1和Q2 lazy.orange.elephant 只会匹配 *.orange.* 和lazy.#,进到Q1和Q2 quick.orange.fox 只会匹配 *.orange.*,进入Q1 lazy.brown.fox 只会匹配azy.#,进入Q2 lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次) quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理 lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2
例?:?志采集系统
?个队列收集订单系统的error?志信息,order.log.error
?个队列收集全部系统的全部级别?志信息, * .log. *
生产端
public class Send { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,topic交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "我是订单服务的error日志"; String info = "我是订单服务的info日志"; String debug = "我是商品服务的debug日志"; channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8)); System.out.println("TOPIC消息发送成功"); } } }
消费端1
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机, channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, 需要指定routingkey channel.queueBind(queueName, EXCHANGE_NAME, "order.log.error"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body=" + new String(body, "utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName, false, consumer); } }
消费端2
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机, channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, 需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
简单模式
?作队列模式
发布订阅模式
路由模式
通配符模式
标签:订阅 通配 代码 exce 主题模式 队列模式 业务 背景 oid
原文地址:https://www.cnblogs.com/jwen1994/p/14362584.html