标签:
在上篇《Netty之引题》中,分别对AIO,BIO,PIO,NIO进行了简单的阐述,并写了简单的demo。但是这里说的简单,我也只能呵呵了,特别是NIO、AIO(我全手打的,好麻烦)。
在开始netty开发TimeServer之前,先回顾下NIO进行服务端开发的步骤:
一个简单的NIO程序,需要经过繁琐的十多步操作才能完成最基本的消息读取和发送,这也是我学netty的原因,下面就看看使用netty是如何轻松写服务器的。
在这里,我使用IDEA 14 + Maven用netty写上篇中TimeServer的程序。这里我直接用Maven的pom.xml来直接下载netty的包(Maven是对依赖进行管理,支持自动化的测试、编译、构建的项目管理工具,具体的Maven请读者自行百度、google搜索)。
/* TimeServer */
1 public class TimeServer { 2 public void bind(int port)throws Exception{ 3 /* 配置服务端的NIO线程组 */ 4 // NioEventLoopGroup类 是个线程组,包含一组NIO线程,用于网络事件的处理 5 // (实际上它就是Reactor线程组)。 6 // 创建的2个线程组,1个是服务端接收客户端的连接,另一个是进行SocketChannel的 7 // 网络读写 8 EventLoopGroup bossGroup = new NioEventLoopGroup(); 9 EventLoopGroup WorkerGroup = new NioEventLoopGroup(); 10 11 try { 12 // ServerBootstrap 类,是启动NIO服务器的辅助启动类 13 ServerBootstrap b = new ServerBootstrap(); 14 b.group(bossGroup,WorkerGroup) 15 .channel(NioServerSocketChannel.class) 16 .option(ChannelOption.SO_BACKLOG,1024) 17 .childHandler(new ChildChannelHandler()); 18 19 // 绑定端口,同步等待成功 20 ChannelFuture f= b.bind(port).sync(); 21 22 // 等待服务端监听端口关闭 23 f.channel().closeFuture().sync(); 24 }finally { 25 // 释放线程池资源 26 bossGroup.shutdownGracefully(); 27 WorkerGroup.shutdownGracefully(); 28 } 29 } 30 31 private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ 32 @Override 33 protected void initChannel(SocketChannel arg0)throws Exception{ 34 arg0.pipeline().addLast(new TimeServerHandler()); 35 } 36 } 37 38 public static void main(String[]args)throws Exception{ 39 int port = 8080; 40 if(args!=null && args.length>0){ 41 try { 42 port = Integer.valueOf(args[0]); 43 } 44 catch (NumberFormatException ex){} 45 } 46 new TimeServer().bind(port); 47 } 48 }
1 public class TimeServerHandler extends ChannelHandlerAdapter{ 2 // 用于网络的读写操作 3 @Override 4 public void channelRead(ChannelHandlerContext ctx,Object msg) 5 throws Exception{ 6 ByteBuf buf = (ByteBuf)msg; 7 byte[]req = new byte[buf.readableBytes()]; 8 buf.readBytes(req); 9 String body = new String(req,"UTF-8"); 10 System.out.println("the time server order : " + body); 11 12 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date( 13 System.currentTimeMillis()).toString():"BAD ORDER"; 14 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 15 ctx.write(resp); 16 } 17 18 @Override 19 public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{ 20 ctx.flush(); // 它的作用是把消息发送队列中的消息写入SocketChannel中发送给对方 21 // 为了防止频繁的唤醒Selector进行消息发送,Netty的write方法,并不直接将消息写入SocketChannel中 22 // 调用write方法只是把待发送的消息发到缓冲区中,再调用flush,将发送缓冲区中的消息 23 // 全部写到SocketChannel中。 24 } 25 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 28 ctx.close(); 29 } 30 }
/* TimeClient */
1 public class TimeClient { 2 public void connect(String host,int port)throws Exception{ 3 // 配置服务端的NIO线程组 4 EventLoopGroup group = new NioEventLoopGroup(); 5 6 try { 7 // Bootstrap 类,是启动NIO服务器的辅助启动类 8 Bootstrap b = new Bootstrap(); 9 b.group(group).channel(NioSocketChannel.class) 10 .option(ChannelOption.TCP_NODELAY,true) 11 .handler(new ChannelInitializer<SocketChannel>() { 12 @Override 13 public void initChannel(SocketChannel ch) 14 throws Exception{ 15 ch.pipeline().addLast(new TimeClientHandler()); 16 } 17 }); 18 19 // 发起异步连接操作 20 ChannelFuture f= b.connect(host,port).sync(); 21 22 // 等待客服端链路关闭 23 f.channel().closeFuture().sync(); 24 }finally { 25 group.shutdownGracefully(); 26 } 27 } 28 29 public static void main(String[]args)throws Exception{ 30 int port = 8080; 31 if(args!=null && args.length>0){ 32 try { 33 port = Integer.valueOf(args[0]); 34 } 35 catch (NumberFormatException ex){} 36 } 37 new TimeClient().connect("127.0.0.1",port); 38 } 39 }
1 public class TimeClientHandler extends ChannelHandlerAdapter{ 2 3 // 写日志 4 private static final Logger logger = 5 Logger.getLogger(TimeClientHandler.class.getName()); 6 7 private final ByteBuf firstMessage; 8 9 public TimeClientHandler(){ 10 byte[] req = "QUERY TIME ORDER".getBytes(); 11 firstMessage = Unpooled.buffer(req.length); 12 firstMessage.writeBytes(req); 13 } 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx,Object msg) 17 throws Exception{ 18 ByteBuf buf = (ByteBuf)msg; 19 byte[]req = new byte[buf.readableBytes()]; 20 buf.readBytes(req); 21 String body = new String(req,"UTF-8"); 22 System.out.println("Now is : " + body); 23 } 24 25 @Override 26 public void channelActive(ChannelHandlerContext ctx){ 27 // 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive 28 // 发送查询时间的指令给服务端。 29 // 调用ChannelHandlerContext的writeAndFlush方法,将请求消息发送给服务端 30 // 当服务端应答时,channelRead方法被调用 31 ctx.writeAndFlush(firstMessage); 32 } 33 34 @Override 35 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 36 logger.warning("message from:"+cause.getMessage()); 37 ctx.close(); 38 } 39 }
本例子没有考虑读半包的处理,对于功能演示和测试,本例子没问题,但是如果进行性能或者压力测试,就不能正常工作了。在下一节会弄正确处理半包消息的例子。
项目在源码在src/main/java/Netty/下,分为客户端和服务端。
源码下载:GitHub地址:https://github.com/orange1438/Netty_Course
题外话:虽然文章全是我纯手打,没任何复制,但是文章大多数内容来自《Netty权威指南》,我也是顺便学习的。之前我做C++服务端,因为狗血的面试C++,结果公司系统居然是java的,无耐我所在的重庆,C++少得可怜,所以只有在公司里学java了。当然,有epoll,select,事件驱动,TCP/IP概念的小伙伴来说,学这个netty,还是挺简单的。
标签:
原文地址:http://www.cnblogs.com/orange1438/p/5003080.html