【序列化message传输方式】两种方式都是转成二进制。
1.使用Java序列化器,ObjectXXXputStream
2.使用ByteBuffer.wrap(bytes)。
在一个标准群集场景中,节点通过一个数据包发送到协定好的多播IP地址:Port上,建立起通信。比如使用TCP插头。
【使用Servlet模拟群集场景】
【1.连接上@ServerEndPoint】
【节点做的事】
//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query
URI uri = new URI("ws", "localhost:8080", path, null, null); //连接上websocket
this.session = ContainerProvider.getWebSocketContainer()
.connectToServer(this, uri); 【Server做的事】
public void onOpen(Session session, @PathParam("nodeId") String nodeId)
{
ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster."); //通知所有节点 有新的节点加入 因为这是在onOpen发生的,也就是终端连接上的代表加入
byte[] bytes = ClusterNodeEndpoint.toByteArray(message);
for(Session node : ClusterNodeEndpoint.nodes)
//发送ByteBuffer 因为ClusterMessage实现了序列化,想要在websocket上传送序列化数据,必须做成二进制。
node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes)); ClusterNodeEndpoint.nodes.add(session);
}
【2.Servlet负责路由请求和接收消息、Server负责传递给其他节点消息】
【节点处理get请求】
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
//构造Message准备发给节点
ClusterMessage message = new ClusterMessage(this.nodeId,
"request:{ip:\"" + request.getRemoteAddr() +
"\",queryString:\"" + request.getQueryString() + "\"}"); //使用序列化机制发送消息
try(OutputStream output = this.session.getBasicRemote().getSendStream();
ObjectOutputStream stream = new ObjectOutputStream(output))
{
stream.writeObject(message);
}
response.getWriter().append("OK");
}
【节点接收消息】
@OnMessage
public void onMessage(InputStream input)
{
try(ObjectInputStream stream = new ObjectInputStream(input))
{
ClusterMessage message = (ClusterMessage)stream.readObject();
System.out.println("INFO (Node " + this.nodeId +
"): Message received from cluster; node = " +
message.getNodeId() + ", message = " + message.getMessage());
}
catch(IOException | ClassNotFoundException e)
{
e.printStackTrace();
}
}
【Server传递给其他节点消息】
@OnMessage
public void onMessage(Session session, byte[] message)
{
try
{
for(Session node : ClusterNodeEndpoint.nodes)
{
//向其他节点发送消息(消息来自当前节点)
if(node != session)
node.getBasicRemote().sendBinary(ByteBuffer.wrap(message));
}
}
catch(IOException e)
{
System.err.println("ERROR: Exception when handling message on server");
e.printStackTrace();
}
}