标签:nts def pre oop new 并且 ble 可靠 注销
Netty 是由 JBOSS 提供的一个 Java 开源框架。 Netty 提供异步的、事件驱动的网络应用程序框架和工具 ,用以快速开发高性能 、 高可靠性的网络服务器和客户端程序。
Netty 框架是对 Java BIO 、 Java NIO 框架的再次封装。 Netty 框架是一个面向上层业务实现进行封装的“业务层”框架。而Java Socket 框架、 Java NIO 框架、 Java AIO 框架更偏向于对下层技术实现的封装,是面向“技术层” 的框架。
1.工作原理
1) Boss 使用的线程池:Boss 线程池实际上就是 Java NIO 框架中的 Selector 工作角色,针对一个本地 IP 的端口,Boss 线程池中有一条线程工作,工作内容也相对简单,就是发现新的连接: Ne时支持同时监听多个端口,所以 Boss 线程池的大小按照需要监昕的服务器端口数量进行设置。
2) Work 线程池中的线程:(如果封装的是 Java NIO,那么具体的线程实现类就是 NIOEventLoop )都固定负责指派给它的网络连接的事件监听,并根据状态调用不同的Channe!Handler 事件方法。而最后一个参数 SelectorProvider 说明了这个 EventLoop 所使用的 多路复用I/O模型的具体实现由操作系统决定。
option 方法可以设置这个 ServerChannel 相应的各种属性(在代码中使用的是NIOServerSocketChannel) ; childOption 方法用于设置这个 ServerChannel 收到客户端事件后,所生成的新的 Channel 的各种属性(代码中生成的是 NIOSocketChannel ) 。
2. Netty 线程中几制
在 Netty 中,原来Java NIO中的Selector 的工作就交给 Boss 线程完成,而且建议使用线程池技术 。 Boss 线程负责发现连接到服务器的新的 Channel ( SocketServerChannel的 ACCEPT 事件),并且将这个 Channel 经过检查后注册到 Work 连接池的某个 EventLoop 线程中 。 而当 Work 线程发现操作系统有一个它感兴趣的I/O事件时(例如 SocketChannel 的 READ 事件 ) ,则调用相应的 ChannelHandler事件 。当某 个 Channel 失效后(例如显示调用 ctx.close() ) ,这个 Channel 将从绑定的EventLoop 中被剔除 。
在 Netty 中,如果我们使用的是 Java NIO 框架实现的对多路复用I/O模型的支持,那么进行这个循环的是 NIOEventLoop 类 (可参见该类中的 processSelectedKeysPlain 方法和processSelectedKey 方法) 。
一个 Work 线程池的线程将按照底层封装 Java NIO 框架中 Selector 的事件状态, 决定执行ChannelHandler 中的哪一个事件方法( Netty 中包括了 channelRegistered 、 channelUnregistered 、channelActive 、 channellnactive 等事件方法)。执行完成后, Work 线程将一直轮询直到操作系统回复下一个它所管理的 Channel 发生了新的 I/O 事件。
3.Netty 的几个概念: Channel 、 Buffer、 ChannelPipeline 、 ChanneHandler 、 ChannelHandlerContext等
1) ByteBuf
Netty 重写了 Java NIO 框架中的缓存结构,井将这个结构应用在更上层的封装ByteBuf(其实现有:EmptyByteBuf、 ReadOnlyByteBuf、 UnpooledDirectByteBuf、 PooledByteBuf)中 。
2) Channel
Netty中的 Channel 专门代表网络通信,这个和 Java NIO 框架中的 Channel 不一样,它是专门代表网络通信,所以它是由客户端地址+服务器地址+网络操作状态构成的。
在 Netty中,不止封装了多路复用I/O 模型,还封装了 Java BIO 支持的同步网络I/O通信模型。将它们在表现上都抽象成 Channel了。而我们知道在 Java BIO 支持的同步网络I/O模型
中,原来是不存在 Channel 这个概念的 。
3) ChannelPipeline 幸日 ChannelHandler
Netty 中的每一个 Channel ,都有一个独立的 ChannelPipeline , 它是双向的,数据可以通过这个它流入到服务器,也可以通过它从服务器流出。
在 ChannelPipeline 中, 有若干个过滤器,我们称之为“ ChannelHandler ” (也可以称为过滤器) 。 同“流入”和“流出”的概念相对应:用于处理/过滤“流入数据”的 ChannelHandler,被称为“ ChannellnboundHandler ”: 用于处理/过滤“流出数据”的 ChannelHandler,被称为“ ChannelOutboundHandler”,
(1) 责任链和适配器的应用
数据在 ChannelPipeline 中由 一个一个的 Handler 进行处理,井形成一个新的数据状态 。这是典型的“责任链”模式。
虽然数据管道中的 Handler 是按照顺序执行的, 但不代表某一个 Handler 会处理任何一种由“上一个 Handler”发送过来的数据。某些 Handler 会检查传来的数据是否符合要求,如果不符合自己的处理要求,则不进行处理 。
(2) ChannellnboundHandler 类举例
HttpRequestDecoder、ByteArrayDecoder、DelimiterBasedFrameDecoder、ProtobuIDecoder 和 ProtobufVarint32FrameDecoder等等
(3) ChannelOutboundHandler 类举例
HttpResponseEncoder、ByteArrayEncoder、ProtobutEncoder 、 ProtobufVarint32LengthFieldPrepender 、 MarshallingEncoder 、JZlibEncoder 等等
4. 信息格式
对数据信息格式的封装: Protobuf 数据协议的集成、JBoss Marshalling 数据协议的集成、HTTP Request/HTTP Response 协议的集成
5. 解决半包问题和粘包问题
1)MSS: MSS 属性是 TCP 连接双方在三次握手时所确认的每一个 TCP 报文段中数据宇段的最大长度。
2)半包是指,接收方应用程序在接收信息时,没有接收到 一个完整的信息格式块;
3)粘包是指,接收方应用程序在接收信息时,除了接收到发送方应用程序发送的某一个完整数据信息描述,还接收到了 一下发送方应用程序发送的下一个数据信息的一部分 。
4)半包和粘包是针对应用程序来说的,这个问题只 会发生在 TCP 协议进行连续发送数据时 ( TCP 长连接 )。
5)半包/粘包是一个应用层问题,在应用程序层面上、在业务层面上,我们自行定义的“数据块”,但在 TCP 层面上并不被协议认可。
6)常见的有两种方式 :
一是消息定长,即保证每一个完整的信息描述的长度都是一定的,这样无论 TCP/IP 协议如何进行分片,数据接收方都可以按照固定长度进行消息的还原 ;
二是在完整的一块数据结束后增加协商一致的分隔符 (例如增加一个回车符,再例如我们之前示例中一直使用的“over”关键字。
7)在Netty中可以应用FixedLengthFrameDecoder 、 DelimiterBasedFrameDecoder 、 LineBasedFrameDecoder 解决半包/粘包问题
6.实例
1.TestTCPNettyServer1
package testNetty; import java.net.InetSocketAddress; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.ThreadFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.util.concurrent.DefaultThreadFactory; public class TestTCPNettyServer1 { public static void main(String[] args) throws Exception { // 这就是主要的服务启动器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // =======================下面设置线程池 // Boss 线程池 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); // Work 线程池:这样的申明方式,主要是为了向读者说明 Netty 的线程组是怎样工作的 ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool"); // CPU 个数 int processorsNumber = Runtime.getRuntime().availableProcessors(); EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber, threadFactory, SelectorProvider.provider()); // 指定 Netty 的 Boss 线程和 Work 线程 serverBootstrap.group(bossLoopGroup, workLoogGroup); // 如果是以下的申明方式,则说明 Boss 线程和 Work 线程共享一个线程池 // serverBootstrap . group(workLoogGroup); // ========================下面我们设置服务的通道类型 // 只能是实现了 ServerChannel 接口的 “ 服务器”通道类 serverBootstrap.channel(NioServerSocketChannel.class); // 当然也可以这样创建( SelectorProvider 是不是感觉很熟悉〉 /* * serverBootstrap.channelFactory(new ChannelFactory<NioServerSocketChannel>() { * * @Override public NioServerSocketChannel newChannel() { return new * NioServerSocketChannel(SelectorProvider.provider()); * * } }); */ // ========================设置处理器 // 这里设置了一组简单的 ByteArrayDecoder 和 ByteArrayEncoder // Netty 的特色就在这一连串“通道水管 ” 中的“处理器 ” serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ByteArrayEncoder()); ch.pipeline().addLast(new TcpServerHandler()); ch.pipeline().addLast(new ByteArrayDecoder()); } }); // === =====================设置 Netty 服务器绑起的 IP 和端口 serverBootstrap.option(ChannelOption.SO_BACKLOG, 128); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 8888)); // 还可以监控多个端口 serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 8080)); } }
2.TcpServerHandler
package testNetty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.AttributeKey; @Sharable public class TcpServerHandler extends ChannelInboundHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(TcpServerHandler.class); // 每一个 Channel ,都有独立的 handler 、 ChannelHandlerContext 、 ChannelPipeline 、 Attribute // 所以不需要担心多个 Channel 中的这些对象相互影响 // 这里我们使用 Content 这个 Key ,记录这个 handler 中已经接收到的客户端信息 private static AttributeKey<StringBuffer> content = AttributeKey.valueOf("content"); @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { TcpServerHandler.LOGGER.info("super.channelRegistered(ctx)"); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { TcpServerHandler.LOGGER.info("super.channelUnregistered(ctx)"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { TcpServerHandler.LOGGER.info("super.channelActive(ctx) = " + ctx.toString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { TcpServerHandler.LOGGER.info("super.channelinactive(ctx)"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { TcpServerHandler.LOGGER.info("channelRead(ChannelHandlerContext ctx, Object msg ) "); // 我们使用 IDE 工具模拟长连接中的数据缓慢提交 // 由 read 方法负责接收数据,但只是进行数据累加,不进行任何处理 ByteBuf byteBuf = (ByteBuf) msg; try { StringBuffer contextBuffer = new StringBuffer(); while (byteBuf.isReadable()) { contextBuffer.append((char) byteBuf.readByte()); } // 加入临时区域 StringBuffer content = ctx.attr(TcpServerHandler.content).get(); if (content == null) { content = new StringBuffer(); ctx.attr(TcpServerHandler.content).set(content); } content.append(contextBuffer); } catch (Exception e) { throw e; } finally { byteBuf.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { TcpServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContextctx)"); // 由 readComplete 方法负责检查数据是否接收完了 StringBuffer content = ctx.attr(TcpServerHandler.content).get(); // 如果条件成立,则说明还没有接收到完整的客户端信息 if (content.indexOf("ver") == -1) { return; } // 当接收到信息后,首先要做的是清空原来的历史信息 ctx.attr(TcpServerHandler.content).set(new StringBuffer()); // 准备向客户端发送响应 ByteBuf byteBuf = ctx.alloc().buffer(1024); byteBuf.writeBytes("回发响应信息!".getBytes()); ctx.writeAndFlush(byteBuf); // 正常终止这个通道上下文,就可以关闭通道了 // 如果不关闭,这个通道的会话将一直存在, // 只要网络是稳定的,服务器就可以随时通过这个会话向客户端发送信息 // 关闭通道意味着 TCP 将正常新开,其中所有的 // handler 、 ChannelHandlerContext 、 ChannelPipeline 、 Attribute 等信息 // 都将被注销 ctx.close(); } }
3.TestTCPNettyClient
package testNetty; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URLEncoder; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestTCPNettyClient implements Runnable { private final static Logger LOGGER = LoggerFactory.getLogger(TestTCPNettyClient.class); private CountDownLatch countDownLatch; private Integer clientindex; public TestTCPNettyClient(CountDownLatch countDownLatch, Integer clientindex) { this.countDownLatch = countDownLatch; this.clientindex = clientindex; } @Override public void run() { Socket socket = null; OutputStream clientRequest = null; InputStream clientResponse = null; try { socket = new Socket("localhost", 8888); clientRequest = socket.getOutputStream(); clientResponse = socket.getInputStream(); this.countDownLatch.await(); clientRequest.write(URLEncoder.encode("第" + this.clientindex + "个客户端请求 11 。", "UTF-8").getBytes()); clientRequest.flush(); clientRequest.write(URLEncoder.encode("第" + this.clientindex + "个客户端请求 22 。 over。", "UTF-8").getBytes()); TestTCPNettyClient.LOGGER.info("第" + this.clientindex + "个客户端请求发送完成 , 等待服务器返回 "); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; int realLen; String message = ""; while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { message += new String(contextBytes, 0, realLen); } TestTCPNettyClient.LOGGER.info("接收到来自服务器的信息:" + message); } catch (Exception e) { TestTCPNettyClient.LOGGER.error(e.getMessage(), e); } finally { // 试图关闭连接 try { clientRequest.close(); clientResponse.close(); socket.close(); } catch (Exception e) { TestTCPNettyClient.LOGGER.error(e.getMessage(), e); } } } }
7.使用 Netty 的 HTTP 编码/解码处理器设计的一个简单的 Web 服务器
标签:nts def pre oop new 并且 ble 可靠 注销
原文地址:https://www.cnblogs.com/gispathfinder/p/9032997.html