码迷,mamicode.com
首页 > Web开发 > 详细

Netty in Action Version5

时间:2018-08-29 21:27:56      阅读:260      评论:0      收藏:0      [点我收藏+]

标签:change   2.4   else   enc   bin   main   util   list   site   

1.Netty介绍

  1.1为什么需要Netty

    1.1.1不是所有的网络框架都是一样的

    1.1.2Netty的功能非常丰富

      框架组成

技术分享图片

  1.2异步设计

    1.2.1Callbacks(回调)

      简单的回调

public interface Fetcher {
    void fetchData(FetchCallback callback);
}

public interface FetchCallback {
    void onData(Data data);

    void onError(Throwable cause);
}

public class Worker {
    public void doWork() {
        Fetcher fetcher = ...
        fetcher.fetchData(new FetchCallback() {
            @Override
            public void onData(Data data) { //获取到数据
                System.out.println("Data received: " + data);
            }

            @Override
            public void onError(Throwable cause) { //未获取到数据
                System.err.println("An error accour: " + cause.getMessage());
            }
        });
    }
}

      Fetcher.fetchData()方法需传递一个FetcherCallback类型的参数,当获得数据或发生错误时被回调。对于每种情况都提供了统一的方法:FetcherCallback.onData(),将接收数据时被调用;FetcherCallback.onError(),发生错误时被调用

    1.2.2Futures

    ExecutorService executor = Executors.newCachedThreadPool();
    Runnable task1 = new Runnable() {
        @Override
        public void run() {
            doSomeHeavyWork();
        }
        //...
    }
    Callable<Interger> task2 = new Callable() {
        @Override
        public Integer call() {
            return doSomeHeavyWorkWithResul();
        }
        //...
    }
    Future<?> future1 = executor.submit(task1);
    Future<Integer> future2 = executor.submit(task2);
    while(!future1.isDone()||!future2.isDone()){
        ...
        // do something else
        ...
    }

       Future的未来应用

public interface Fetcher {
    Future<Data> fetchData();
}

public class Worker {
    public void doWork() {
        Fetcher fetcher = ...
        Future<Data> future = fetcher.fetchData();
        try {
            while (!fetcher.isDone()) {
                //...
                // do something else 
            }
            System.out.println("Data received: " + future.get());
        } catch (Throwable cause) {
            System.err.println("An error accour: " + cause.getMessage());
        }
    }
}

  1.3Java中的Blocking和non-blocking IO对比

     1.3.1基于阻塞IO的EchoServer

public class PlainEchoServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);//绑定端口
        try {
            while (true) {
                final Socket clientSocket = socket.accept(); //阻塞,直到接受新的客户端连接为止。
                System.out.println("Accepted connection from " + clientSocket);
                new Thread(new Runnable() { //创建处理客户端连接的新线程
                    @Override
                    public void run() {
                        try {
                            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
                            while (true) { //从客户端读取数据并将其写回
                                writer.println(reader.readLine());
                                writer.flush();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start(); //开始执行程序
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

    1.3.2非阻塞IO基础

      ByteBuffer

         将数据写入ByteBuffer

        调用ByteBuffer.flip()从写模式切换到读取模式

        从ByteBuffer读取数据

        ByteBuffer.clear()清除所有数据

        Bytebuffer.compact()清除已读取数据

    Channel inChannel = ....;
    ByteBuffer buf=ByteBuffer.allocate(48);
    int bytesRead=-1;
    do{
        bytesRead=inChannel.read(buf); //将数据从通道读取到ByteBuffer
        if(bytesRead!=-1){
            buf.flip();//使缓冲区为读做准备
        while(buf.hasRemaining()){
            System.out.print((char)buf.get()); //读取ByteBuffer中的字节;每个get()操作都会将位置更新1
        }
        buf.clear(); //让ByteBuffer准备好再写一遍
        }
    }while(bytesRead!=-1);
    inChannel.close();

      使用NIO选择器

        1.创建一个或多个选择器,其中可以注册打开的通道(套接字)。

        2.注册信道时,指定您感兴趣侦听的事件。以下四个可用事件(或操作/操作)为:接收、连接、读取、等待

        3.在注册通道时,您可以调用Selector.select()方法来阻塞,直到发生这些事件之一。

        4.当该方法解除阻塞时,您可以获得所有SelectionKey实例(这些实例保存对已注册通道和所选操作的引用)并执行一些操作。 您到底做了什么取决于哪个操作已经准备好了。SelectedKey可以在任何给定时间包含多个操作。

    1.3.3基于NIO的EchoServer

public class PlainNioEchoServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);//将服务器绑定到端口
        serverChannel.configureBlocking(false);//设置为非阻塞
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);//向选择器注册通道,以便对被接受的新客户端连接感兴趣
        while (true) {
            try {
                selector.select();//阻塞,直到选定某物为止。
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle in a proper way
                break;
            }
            Set readyKeys = selector.selectedKeys(); //获取所有SelectedKey实例
            Iterator iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                iterator.remove();//从迭代器中删除SelectedKey
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();//接受客户端连接
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);//设置为非阻塞
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));//注册到选择器的连接并设置ByteBuffer
                    }
                    if (key.isReadable()) {//检查SelectedKey的阅读
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        client.read(output); //读取数据到ByteBuffer
                    }
                    if (key.isWritable()) {//检查SelectedKey的写
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        client.write(output);
                        output.compact();//将数据从ByteBuffer写入信道
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }
    }
}

    1.3.4基于NIO.2的EchoServer

      与最初的NIO实现不同,NIO.2允许您发出IO操作并提供所谓的完成处理程序

