标签:
在上节的解读中发现spark的源码中大量使用netty的buffer部分的api,该节将看到netty核心的一些api,比如channel:
private static class ClientPool {TransportClient[] clients;Object[] locks;public ClientPool(int size) {clients = new TransportClient[size];locks = new Object[size];for (int i = 0; i < size; i++) {locks[i] = new Object();}}
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {// Get connection from the connection pool first.// If it is not found or not active, create a new one.final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);// Create the ClientPool if we don‘t have it yet.ClientPool clientPool = connectionPool.get(address);if (clientPool == null) {connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));clientPool = connectionPool.get(address);}int clientIndex = rand.nextInt(numConnectionsPerPeer);TransportClient cachedClient = clientPool.clients[clientIndex];if (cachedClient != null && cachedClient.isActive()) {logger.trace("Returning cached connection to {}: {}", address, cachedClient);return cachedClient;}// If we reach here, we don‘t have an existing connection open. Let‘s create a new one.// Multiple threads might race here to create new connections. Keep only one of them active.synchronized (clientPool.locks[clientIndex]) {cachedClient = clientPool.clients[clientIndex];if (cachedClient != null) {if (cachedClient.isActive()) {logger.trace("Returning cached connection to {}: {}", address, cachedClient);return cachedClient;} else {logger.info("Found inactive connection to {}, creating a new one.", address);}}clientPool.clients[clientIndex] = createClient(address);return clientPool.clients[clientIndex];}}
*/public TransportClient createUnmanagedClient(String remoteHost, int remotePort)throws IOException {final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);return createClient(address);}/** Create a completely new {@link TransportClient} to the remote address. */private TransportClient createClient(InetSocketAddress address) throws IOException {logger.debug("Creating new connection to " + address);Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(socketChannelClass)// Disable Nagle‘s Algorithm since we don‘t want packets to wait.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()).option(ChannelOption.ALLOCATOR, pooledAllocator);final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {TransportChannelHandler clientHandler = context.initializePipeline(ch);clientRef.set(clientHandler.getClient());channelRef.set(ch);}});// Connect to the remote serverlong preConnect = System.nanoTime();ChannelFuture cf = bootstrap.connect(address);if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {throw new IOException(String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));} else if (cf.cause() != null) {throw new IOException(String.format("Failed to connect to %s", address), cf.cause());}TransportClient client = clientRef.get();Channel channel = channelRef.get();assert client != null : "Channel future completed successfully with null client";// Execute any client bootstraps synchronously before marking the Client as successful.long preBootstrap = System.nanoTime();logger.debug("Connection to {} successful, running bootstraps...", address);try {for (TransportClientBootstrap clientBootstrap : clientBootstraps) {clientBootstrap.doBootstrap(client, channel);}} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scalalong bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);client.close();throw Throwables.propagate(e);}long postBootstrap = System.nanoTime();logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);return client;}
private final Channel channel;private final TransportResponseHandler handler;@Nullable private String clientId;
public void fetchChunk(long streamId,final int chunkIndex,final ChunkReceivedCallback callback) {final String serverAddr = NettyUtils.getRemoteAddress(channel);final long startTime = System.currentTimeMillis();logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);handler.addFetchRequest(streamChunkId, callback);channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {long timeTaken = System.currentTimeMillis() - startTime;logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,timeTaken);} else {String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,serverAddr, future.cause());logger.error(errorMsg, future.cause());handler.removeFetchRequest(streamChunkId);channel.close();try {callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));} catch (Exception e) {logger.error("Uncaught exception in RPC response callback handler!", e);}}}});}
public void stream(final String streamId, final StreamCallback callback) {final String serverAddr = NettyUtils.getRemoteAddress(channel);final long startTime = System.currentTimeMillis();logger.debug("Sending stream request for {} to {}", streamId, serverAddr);// Need to synchronize here so that the callback is added to the queue and the RPC is// written to the socket atomically, so that callbacks are called in the right order// when responses arrive.synchronized (this) {handler.addStreamCallback(callback);channel.writeAndFlush(new StreamRequest(streamId)).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {long timeTaken = System.currentTimeMillis() - startTime;logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,timeTaken);} else {String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,serverAddr, future.cause());logger.error(errorMsg, future.cause());channel.close();try {callback.onFailure(streamId, new IOException(errorMsg, future.cause()));} catch (Exception e) {logger.error("Uncaught exception in RPC response callback handler!", e);}}}});}}
public byte[] sendRpcSync(byte[] message, long timeoutMs) {final SettableFuture<byte[]> result = SettableFuture.create();sendRpc(message, new RpcResponseCallback() {@Overridepublic void onSuccess(byte[] response) {result.set(response);}@Overridepublic void onFailure(Throwable e) {result.setException(e);}});try {return result.get(timeoutMs, TimeUnit.MILLISECONDS);} catch (ExecutionException e) {throw Throwables.propagate(e.getCause());} catch (Exception e) {throw Throwables.propagate(e);}}
标签:
原文地址:http://www.cnblogs.com/gaoxing/p/4985559.html