标签:不同 logging waiting please logs conf 监听 oid 用户名
RabbitMQ的简单使用
安装步骤可以参考该网址进行安装,此处不再赘述
创建pom工程,并引入下面依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
生产者代码
首先定义一个链接对象
public class ConnectionUtil {
public static Connection getConnection(String host, int port, String vHost, String userName, String passWord) throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务器地址
connectionFactory.setHost(host);
//设置端口号
connectionFactory.setPort(port);
//设置主机,用户名,密码
connectionFactory.setVirtualHost(vHost);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(passWord);
//返回连接
return connectionFactory.newConnection();
}
}
生产者代码
public class ProducerMessage {
private final static String QUEUE_NAME = "MQ";
public static void main(String[] args) throws Exception {
//1、获取连接
Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
//2、声明通道
Channel channel = connection.createChannel();
//3、创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、定义消息内容
String message = "hello MQ";
//发布消息
for (int i = 0; i < 100; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
System.out.println("[x] Sent'" + message+i + "'");
}
//6、关闭通道和连接
channel.close();
connection.close();
}
}
消费者代码
由于本教程是参考其他人的博客进行,且由于MQ的版本不同,所以copy了一段消费者队列代码,如下:
QueueingConsumer需要实现DefaultConsumer
public class QueueingConsumer extends DefaultConsumer {
//定义一个队列
private final BlockingQueue<QueueingConsumer.Delivery> _queue;
// When this is non-null the queue is in shutdown mode and nextDelivery should
// throw a shutdown signal exception.
private volatile ShutdownSignalException _shutdown;
private volatile ConsumerCancelledException _cancelled;
// Marker object used to signal the queue is in shutdown mode.
// It is only there to wake up consumers. The canonical representation
// of shutting down is the presence of _shutdown.
// Invariant: This is never on _queue unless _shutdown != null.
private static final QueueingConsumer.Delivery POISON = new QueueingConsumer.Delivery(null, null, null);
public QueueingConsumer(Channel ch) {
this(ch, new LinkedBlockingQueue<QueueingConsumer.Delivery>());
}
public QueueingConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) {
super(ch);
this._queue = q;
}
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
_shutdown = sig;
_queue.add(POISON);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
_cancelled = new ConsumerCancelledException();
_queue.add(POISON);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
checkShutdown();
this._queue.add(new QueueingConsumer.Delivery(envelope, properties, body));
}
/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
*/
public static class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
_envelope = envelope;
_properties = properties;
_body = body;
}
/**
* Retrieve the message envelope.
*
* @return the message envelope
*/
public Envelope getEnvelope() {
return _envelope;
}
/**
* Retrieve the message properties.
*
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return _properties;
}
/**
* Retrieve the message body.
*
* @return the message body
*/
public byte[] getBody() {
return _body;
}
}
/**
* Check if we are in shutdown mode and if so throw an exception.
*/
private void checkShutdown() {
if (_shutdown != null)
throw Utility.fixStackTrace(_shutdown);
}
/**
* If delivery is not POISON nor null, return it.
* <p/>
* If delivery, _shutdown and _cancelled are all null, return null.
* <p/>
* If delivery is POISON re-insert POISON into the queue and
* throw an exception if POISONed for no reason.
* <p/>
* Otherwise, if we are in shutdown mode or cancelled,
* throw a corresponding exception.
*/
private QueueingConsumer.Delivery handle(QueueingConsumer.Delivery delivery) {
if (delivery == POISON ||
delivery == null && (_shutdown != null || _cancelled != null)) {
if (delivery == POISON) {
_queue.add(POISON);
if (_shutdown == null && _cancelled == null) {
throw new IllegalStateException(
"POISON in queue, but null _shutdown and null _cancelled. " +
"This should never happen, please report as a BUG");
}
}
if (null != _shutdown)
throw Utility.fixStackTrace(_shutdown);
if (null != _cancelled)
throw Utility.fixStackTrace(_cancelled);
}
return delivery;
}
/**
* 等待消息投递,并将消息返回
*
* @return the next message
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public QueueingConsumer.Delivery nextDelivery()
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return handle(_queue.take());
}
/**
* Main application-side API: wait for the next message delivery and return it.
*
* @param timeout timeout in millisecond
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public QueueingConsumer.Delivery nextDelivery(long timeout)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
}
}
消费者代码
public class ConsumerMessage {
private final static String QUEUE_NAME = "MQ";
//consumer.n
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}public static void main(String[] args) throws Exception {
//1、获取连接
Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、定义队列的消费者
//Consumer consumer=new DefaultConsumer(channel);
QueueingConsumer consumer = new QueueingConsumer(channel);
//5、监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
//6、获取消息
while (true) {
//consumer.n
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
上面完成消费者和生产者代码,运行即可
应用启动类:
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
生产者代码:
public class Tut1Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
/**
* 使用RabbitTemplate向队列中定时发送消息
*/
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
消费者代码:
@RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,
@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。
//表示监听一个hello队列
@RabbitListener(queues = "hello")
public class Tut1Receiver {
@RabbitHandler
public void receive(String message) {
System.out.println(" [x] Received '" + message + "'");
}
}
配置类:
@Configuration
public class Tut1Config {
//声明一个队列
@Bean
public Queue hello() {
return new Queue("hello");
}
@Bean
public Tut1Receiver receiver() {
//实例化消费者
return new Tut1Receiver();
}
//@Profile("sender")
@Bean
public Tut1Sender sender() {
//实例化生产者
return new Tut1Sender();
}
}
application.yml
#spring:
# profiles:
# active: usage_message
logging:
level:
org: ERROR
tutorial:
client:
duration: 10000
spring:
rabbitmq:
host: localhost
username: guest
password: guest
其实使用springboot还是比较简单的,这个主要是因为springboot对RabbitMQ的底层做了很多的封装,隐藏和很多的细节。如果想全面的了解RabbitMQ最好还是到官网上面查看资料,虽然都是英文资料,读起来比较生涩难懂
标签:不同 logging waiting please logs conf 监听 oid 用户名
原文地址:https://www.cnblogs.com/haizhilangzi/p/12301686.html