public class PlainNio2EchoServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(port);
        serverChannel.bind(address);
        final CountDownLatch latch = new CountDownLatch(1);
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { //开始接受新的客户端连接。一旦其中一个被接受,CompletionHandler就会被调用。
            @Override
            public void completed(final AsynchronousSocketChannel channel, Object attachment) {
                serverChannel.accept(null, this); //再次接受新的客户端连接
                ByteBuffer buffer = ByteBuffer.allocate(100);
                channel.read(buffer, buffer, new EchoCompletionHandler(channel)); //触发通道上的读取操作,一旦读取某个消息,将通知给定的PrimeTyHand处理程序。
            }

            @Override
            public void failed(Throwable throwable, Object attachment) {
                try {
                    serverChannel.close(); //关闭套接字错误
                } catch (IOException e) {
                    // ingnore on close
                } finally {
                    latch.countDown();
                }
            }
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel channel;

        EchoCompletionHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            buffer.flip();
            channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { //触发通道上的写操作,给定的CompletionHandler一写就会被通知

                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    if (buffer.hasRemaining()) {
                        channel.write(buffer, buffer, this); //如果ByteBuffer中有东西,则再次触发写操作。
                    } else {
                        buffer.compact();
                        channel.read(buffer, buffer, EchoCompletionHandler.this); //触发通道上的读取操作,一旦读取某个消息,将通知给定的PrimeTyHand处理程序。
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        // ingnore on close
                    }
                }
            });
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                channel.close();
            } catch (IOException e) {
                // ingnore on close
            }
        }
    }
}

  1.4NIO的问题和Netty中是如何解决这些问题的

    1.4.1 跨平台和兼容性问题

    1.4.2扩展ByteBuffer.或者不扩展

    1.4.3散射和聚集可能会泄漏

    1.4.4解决著名的epoll空轮询bug

  1.5小结

2.第一个Netty程序

  2.1搭建开发环境

  2.2Netty客户机和服务器概述

  2.3编写Echo服务器

    2.3.1引导服务器

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); //创建引导服务器
            b.group(group);
            b.channel(NioServerSocketChannel.class);//指定nio传输、本地套接字地址。
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //将处理程序添加到通道管道
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoServerHandler()); //绑定服务器,等待服务器关闭,并释放资源。
                }
            });
            ChannelFuture f = b.bind().sync(); //绑定服务器,然后等待绑定完成,对sync()方法的调用将导致阻塞,直到服务器绑定。
            System.out.println(EchoServer.class.getName() + "ì started and listen on ì" + f.channel().localAddress());//应用程序将等到服务器通道关闭。
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("ìUsage:" + EchoServer.class.getSimpleName() + " < port > ");
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }
}

    2.3.2实现服务器/业务逻辑

