码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ系列之二:work queue

时间:2014-10-21 19:29:19      阅读:288      评论:0      收藏:0      [点我收藏+]

标签:style   blog   color   io   os   ar   使用   java   for   

server端代码:

 1 package com.example.workqueue;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.MessageProperties;
 9 
10 public class Send {
11 
12     public static void main(String[] args) throws IOException {
13         
14         // 队列名称
15         String queueName = "task_queue";
16         
17         ConnectionFactory factory = new ConnectionFactory();
18         
19         //远程服务器ip,如果在本地测试可以改成localhost
20         factory.setHost("121.40.151.120");
21         
22         //不是在本地测试,用户名和密码必填
23         factory.setUsername("rabbitmqname");
24         factory.setPassword("rabbitmqpwd");
25         
26         Connection conn = factory.newConnection();
27         Channel channel = conn.createChannel();
28         
29         boolean durable = true;  
30         
31         /** 
32          * 参数说明:
33          * queue:队列名称
34          * durable:队列数据是否可以持久化,true:是,false:否。也就是服务重启后队列数据是否依然存在
35          * exclusive:是否为某一个队列的专用连接
36          * autoDelete:当队列不再被使用也就是没有消费者的时候是否自动删除
37          * arguments:其它参数,比如队列存活时间
38         */
39         channel.queueDeclare(queueName, durable, false, false, null);
40         
41         String[] strs = new String[] { "First message." }; 
42         String message = getMessage(strs);  
43         
44         /** 
45          * 参数说明:
46          * exchange:默认的exchange就是"",是direct类型的,
47          *             任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。
48          * routingKey:指定接收消息的队列
49          * props:其它属性,比如消息路由头信息,持久化信息
50          * body:消息内容
51         */  
52         channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
53         
54         System.out.println("[" + message + "]");  
55           
56         // 最后,我们关闭channel和连接,释放资源。  
57         channel.close();  
58         conn.close(); 
59     }
60     
61     private static String getMessage(String[] strings) {  
62         if (strings.length < 1) {  
63             return "Hello World!";  
64         }  
65         return joinStrings(strings, " ");  
66     }  
67   
68     private static String joinStrings(String[] strings, String delimiter) {  
69         int length = strings.length;  
70         if (length == 0) {  
71             return "";  
72         }  
73         StringBuilder words = new StringBuilder(strings[0]);  
74         for (int i = 1; i < length; i++) {  
75             words.append(delimiter).append(strings[i]);  
76         }  
77         return words.toString();  
78     } 
79 
80 }

 

client端代码:

 1 package com.example.workqueue;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.ConsumerCancelledException;
 9 import com.rabbitmq.client.QueueingConsumer;
10 import com.rabbitmq.client.ShutdownSignalException;
11 
12 public class Recv {
13 
14     public static void main(String[] args) throws IOException, ShutdownSignalException, 
15         ConsumerCancelledException, InterruptedException {
16         
17         // 队列名称
18         String queueName = "task_queue";
19         
20         ConnectionFactory factory = new ConnectionFactory(); 
21         
22         factory.setHost("121.40.151.120");
23         factory.setUsername("rabbitmqname");
24         factory.setPassword("rabbitmqpwd");
25         
26         Connection connection = factory.newConnection();  
27         Channel channel = connection.createChannel();  
28         
29         // 表示在同一时间不要给一个Rev一个以上的消息(只能是一个),也就是说不要将一个新的消息分发给Rev直到它处理完了并且返回了前一个消息的通知标志(acknowledged)
30         channel.basicQos(1);
31           
32         //与服务端一致
33         channel.queueDeclare(queueName, true, false, false, null);  
34         
35         System.out.println("CRTL+C");  
36           
37         // QueueingConsumer:用来缓存服务端推送给我们的消息。  
38         QueueingConsumer consumer = new QueueingConsumer(channel);  
39         
40         boolean autoAck = false;
41         /** 
42          * 参数说明:
43          * queue:队列名称
44          * autoAck:是否自动应答,true:消息一旦被消费者消费,服务端就知道该消息已经投递,从而从队列中将消息剔除;
45          *                      false:需要在消费端显示调用channel.basicAck()方法通知服务端,如果没用显示调用,消息将进入
46          *                            unacknowledged状态,当前消费者连接断开后该消息变成ready状态重新进入队列。
47          * callback:具体消费者类
48         */
49         channel.basicConsume(queueName, autoAck, consumer);  
50           
51         while (true) {  
52             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
53             String message = new String(delivery.getBody());  
54             System.out.println("[" + message + "]");  
55             doWork(message);
56             System.out.println("r[done]");
57             
58             /**
59              * 显示调用通知服务端该消息已经消费并返回了acknowledged
60              * true:通知所有相同tag的untracked,false:只通知当前一个
61              */
62             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
63         } 
64     }
65 
66     private static void doWork(String message) throws InterruptedException {
67         for (char ch : message.toCharArray()) {  
68             if (ch == ‘.‘) {  
69                 Thread.sleep(1000);  
70             }  
71         }
72     }
73 
74 }

 

RabbitMQ系列之二:work queue

标签:style   blog   color   io   os   ar   使用   java   for   

原文地址:http://www.cnblogs.com/weishaohua/p/4041094.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!