/**
* This tells RabbitMQ not to give more than one message to a
* worker at a time. Or, in other words, don‘t dispatch a new
* message to a worker until it has processed and acknowledged
* the previous one. Instead, it will dispatch it to the next
* worker that is not still busy
* 这告诉RabbitMQ不发送多条消息到同一个worker。或者,换句话说,在它处理完成前,不要发送一个新的消息给它。而是将他发送到另一个没有消息的worker
* 也就是 一条一条处理
*/
channel.basicQos(1);
consumer = new QueueingConsumer(channel);
//Use hostname as consume tagname , So that We can monitor who consume this Queue
channel.basicConsume(this.queueName, false, hostName, consumer);
} catch (IOException e) {
e.printStackTrace();
}
while (true) {
try {
//Get next message
delivery = consumer.nextDelivery();
} catch (ShutdownSignalException e) {
//If rabbitmq-server has closed , out of loop
e.printStackTrace();
isSignalBroken = true ;
break;
} catch (ConsumerCancelledException e) {
e.printStackTrace();
log.warn("The consumer has cancelled , Try to re-consume");
//If the channel and conn have closed .
try{
//Sleep 1s and reconnect to rabbitmq-server
Thread.sleep(1000);
conn = qFactory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
//process message
}catch (Exception e) {
//If throw exception when process message , close channel and conn , make sure this message not block . then re-work
e.printStackTrace();
try {
channel.basicCancel(hostName);
channel.close();
conn.close();
} catch (IOException e1) {
e1.printStackTrace();
}
continue;
}
/**
* If a consumer dies without sending an ack, RabbitMQ will
* understand that a message wasn‘t processed fully and will
* redeliver it to another consumer
* There aren‘t any message timeouts;
* RabbitMQ will redeliver the message only when
* the worker connection dies. It‘s fine even if processing
* a message takes a very, very long time
*/
try {
channel.basicAck(delivery.getEnvelope() .getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
//If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s
if(isSignalBroken){
log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds");
try {
Thread.sleep(1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.run();
}
}
*******************************************例子*****************************************
1.发送端
package com.boonya.rabbitmq;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "1234";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
//设置使用哪个vhost 不设置为/ 如果这里设置了 在接收端也需要设置,而且接收端要设置用户名密码
factory.setVirtualHost("dnt_mq");
factory.setHost("localhost");
factory.setPort(5672);
//设置用户名和密码
factory.setUsername("ayf");
factory.setPassword("ayf");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Date date = new Date();
for (int i = 0; i < 500000; i++) {
String message = "Hello world!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
}
System.out.println("1:"+date+"----2:"+new Date());
channel.close();
connection.close();
//1000个 10:36:55 10:36:36 19秒//20个 10:38:07 10:37:48 19秒//5个 10:39:19 10:39:01 18秒//1个 10:40:31 10:40:13 18秒
}
}
2.接收端
package com.boonya.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Receive {
private final static String QUEUE_NAME = "1234";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
//如果发送端设置了vhost 这里也要设置
factory.setVirtualHost("dnt_mq");
factory.setHost("localhost");
factory.setPort(5672);
//如果发送端设置了vhost 这里要设置用户名和密码
factory.setUsername("abc");
factory.setPassword("abc");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//设置每次取一个队列
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
}
}
}