@ChannelHandler.Sharable //使用@Sharable注释,以便在各通道之间共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Server received: " + msg);
        ctx.write(msg);//把收到的消息写回去。请注意,这将不会将消息刷新到远程对等端。
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); //将所有以前的书面消息(挂起)刷新到远程对等端,并在操作完成后关闭通道。
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace(); //异常日志
        ctx.close(); //异常关闭通道
    }
}

    2.3.3捕获异常

  2.4编写回送客户端

    2.4.1引导客户端

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap(); //为客户端创建引导程序
            b.group(group); //指定EventLoopGroup来处理客户端事件。使用NioEventLoopGroup,因为应该使用NIO-传输
            b.channel(NioSocketChannel.class);//指定通道类型;为NIO-传输使用正确的通道类型
            b.remoteAddress(new InetSocketAddress(host, port));//设置客户端连接的InetSocketAddress
            b.handler(new ChannelInitializer<SocketChannel>() { //使用ChannelInitiators指定ChannelHandler,一旦连接建立并创建通道,就调用它
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoClientHandler());//将EchoClientHandler添加到属于通道的Channel管道。管道拥有所有通道的通道处理器
                }
            });
            ChannelFuture f = b.connect().sync(); //将客户端连接到远程对等端;等待sync()完成连接
            f.channel().closeFuture().sync(); //等到ClientChannel关闭。这会挡住。
        } finally {
            group.shutdownGracefully().sync(); //关闭引导程序和线程池;释放所有资源
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
            return;
        }
        
        // Parse options.
        final String host = args[0];
        final int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

    2.4.2实现客户端逻辑

@ChannelHandler.Sharable //使用@Sharable注释,因为它可以在通道之间共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); //现在写入通道连接的消息
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx,
                             ByteBuf in) {
        System.out.println("Client received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); //以己转储的形式记录接收到的消息
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//日志异常和关闭通道
        cause.printStackTrace();
        ctx.close();
    }
}

  2.5编译和运行回送客户端和服务器

    2.5.1编译服务器和客户端

    2.5.2运行服务器和客户端

  2.6小结

3.Netty核心概念

  3.1Netty速成班

  3.2通道、事件和输入/输出(IO)

    EventLoops与EventLoopGroups的关系。

技术分享图片

  3.3引导:什么和为什么

  3.4通道处理程序和数据流

    3.4.1把它拼凑在一起,管道和处理程序

      管道安排的示例。

技术分享图片

  3.5编码器、解码器和域逻辑:对处理程序的深入观察

    3.5.1Encodes,Deodes

    3.5.2域逻辑

4.Transports(传输)

  4.1案例研究:运输迁移

    4.1.1使用无网络的I/O和NIO

public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            while (true) {
                final Socket clientSocket = socket.accept(); 
                System.out.println("Accepted connection from " +
                        clientSocket);
                //创建新线程来处理连接
                new Thread(() -> {
                    OutputStream out;
                    try {
                        out = clientSocket.getOutputStream();
                        out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //向连接的客户端写入消息
                        out.flush();
                        clientSocket.close(); //一旦消息被写入并刷新,就关闭连接。
                    } catch (IOException e) {
                        e.printStackTrace();
                        try {
                            clientSocket.close();
                        } catch (IOException ex) {
                            // ignore on close
                        }
                    }
                }).start(); //启动线程开始处理
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

    4.1.2没有Netty的异步网络

public class PlainNioServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        ServerSocketChannel serverChannel;
        Selector selector;
        serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        serverChannel.configureBlocking(false);
        selector = Selector.open();//打开处理通道的选择器
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); //将erverSocket注册到选择器,并指定它对新接受的客户端感兴趣。
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        while (true) {
            try {
                selector.select(); //等待已准备好进行处理的新事件。这将阻止直到发生什么事情
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle in a proper way
                break;
            }
            Set<SelectedKey> readyKeys = selector.selectedKeys(); //获取接收事件的所有SelectionKey实例
            Iterator<SelectedKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) { //检查事件是否是因为新客户端准备接受
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //接受客户端并将其注册到选择器
                    }
                    if (key.isWritable()) { //检查事件是否因为套接字已准备好写入数据
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) { //将数据写入连接的客户端。如果网络饱和,这可能不会写入所有数据。如果是这样的话,它将捡起未写入的数据,并在网络再次可写时将其写入。
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }
    }
}

    4.1.3在Netty中使用I/O和NIO

public class NettyOioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group);
            b.channel(OioServerSocketChannel.class);//使用OioEventLoopGroupIto允许阻塞模式(旧-IO)
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //指定将为每个接受的连接调用的信道初始化器
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler来拦截事件并允许对它们作出反应
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客户端写入消息,并在消息写入后添加ChannelFutureListener以关闭连接
                        }
                    });
                }
            });
            ChannelFuture f = b.bind().sync(); //绑定服务器以接受连接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync(); //释放所有资源
        }
    }
}

    4.1.4实现异步支持

