网上好多连接好多demo都不是netty5的,都是以前的版本,况且还有好多运行时老报错。入门级demo就不写了,估计都是那些老套路。好多公司都会有最佳实践,今天就说说如何自定义协议,一般自定义协议都是公司内部各个部门定义的,当然了我写的比较简单。
注意:
本教程是采用netty-all-5.0.0.Alpha2.jar,netty5的版本,不是网上很多的例子都是采用以前老大版本。
自定义协议:
协议 {
协议头(header)
消息体(body)
}
header格式 {
固定头,
命令码,
版本号,
长度
}
自定义协议嘛,都是可以自己定义实现的,内部商量好就可以。
package com.nio.netty; import java.io.Serializable; import java.util.Arrays; /** * Created by sdc on 2017/8/26. */ public class MsgHeader implements Serializable{ //固定头 private byte startTag; //命令码,4位 private byte[] cmdCode; //版本 2位 private byte[] version; private int length; public byte[] getVersion() { return version; } public void setVersion(byte[] version) { this.version = version; } public byte[] getCmdCode() { return cmdCode; } public void setCmdCode(byte[] cmdCode) { this.cmdCode = cmdCode; } public byte getStartTag() { return startTag; } public void setStartTag(byte startTag) { this.startTag = startTag; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } @Override public String toString() { return "MsgHeader{" + "startTag=" + startTag + ", cmdCode=" + Arrays.toString(cmdCode) + ", version=" + Arrays.toString(version) + ", length=" + length + ‘}‘; } }
package com.nio.netty; /** * Created by sdc on 2017/8/26. */ public class Message { private MsgHeader header; private Object body; //检验和 // private byte crcCode; // public byte getCrcCode() { // return crcCode; // } // // public void setCrcCode(byte crcCode) { // this.crcCode = crcCode; // } public MsgHeader getHeader() { return header; } public void setHeader(MsgHeader header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } @Override public String toString() { return "Message{" + "header=" + header + ", body=" + body + // ", crcCode=" + crcCode + ‘}‘; } }
消息格式定义完成后,需要编码和解码,这里采用ByteToMessageDecoder MessageToByteEncoder,这两个编解码。
package com.nio.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * Created by sdc on 2017/8/26. */ public class MsgEncoder extends MessageToByteEncoder { public static byte getIndexToByte(int i, int index){ if(index == 0){ return (byte)(i % 10); }else{ int num = (int)Math.pow(10, index); return (byte)((i / num) % 10); } } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception { if (o instanceof Message) { try { Message msg = (Message)o; if (msg == null || msg.getHeader() == null) { throw new Exception("The encode message is null"); } out.writeByte(msg.getHeader().getStartTag()); out.writeBytes(msg.getHeader().getCmdCode()); //占位 byte[] lengthBytes = new byte[]{0, 0, 0, 0}; out.writeBytes(lengthBytes); out.writeBytes(msg.getHeader().getVersion()); String body = (String) msg.getBody(); int length = 0; if (body != null) { byte[] bodyBytes = body.getBytes(); out.writeBytes(bodyBytes); length = bodyBytes.length; // if (Constants.CRCCODE_DEFAULT != msg.getCrcCode()) { // msg.setCrcCode(CRC8.calcCrc8(bodyBytes)); // } // msg.setCrcCode(); } //长度从int转换为byte[4] byte l1 = getIndexToByte(length, 3); byte l2 = getIndexToByte(length, 2); byte l3 = getIndexToByte(length, 1); byte l4 = getIndexToByte(length, 0); lengthBytes = new byte[]{l1, l2, l3, l4}; out.setBytes(5, lengthBytes); System.out.println("encoder:" + msg.getBody()); // out.writeByte(msg.getCrcCode()); }catch(Exception e){ e.printStackTrace(); throw e; } } } }
package com.nio.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.ByteToMessageCodec; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * Created by sdc on 2017/8/26. */ public class MsgDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { try{ //内部协议这块可以约定一下,超过多大的长度就不可以了。 // if(in.readableBytes() < 12){ // return; // } System.out.println("开始解码消息,消息长度:" + in.readableBytes()); in.markReaderIndex(); //设置一些消息的属性 Message message = new Message(); MsgHeader header = new MsgHeader(); header.setStartTag(in.readByte()); byte[] cmdCode = new byte[4]; in.readBytes(cmdCode); header.setCmdCode(cmdCode); System.out.println(new String(cmdCode, "UTF-8")); //长度从byte[4]转int byte[] lengthBytes = new byte[4]; in.readBytes(lengthBytes); int length = toInt(lengthBytes); System.out.println("header:" + length); header.setLength(length); if(length < 0 || length > 10240){//过长消息或不合法消息 throw new IllegalArgumentException("wrong message length"); } byte[] version = new byte[2]; in.readBytes(version); header.setVersion(version); System.out.println("version:" + new String(version, "UTF-8")); if(header.getLength() > 0){ System.out.println("bytebuffer可读的范围" + in.readableBytes()); if(in.readableBytes() > length + 1){ in.resetReaderIndex(); System.out.println("返回了"); return; } //读取body里的内容 byte[] bodyBytes = new byte[header.getLength()]; in.readBytes(bodyBytes); message.setBody(new String(bodyBytes, "UTF-8")); } //crccode暂时去掉 // message.setCrcCode(in.readByte()); //设置头部 message.setHeader(header); System.out.println("body:" + message.getBody()); out.add(message); }catch(Exception e){ e.printStackTrace(); throw e; } } public static int toInt(byte[] bytes){ int value = 0; for(int i=0; i<bytes.length; i++){ int num = (int)Math.pow(10, bytes.length - 1 - i); value += num * bytes[i]; } return value; } }
消息的类已经处理完了,下面来开始写客户端和服务端的代码。
客户端代码:
package com.nio.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * Created by sdc on 2017/8/26. */ public class MsgClient { public void connect(String ip, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); Message message = new Message(); String msgStr = "我想发送一条消息"; MsgHeader header = new MsgHeader(); header.setStartTag(new Byte("0")); header.setCmdCode("1234".getBytes()); header.setLength(msgStr.length()); header.setVersion("11".getBytes()); message.setBody(msgStr); message.setHeader(header); try { Bootstrap bs = new Bootstrap(); bs.group(workerGroup).channel(NioSocketChannel.class).handler(new ChildChannelHandler(message)); ChannelFuture f = bs.connect(ip,port).sync(); //写入消息 f.channel().writeAndFlush(message).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer { Message message; public ChildChannelHandler(Message message) { this.message = message; } @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new MsgDecoder()) .addLast(new MsgEncoder()); } } public static void main(String[] args){ try { new MsgClient().connect("127.0.0.1", 9080); } catch (Exception e) { e.printStackTrace(); } } }
服务端代码
package com.nio.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by sdc on 2017/8/26. */ public class MsgServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); ChannelFuture cf = sb.bind(port).sync(); cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() .addLast(new MsgDecoder()) .addLast(new MsgEncoder()); } } public static void main(String[] args){ try { new MsgServer().bind(9080); } catch (Exception e) { e.printStackTrace(); } } }
运行的时候先运行服务端,否则胡报如下的错误,
-all-5.0.0.Alpha2.jar com.nio.netty.MsgClient java.net.ConnectException: Connection refused: no further information: /127.0.0.1:9080 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:223) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:276) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:531) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412) at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280) at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877) at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706) at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661) at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) Process finished with exit code 0
成功会打印以下信息:
开始解码消息,消息长度:35 开始解码消息,消息长度2222:34 1234 header:24 version:11 可读的范围24 body:我想发送一条消息 Process finished with exit code 1
基本流程已经走完,学框架,我还是习惯先写例子,然后再学概念性的东西,会理解的更深刻一些。
参考了这篇文章,但是这篇文章似乎看不了结果,所以还是自己改了改,写了点东西。
http://www.blogs8.cn/posts/Wx8Sd43
本文出自 “不积跬步无以至千里” 博客,请务必保留此出处http://shangdc.blog.51cto.com/10093778/1959680
原文地址:http://shangdc.blog.51cto.com/10093778/1959680