标签:
在上节的解读中发现spark的源码中大量使用netty的buffer部分的api,该节将看到netty核心的一些api,比如channel:
private static class ClientPool {
TransportClient[] clients;
Object[] locks;
public ClientPool(int size) {
clients = new TransportClient[size];
locks = new Object[size];
for (int i = 0; i < size; i++) {
locks[i] = new Object();
}
}
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
// Create the ClientPool if we don‘t have it yet.
ClientPool clientPool = connectionPool.get(address);
if (clientPool == null) {
connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(address);
}
int clientIndex = rand.nextInt(numConnectionsPerPeer);
TransportClient cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null && cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
return cachedClient;
}
// If we reach here, we don‘t have an existing connection open. Let‘s create a new one.
// Multiple threads might race here to create new connections. Keep only one of them active.
synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) {
if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient);
return cachedClient;
} else {
logger.info("Found inactive connection to {}, creating a new one.", address);
}
}
clientPool.clients[clientIndex] = createClient(address);
return clientPool.clients[clientIndex];
}
}
*/
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
throws IOException {
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
return createClient(address);
}
/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
logger.debug("Creating new connection to " + address);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle‘s Algorithm since we don‘t want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
TransportClient client = clientRef.get();
Channel channel = channelRef.get();
assert client != null : "Channel future completed successfully with null client";
// Execute any client bootstraps synchronously before marking the Client as successful.
long preBootstrap = System.nanoTime();
logger.debug("Connection to {} successful, running bootstraps...", address);
try {
for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
clientBootstrap.doBootstrap(client, channel);
}
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
client.close();
throw Throwables.propagate(e);
}
long postBootstrap = System.nanoTime();
logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
return client;
}
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
public void fetchChunk(
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
timeTaken);
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
serverAddr, future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
}
public void stream(final String streamId, final StreamCallback callback) {
final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
// Need to synchronize here so that the callback is added to the queue and the RPC is
// written to the socket atomically, so that callbacks are called in the right order
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
channel.writeAndFlush(new StreamRequest(streamId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
timeTaken);
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
serverAddr, future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
}
}
public byte[] sendRpcSync(byte[] message, long timeoutMs) {
final SettableFuture<byte[]> result = SettableFuture.create();
sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(byte[] response) {
result.set(response);
}
@Override
public void onFailure(Throwable e) {
result.setException(e);
}
});
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
标签:
原文地址:http://www.cnblogs.com/gaoxing/p/4985559.html