public class NettyNioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group);
            b.channel(NioServerSocketChannel.class);//使用OioEventLoopGroupIto允许阻塞模式(旧-IO)
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //指定将为每个接受的连接调用的信道初始化器
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler来拦截事件并允许对它们作出反应
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客户端写入消息,并在消息写入后添加ChannelFutureListener以关闭连接
                        }
                    });
                }
            });
            ChannelFuture f = b.bind().sync(); //绑定服务器以接受连接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync(); //释放所有资源
        }
    }
}

  4.2传输API

    通道接口层次结构

技术分享图片

  

    最重要的信道方法

      eventLoop()返回分配给通道的EVELATIORE

      pipeline()返回分配给通道的通道管道。

      isActive()如果通道处于活动状态,则返回该通道,这意味着它已连接到远程对等端。

      localAddress()返回绑定到本地的SocketAddress

      remoteAddress()返回绑定远程的SocketAddress

      write()将数据写入远程对等程序。这些数据是通过管道传递的。

    写信给频道

    Channel channel = ...
    ByteBuf buf = Unpooled.copiedBuffer(..your data, CharsetUtil.UTF_8);//创建保存要写入的数据的ByteBuf
    ChannelFuture cf = channel.write(buf);//写数据
    cf.addListener(new ChannelFutureListener() { //添加ChannelFutureListener,以便在写入完成后得到通知
        @Override
        public void operationComplete (ChannelFuture future){
            if (future.isSuccess()) { //写入操作完成,没有错误。
                System.out.println("Write successful");
            } else {
                System.err.println("Write error"); //写入操作已完成,但由于错误
                future.cause().printStacktrace();
            }
        }
    });

    使用来自多个线程的通道

    final Channel channel = ...
    final ByteBuf buf = Unpooled.copiedBuffer(..your data", CharsetUtil.UTF_8); //创建保存要写入的数据的ByteBuf
    Runnable writer = new Runnable() { //创建Runnable将数据写入通道
        @Override
        public void run() {
            channel.write(buf.duplicate());
        }
    };
    Executor executor = Executors.newChachedThreadPool();//获取对执行程序的引用,该执行器使用线程执行任务。
    // write in one thread
    executor.execute(writer); //将写任务交给Executor,以便在线程中执行。
    // write in another thread
    executor.execute(writer); //将另一个写任务交给Executor,以便在线程中执行。

  4.3包括运输

    4.3.1NiO非阻塞I/O

      选择操作位集

        OP_ACCEPT一旦新连接被接受并创建了一个通道,就会得到通知。

        OP_CONNECT一旦连接尝试完成,就会收到通知。

        OP_READ一旦数据准备好从通道中读取,就会得到通知。

        OP_WRITE一旦有可能将更多的数据写入通道,就会得到通知。大多数情况下,这是可能的,但可能不是因为OS套接字缓冲区已完全填满。 您编写得更快,远程对等程序就可以处理它。

      选择器逻辑

技术分享图片

 

    4.3.2OIO旧阻塞I/O

    4.3.3VM传输中的局部

    4.3.4嵌入式传输

  4.4何时使用各种运输方式

    低并发连接计数->OIO

    高并发连接计数->NIO

    低延时->OIO

    基本模块代码->OIO

    在同一个JVM中进行通信->Local

    测试ChannelHandler实现->Embedded

5.Buffers(缓冲)

  5.1缓冲API

  5.2字节数据容器

    5.2.1工作原理

    5.2.2不同类型的ByteBuf

      Heap Buffer(堆缓冲区)

    ByteBuf heapBuf = ...;
    if (heapBuf.hasArray()) { //检查ByteBuf是否由数组支持
        byte[] array = heapBuf.array(); //获取对数组的引用
        int offset = heapBuf.arrayOffset() + heapBuf.position(); //计算其中第一个字节的偏移量
        int length = heapBuf.readableBytes(); //获取可读字节的数量
        YourImpl.method(array, offset, length); //使用数组、偏移量、长度作为参数的调用方法
    }

      Direct Buffer(直接缓冲区)

    ByteBuf directBuf = ...;
    if (!directBuf.hasArray()){ //检查ByteBuf是否不受数组支持,对于直接缓冲区,数组为false
        int length = directBuf.readableBytes(); //获取可读字节数
        byte[] array = new byte[length]; //分配具有可读字节长度的新数组
        directBuf.getBytes(array); //将字节读入数组
        YourImpl.method(array, 0, array.length);//以数组、偏移量、长度为参数的Call方法
    }

      Composite Buffer(复合缓冲区)

        编写遗留的JDK ByteBuffer

    //Use an array to composite them
    ByteBuffer[] message = new ByteBuffer[] { header, body }; 
    // Use copy to merge both 
    ByteBuffer message2 = ByteBuffer.allocate(header.remaining()+ body.remaining(); 
    message2.put(header); 
    message2.put(body); 
    message2.flip();

        CompositeByteBuf

    CompositeByteBuf compBuf = ...;
    ByteBuf heapBuf = ...;
    ByteBuf directBuf = ...;
    compBuf.addComponent(heapBuf, directBuf); //将ByteBuf实例追加到复合
            .....
            compBuf.removeComponent(0); //在索引0 bytebuf remove(heapbuf这里)
            for (ByteBuf buf: compBuf) { //循环遍历所有组合的ByteBuf
                System.out.println(buf.toString());
            }

      [计]存取数据

    CompositeBuf compBuf = ...;
    if (!compBuf.hasArray()) { //检查ByteBuf是否不受数组支持,这对于复合缓冲区来说是false
        int length = compBuf.readableBytes(); //获取可读字节的数量
        byte[] array = new byte[length];//分配具有可读字节长度的新数组
        compBuf.getBytes(array); //将字节读入数组
        YourImpl.method(array, 0, array.length);//以数组、偏移量、长度为参数的Call方法
    }

  5.3 ByteBuf的字节操作

    5.3.1 随机访问索引

    ByteBuf buffer = ...; 
    for (int i = 0; i < buffer.capacity(); i ++) {
        byte b = buffer.getByte(i);
        System.out.println((char) b);
    }

    5.3.2 顺序访问索引

    5.3.3Discardable bytes废弃字节

    5.3.4 可读字节(实际内容)

    ByteBuf buffer = ...; 
    while (buffer.readable()) {
        System.out.println(buffer.readByte());
    }

    5.3.5 可写字节Writable bytes

    ByteBuf buffer = ...; 
    while (buffer.writableBytes() >= 4) {
        buffer.writeInt(random.nextInt()); 
    }

    5.3.6 清除缓冲区索引Clearing the buffer indexs

    5.3.7 搜索操作Search operations

    5.3.8 标准和重置Mark and reset

    5.3.9 衍生的缓冲区Derived buffers

    Charset utf8 = Charset.forName("UTF-8");
    ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //创建ByteBuf,它包含给定字符串的字节
    ByteBuf sliced = buf.slice(0, 14); //创建ByteBuf的新片段,从索引0开始,以索引14结束
    System.out.println(sliced.toString(utf8); //包含在行动中的Netty
    buf.setByte(0, (byte) íJí); //更新索引0上的字节
    assert buf.get(0) == sliced.get(0);//不会失败,因为ByteBuf共享相同的内容,因此对其中一个的修改在另一个上也是可见的

    5.3.10 读/写操作以及其他一些操作

  5.4 ByteBufHolder

    5.4.1 ByteBufAllocator

    5.4.2 Unpooled

    5.4.3 ByteBufUtil

6.ChannelHandler

  6.1 ChannelPipeline

    修改ChannelPipeline的方法     

      addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置
      addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
      addAfter(...),在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
      addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler
      remove(...),删除ChannelPipeline中指定的ChannelHandler
      replace(...),替换ChannelPipeline中指定的ChannelHandler

  6.2 ChannelHandlerContext

    6.2.1 通知下一个ChannelHandler

      ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系

技术分享图片

      事件通过渠道

    ChannelHandlerContext ctx = ..;
    Channel channel = ctx.channel(); //获取属于ChannelHandlerContext的通道的引用
    channel.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));//通过通道写入缓冲器

      信道管道事件

    ChannelHandlerContext ctx = ..;
    ChannelPipeline pipeline = ctx.pipeline();
    pipeline.write(Unpooled.copiedBuffer(ìNetty in Actionì,CharsetUtil.UTF_8));

      通过Channel或ChannelPipeline的通知:

技术分享图片

    6.2.2 修改ChannelPipeline

  6.3 状态模型

技术分享图片

  6.4 ChannelHandler和其子类

技术分享图片

    6.4.1 ChannelHandler中的方法

    6.4.2 ChannelInboundHandler

      channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
      channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
      channelActive,ChannelHandlerContext的Channel已激活
      channelInactive,ChannelHanderContxt的Channel结束生命周期
      channelRead,从当前Channel的对端读取消息
      channelReadComplete,消息读取完成后执行
      userEventTriggered,一个用户事件被处罚

      channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
      exceptionCaught,重写父类ChannelHandler的方法,处理异常

    6.4.3 ChannelOutboundHandler

      bind,Channel绑定本地地址
      connect,Channel连接操作
      disconnect,Channel断开连接
      close,关闭Channel
      deregister,注销Channel
      read,读取消息,实际是截获ChannelHandlerContext.read()
      write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
      flush,刷新消息到通道

7.编解码器Codec

  7.1 编解码器Codec

  7.2 解码器

    7.2.1 ByteToMessageDecoder

技术分享图片

    7.2.2 ReplayingDecoder      

      读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查;

    7.2.3 MessageToMessageDecoder

  7.3 编码器

技术分享图片.

    7.3.1 MessageToByteEncoder

    7.3.2 MessageToMessageEncoder

  7.4 编解码器

    7.4.1 byte-to-byte编解码器

    7.4.2 ByteToMessageCodec

    7.4.3 MessageToMessageCodec

  7.5 其他编解码方式

    7.5.1 CombinedChannelDuplexHandler

8.附带的ChannelHandler和Codec

  8.1 使用SSL/TLS创建安全的Netty程序

public class SslChannelInitializer extends ChannelInitializer<Channel> {
    private final SSLContext context;
    private final boolean client;
    private final boolean startTls;

    public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
        this.context = context;
        this.client = client;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.createSSLEngine();
        engine.setUseClientMode(client);
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}   

    setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),设置握手超时时间,ChannelFuture将得到通知
    setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置握手超时时间,ChannelFuture将得到通知
    getHandshakeTimeoutMillis(),获取握手超时时间值
    setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
    setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
    getCloseNotifyTimeoutMillis(),获取关闭通知超时时间
    handshakeFuture(),返回完成握手后的ChannelFuture
    close(),发送关闭通知请求关闭和销毁

  8.2 使用Netty创建HTTP/HTTPS程序

    8.2.1 Netty的HTTP编码器,解码器和编解码器

