码迷,mamicode.com
首页 > Web开发 > 详细

Netty总结

时间:2020-03-11 19:46:47      阅读:83      评论:0      收藏:0      [点我收藏+]

标签:协议   异常   保持活动   nio   职责   弹性   情况下   监听   int   

------------恢复内容开始------------

Netty概述

netty是JBoss提供的开源网络编程框架,提供异步的、基于事件驱动的网络应用程序框架和工具。

架构

使用了典型的三层网络架构,Reactor 通信调度层 -> 职责链 PipeLine -> 业务逻辑处理层

技术图片

 Reactor层主要监听网络的读写和连接操作,负责将网络层的数据 读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事 件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来 进行后续的处理。涉及到的类包括:Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父 类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe 以及其衍生出的各种内 部类等

Pipeline层负责事件在职责链中有序的传播,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向 后 / 向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部 的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层 的协议差异和线程模型差异,实现了架构层面的分层隔离。

Service层有纯粹的业务逻辑 处理,例如订单处理;也有应用层协议管理,例如 HTTP 协议、FTP 协议等。

 

Netty的IO模型

netty的io线程就是NioEventLoop,它聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端连接。

当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。

一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

当然,实际中可以使用NioEventLoopGroup线程组,可以并发处理的客户端就非常多了。

 

Netty的线程模型

基于主从 Reactors 多线程模型,如下图

技术图片

 

 MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。

SubReactor 负责相应通道的 IO 读写请求。

非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理

具体中,可以申请一个bossGroup和一个workerGroup

 

异步设计

异步的概念:在同步中一个线程请求一种资源,如果没有请求到,就会一直等待,直到请求到为止;但异步的话,如果没有请求到,就会转手去做其他的事,直到等有了资源才会继续进行。

CallBack

CallBack是异步编程常用的一种技术。 CallBack被传入某个方法中,该方法执行结束,则调用CallBack。类似的 技术,在javascript中被广泛使用

这些回调方法,可以从一个调用者线程放入到其他的不同线程中。因此,无法保证究竟在什么时候会回调用到 FetchCallback中的哪个方法。 回调方式面临一个这样的问题,即 当具有不同回调方法的异步方法,链式的放在一起时(即), 很容易导致代 码的混淆,易读性很差。 当然,代码易用性和可读性是两码事。例如,基于Javascript的Node.js,虽然大量的使用 回调方式;但是,却能很方便的使用它去写应用,代码可读性也很好。

Futures

第二中方式就是使用Futures。 Future是一种抽象,它表示在某个条件下,这个值变得有效或者可用。 Future对象 要么表示某个计算结果,要么就表示计算失败的某种异常

某些时候,使用Future会显得不太优雅,因为你不得不隔一段时间就去检查一下Future的状态以观察它是否执行完成;相比之下, callback则不会这样,它实在某种执行完成后,直接去触发某种相应的操作。

 

Netty的执行流程

服务端

步骤1:创建ServerBootStrap 实例

步骤2:设置并绑定 Reactor 线程池:EventLoopGroup,EventLoop 就是处理所有注册到本线程的 Selector 上面的 Channel,会有主从两个EventLoopGroup。

步骤3:设置并绑定服务端的 channel

步骤4和5:创建处理网络事件的 ChannelPipeline 和 handler,网络时间以流的形式在其中流转,handler 完成多数的功能定制:比如编解码 SSl 安全认证,handle是自己写的。

步骤6:绑定并监听端口

步骤7:轮询准备就绪的channel,由 Reactor 线程:NioEventLoop 执行 pipline 中的方法,最终调度并执行 channelHandler

 

客户端

客户端和服务器端基本一致,但步骤6中,不会绑定端口,而是根据主机名和端口号连接。

 

Netty的功能特性

传输服务,支持 BIO 和 NIO。

容器集成,支持 OSGI、JBossMC、Spring、Guice 容器。

协议支持,HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议。

Core 核心,可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。

 

Netty入门案例

服务器

技术图片
package com.liuxinghang.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer1 {

    public static void main(String[] args) throws InterruptedException {

        //创建一个线程组,接收客户端的连接
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        //创建一个线程组,处理网络操作
        EventLoopGroup workerGroup=new NioEventLoopGroup();

        //创建启动对象来配置参数
        ServerBootstrap b=new ServerBootstrap();
        b.group(bossGroup,workerGroup) //设置两个线程组
        .channel(NioServerSocketChannel.class)  //使用NioServerSocketChannel作为服务器通道的实现
        .option(ChannelOption.SO_BACKLOG,128)  //设置线程队列中等待连接的个数
        .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
        .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个初始化通道的对象
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new NettyServerHandle1());  //往pipeline链中添加自定义的handle类
            }
        });
        System.out.println("Server is ready....");
        ChannelFuture cf=b.bind(9999).sync();//绑定端口,设置非阻塞。
        System.out.println("Server is starting....");

        //关闭通道,关闭线程组
        cf.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

    }
}
NettyServer1
技术图片
package com.liuxinghang.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandle1  extends ChannelInboundHandlerAdapter{

    //读取数据的事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server:"+ctx);//输出上下文内容
        ByteBuf buf= (ByteBuf) msg;
        System.out.println("客户端发来的消息:"+buf.toString(CharsetUtil.UTF_8));
    }

    //数据读取完毕事件
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //向客户端返回数据
        ctx.writeAndFlush(Unpooled.copiedBuffer("就是没钱",CharsetUtil.UTF_8));
    }

    //处理异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
NettyServerHandle1

 

客户端

技术图片
package com.liuxinghang.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient1 {

    public static void main(String[] args) throws InterruptedException {

        //创建一个线程组
        EventLoopGroup group=new NioEventLoopGroup();

        //创建客户端的启动类
        Bootstrap b=new Bootstrap();

        //设置线程组
        b.group(group)
                .channel(NioSocketChannel.class)//设置客户端通道的实现类
        .handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new NettyClientHandle1());
            }
        });

        System.out.println("Client is ready....");
        ChannelFuture cf=null;
        for(int i=0;i<2;i++) {
             cf = b.connect("127.0.0.1", 9999).sync();//连接服务器
        }
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();



    }
}
NettyClient1
技术图片
package com.liuxinghang.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandle1 extends ChannelInboundHandlerAdapter{

    //通道就绪事件,就绪就可以写出消息了
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client:"+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("老板,还钱吧", CharsetUtil.UTF_8));
    }

    //读取数据的时间


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf= (ByteBuf) msg;
        System.out.println("服务器发来的消息:"+buf.toString(CharsetUtil.UTF_8));
    }
}
NettyClientHandle1

 

Netty总结

标签:协议   异常   保持活动   nio   职责   弹性   情况下   监听   int   

原文地址:https://www.cnblogs.com/lovejune/p/12430271.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!