码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitMQ学习笔记(七) RPC 远程过程调用

时间:2016-07-19 15:30:16      阅读:196      评论:0      收藏:0      [点我收藏+]

标签:

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务。

其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是  

客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

示例:

 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.QueueingConsumer.Delivery;
12 import com.rabbitmq.client.ShutdownSignalException;
13 
14 public class RPCServer {
15     
16     public static final String RPC_QUEUE_NAME = "rpc_queue";
17     
18     public static String sayHello(String name){
19         return "hello " + name ;
20     }
21     
22     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
23         
24         ConnectionFactory connFac = new ConnectionFactory() ;
25         connFac.setHost("localhost");
26         
27         Connection conn = connFac.newConnection() ;
28         
29         Channel channel = conn.createChannel() ;
30         
31         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
32         
33         QueueingConsumer consumer = new QueueingConsumer(channel);
34         
35         channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
36         
37         while(true){
38             System.out.println("服务端等待接收消息..");  
39             Delivery deliver = consumer.nextDelivery() ;
40             System.out.println("服务端成功收到消息..");
41             BasicProperties props =  deliver.getProperties() ;
42             
43             String message = new String(deliver.getBody() , "UTF-8") ;
44             
45             String responseMessage = sayHello(message) ;
46             
47             BasicProperties responseProps = new BasicProperties.Builder()
48             .correlationId(props.getCorrelationId())  
49             .build() ;
50             
51             //将结果返回到客户端Queue
52             channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
53              
54             //向客户端确认消息
55             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
56             System.out.println("服务端返回消息完成..");
57         }
58         
59     }
60 
61 }
 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 import java.util.UUID;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 import com.rabbitmq.client.QueueingConsumer.Delivery;
13 import com.rabbitmq.client.ShutdownSignalException;
14 
15 public class RPCClient {
16 
17     public static final String RPC_QUEUE_NAME = "rpc_queue";
18 
19     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
20 
21         ConnectionFactory connFac = new ConnectionFactory() ;
22         connFac.setHost("localhost");
23         Connection conn = connFac.newConnection() ;
24         Channel channel = conn.createChannel() ;
25 
26         //响应QueueName ,服务端将会把要返回的信息发送到该Queue
27         String responseQueue = channel.queueDeclare().getQueue() ;
28 
29         String correlationId = UUID.randomUUID().toString() ;
30 
31         BasicProperties props = new BasicProperties.Builder()
32         .replyTo(responseQueue)
33         .correlationId(correlationId)
34         .build();
35 
36         String message = "is_zhoufeng";
37         channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
38 
39         QueueingConsumer consumer = new QueueingConsumer(channel)    ;
40 
41         channel.basicConsume( responseQueue , consumer) ;
42 
43         while(true){
44             
45             Delivery delivery = consumer.nextDelivery() ;
46             
47             if(delivery.getProperties().getCorrelationId().equals(correlationId)){
48                 String result = new String(delivery.getBody()) ;  
49                 System.out.println(result);
50             }
51             
52         }
53     }
54 
55 }

 

rabbitMQ学习笔记(七) RPC 远程过程调用

标签:

原文地址:http://www.cnblogs.com/jianliang-Wu/p/5684893.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!