技术分享图片

      HttpRequestEncoder,将HttpRequest或HttpContent编码成ByteBuf
      HttpRequestDecoder,将ByteBuf解码成HttpRequest和HttpContent
      HttpResponseEncoder,将HttpResponse或HttpContent编码成ByteBuf
      HttpResponseDecoder,将ByteBuf解码成HttpResponse和HttpContent

    8.2.2 HTTP消息聚合

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpAggregatorInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
    }

}

    8.2.3 HTTP压缩

    8.2.4 使用HTTPS

    8.2.5 WebSocket

      BinaryWebSocketFrame,包含二进制数据
      TextWebSocketFrame,包含文本数据
      ContinuationWebSocketFrame,包含二进制数据或文本数据,BinaryWebSocketFrame和TextWebSocketFrame的结合体
      CloseWebSocketFrame,WebSocketFrame代表一个关闭请求,包含关闭状态码和短语
      PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame发送数据
      PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame响应

public class WebSocketServerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new HttpServerCodec(),
                new HttpObjectAggregator(65536),
                new WebSocketServerProtocolHandler("/websocket"),
                new TextFrameHandler(),
                new BinaryFrameHandler(),
                new ContinuationFrameHandler());
    }

    public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            // handler text frame
        }
    }

    public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
            //handler binary frame
        }
    }

    public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
            //handler continuation frame
        }
    }
}

    8.2.6 SPDY

      SPDY(读作“SPeeDY”)是Google开发的基于TCP的应用层协议,用以最小化网络延迟,提升网络速度,优化用户的网络使用体验。

  8.3 处理空闲连接和超时

    IdleStateHandler,当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent
    ReadTimeoutHandler,在指定时间内没有接收到任何数据将抛出ReadTimeoutException
    WriteTimeoutHandler,在指定时间内有写入数据将抛出WriteTimeoutException

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                "HEARTBEAT", CharsetUtil.UTF_8));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

  8.4 解码分隔符和基于长度的协议

    使用LineBasedFrameDecoder提取"\r\n"分隔帧:

