标签:
与IPC相关的代码在org.apache.hadoop.ipc包下。共七个文件,其中4个辅助类:
RemoteException
Status
VersionedProtocol
ConnectionHeader
主要实现类3个:
Client
Server
RPC
如上图:
与IPC连接相关的
与远程调用Call相关的
与IPC连接相关的
与远程调用Call相关的
RPC是在Server及Client的基础上实现了Hadoop IPC。
与客户端相关的功能:
与服务端相关的功能:
客户端与服务器端对连接的抽象不一样,所以有Server.Connection和Client.Connection。Hadoop远程调用采用TCP协议通信。
1)客户端Client.ConnectionId
连接复用:当多个IPC客户端的ConnectionId相同时,他们共享一个IPC连接。连接复用可以减少Hadoop Server、Client的资源占用,同时节省IPC连接时间。
2)ConnectionHeader
Server与Client间TCP连接建立后交换的第一条信息,包含ConnectionId.ticket(UserGroupInformation)用户信息和IPC接口信息,检验是否实现了IPC接口,以及该用户是否有权使用接口。
建立连接后,即可以进行远程过程调用服务,即对IPC接口方法的调用,源码抽象为Call。
远程调用Client.Call对象和Server.Call对象,是一个IPC调用产生的,存在于IPC客户端(存根)和IPC服务端(骨架)中的实体。
Client.Call对象通过IPC连接到服务器后,自然会构成相应的Server.Call对象。
如上图所示流程:
IPC连接
服务器端的IPC连接代码分散在Listener和Server.Connection中。
Listener.run() 实现了NIO中的选择器循环。如下代码:
//Listener构造函数 public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); }
Listener.run()开启选择器循环,并处理Accept请求,如下:
//Listener运行函数 public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } }
doAccept()中通过server.accpet获取SocketChannel,并获取一个Reader对象,该对象包含一个Selector:readerSelector,通过reader.registerChannel,将SocketChannel注册到readerSelector下.并新建connection对象。
//Do_Accept void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } }
public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); }
标签:
原文地址:http://www.cnblogs.com/dorothychai/p/4180738.html