服务端采用reactor的模式(Java nio)的方式来处理客户端的请求并给予响应。
客户端代码在Hadoop common中的ipc包里,主要类为client.java。负责通信的内部类是Client.Connection,Connection中包括以下几个属性
private InetSocketAddress server;// 连接服务端的地址
private final ConnectionId remoteId;//connection复用,此类是为了复用连接而创建的,在client类中有一个连接池属性Hashtable<ConnectionId, Connection> connections,此属性表示如果多个客户端来自同一个remoteID连接,如果connection没有关闭,那么就复用这个connection。那么如何判断是来自同一个ConnectionId呢,见下面的代码。
/** *ConnectionId类重写了equals方法 * **/ @Override public boolean equals(Object obj) { if (obj == this) { return true; } if (obj instanceof ConnectionId) { ConnectionId that = (ConnectionId) obj; //同一个远端服务地址,即要连接同一个服务端 return isEqual(this.address, that.address) && this.doPing == that.doPing && this.maxIdleTime == that.maxIdleTime && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy) && this.pingInterval == that.pingInterval //同一个远程协议,像datanode与namenode,client与 //namenode等之间通信的时候都各自有自己的协议, //如果不是同一个协议则使用不同的连接 && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout && this.tcpNoDelay == that.tcpNoDelay && isEqual(this.ticket, that.ticket); } return false; }
private DataInputStream in;//输入
private DataOutputStream out;//输出
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//Call类是client的内部类,将客户端的请求,服务端的响应等信息封装成一个call类,在后面我们会详细分析此类。而calls属性是建立连接后进行的多次消息传送,也就是我们每次建立连接可能会在连接有效期间发送了多次请求。
private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { //running是client的一个属性,表示客户端现在是否向服务端进行请求,如果没有running(running是一个AtomicBollean原子布尔类的对象)就是返回false if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; do { synchronized (connections) { //判断是否存在对应的连接没有则新建 connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId, serviceClass); connections.put(remoteId, connection); } } //addCall中判断当获取取得接应该关闭了,则不能将call放到这个关闭的连接中 } while (!connection.addCall(call)); //进行输入输出对象初始化 connection.setupIOstreams(fallbackToSimpleAuth); return connection; } private synchronized boolean addCall(Call call) { //shouldCloseConnection也是connection类的属性,当连接异常,或者客户端要断开连接是,它返回false,说明这个连接正在回收中,不能继续使用。 if (shouldCloseConnection.get()) return false; calls.put(call.id, call); notify(); return true; }
getConnection方法只是初始化了connection对象,并将要发送的请求call对象放入连接connection中,其实还并没有与客户端进行通信。开始通信的方法是setupIOstreams方法,此方法不仅建立与服务端通信的输入输出对象,还进行消息头的发送,判断能否与服务端进行连接,由于Hadoop有很多个版本,而且并不是每个版本之间都能进行完美通信的。所以不同版本是不能通信的,消息头就是负责这个任务的,消息头中也附带了,通信的协议,说明到底是谁和谁之间通信(是client和namenode还是datanode和namenode,还是yarn中的resourceManage 和nodemanage等等)。
//省略了部分代码 private synchronized void setupIOstreams( AtomicBoolean fallbackToSimpleAuth) { if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connecting to " + server); } short numRetries = 0; Random rand = null; while (true) { //connection一些初始化信息,建立socket,初始socket等等操作 setupConnection(); //初始输入 InputStream inStream = NetUtils.getInputStream(socket); //初始输出 OutputStream outStream = NetUtils.getOutputStream(socket); //向服务端写消息头信息 writeConnectionHeader(outStream); . . . . . . writeConnectionContext(remoteId, authMethod); //connection连接有一定的超时限制,touch方法进行时间更新将连接最新时间更新到现在。 touch(); if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connected to " + server); } // connection类继承自thread类,在其run方法中开始接收服务端的返回消息,详见下面run方法 start(); return; } } catch (Throwable t) { if (t instanceof IOException) { markClosed((IOException)t); } else { markClosed(new IOException("Couldn‘t set up IO streams", t)); } //如果出现错误就关闭连接, close(); } }
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //waitForWork方法判断当前连接是否处于工作状态, while (waitForWork()) {//wait here for work - read or close connection //接受消息 receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don‘t leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } //connection已经关闭,进行连接回收,包括输入输出的回收将连接从连接池中清除等 close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
//接收服务端返回的信息 private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { //对返回消息的处理,分布式消息的处理方式有很多种,一种是定长格式,一种是不定长,定长方式很容易理解,不定长中包含了消息的长度,在消息头处,则可以容易的读出消息准确长度,并进行处理。 int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); //每个连接中有很多个call,call类中有一个callId的属性,类似于mac地址在对应的集群中是唯一的,从而能让客户端和服务端能够准去的处理请求。 int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); //获取正在处理的call Call call = calls.get(callId); //处理状态,RpcStatusProto是一个枚举类,有三种状态成功,错误,连接关闭。 RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { //通过反射方式获取返回的消息值 Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value //处理完成后将call从calls中删除掉 calls.remove(callId); //将返回值放到client的结果值中 call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc Request failed // Verify that length was correct if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(re); } } } catch (IOException e) { markClosed(e); } } //此方法是call中的 public synchronized void setRpcResponse(Writable rpcResponse) { //将结果值放到返回值中 this.rpcResponse = rpcResponse; //当前call已处理完毕, callComplete(); } //此方法是call中的 protected synchronized void callComplete() { //done=true表示此call已经处理完成 this.done = true; //在处理call的时候采用的是同步处理方案,所有处理完后要唤醒其中一个还未处理的call notify(); // notify caller }