/**
 * 处理换行分隔符消息
 *
 * @author c.k
 */
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something with the frame 
        }
    }
}

    8.4.2 长度为基础的协议

  8.5 写大数据

  8.6 序列化数据

    8.6.1 普通的JDK序列化

    8.6.2 通过JBoss编组序列化

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider))
                .addLast(new MarshallingEncoder(marshallerProvider))
                .addLast(new ObjectHandler());
    }

    public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
            // do something 
        }
    }
}

    8.6.3 使用ProtoBuf序列化

/**
 * 使用protobuf序列化数据,进行编码解码
 * 注意:使用protobuf需要protobuf-java-jar
 *
 * @author Administrator
 */
public class ProtoBufInitializer extends ChannelInitializer<Channel> {

    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufEncoder())
                .addLast(new ProtobufDecoder(lite))
                .addLast(new ObjectHandler());
    }

    public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
            // do something 
        }
    }
}

9.引导Netty应用程序

  9.1 不同的引导类型

  9.2 引导客户端和无连接协议

  9.2.1 引导客户端的方法    

    group(...),设置EventLoopGroup,EventLoopGroup用来处理所有通道的IO事件
    channel(...),设置通道类型
    channelFactory(...),使用ChannelFactory来设置通道类型
    localAddress(...),设置本地地址,也可以通过bind(...)或connect(...)
    option(ChannelOption<T>, T),设置通道选项,若使用null,则删除上一个设置的ChannelOption
    attr(AttributeKey<T>, T),设置属性到Channel,若值为null,则指定键的属性被删除
    handler(ChannelHandler),设置ChannelHandler用于处理请求事件
    clone(),深度复制Bootstrap,Bootstrap的配置相同
    remoteAddress(...),设置连接地址
    connect(...),连接远程通道
    bind(...),创建一个新的Channel并绑定

  9.2.2 怎么引导客户端

