标签:name div 策略 界面 访问 cep 示例 dir 能力
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址 connectionFactory.setHost("192.168.47.132"); //连接端口: 默认5672 connectionFactory.setPort(5672); //虚拟主机名: 默认 / connectionFactory.setVirtualHost("/yellowstreak"); //连接用户名: 默认guest connectionFactory.setUsername("yellowstreak"); //连接密码: 默认guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建频道 - 根据频道通信 Channel channel = connection.createChannel(); /** * 参数1: 队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //要发送的消息 String message = "Hello World"; /** * 参数1: 交换机exchange名称: 没有则使用默认Default Exchange * 参数2: 路由key, 简单模式下可以传递队列名称 * 参数3: 消息的其他属性 * 参数4: 参数内容, 要转换成字节数组 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息"); // 关闭资源 channel.close(); connection.close(); } }
public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setHost("192.168.47.132"); connectionFactory.setVirtualHost("/yellowstreak"); connectionFactory.setUsername("yellowstreak"); connectionFactory.setPassword("123456"); return connectionFactory.newConnection(); } } public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //创建频道 Channel channel = connection.createChannel(); //去连通生产者的队列. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,路由Key,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key (这里是队列名) System.out.println(envelope.getRoutingKey()); //消息id System.out.println(envelope.getDeliveryTag()); //收到的消息 System.out.println(new String(body, "utf-8")); } };
//监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了, mq接收到回复会删除消息, 设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.QUEUE_NAME, true, consumer); //这里不关闭资源, 应该一直监听着.. //这样只要生产者发出消息, 消费者就能收到并消费. } }
标签:name div 策略 界面 访问 cep 示例 dir 能力
原文地址:https://www.cnblogs.com/binwenhome/p/12964526.html