1. 消息定长。例如100字节。发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。
2. 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议,发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。
3. 将消息分为消息头和消息尾。可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。
4. 其它复杂的协议,如RTMP协议等。
public class XNettyServer { public static void main(String[] args) throws Exception { // 1、 线程定义 // accept 处理连接的线程池 EventLoopGroup acceptGroup = new NioEventLoopGroup(); // read io 处理数据的线程池 EventLoopGroup readGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptGroup, readGroup); // 2、 选择TCP协议,NIO的实现方式 b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 3、 职责链定义(请求收到后怎么处理) ChannelPipeline pipeline = ch.pipeline(); // TODO 3.1 增加解码器 // pipeline.addLast(new XDecoder()); // TODO 3.2 打印出内容 handdler pipeline.addLast(new XHandller()); } }); // 4、 绑定端口 System.out.println("启动成功,端口 9999"); b.bind(9999).sync().channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); readGroup.shutdownGracefully(); } } }
// 编解码一定是根据协议~如http public class XDecoder extends ByteToMessageDecoder { static final int PACKET_SIZE = 220; // 每次请求数据大小是220,我们自己定义的协议 // 用来临时保留没有处理过的请求报文,如只传过来了110个字节,先存着 ByteBuf tempMsg = Unpooled.buffer(); // in输入 --- 处理 --- out 输出 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes()); // in 请求的数据 // out 将粘在一起的报文拆分后的结果保留起来 // 1、 合并报文 ByteBuf message = null; int tmpMsgSize = tempMsg.readableBytes(); // 如果暂存有上一次余下的请求报文,则合并 if (tmpMsgSize > 0) { message = Unpooled.buffer(); message.writeBytes(tempMsg); message.writeBytes(in); System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes()); } else { message = in; } // 2、 拆分报文 // 这个场景下,一个请求固定长度为3,可以根据长度来拆分 // i+1 i+1 i+1 i+1 i+1 // 不固定长度,需要应用层协议来约定 如何计算长度 // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并 // dubbo rpc协议 = header(16) + body(不固定) // header最后四个字节来标识body // 长度 = 16 + body长度 // 0xda, 0xbb 魔数 int size = message.readableBytes(); int counter = size / PACKET_SIZE; for (int i = 0; i < counter; i++) { byte[] request = new byte[PACKET_SIZE]; // 每次从总的消息中读取220个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(Unpooled.copiedBuffer(request)); } // 3、多余的报文存起来 // 第一个报文: i+ 暂存 // 第二个报文: 1 与第一次 size = message.readableBytes(); if (size != 0) { System.out.println("多余的数据长度:" + size); // 剩下来的数据放到tempMsg暂存 留到下次再进行合并 tempMsg.clear(); tempMsg.writeBytes(message.readBytes(size)); } } }
public final class WebSocketServer { static int PORT = 9000; public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_REUSEADDR, true) .childHandler(new WebSocketServerInitializer()) .childOption(ChannelOption.SO_REUSEADDR, true); for (int i = 0; i < 100; i++) { // 绑定100个端口 b.bind(++PORT).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if ("true".equals(System.getProperty("netease.debug"))) System.out.println("端口绑定完成:" + future.channel().localAddress()); } }); } // 端口绑定完成,启动消息随机推送(测试) TestCenter.startTest(); System.in.read(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { // 职责链, 数据处理流程 ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); // 转为http请求 pipeline.addLast(new HttpObjectAggregator(65536)); // 最大数据量 pipeline.addLast(new WebSocketServerHandler()); // websocket握手,处理后续消息 pipeline.addLast(new NewConnectHandler()); } }
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final String WEBSOCKET_PATH = "/websocket"; private WebSocketServerHandshaker handshaker; public static final LongAdder counter = new LongAdder(); @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { counter.add(1); if (msg instanceof FullHttpRequest) { // 处理websocket握手 handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { // 处理websocket后续的消息 handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // Handle a bad request. //如果http解码失败 则返回http异常 并且判断消息头有没有包含Upgrade字段(协议升级) if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } // 构造握手响应返回 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(req), null, true, 5 * 1024 * 1024); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { // 版本不支持 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); ctx.fireChannelRead(req.retain()); // 继续传播 } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame 关闭 if (frame instanceof CloseWebSocketFrame) { Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get(); TestCenter.removeConnection(userId); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { // ping/pong作为心跳 System.out.println("ping: " + frame); ctx.write(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof TextWebSocketFrame) { // Echo the frame // TODO 处理具体的数据请求(... 云课堂聊天室,推送给其他的用户) //发送到客户端websocket ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text() + ", 欢迎使用Netty WebSocket服务, 现在时刻:" + new java.util.Date().toString())); return; } // 不处理二进制消息 if (frame instanceof BinaryWebSocketFrame) { // Echo the frame ctx.write(frame.retain()); } } private static void sendHttpResponse( ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // Generate an error page if response getStatus code is not OK (200). if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpUtil.setContentLength(res, res.content().readableBytes()); } // Send the response and close the connection if necessary. ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } private static String getWebSocketLocation(FullHttpRequest req) { String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH; return "ws://" + location; } }
// 新连接建立了 public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 解析请求,判断token,拿到用户ID。 Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters(); // String token = parameters.get("token").get(0); 不是所有人都能连接,比如需要登录之后,发放一个推送的token String userId = parameters.get("userId").get(0); ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId TestCenter.saveConnection(userId, ctx.channel()); // 保存连接 // 结束 } }
// 正常情况是,后台系统通过接口请求,把数据丢到对应的MQ队列,再由推送服务器读取 public class TestCenter { // 此处假设一个用户一台设备,否则用户的通道应该是多个。 // TODO 还应该有一个定时任务,用于检测失效的连接(类似缓存中的LRU算法,长时间不使用,就拿出来检测一下是否断开了); static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>(); // 保存信息 public static void saveConnection(String userId, Channel channel) { userInfos.put(userId, channel); } // 退出的时候移除掉 public static void removeConnection(Object userId) { if (userId != null) { userInfos.remove(userId.toString()); } } final static byte[] JUST_TEST = new byte[1024]; public static void startTest() { // 发一个tony吧 System.arraycopy("tony".getBytes(), 0, JUST_TEST, 0, 4); final String sendmsg = System.getProperty("netease.server.test.sendmsg", "false"); Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { try { // 压力测试,在用户中随机抽取1/10进行发送 if (userInfos.isEmpty()) { return; } int size = userInfos.size(); ConcurrentHashMap.KeySetView<String, Channel> keySetView = userInfos.keySet(); String[] keys = keySetView.toArray(new String[]{}); System.out.println(WebSocketServerHandler.counter.sum() + " : 当前用户数量" + keys.length); if (Boolean.valueOf(sendmsg)) { // 是否开启发送 for (int i = 0; i < (size > 10 ? size / 10 : size); i++) { // 提交任务给它执行 String key = keys[new Random().nextInt(size)]; Channel channel = userInfos.get(key); if (channel == null) { continue; } if (!channel.isActive()) { userInfos.remove(key); continue; } channel.eventLoop().execute(() -> { channel.writeAndFlush(new TextWebSocketFrame(new String(JUST_TEST))); // 推送1024字节 }); } } } catch (Exception ex) { ex.printStackTrace(); } }, 1000L, 2000L, TimeUnit.MILLISECONDS); } }
<!-- saved from url=(0022) --> <html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>Web Socket Test</title></head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { // 随机数 var random = Math.floor(Math.random()*(10000 - 10 +1) + 10) socket = new WebSocket("ws://" + random); socket.onmessage = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = ta.value + ‘\n‘ + event.data }; socket.onopen = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = "Web Socket opened!"; }; socket.onclose = function(event) { var ta = document.getElementById(‘responseText‘); ta.value = ta.value + "Web Socket closed"; }; } else { alert("Your browser does not support Web Socket."); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("The socket is not open."); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="Hello, World!"><input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)"> <h3>Output</h3> <textarea id="responseText" style="width:500px;height:300px;"></textarea> </form> </body></html>
public final class WebSocketClient { public static void main(String[] args) throws Exception { final String host = System.getProperty("netease.pushserver.host", ""); final String maxSize = System.getProperty("netease.client.port.maxSize", "100"); final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000"); int port = 9001; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.option(ChannelOption.SO_REUSEADDR, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpClientCodec()); p.addLast(new HttpObjectAggregator(8192)); p.addLast(WebSocketClientCompressionHandler.INSTANCE); p.addLast("webSocketClientHandler", new WebSocketClientHandler()); } }); // tcp 建立连接 for (int i = 0; i < 100; i++) { // 服务端有100个端口,发起对100个端口反复的连接 for (int j = 0; j < 60000; j++) { // 每个端口6万次连接 b.connect(host, port).sync().get(); } port++; } System.in.read(); } finally { group.shutdownGracefully(); } } }
// handler 处理多个事件~ 包括tcp连接建立之后的事件 // open websocket public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; public ChannelFuture handshakeFuture() { return handshakeFuture; } @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise(); } static AtomicInteger counter = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) { if (handshaker == null) { InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); URI uri = null; try { uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet()); } catch (Exception e) { e.printStackTrace(); } handshaker = WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()); } handshaker.handshake(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) { if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!"); } @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { try { handshaker.finishHandshake(ch, (FullHttpResponse) msg); if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client connected!"); handshakeFuture.setSuccess(); } catch (WebSocketHandshakeException e) { if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client failed to connect"); handshakeFuture.setFailure(e); } return; } if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ‘)‘); } WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client received message: " + textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client received pong"); } else if (frame instanceof CloseWebSocketFrame) { if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client received closing"); ch.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } ctx.close(); } }
打包上传服务器 服务端6G4核,客户端6G2核
open files太小了,调参数