public class BootstrapingClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                System.out.println("Received data");
                msg.clear();
            }
        });
        ChannelFuture f = b.connect("1", 2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("connection finished");
                } else {
                    System.out.println("connection failed");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

    9.2.3 选择兼容通道实现

  9.3 使用ServerBootstrap引导服务器

    9.3.1 引导服务器的方法

      group(...),设置EventLoopGroup事件循环组
      channel(...),设置通道类型
      channelFactory(...),使用ChannelFactory来设置通道类型
      localAddress(...),设置本地地址,也可以通过bind(...)或connect(...)
      option(ChannelOption<T>, T),设置通道选项,若使用null,则删除上一个设置的ChannelOption
      childOption(ChannelOption<T>, T),设置子通道选项
      attr(AttributeKey<T>, T),设置属性到Channel,若值为null,则指定键的属性被删除
      childAttr(AttributeKey<T>, T),设置子通道属性
      handler(ChannelHandler),设置ChannelHandler用于处理请求事件
      childHandler(ChannelHandler),设置子ChannelHandler
      clone(),深度复制ServerBootstrap,且配置相同
      bind(...),创建一个新的Channel并绑定

    9.3.2 怎么引导服务器

public class BootstrapingServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                        System.out.println("Received data");
                        msg.clear();
                    }
                });
        ChannelFuture f = b.bind(2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("bound fail");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

  9.4 从Channel引导客户端

public class BootstrapingFromChannel {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    ChannelFuture connectFuture;

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        Bootstrap b = new Bootstrap();
                        b.channel(NioSocketChannel.class).handler(
                                new SimpleChannelInboundHandler<ByteBuf>() {
                                    @Override
                                    protected void channelRead0(ChannelHandlerContext ctx,
                                                                ByteBuf msg) throws Exception {
                                        System.out.println("Received data");
                                        msg.clear();
                                    }
                                });
                        b.group(ctx.channel().eventLoop());
                        connectFuture = b.connect(new InetSocketAddress("1", 2048));
                    }

                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                            throws Exception {
                        if (connectFuture.isDone()) {
                            // do something with the data 
                        }
                    }
                });
        ChannelFuture f = b.bind(2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("bound fail");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

  9.5 添加多个ChannelHandler

public class InitChannelExample {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializerImpl());
        ChannelFuture f = b.bind(2048).sync();
        f.channel().closeFuture().sync();
    }

    static final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new HttpClientCodec())
                    .addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
        }
    }
}

  9.6 使用通道选项和属性

    public static void main(String[] args) {
    //创建属性键对象 
        final AttributeKey<Integer> id = AttributeKey.valueOf("ID");
        //客户端引导对象 
        Bootstrap b = new Bootstrap();
        //设置EventLoop,设置通道类型 
        b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
                //设置ChannelHandler 
                .handler(new SimpleChannelInboundHandler<ByteBuf>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                            throws Exception {
                        System.out.println("Reveived data");
                        msg.clear();
                    }

                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        //通道注册后执行,获取属性值 
                        Integer idValue = ctx.channel().attr(id).get();
                        System.out.println(idValue);
                        //do something with the idValue 
                    }
                });
        //设置通道选项,在通道注册后或被创建后设置 
        b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        //设置通道属性 
        b.attr(id, 123456);
        ChannelFuture f = b.connect("www.manning.com", 80);
        f.syncUninterruptibly();
    }

