码迷,mamicode.com
首页 > 其他好文 > 详细

Tibco RV request/reply 的同步與非同步

时间:2015-02-07 20:16:21      阅读:197      评论:0      收藏:0      [点我收藏+]

标签:

Tibco RV 有提供 request/reply 模式,也有提供 publish/subscribe 模式,這兩種模式的用途分別是,request/reply 用在一對一的狀況下,而 publish/subscribe 則是用在一對多。雖然 request/reply 是用在一對一,Tibco RV 仍提供了同步與非同步兩種模式,在說明同步和非同步之前,先看一下 server 端的程式,如下:

  1 package idv.steven.rv;
  2 
  3 import com.tibco.tibrv.Tibrv;
  4 import com.tibco.tibrv.TibrvException;
  5 import com.tibco.tibrv.TibrvListener;
  6 import com.tibco.tibrv.TibrvMsg;
  7 import com.tibco.tibrv.TibrvMsgCallback;
  8 import com.tibco.tibrv.TibrvMsgField;
  9 import com.tibco.tibrv.TibrvRvdTransport;
 10 import com.tibco.tibrv.TibrvTransport;
 11 
 12 public class Server implements TibrvMsgCallback {
 13     private TibrvTransport transport = null;
 14     
 15     private String service = null;
 16     private String network = null;
 17     private String daemon  = null;
 18     private String subject = null;
 19     
 20     private double server_timeout = 60;
 21        
 22     public void run(String[] args) {
 23         boolean eventReceived = false;
 24         
 25         int i = loadParameters(args);
 26         if (i > args.length-1) {
 27             usage();
 28             return;
 29         }
 30         
 31         try {
 32             Tibrv.open(Tibrv.IMPL_NATIVE);
 33             
 34             transport = new TibrvRvdTransport(service,network,daemon);
 35             subject = args[args.length-1];
 36             new TibrvListener(Tibrv.defaultQueue(), this, transport, subject, null);
 37             
 38             while (!eventReceived) {
 39                 eventReceived = Tibrv.defaultQueue().timedDispatch(server_timeout);
 40                 if (eventReceived) {
 41                     System.out.println("receive a message");
 42                 }
 43                 else {
 44                     System.out.println("timeout");
 45                 }
 46             }
 47         } catch (TibrvException | InterruptedException e) {
 48             e.printStackTrace();
 49         }
 50         finally {
 51             try {
 52                 Tibrv.close();
 53             } catch (TibrvException e) {
 54                 e.printStackTrace();
 55             }
 56         }
 57     }
 58     
 59     @Override
 60     public void onMsg(TibrvListener listener, TibrvMsg msg) {
 61         String replySubject = msg.getReplySubject();
 62         if (replySubject == null) {
 63             System.out.println("no reply subject,discard client‘s request");
 64             return;
 65         }
 66         
 67         try {
 68             TibrvMsgField field = msg.getField("sendData");
 69             String sendData = (String) field.data;
 70             System.out.println("sendData: " + sendData);
 71 
 72             TibrvMsg replyMsg = new TibrvMsg();
 73             replyMsg.setSendSubject(replySubject);
 74             replyMsg.update("replyData", "Nice to meet you.");
 75             transport.send(replyMsg);
 76         } catch (TibrvException e) {
 77             e.printStackTrace();
 78         }
 79     }
 80     
 81     int loadParameters(String[] args)
 82     {
 83         int i=0;
 84         while(i < args.length-1 && args[i].startsWith("-"))
 85         {
 86             if (args[i].equals("-service"))
 87             {
 88                 service = args[i+1];
 89                 i += 2;
 90             }
 91             else
 92             if (args[i].equals("-network"))
 93             {
 94                 network = args[i+1];
 95                 i += 2;
 96             }
 97             else
 98             if (args[i].equals("-daemon"))
 99             {
100                 daemon = args[i+1];
101                 i += 2;
102             }
103             else
104                 usage();
105         }
106         return i;
107     }
108     
109     void usage()
110     {
111         System.err.println("Usage: java idv.steven.rv.Server [-service service] [-network network]");
112         System.err.println("            [-daemon daemon] <subject>");
113         System.exit(-1);
114     }
115 
116     public static void main(String[] args) {
117         new Server().run(args);
118     }
119 }

要執行上面的程式,可於命令列下如下指令:

java idv.steven.rv.Server -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV

