第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。
第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。
第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。
一般情况下我们采用第三种方案
一,客户端类
1 package bhz.netty.runtime; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelInitializer; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.timeout.ReadTimeoutHandler; 12 13 public class Client { 14 15 public static void main(String[] args) throws Exception { 16 //ONE: 17 //1 线程工作组 18 EventLoopGroup work = new NioEventLoopGroup(); 19 20 //TWO: 21 //3 辅助类。用于帮助我们创建NETTY服务 22 Bootstrap b = new Bootstrap(); 23 b.group(work) //绑定工作线程组 24 .channel(NioSocketChannel.class) //设置NIO的模式 25 // 初始化绑定服务通道 26 .handler(new ChannelInitializer<SocketChannel>() { 27 @Override 28 protected void initChannel(SocketChannel sc) throws Exception { 29 // 为通道进行初始化: 数据传输过来的时候会进行拦截和执行 30 sc.pipeline().addLast(new ReadTimeoutHandler(5)); 31 sc.pipeline().addLast(new ClientHandler()); 32 } 33 }); 34 35 ChannelFuture cf = b.connect("127.0.0.1", 8765).syncUninterruptibly(); 36 37 cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-1".getBytes())); 38 39 Thread.sleep(1000); 40 41 cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-2".getBytes())); 42 43 Thread.sleep(1000); 44 cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-3".getBytes())); 45 46 Thread.sleep(1000); 47 cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-4".getBytes())); 48 49 Thread.sleep(1000); 50 cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-5".getBytes())); 51 52 53 //释放连接 54 cf.channel().closeFuture().sync(); 55 work.shutdownGracefully(); 56 } 57 }
二,客户端助手类
1 package bhz.netty.runtime; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 import io.netty.util.ReferenceCountUtil; 7 8 public class ClientHandler extends ChannelInboundHandlerAdapter { 9 10 11 @Override 12 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 13 try { 14 ByteBuf buffer = (ByteBuf)msg; 15 byte[] data = new byte[buffer.readableBytes()]; 16 buffer.readBytes(data); 17 String str = new String(data, "utf-8"); 18 System.err.println("客户端:" + str); 19 } finally { 20 ReferenceCountUtil.release(msg); 21 } 22 } 23 24 @Override 25 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 26 throws Exception { 27 ctx.close(); 28 } 29 30 }
三,服务端类
1 package bhz.netty.runtime; 2 3 4 import io.netty.bootstrap.ServerBootstrap; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelInitializer; 7 import io.netty.channel.ChannelOption; 8 import io.netty.channel.EventLoopGroup; 9 import io.netty.channel.nio.NioEventLoopGroup; 10 import io.netty.channel.socket.SocketChannel; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.timeout.ReadTimeoutHandler; 13 14 public class Server { 15 16 17 public static void main(String[] args) throws Exception { 18 //ONE: 19 //1 用于接受客户端连接的线程工作组 20 EventLoopGroup boss = new NioEventLoopGroup(); 21 //2 用于对接受客户端连接读写操作的线程工作组 22 EventLoopGroup work = new NioEventLoopGroup(); 23 24 //TWO: 25 //3 辅助类。用于帮助我们创建NETTY服务 26 ServerBootstrap b = new ServerBootstrap(); 27 b.group(boss, work) //绑定两个工作线程组 28 .channel(NioServerSocketChannel.class) //设置NIO的模式 29 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区 30 //.option(ChannelOption.SO_SNDBUF, 32*1024) // 设置发送数据的缓存大小 31 .option(ChannelOption.SO_RCVBUF, 32*1024) // 设置接受数据的缓存大小 32 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置保持连接 33 .childOption(ChannelOption.SO_SNDBUF, 32*1024) 34 // 初始化绑定服务通道 35 .childHandler(new ChannelInitializer<SocketChannel>() { 36 @Override 37 protected void initChannel(SocketChannel sc) throws Exception { 38 // 为通道进行初始化: 数据传输过来的时候会进行拦截和执行 39 sc.pipeline().addLast(new ReadTimeoutHandler(5)); 40 sc.pipeline().addLast(new ServerHandler()); 41 } 42 }); 43 44 ChannelFuture cf = b.bind(8765).sync(); 45 46 47 48 //释放连接 49 cf.channel().closeFuture().sync(); 50 work.shutdownGracefully(); 51 boss.shutdownGracefully(); 52 } 53 }
四,服务端助手类
1 package bhz.netty.runtime; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFutureListener; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.channel.ChannelInboundHandlerAdapter; 8 import io.netty.util.ReferenceCountUtil; 9 10 public class ServerHandler extends ChannelInboundHandlerAdapter { 11 12 /** 13 * 当我们通道进行激活的时候 触发的监听方法 14 */ 15 @Override 16 public void channelActive(ChannelHandlerContext ctx) throws Exception { 17 18 System.err.println("--------通道激活------------"); 19 } 20 21 /** 22 * 当我们的通道里有数据进行读取的时候 触发的监听方法 23 */ 24 @Override 25 public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception { 26 // try{ 27 //do something with msg 28 29 //NIO通信(传输的数据是什么? ----> buffer对象) 30 ByteBuf buf = (ByteBuf)msg; 31 byte[] request = new byte[buf.readableBytes()]; 32 buf.readBytes(request); 33 String body = new String(request, "utf-8"); 34 System.out.println("服务器: " + body); 35 36 //ByteBuf 37 String response = "我是返回的数据!!"; 38 ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); 39 //添加addListener 可以触发关闭通道监听事件 40 //.addListener(ChannelFutureListener.CLOSE); 41 42 // } finally { 43 // ReferenceCountUtil.release(msg); 44 // } 45 46 47 48 49 } 50 51 @Override 52 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 53 System.err.println("--------数据读取完毕----------"); 54 } 55 56 @Override 57 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 58 throws Exception { 59 System.err.println("--------数据读异常----------: "); 60 cause.printStackTrace(); 61 ctx.close(); 62 } 63 64 65 66 67 68 69 70 71 72 73 74 75 76 }
五,结果
客户端结果
1 客户端:我是返回的数据!! 2 客户端:我是返回的数据!! 3 客户端:我是返回的数据!! 4 客户端:我是返回的数据!! 5 客户端:我是返回的数据!!
客户端已近结束了。
服务端结果
1 --------通道激活------------ 2 服务器: hello netty!-1 3 --------数据读取完毕---------- 4 服务器: hello netty!-2 5 --------数据读取完毕---------- 6 服务器: hello netty!-3 7 --------数据读取完毕---------- 8 服务器: hello netty!-4 9 --------数据读取完毕---------- 10 服务器: hello netty!-5 11 --------数据读取完毕---------- 12 --------数据读异常----------: 13 io.netty.handler.timeout.ReadTimeoutException