标签:str util 字节 encoder etc 粘包 监听 大于 分享
客户端:
package com.server; import java.net.Socket; import java.nio.ByteBuffer; public class Client { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 10101); String message = "hello"; byte[] bytes = message.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length); buffer.putInt(bytes.length);//netty是write,ByteBuffer是nio的,所以用put。 buffer.put(bytes); byte[] array = buffer.array(); for(int i=0; i<5; i++){ socket.getOutputStream().write(array); } socket.close(); } }
服务端:
package com.server; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); //设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); //设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new MyDecoder()); pipeline.addLast("handler1", new HelloHandler()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start!!!"); } }
package com.server; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class MyDecoder extends FrameDecoder { @Override //FrameDecoder的decode方法 protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { //buffer是netty的ChannelBuffer if(buffer.readableBytes() > 4){ //必须大于基本的最短长度4个字节 if(buffer.readableBytes() > 2048){ buffer.skipBytes(buffer.readableBytes()); } //标记 buffer.markReaderIndex(); //长度 int length = buffer.readInt(); //buffer里面剩余的数据小于长度 if(buffer.readableBytes() < length){ //前面做了标记,这里可以还原 buffer.resetReaderIndex(); //缓存当前剩余的buffer数据,等待剩下数据包到来 return null; } //大于长度,开始读数据 byte[] bytes = new byte[length]; buffer.readBytes(bytes); //往下传递对象给HelloHandler,这次的buffer处理完了,后面在来buffer的时候FrameDecoder会帮我们循环读取, return new String(bytes); } //缓存当前剩余的buffer数据,等待剩下数据包到来(FrameDecoder帮我们实现的), return null; } }
package com.server; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class HelloHandler extends SimpleChannelHandler { private int count = 1;//单线程的没有并发问题 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println(e.getMessage() + " " +count); count++; } }
标签:str util 字节 encoder etc 粘包 监听 大于 分享
原文地址:https://www.cnblogs.com/yaowen/p/9063227.html