10.单元测试代码

  10.1 General

    writeInbound(Object...),写一个消息到入站通道
    writeOutbound(Object...),写消息到出站通道
    readInbound(),从EmbeddedChannel读取入站消息,可能返回null
    readOutbound(),从EmbeddedChannel读取出站消息,可能返回null
    finish(),标示EmbeddedChannel已结束,任何写数据都会失败

技术分享图片

  10.2 测试ChannelHandler

    10.2.1 测试处理入站消息的handler

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }

}

    测试

public class FixedLengthFrameDecoderTest {

    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        // write bytes 
        Assert.assertTrue(channel.writeInbound(input));
        Assert.assertTrue(channel.finish());
        // read message 
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertNull(channel.readInbound());
    }

    @Test
    public void testFramesDecoded2() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        Assert.assertFalse(channel.writeInbound(input.readBytes(2)));
        Assert.assertTrue(channel.writeInbound(input.readBytes(7)));
        Assert.assertTrue(channel.finish());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertNull(channel.readInbound());
    }

}

    10.2.2 测试处理出站消息的handler

      解码器

public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg,
                          List<Object> out) throws Exception {
        while (msg.readableBytes() >= 4) {
            int value = Math.abs(msg.readInt());
            out.add(value);
        }
    }
}

      测试

public class AbsIntegerEncoderTest {

    @Test
    public void testEncoded() {
        //创建一个能容纳10个int的ByteBuf 
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1);
        }
        //创建EmbeddedChannel对象 
        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
        //将buf数据写入出站EmbeddedChannel 
        Assert.assertTrue(channel.writeOutbound(buf));
        //标示EmbeddedChannel完成 
        Assert.assertTrue(channel.finish());
        //读取出站数据 
        ByteBuf output = (ByteBuf) channel.readOutbound();
        for (int i = 1; i < 10; i++) {
            Assert.assertEquals(i, output.readInt());
        }
        Assert.assertFalse(output.isReadable());
        Assert.assertNull(channel.readOutbound());
    }

}

  10.3 测试异常处理

    解码器

public class FrameChunkDecoder extends ByteToMessageDecoder {

    // 限制大小 
    private final int maxFrameSize;

    public FrameChunkDecoder(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        // 获取可读字节数 
        int readableBytes = in.readableBytes();
        // 若可读字节数大于限制值,清空字节并抛出异常 
        if (readableBytes > maxFrameSize) {
            in.clear();
            throw new TooLongFrameException();
        }
        // 读取ByteBuf并放到List中 
        ByteBuf buf = in.readBytes(readableBytes);
        out.add(buf);
    }

}

    测试代码

public class FrameChunkDecoderTest {

    @Test
    public void testFramesDecoded() {
        //创建ByteBuf并填充9字节数据 
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        //复制一个ByteBuf 
        ByteBuf input = buf.duplicate();
        //创建EmbeddedChannel 
        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
        //读取2个字节写入入站通道 
        Assert.assertTrue(channel.writeInbound(input.readBytes(2)));
        try {
            //读取4个字节写入入站通道 
            channel.writeInbound(input.readBytes(4));
            Assert.fail();
        } catch (TooLongFrameException e) {

        }
        //读取3个字节写入入站通道 
        Assert.assertTrue(channel.writeInbound(input.readBytes(3)));
        //标识完成 
        Assert.assertTrue(channel.finish());
        //从EmbeddedChannel入去入站数据 
        Assert.assertEquals(buf.readBytes(2), channel.readInbound());
        Assert.assertEquals(buf.skipBytes(4).readBytes(3),
                channel.readInbound());
    }

}

11.WebSocket

12.SPDY

13.通过UDP广播事件

14..实现自定义编解码器

  14.1编解码器的范围

  14.2实现memcached编解码器

  14.3了解memcached二进制协议

  14.4 Netty编码器和解码器

15.选择正确的线程模型

  15.1线程模型概述

  15.2事件循环

    15.2.1使用事件循环

    15.2.2 Netty 4的I/O业务

    15.2.3 Netty 3的I/O业务

    15.2.4 Nettys线程模型内部件

  15.3为以后的执行安排任务

    15.3.1使用普通Java API调度任务

    15.3.2使用事件循环调度任务

    15.3.3计划实施内部

    15.4 I/O线程分配的详细情况

16.用Eventloop注销/重新注册

Netty in Action Version5

标签:change   2.4   else   enc   bin   main   util   list   site   

原文地址:https://www.cnblogs.com/plxz/p/9550330.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!