码迷,mamicode.com
首页 > Web开发 > 详细

使用WebSocket帮助应用程序群集节点间通信

时间:2018-03-27 20:57:31      阅读:220      评论:0      收藏:0      [点我收藏+]

标签:web   byte   end   机制   ror   binary   cluster   bsp   nbsp   

在一个标准群集场景中,节点通过一个数据包发送到协定好的多播IP地址:Port上,建立起通信。比如使用TCP插头。

【使用Servlet模拟群集场景】

 【1.连接上@ServerEndPoint】

  【节点做的事】

//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query
URI uri = new URI("ws", "localhost:8080", path, null, null);

//连接上websocket
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri);

【Server做的事】
 2     public void onOpen(Session session, @PathParam("nodeId") String nodeId)
 3     {
 8         ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster.");11             //通知所有节点 有新的节点加入  因为这是在onOpen发生的,也就是终端连接上的代表加入
12         byte[] bytes = ClusterNodeEndpoint.toByteArray(message);
13         for(Session node : ClusterNodeEndpoint.nodes)
14                 //发送ByteBuffer
15                 node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));22         ClusterNodeEndpoint.nodes.add(session);
23     }

 

【2.Servlet负责路由请求和接收消息、Server负责传递给其他节点消息】

【节点处理get请求】

 2     protected void doGet(HttpServletRequest request, HttpServletResponse response)
 3             throws ServletException, IOException
 4     {
 6         //构造Message准备发给节点
 7         ClusterMessage message = new ClusterMessage(this.nodeId,
 8                 "request:{ip:\"" + request.getRemoteAddr() +
 9                 "\",queryString:\"" + request.getQueryString() + "\"}");
10 
11         //使用序列化机制发送消息
12         try(OutputStream output = this.session.getBasicRemote().getSendStream();
13             ObjectOutputStream stream = new ObjectOutputStream(output))
14         {
15             stream.writeObject(message);
16         }
17         response.getWriter().append("OK");
18     }

【节点接收消息】

 1  @OnMessage
 2     public void onMessage(InputStream input)
 3     {
 4         try(ObjectInputStream stream = new ObjectInputStream(input))
 5         {
 6             ClusterMessage message = (ClusterMessage)stream.readObject();
 7             System.out.println("INFO (Node " + this.nodeId +
 8                     "): Message received from cluster; node = " +
 9                     message.getNodeId() + ", message = " + message.getMessage());
10         }
11         catch(IOException | ClassNotFoundException e)
12         {
13             e.printStackTrace();
14         }
15     }

【Server传递给其他节点消息】

 1 @OnMessage
 2     public void onMessage(Session session, byte[] message)
 3     {
 4         try
 5         {
 6             for(Session node : ClusterNodeEndpoint.nodes)
 7             {
 8                 //向其他节点发送消息(消息来自当前节点)
 9                 if(node != session)
11                     node.getBasicRemote().sendBinary(ByteBuffer.wrap(message));
12             }
13         }
14         catch(IOException e)
15         {
16             System.err.println("ERROR: Exception when handling message on server");
17             e.printStackTrace();
18         }
19     }

 

使用WebSocket帮助应用程序群集节点间通信

标签:web   byte   end   机制   ror   binary   cluster   bsp   nbsp   

原文地址:https://www.cnblogs.com/chenhui7373/p/8654592.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!