让我们回顾一下,在上几章里都讲了什么?总结如下:
事先,先普及一下图标(我们会在下面的事例中,会大量用到,所以先普及一下,便于识别,最终更好理解事例的含义)
① producting(生产者):在程序中 发送消息的一端,我们暂且称之为 生产者,在这里用“p”表示
② queue(队列):队列是一个邮箱的名字。它住在RabbitMQ。尽管消息流经RabbitMQ和您的应用程序,他们只可以存储在一个队列中。队列是不受任何限制,它可以储存尽可能多的信息(按你兴趣来了),它本质上是一个无限缓冲区。许多生产商可以发送消息到一个队列,许多消费者可以尝试接收数据从一个队列。
③ consuming(消费者):消费者和生产者是对应的,较为相似的意思;在这里,我用“C”表示
我们需要下载(Download) client library package,并要核实每个jar包,解压到相应位置,如下图所示:
第一步:点击 http://www.rabbitmq.com/java-client.html,然后找到相应的lib下载位置
第二步:选择合适的下载,比如我下载了zip包,如图所示:
第三步:Unzip it(解压它) 到你的working directory(工作目录)中 and grab (并且获得)你的jar包文件
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
onnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘");
channel.close(); connection.close();
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer;
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ‘" + message + "‘"); }
QueueingConsumer.nextDelivery()块,直到另一个来自服务器的消息交付。
下面是Recv.java 源代码:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }待续....
柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)
原文地址:http://blog.csdn.net/sun305355024sun/article/details/41925469