标签:存在 style queue mave str 简介 点击 ati 完成
Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,我们先不考虑那么多
在IDE中新建一个Maven项目,并在pom.xml中贴入如下依赖,RabbitMQ的最新版本依赖可以在这里找到
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MessageProducer { private Logger logger = LoggerFactory.getLogger(MessageSender.class); //声明一个队列名字 private final static String QUEUE_NAME = "hello"; public boolean sendMessage(String message){ //new一个RabbitMQ的连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置需要连接的RabbitMQ地址,这里指向本机 factory.setHost("127.0.0.1"); Connection connection = null; Channel channel = null; try { //尝试获取一个连接 connection = factory.newConnection(); //尝试创建一个channel channel = connection.createChannel(); //队列名,是否持久化,是否是排他性队列,是否自动删除,其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); logger.info("Sent ‘" + message + "‘"); //关闭channel和连接 channel.close(); connection.close(); } catch (IOException | TimeoutException e) { //失败后记录日志,返回false,代表发送失败 logger.error("send message faild!",e); return false; } return true; } }
package com.liyang.ticktock.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageConsumer { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); public boolean consume(String queueName){ //连接RabbitMQ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //这里声明queue是为了取消息的时候,queue肯定会存在 //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue channel.queueDeclare(queueName, false, false, false, null); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); logger.info("Received ‘" + message + "‘"); } }; //上面是声明消费者,这里用声明的消费者消费掉队列中的消息 channel.basicConsume(queueName, true, consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (IOException | TimeoutException e) { //失败后记录日志,返回false,代表消费失败 logger.error("send message faild!",e); return false; } return true; } }
RabbitMQ中,有4种类型的Exchange
opic与direct的重要区别就是,它有两个关键字
转载自:http://www.cnblogs.com/4----/p/6518801.html
标签:存在 style queue mave str 简介 点击 ati 完成
原文地址:https://www.cnblogs.com/mingyao123/p/10072529.html