标签:rabbitmq 工作 内存 activemq 解决方案
让我们回顾一下,在上几章里都讲了什么?总结如下:
在第上一个教程中我们写程序从一个命名队列发送和接收消息。在这一次我们将创建一个工作队列,将用于分发耗时的任务在多个工作者(worker)之间。 背后的主要思想工作队列(又名:任务队列)是为了避免立即做一个资源密集型任务,不得不等待它完成。相反,我们安排的任务要做。我们封装任务作为消息并将其发送到一个队列。工作进程在后台运行将流行的任务和最终执行的工作。当您运行许多worker的任务将在他们之间共享。这个概念是特别有用的web应用程序中处理复杂的任务是不可能在一个短的HTTP请求窗口。 为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘");
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
我们的老的Recv.java程序也需要一些变化:
while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ‘" + message + "‘"); doWork(message); System.out.println(" [x] Done"); }
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ‘.‘) Thread.sleep(1000); } }编译它们 (工作目录中的jar文件):
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received ‘First message.‘ [x] Received ‘Third message...‘ [x] Received ‘Fifth message.....‘
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received ‘Second message..‘ [x] Received ‘Fourth message....‘默认情况下,RabbitMQ将每个消息发送到下一个消费者,在序列。平均每个消费者将获得相同数量的信息。这种方式称为循环分配消息。试试这三个或更多的worker。
QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
消息响应
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
int prefetchCount = 1; channel.basicQos(prefetchCount);
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
channel.close();
connection.close();
}
//...
}
And our Worker.java:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
doWork(message);
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
</pre><pre code_snippet_id="552030" snippet_file_name="blog_20141214_2_677810" name="code" class="java">
</pre><pre code_snippet_id="552030" snippet_file_name="blog_20141214_3_2789828" name="code" class="java">
</pre><pre code_snippet_id="552030" snippet_file_name="blog_20141214_3_2789828" name="code" class="java">import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)
标签:rabbitmq 工作 内存 activemq 解决方案
原文地址:http://blog.csdn.net/sun305355024sun/article/details/41927209