這指令指出,UDP 的廣播網址是 225.1.1.1,client 端要連線到 daemon 的話,使用 TCP 7500 port,傾聽的 subject 是 TEST.RV,所以,client 端送出的訊息的 subject 也要是 TEST.RV。關於 service、network、daemon 的詳細說明可以參考 Oracle 官網上的說明 http://docs.oracle.com/cd/E21455_01/common/tutorials/connector_rendezvous_daemon.html

 

server 的程式說明如下:

  1. Line 12: 這個類別實作了 TibcoMsgCallback 介面,當收到訊息時,RV 會呼叫 onMsg method。
  2. Line 39: 訊息分派機制,如果沒有寫這一行,RV 的訊息沒辦法分派,上面是呼叫 timedDispatch(timeout),也就是每隔 timeout 的秒數,就會離開這個 method,另一個用法是使用 dispatch(),這個方法就不會 timeout,會一直停在那一行直到有訊息時進行分派。
  3. Line 73: 將 client 指定的 reply subject 設定給 send subject,這樣 client 端才能收到訊息。
  • 同步

接下來,我們先看一下當 client 端選擇同步的模式時,程式要怎麼寫。

 1 package idv.steven.rv;
 2 
 3 import com.tibco.tibrv.Tibrv;
 4 import com.tibco.tibrv.TibrvException;
 5 import com.tibco.tibrv.TibrvMsg;
 6 import com.tibco.tibrv.TibrvRvdTransport;
 7 import com.tibco.tibrv.TibrvTransport;
 8 
 9 public class SyncClient {
10     private String service = null;
11     private String network = null;
12     private String daemon  = null;
13     
14     private TibrvTransport transport = null;
15     private double timeout = 5; //second
16     
17     public void run(String[] args) {
18         int i = loadParameters(args);
19         if (i > args.length-2) {
20             usage();
21             return;
22         }
23         
24         try {
25             Tibrv.open(Tibrv.IMPL_NATIVE);
26             
27             transport = new TibrvRvdTransport(service,network,daemon);
28             
29             String subject = args[args.length-2];
30             String sendData = args[args.length-1];
31             
32             TibrvMsg msg = new TibrvMsg();
33             msg.setSendSubject(subject);
34             msg.update("sendData", sendData);
35             
36             TibrvMsg replyMsg = null;
37             replyMsg = transport.sendRequest(msg, timeout);
38             
39             if (replyMsg == null)
40                 System.out.println("request time-out");
41             else 
42                 System.out.println("Receive reply msg:" + replyMsg);
43         } catch (TibrvException e) {
44             e.printStackTrace();
45         }
46         finally {
47             try {
48                 Tibrv.close();
49             } catch (TibrvException e) {
50                 e.printStackTrace();
51             }
52         }
53     }
54     
55     int loadParameters(String[] args)
56     {
57         int i=0;
58         while(i < args.length-1 && args[i].startsWith("-"))
59         {
60             if (args[i].equals("-service"))
61             {
62                 service = args[i+1];
63                 i += 2;
64             }
65             else
66             if (args[i].equals("-network"))
67             {
68                 network = args[i+1];
69                 i += 2;
70             }
71             else
72             if (args[i].equals("-daemon"))
73             {
74                 daemon = args[i+1];
75                 i += 2;
76             }
77             else
78                 usage();
79         }
80         return i;
81     }
82     
83     void usage()
84     {
85         System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
86         System.err.println("            [-daemon daemon] <subject> <messages>");
87         System.exit(-1);
88     }
89 
90     public static void main(String[] args) {
91         new SyncClient().run(args);
92     }
93 
94 }

要執行上面的程式,可於命令列下如下指令:

java idv.steven.rv.SyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello

這裡只是簡單的傳送一個 Hello 訊息給 server,server 收到後會透過 System.out.println 印出來,接下說說明一下上面的程式:

  1. Line 37: 送訊息給 server 時,使用 sendRquest,並等待 server 回覆訊息,等待的時間是 timeout 指定的秒數。
  2. Line 48: 程式結束前,記得關閉 Tibco 回收資源,否則程式無法正常結束。
  • 非同步

