标签:netty nio socket bootstrap 多线程
----Netty 4.X 用户指南 为本人自己翻译,其中不免有差错,还请谅解,也请指正。
Preface、The Problem、The Solution 、Geting started、Before Geting Started 略过。
这个世界上最简单的协议不是“hello world”,而是”DISACRD“。这是一种对于你接收的任何数据都不做任何响应的协议。
为实现DISCARD协议,你需要做的唯一事情是忽略所有你接收到的数据。让我们直接从handler实现开始,handler的实现操控着由Netty生成的I/O事件。
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
<span style="font-size:14px;">@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }</span>
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
祝贺!你刚刚完成了你已经熟练掌握了Netty的第一个server。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
至此,我们对消费数据还没有任何响应。然而,服务器应该对请求做出回应。接下来让我们学习当任何地方收到的数据时返回时,如何通过ECHO协议的实现向一个客户端响应消息。
这和前面提到的的DISCARD SERVER 仅有的不同是,它对接收到的数据做出了响应而不是将接收的数据打印到控制台。因此,很有必要再次修改channelRead()方法。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
Channel ch = ...; ch.writeAndFlush(message); ch.close();因此,你需要在channelFuture结束时调用close()方法,他通过write()方法返回,并且当写操作已经做时,他会通知它的监听者。请注意这里,close()方法也有可能立刻关闭连接,并将返回channelFuture。
f.addListener(ChannelFutureListener.CLOSE);为测试我们的服务端是否如愿工作,你可以用如下UNIX rdate 命令:
$ rdate -o <port> -p <host>这里的<port> 是main方法中的端口,<host> 一般是localhost。
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });如果你是个喜欢挑战的人,你可能会试着将ReplayingDecoder进一步简化。你可以查阅API指南,了解更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); } }另外,Netty提供了开箱即用的解码器,使您可以很容易的实现大多数协议并且帮助你实现避免了整体不可维护的处理程序的问题。请参考如下包,获取更多详细例子:
package io.netty.example.time; import java.util.Date; public class UnixTime { private final int value; public UnixTime() { this((int) (System.currentTimeMillis() / 1000L + 2208988800L)); } public UnixTime(int value) { this.value = value; } public int value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readInt())); }更新decoder的同时, TimeClientHandler不再用bytebuf.
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }更加的简单和优雅了,不是吗? 同样的方法也可以应用与服务端。让我们首先修改下TimeServerHandler。
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }现在,仅剩下的是编码,编码继承自ChannelOutboundHandler,用于将unixtime 转换为bytebuf.这要比写一个解码器更加简单,因为这里没必要当将信息编码时,处理和整合包碎片。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt(m.value()); ctx.write(encoded, promise); // (1) } }
public class TimeEncoder extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt(msg.value()); } }最后剩余的任务是在服务端TimeServerHandler之前将TimeEncoder 插入到ChannelPipeline。
标签:netty nio socket bootstrap 多线程
原文地址:http://blog.csdn.net/hohxil_sun/article/details/38976327