标签:
package time.server.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * TODO * * @description * @author mjorcen * @time 2015年5月25日 下午2:50:57 */ public class NTimeServerImpl { public void bind(int port) { // 创建两个NioEventLoopGroup 实例,NioEventLoopGroup // 是一个线程组,它包含一组NIO线程,专门用于处理网络事件的处理,实际上他们就是Reactor 线程组 // 这里创建两个的原因是一个用于服务端接收用户的链接,另一个用于进行SocketChannel的网络读写 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 ServerBootstrap ,它是netty用于NIO服务端的辅助启动类,目的是降低服务端的开发复杂度. ServerBootstrap bootstrap = new ServerBootstrap(); // 设定 服务端接收用户请求的线程组和用于进行SocketChannel网络读写的线程组 bootstrap.group(bossGroup, workerGroup); // 设置创建的 channel 类型 bootstrap.channel(NioServerSocketChannel.class); // 配置 NioServerSocketChannel 的 tcp 参数, BACKLOG 的大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 绑定io处理类(childChannelHandler).他的作用类似于 reactor 模式中的 handler // 类,主要用于处理网络 I/O 事件,例如对记录日志,对消息进行解码等. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 加入行处理器 ch.pipeline().addLast(new StringDecoder()); // 加入字符串解码器 ch.pipeline().addLast(new TimeServerHandler()); } }); // 绑定端口,随后调用它的同步阻塞方法 sync 等等绑定操作成功,完成之后 Netty 会返回一个 ChannelFuture // 它的功能类似于的 Future,主要用于异步操作的通知回调. ChannelFuture channelFuture = bootstrap.bind(port).sync(); // 等待服务端监听端口关闭,调用 sync 方法进行阻塞,等待服务端链路关闭之后 main 函数才退出. channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅的退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { NTimeServerImpl server = new NTimeServerImpl(); server.bind(9091); } }
ServerHandler
package time.server.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; import time.TimeConfig; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:06:09 */ public class TimeServerHandler extends ChannelHandlerAdapter implements TimeConfig { /* * (non-Javadoc) * * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel. * ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body); String currentTime = QUERY.equalsIgnoreCase(body) ? new Date() .toString() : "BAD ORDER"; currentTime += System.getProperty("line.separator"); System.out.println("currentTime : " + currentTime); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8")); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 当出现异常时,释放资源. ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
Client
package time.client.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:17:29 */ public class NTimeClient { public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步链接操作 ChannelFuture future = bootstrap.connect(host, port).sync(); // 等待客户端链路关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { NTimeClient client = new NTimeClient(); client.connect(9091, "localhost"); } }
ClientHandler
package time.server.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; import time.TimeConfig; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:06:09 */ public class TimeServerHandler extends ChannelHandlerAdapter implements TimeConfig { /* * (non-Javadoc) * * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel. * ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body); String currentTime = QUERY.equalsIgnoreCase(body) ? new Date() .toString() : "BAD ORDER"; currentTime += System.getProperty("line.separator"); System.out.println("currentTime : " + currentTime); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8")); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 当出现异常时,释放资源. ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
LineBasedFrameDecoder 与 StringDecoder
标签:
原文地址:http://www.cnblogs.com/mjorcen/p/4539205.html