非同步的 client 端寫法很類似 server 端,程式如下:

  1 package idv.steven.rv;
  2 
  3 import com.tibco.tibrv.Tibrv;
  4 import com.tibco.tibrv.TibrvException;
  5 import com.tibco.tibrv.TibrvListener;
  6 import com.tibco.tibrv.TibrvMsg;
  7 import com.tibco.tibrv.TibrvMsgCallback;
  8 import com.tibco.tibrv.TibrvMsgField;
  9 import com.tibco.tibrv.TibrvRvdTransport;
 10 import com.tibco.tibrv.TibrvTransport;
 11 
 12 public class AsyncClient implements TibrvMsgCallback {
 13     private String service = null;
 14     private String network = null;
 15     private String daemon  = null;
 16     
 17     private TibrvTransport transport = null;
 18     private boolean running = true;
 19     
 20     public void run(String[] args) {
 21         int i = loadParameters(args);
 22         if (i > args.length-2) {
 23             usage();
 24             return;
 25         }
 26         
 27         try {
 28             Tibrv.open(Tibrv.IMPL_NATIVE);
 29             
 30             transport = new TibrvRvdTransport(service,network,daemon);
 31             
 32             String subject = args[args.length-2];
 33             String sendData = args[args.length-1];
 34             String replySubject = transport.createInbox();
 35             new TibrvListener(Tibrv.defaultQueue(), this, transport, replySubject, null);
 36             
 37             TibrvMsg msg = new TibrvMsg();
 38             msg.setSendSubject(subject);
 39             msg.setReplySubject(replySubject);
 40             msg.update("sendData", sendData);
 41             transport.send(msg);
 42             
 43             while (running) {
 44                 Tibrv.defaultQueue().dispatch();
 45             }
 46         } catch (TibrvException | InterruptedException e) {
 47             e.printStackTrace();
 48         }
 49         finally {
 50             try {
 51                 Tibrv.close();
 52             } catch (TibrvException e) {
 53                 e.printStackTrace();
 54             }
 55         }
 56     }
 57     
 58     @Override
 59     public void onMsg(TibrvListener listener, TibrvMsg msg) {
 60         System.out.println("subject: " + listener.getSubject());
 61         
 62         try {
 63             TibrvMsgField field;
 64             field = msg.getField("replyData");
 65             String replyData = (String) field.data;
 66             System.out.println("replyData: " + replyData);
 67         } catch (TibrvException e) {
 68             e.printStackTrace();
 69         }
 70         
 71         running = false;
 72     }
 73     
 74     int loadParameters(String[] args)
 75     {
 76         int i=0;
 77         while(i < args.length-1 && args[i].startsWith("-"))
 78         {
 79             if (args[i].equals("-service"))
 80             {
 81                 service = args[i+1];
 82                 i += 2;
 83             }
 84             else
 85             if (args[i].equals("-network"))
 86             {
 87                 network = args[i+1];
 88                 i += 2;
 89             }
 90             else
 91             if (args[i].equals("-daemon"))
 92             {
 93                 daemon = args[i+1];
 94                 i += 2;
 95             }
 96             else
 97                 usage();
 98         }
 99         return i;
100     }
101     
102     void usage()
103     {
104         System.err.println("Usage: java idv.steven.rv.Client [-service service] [-network network]");
105         System.err.println("            [-daemon daemon] <subject> <messages>");
106         System.exit(-1);
107     }
108 
109     public static void main(String[] args) {
110         new AsyncClient().run(args);
111     }
112 }

要執行的話,在命令列打入如下指令:

java idv.steven.rv.AsyncClient -service 7500 -network ;225.1.1.1 -daemon tcp:7500 TEST.RV Hello

程式說明如下:

  1. Line 12: 實作 TibrvMsgCallback,當收到訊息時 RV 會呼叫 onMsg。
  2. Line 34: 產生一個 InBox,這樣的話,當送出訊息時,RV 會讓 client 和 server 直接連線,也就是說,Line 35 看起來 client 端是透過傾聽 inbox 這個 subject 來接收訊息,似乎別的程式如果知道這個 inbox 字串的話,也可以傾聽相同的 subject 得到訊息內容,事實上是沒辦法的! 採用 InBox 的模式,RV 會將 server 的訊息直接送給 client 端,而不是用 UDP 群播的方式。
  3. Line 41: 送出訊息使用的是 send,和同步的方式不同!
  4. Line 44: 因為是非同步,需要等待 RV 將訊息回送,也要有訊息的分派機制。

Tibco RV request/reply 的同步與非同步

标签:

原文地址:http://www.cnblogs.com/stevwn/p/4279198.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!