标签:业务 print .text etl throw 线程池 override sock orm
需求:
服务端:接收客户端请求,返回当前系统时间
客户端:发起时间请求
服务端
package org.zln.netty.five.timer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 时间服务器服务端 * Created by sherry on 16/11/5. */ public class TimerServer { /** * 服务端绑定端口号 */ private int PORT; public TimerServer(int PORT) { this.PORT = PORT; } /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(TimerServer.class); public void bind() { /* NioEventLoopGroup是线程池组 包含了一组NIO线程,专门用于网络事件的处理 bossGroup:服务端,接收客户端连接 workGroup:进行SocketChannel的网络读写 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { /* ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度 */ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024, .childHandler(new TimerServerInitializer());//绑定I/O事件处理类 logger.debug("绑定端口号:" + PORT + ",等待同步成功"); /* bind:绑定端口 sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调 */ ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync(); logger.debug("等待服务端监听窗口关闭"); /* closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法 */ channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } finally { logger.debug("优雅退出,释放线程池资源"); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
package org.zln.netty.five.timer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; /** * Created by sherry on 16/11/5. */ public class TimerServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new TimerServerHandler()); } }
package org.zln.netty.five.timer; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; /** * Handler主要用于对网络事件进行读写操作,是真正的业务类 * 通常只需要关注 channelRead 和 exceptionCaught 方法 * Created by sherry on 16/11/5. */ public class TimerServerHandler extends ChannelHandlerAdapter { /** * 日志 */ private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //ByteBuf,类似于NIO中的ByteBuffer,但是更强大 ByteBuf reqBuf = (ByteBuf) msg; //获取请求字符串 String req = getReq(reqBuf); logger.debug("From:"+ctx.channel().remoteAddress()); logger.debug("服务端收到:" + req); if ("GET TIME".equals(req)){ String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date()); String resStr = "当前时间:" + timeNow; //获取发送给客户端的数据 ByteBuf resBuf = getRes(resStr); logger.debug("服务端应答数据:\n" + resStr); ctx.write(resBuf); }else { //丢弃 logger.debug("丢弃"); ReferenceCountUtil.release(msg); } } /** * 获取发送给客户端的数据 * * @param resStr * @return */ private ByteBuf getRes(String resStr) throws UnsupportedEncodingException { byte[] req = resStr.getBytes("UTF-8"); ByteBuf pingMessage = Unpooled.buffer(req.length); //将字节数组信息写入到ByteBuf pingMessage.writeBytes(req); return pingMessage; } /** * 获取请求字符串 * * @param buf * @return */ private String getReq(ByteBuf buf) { byte[] con = new byte[buf.readableBytes()]; //将ByteByf信息写出到字节数组 buf.readBytes(con); try { return new String(con, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //将消息发送队列中的消息写入到SocketChannel中发送给对方 logger.debug("channelReadComplete"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源 logger.error("exceptionCaught"); ctx.close(); } }
客户端
package org.zln.netty.five.timer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 时间服务器客户端 * Created by sherry on 16/11/5. */ public class TimerClient { /** * 日志 */ private Logger logger = LoggerFactory.getLogger(TimerServer.class); private String HOST; private int PORT; public TimerClient(String HOST, int PORT) { this.HOST = HOST; this.PORT = PORT; } public void connect(){ //配置客户端NIO线程组 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new TimerClientInitializer()); //发起异步连接操作 logger.debug("发起异步连接操作 - start"); ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync(); logger.debug("发起异步连接操作 - end"); //等待客户端链路关闭 logger.debug("等待客户端链路关闭 - start"); channelFuture.channel().closeFuture().sync(); logger.debug("等待客户端链路关闭 - end"); } catch (InterruptedException e) { logger.error(e.getMessage(),e); }finally { //优雅的关闭 eventLoopGroup.shutdownGracefully(); } } }
package org.zln.netty.five.timer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; /** * Created by sherry on 16/11/5. */ public class TimerClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimerClientHandler()); } }
package org.zln.netty.five.timer; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; /** * Created by sherry on 16/11/5. */ public class TimerClientHandler extends ChannelHandlerAdapter { /** * 日志 */ private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.debug("客户端连接上了服务端"); //发送请求 ByteBuf reqBuf = getReq("GET TIME"); ctx.writeAndFlush(reqBuf); } /** * 将字符串包装成ByteBuf * @param s * @return */ private ByteBuf getReq(String s) throws UnsupportedEncodingException { byte[] data = s.getBytes("UTF-8"); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); return reqBuf; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; String resStr = getRes(byteBuf); logger.debug("客户端收到:"+resStr); } private String getRes(ByteBuf buf) { byte[] con = new byte[buf.readableBytes()]; buf.readBytes(con); try { return new String(con, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
关于ByteBuf的读写,面向的都是ByteBuf,所以对于 read,从ByteBuf中读出来,将数据给字节数组,对于写,将数据从字节数组写入到ByteBuf中
标签:业务 print .text etl throw 线程池 override sock orm
原文地址:http://www.cnblogs.com/sherrykid/p/6034337.html