在一个标准群集场景中,节点通过一个数据包发送到协定好的多播IP地址:Port上,建立起通信。比如使用TCP插头。
【使用Servlet模拟群集场景】
【1.连接上@ServerEndPoint】
【节点做的事】
//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query
URI uri = new URI("ws", "localhost:8080", path, null, null);
//连接上websocket
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri);
【Server做的事】
2 public void onOpen(Session session, @PathParam("nodeId") String nodeId) 3 { 8 ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster.");11 //通知所有节点 有新的节点加入 因为这是在onOpen发生的,也就是终端连接上的代表加入 12 byte[] bytes = ClusterNodeEndpoint.toByteArray(message); 13 for(Session node : ClusterNodeEndpoint.nodes) 14 //发送ByteBuffer 15 node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));22 ClusterNodeEndpoint.nodes.add(session); 23 }
【2.Servlet负责路由请求和接收消息、Server负责传递给其他节点消息】
【节点处理get请求】
2 protected void doGet(HttpServletRequest request, HttpServletResponse response) 3 throws ServletException, IOException 4 { 6 //构造Message准备发给节点 7 ClusterMessage message = new ClusterMessage(this.nodeId, 8 "request:{ip:\"" + request.getRemoteAddr() + 9 "\",queryString:\"" + request.getQueryString() + "\"}"); 10 11 //使用序列化机制发送消息 12 try(OutputStream output = this.session.getBasicRemote().getSendStream(); 13 ObjectOutputStream stream = new ObjectOutputStream(output)) 14 { 15 stream.writeObject(message); 16 } 17 response.getWriter().append("OK"); 18 }
【节点接收消息】
1 @OnMessage 2 public void onMessage(InputStream input) 3 { 4 try(ObjectInputStream stream = new ObjectInputStream(input)) 5 { 6 ClusterMessage message = (ClusterMessage)stream.readObject(); 7 System.out.println("INFO (Node " + this.nodeId + 8 "): Message received from cluster; node = " + 9 message.getNodeId() + ", message = " + message.getMessage()); 10 } 11 catch(IOException | ClassNotFoundException e) 12 { 13 e.printStackTrace(); 14 } 15 }
【Server传递给其他节点消息】
1 @OnMessage 2 public void onMessage(Session session, byte[] message) 3 { 4 try 5 { 6 for(Session node : ClusterNodeEndpoint.nodes) 7 { 8 //向其他节点发送消息(消息来自当前节点) 9 if(node != session) 11 node.getBasicRemote().sendBinary(ByteBuffer.wrap(message)); 12 } 13 } 14 catch(IOException e) 15 { 16 System.err.println("ERROR: Exception when handling message on server"); 17 e.printStackTrace(); 18 } 19 }