标签:
TransportContext用来创建TransportServer和TransportclientFactory,同时使用TransportChannelHandler用来配置channel的pipelines,TransportClient提供了两种传输协议,一个是数据层(fetch chunk),一个是控制层(rpc)。rpc的处理需要用户提供一个RpcHandler来处理,它负责建立一个用于传输的流, 使用zero-copy以块的形式进行数据传输。TransportServer和TransportClientFactory为每个channel都创建了一个TransportChannelHandler,每个TransportChannelHandler都包含一个TransportClient,这样服务端可以使用该client向客户端发送消息。
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {TransportResponseHandler responseHandler = new TransportResponseHandler(channel);TransportClient client = new TransportClient(channel, responseHandler);TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,rpcHandler);return new TransportChannelHandler(client, responseHandler, requestHandler,conf.connectionTimeoutMs(), closeIdleConnections);}
public TransportChannelHandler initializePipeline(SocketChannel channel,RpcHandler channelRpcHandler) {try {TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);channel.pipeline().addLast("encoder", encoder).addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()).addLast("decoder", decoder).addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this// would require more logic to guarantee if this were not part of the same event loop..addLast("handler", channelHandler);return channelHandler;} catch (RuntimeException e) {logger.error("Error while initializing Netty pipeline", e);throw e;}}
private void init(String hostToBind, int portToBind) {IOMode ioMode = IOMode.valueOf(conf.ioMode());EventLoopGroup bossGroup =NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");EventLoopGroup workerGroup = bossGroup;PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NettyUtils.getServerChannelClass(ioMode)).option(ChannelOption.ALLOCATOR, allocator).childOption(ChannelOption.ALLOCATOR, allocator);if (conf.backLog() > 0) {bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());}if (conf.receiveBuf() > 0) {bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if (conf.sendBuf() > 0) {bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());}bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {RpcHandler rpcHandler = appRpcHandler;for (TransportServerBootstrap bootstrap : bootstraps) {rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);}context.initializePipeline(ch, rpcHandler);}});InetSocketAddress address = hostToBind == null ?new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);channelFuture = bootstrap.bind(address);channelFuture.syncUninterruptibly();port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();logger.debug("Shuffle server started on port :" + port);}
标签:
原文地址:http://www.cnblogs.com/gaoxing/p/4985665.html