码迷,mamicode.com
首页 > 其他好文 > 详细

细水长流话Hadoop(2)Hadoop RPC 客户端

时间:2014-07-19 09:02:01      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   java   color   

声明:个人原创,转载请注明出处。文中引用了一些网上或书里的资料,如有不妥之处请告之。

本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分。Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路。

1 概述... 1

2 相关技术... 3

2.1 序列化... 3

2.2 动态代理... 3

2.3 Java并发控制... 5

2.3.1 synchronized. 5

2.3.2 Object.wait. 6

2.3.3 Object.notify. 7

2.3.4 ConcurrentHashMap. 7

2.3.5 LinkedBlockingQueue vs ConcurrentLinkedQueue. 8

2.3.6 Collections.synchronizedList. 8

2.4 JAAS. 8

2.5 NIO. 8

2.6 反射... 9

3 RPC客户端(org.apache.hadoop.ipc.Client.java)... 9

3.1 客户端启动... 9

3.2 客户端发出RPC调用流程... 13

3.2.1 调用初始化... 13

3.2.2 调用流程... 14

3.3 数据结构... 21

3.4 Client特别处理(异常、错误恢复等)... 23

4 RPC服务器(org.apache.hadoop,ipc.Server)... 24

4.1 服务器初始化... 24

4.2 服务器执行流程... 29

4.2.1 Listener线程... 29

4.2.2 Handler线程... 38

4.2.3 Responder线程... 42

4.3 服务器数据结构... 48

4.4 服务器特别处理... 49

1 概述

Hadoop RPC的实现依赖多方面的技术,这些技术分别是:序列化、动态代理、Java安全框架、Java并发控制、NIO和反射。这些技术单个来讲都是比较容易理解的,结合起来就有点复杂,特别是NIO和并发控制,让RPC的具体处理过程略显模糊。

RPC属于进程间通信(IPC)的一种方式,因此RPC的核心实现代码都在org.apache.hadoop.ipc包内。RPC一般实现为C/S结构,有客户端,有服务器提供服务。客户端和用户代码连接在一起,用户调用客户端提供的API,客户端完成方法和参数的封闭序列化,通过网络发送给服务器,服务器Hadoop源代码中与RPC相关的源代码文件有如下几个:

文件

说明

Client

org.apache.hadoop.ipc

RPC客户端,维护到服务器端的连接(Client.Connection类),每个连接都维护在此连接上的Client.Call列表,每个Call都是一次RPC调用。一个RPC调用有输入参数和返回值,调用过程可能会有异常,一个Connection里不同的Call都有自己的id,以示区别此Connection上的其它Call。

Client.ConnectionId由<remoteAddress,protocol,ticket>唯一标识,也就是说:一个客户端到某一RPC服务器上运行的RPC服务protocol只有一个连接。

Server

org.apache.hadoop.ipc

RPC服务器的抽象类,这个抽象类只有一个call抽象方法,这个call抽象方法在RPC.java里实现,这个call方法会被Server.Handler线程执行,调用Client请求的方法。

RPC

org.apache.hadoop.ipc

PRC是Server类的一个实现,其中主要定义了Client缓存、动态代理实现客户端调用和RPC Server类的call实现。

ConnectionHeader

org.apache.hadoop.ipc

客户端每建立一个同RPC服务器的连接,在发送所有其它数据之前,都会先发送ConnectionHeader,这相当于RPC客户端和服务器的接头信息。

Status

org.apache.hadoop.ipc

定义了SUCCESS、ERROR和FATAL三种调用执行结果状态。

VersionedProtocol

org.apache.hadoop.ipc

由于Hadoop RPC使用了Java动态代理,Java动态代理可以将接口和实例分离,客户端只要知道接口,并拥有实现该接口的实例就可调用实例中的对应方法,所以用来实现RPC调用协议非常方便。

VersionedProtocol为这种调用协议定义了版本控制,所有协议接口(比如DatanodeProtocol)都实现了VersionedProtocol。

协议接口

各个不同的包

bubuko.com,布布扣

2 相关技术

2.1 序列化

Hadoop RPC客户端要将方法调用参数发送给服务器,服务器也要将执行结果发回给客户端,Hadoop里远程调用一般要在网络上传输数据,所以需要将调用参数数据进行序列化,形成标准数据格式,比如JSON或二进制,这就要求一套可对各种数据,包括对象转换成标准数据格式或反之的系统,可以使用通用系统,但这里Hadoop使用的是Hadoop系统内的序列化系统。

2.2 动态代理

Java动态代理使得开发人员不用手工编写代理类,只要简单地指定一组接口及委托类对象,便能动态地获得代理类。代理类会负责将所有的方法调用分派到委托对象上反射执行,在分派执行的过程中,开发人员还可以按需调整委托类对象及其功能。

什么时候该使用代理呢?假设有一个接口I,类C实现了接口I,现在要求在C的对象上调用的任何I接口定义的方法都应该记录到日志里,或者对调用请求进行某些处理,该怎么办呢?如果有类C的实现代码,当然可以修改每一个需要监控的方法,如果没有就会比较麻烦。而使用动态代理,所有对类C对象的调用都首先转发到对InvocationHandler.invoke方法的调用,而invoke方法是用户自己实现的,可以在invoke方法里做任何事情。

假设RPC客户端是一个方法调用接口,每当通过此方法调用接口向RPC服务器发出调用时,肯定需要将方法名称、方法参数类型和方法参数都要传递给RPC服务器(“调用哪个方法”可以将方法名称(method.getName方法返回)和参数类型(method.getParameterTypes方法返回)传递给服务器,服务器根据class.getMethod(name,parameterTypes)返回方法,然后调用Method.invoke即可执行方法)。所以在调用一个远程方法时,RPC客户端首先将这些参数打包,然后发送给RPC服务器。打包、发送的操作都是在调用方法时发生的,所以客户端发出的RPC调用,实际上是调用代理对象的方法,这个代理对象再转而去调用真正的方法。这里的“转而”之间,就何以做成网络通信,实现远程调用。

Hadoop使用Java动态代理主要有两个类或接口,首先是InvocationHandler的核心方法:

// 该方法负责集中处理动态代理类上的所有方法调用。第一个参数既是代理类实例,第二个参数是被调用的方法对象

// 第三个方法是调用参数。调用处理器根据这三个参数进行预处理或分派到委托类实例上发射执行

Object invoke(Object proxy, Method method, Object[] args)

每次生成动态代理类对象时都需要指定一个实现了指定接口的调用处理器对象。我们现在希望获得这样一个对象:通过这个对象可以调用指定接口定义的方法,而这个调用实际上执行的是InvocationHandler实现类对象的invoke方法,获得这个对象的方法就是调用java.lang.reflect.Proxy.newProxyInstance方法,我们称newProxyInstance返回的这个对象为“代理类实例”:

// 方法 1: 该方法用于获取指定代理对象所关联的调用处理器

static InvocationHandler getInvocationHandler(Object proxy)

// 方法 2:该方法用于获取关联于指定类装载器和一组接口的动态代理类的类对象

static Class getProxyClass(ClassLoader loader, Class[] interfaces)

// 方法 3:该方法用于判断指定类对象是否是一个动态代理类

static boolean isProxyClass(Class cl)

// 方法 4:该方法用于为指定类装载器、一组接口及调用处理器生成动态代理类实例

static Object newProxyInstance(ClassLoader loader, Class[] interfaces,

InvocationHandler h)

newProxyInstance方法的第一个参数是协议接口的ClassLoader,interfaces数组是代理类实例要代理的所有方法定义的接口,h就是实现这些接口InvocationHandler对象。可将代理类实例转换为interfaces中的任何一个接口,通过代理类实例发出的调用实际上执行的都是h.invoke方法。

下面是一个简单的动态代理示例(这些代码不在一个文件):

public interface VersionedProtocol{
    /**
     * 获取此协议的版本号
     */
    int getVersion(String protocolName) throws IOException;
}

public interface MathProtocol extends VersionedProtocol{
    int versionID = 0;
    
    public double add(double a,double b);
}

public class Math implements MathProtocol {
    public int getVersion(String protocolName) throws IOException{
        if(protocolName.equals(MathProtocol.class.getName())){
            return MathProtocol.versionID;
        } else {
            throw new IOException("wrong protocol");
        }
    }
    
    @Override
    public double add(double a,double b){
        return a + b;
    }
}
class Invoker implements InvocationHandler{
    private VersionedProtocol instance;
    
    public Invoker(VersionedProtocol object){
        this.instance = object;
    }
    
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable{
        Object ret = method.invoke(instance, args);
        return ret;
    }
}

public class NDFSProxy{
    public static void main(String[] args){
        Invoker invoker = new Invoker(new Math());
        MathProtocol math = (MathProtocol)Proxy.newProxyInstance(
                MathProtocol.class.getClassLoader(),
                new Class[]{MathProtocol.class}
                invoker);
        double ret = (double)math.add(1, 2);
        System.out.println(ret);
    }
}

这就是Java动态代理的基本原理,在RPC的具体使用处会详细说明Hadoop是怎么应用的。

2.3 Java并发控制

Java安全框架包含相当多的内容,这里只描述synchronized关键字,Object.wait和Object.notify方法。

应该什么时候同步呢?可以运用Brian的同步规则:

如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须同相同的监视器锁同步。

2.3.1 synchronized

共享资源一般是以对象形式存在的内存片段,但也可以是文件、输入/输出端口,或者是打印机。要控制对共享资源的访问,得把所有要访问这个资源的方法标记为synchronized。如果某个线程处于一个对标记为synchronized的方法的调用中,那么在这个线程从该方法返回之前,其他所有要调用类中任何标记为synchronized的方法的线程都会阻塞。

所有对象都自动含有单一的锁(也称监视器)。当在对象上调用其任意synchronized方法的时候,此对象都被加锁,这时该对象上的所有synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被其它线程调用。比如对于类:

class cls{

    synchronized void f(){/*...*/}

    synchronized void g(){/*...*/}

}

如果某个线程对对象调用了f(),对于同一个对象而言,就只能等到f()调用结束并释放了锁之后,其他线程才能调用f()和g()。所以,对于某个特定对象来说,其所有synchronized方法共享同一个锁,这可以被用来防止多个线程同时访问被编码为对象的内存。

在使用并发时,将域设置为private是非常重要的,否则,任意其它线程都可访问公共域,synchronized关键字就不能防止其它任务直接访问域,这样就会产生冲突。

一个线程可以多次获得对象的锁。如果一个synchronized方法在同一个对象上调用了第二个方法,后者又调用了同一对象上的另一个方法,就会发生这种情况。JVM负责跟踪对象被加锁的次数。如果一个对象被解锁(即锁被完全释放),其计数变为0。在线程第一次给对象加锁时,计数变为1。每当这个相同的线程在这个对象上获得锁时,计数都会递增。显然,只有首先获得锁的线程才能允许继续获取多个锁。每当任务离开一个synchronized方法,计数递减,当计数变为零的时候,锁被完全释放,此时别的线程就可以使用此资源。

针对每个类,也有一个锁(作为类的Class对象的一部分),所以synchronized static方法可以在类的范围内防止对static数据的并发访问。

如果在类中有超过一个方法在处理临界数据,那么必须同步所有相关的方法。如果只同步一个方法,那么其他方法将会随意地忽略这个对象锁,并可以在无任何惩罚情况下被调用。这是很重要的一点,每个访问临界共享资源的方法都必须被同步,否则它就不会正确地工作。

2.3.2 Object.wait

Thread.sleep方法会让当前线程无条件地睡眠指定时间,而Object.wait方法会导致当前的线程等待,直到其他线程调用此对象的notify方法或notifyAll方法,或者超过指定的时间量。

当前的线程必须拥有此对象的监视器,才能调用wait方法,关于如何拥有此对象的监视器,可参考2.3.3节。

此方法导致当前线程(称之为线程T)将其自身放置在对象的等待线程集中,然后放弃此对象上的所有同步要求。出于线程调度目的,线程T被禁用,且处于休眠状态,直到发生以下四种情况之一:

l 其他某个线程调用此对象的notify方法,并且线程T碰巧被任选为被唤醒的线程;

l 其他某个线程调用此对象的notifyAll方法;

l 其他某个线程中断线程T;

l 已经达到指定的超时时间。但是,如果timeout为零,则不考虑实际时间,该线程将一直等待,直到获得通知(notify或notifyAll)。

然后,从对象的等待集中删除线程T,并重新进行线程调度。然后,该线程以常规方式与其他线程竞争,以获得该对象上同步的权利;一旦获得对该对象的控制权,该对象上的所有其同步声明都将被还原到以前的状态 — 这就是调用wait方法时的情况。然后线程T从wait方法的调用中返回。所以,从wait方法返回时,该对象和线程T的同步状态与调用wait方法时的情况完全相同。

在没有被通知、中断或超时的情况下,线程也可能被唤醒,即所谓的虚假唤醒(spurious wakeup)。虽然这种情况在实践中很少发生,但是应用程序必须通过以下方式防止其发生,即对应该导致该线程被提醒的条件进行测试,如果不满足条件,则继续等待。换句话说,等待应总是发生在循环中,如下面的示例:

synchronized (obj) {
    while (<condition does not hold>){
        obj.wait(timeout);
        ... // Perform action appropriate to condition
    }
}

如果当前线程在等待时被其他线程中断,则会抛出InterruptedException。在按上述形式恢复此对象的锁定状态时才会抛出此异常。

注意,由于wait方法将当前的线程放入对象的等待集中,所以它只能解除此对象的锁定,可以同步当前线程的任何其他对象在线程等待时仍处于锁定状态。也就是说,各对象对线程的锁定是独立的,一个对象对线程解锁,其它对象对线程的锁定还在。

2.3.3 Object.notify

唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。线程通过调用其中一个wait方法,在对象的监视器上等待。

直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。例如,唤醒的线程在作为锁定此对象的下一个线程方面没有可靠的特权或劣势。

此方法只应由作为此对象监视器的所有者的线程来调用。通过以下三种方法之一,线程可以成为此对象监视器的所有者:

l 通过执行此对象的同步(Synchronized)实例方法;

l 通过执行在此对象上进行同步的synchronized语句块;

l 对于Class类型的对象,可以通过执行该类的同步静态方法(synchronized static方法)。

一次只能有一个线程拥有对象的监视器。

2.3.4 ConcurrentHashMap

ConcurrentHashMap支持检索的完全并发和更新的所期望可调整并发。此类相对于HashMap,实现的核心思想就是细粒度的锁可以提高性能。不同的线程对HashMap的修改操作,可能因为线程执行顺序不同而使结果不一致(比如两个线程put数据到同一桶中,如果没有并发控制,同时进行put得到一个相同的空内存块,那么线程执行顺序不同,这块内存中的值也不同。相反,如果有并发控制,一定会分配两块内存)。ConcurrentHashMap对检索不加锁,而对写操作加细粒度的锁,对ConcurrentHashMap进行写入操作时,ConcurrentHashMap内部尽量对最小部分的数据对象进行加锁,当不同线程写入ConcurrentHashMap的不同部分时,这样做可支持并发写而不违反正确性。

HashMap内部实现为许多桶,每个桶管理许多的HashEntry,那么对某个桶的操作,ConcurrentHashMap会单独对某个桶加锁而不是对整个Map对象加锁。而使用HashMap,要现实同步,必须对整个Map进行加锁,所以ConcurrentHashMap是一种更粒度的锁,显然会提高性能。如下图是ConcurrentHashMap内部的数据结构:

bubuko.com,布布扣

即使ConcurrentHashMap是线程安全的,也不能依赖这种线程安全实现自己的业务逻辑的正确性。比如我们自己有两个线程同时对ConcurrentHashMap进行同一key的put操作,线程执行顺序不一致,最后保存的value也不同。所以ConcurrentHashMap的线程安全更多的是对ConcurrentHashMap内部共享数据结构的保护,而不能对用户的业务逻辑进行保护。

2.3.5 LinkedBlockingQueue vs ConcurrentLinkedQueue

在JDK中对BlockingQueue的描述是BlockingQueue支持两个附加操作,这两个附加操作就是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

这两个队列都是线程安全的,也就是允许多个线程调用同一对象的方法而对其内部数据结构没有破坏。这两个LinkedQueue的区别就是:从队列中取一个元素时,如果LinkedBlockingQueue是空的,调用线程将会一直等待,直到队列非空,而对于ConcurrentLinkedQueue,如果队列为空,则会立即返回空值。

所以LinkedBlockingQueue适用于严格地生产者消费者线程,因为它就像一个加了大粒度的锁,用它编写灵活的程序性能可能不会很高,而ConcurrentLinkedQueue就像所谓的无锁编程一样,加了细粒度的锁,因此性能可以很高。

2.3.6 Collections.synchronizedList

线程安全集合有ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListMap和ConcurrentSkipListSet,除此之外的其它集合,如LinkedList,就只能由Collections.synchronizedList来构造了。Collections.synchronizedList可经LinkedList添加线程安全的特性,这种特性与上述的ConcurrentHashMap类似,也是只保护List的内部数据对象,用户自己的业务逻辑的同步还是要由用户来保证。

2.4 JAAS

JAAS可分为两部分,认证和授权。认证是为了验证用户身份,授权是为了了控制用户的权限。Hadoop当前版本实现的安全认证和授权框架徒有其表,因此默认并没有开启认证。

更详细的内容,参考“认证&授权&安全”的主题。

2.5 NIO

Java NIO主要构成类有三类:ByteBuffer字节缓冲、Channel和Selector。Channel注册到Selector上得得SelectionKey,可注册四种操作:

操作

触发条件

OP_READ

如果Selector检测到如下几种事件就会设置此操作位:

l 相关通道读取就绪;

l 已经到达流末端;

l 已经被远程关闭而无法进行进一步读取;

l 或者有一个挂起的错误。

OP_WRITE

如果Selector检测到如下几种事件就会设置此操作位:

l 相应的通道已为写入准备就绪;

l 已经被远程关闭而无法进行进一步的写入;

l 有一个挂起的错误。

OP_CONNECT

如果Selector检测到如下几种事件就会设置此操作位:

l 相应的套接字通道已为完成其连接序列而准备就绪;

l 有一个挂起的错误。

OP_ACCEPT

如果Selector检测到如下几种事件就会设置此操作位:

l 相应的服务器套接字通道已为接受另一个连接而准备就绪国

l 有一个挂起的错误。

2.6 反射

Java反射就是Java运行时的类信息。运行时的类、方法、字段、修饰符等等一切都可通过Java.lang.reflect进行获取和改变。该类库包含了Method、Field以及Constructor类。这些类型的对象是由JVM在运行时创建的,因此我们就可以用get和set读取和修改与Field对象关联的字段,用invoke方法调用与Method对象关联的方法。另外,还可以用getFields、getMethod和getConstructors等方法获取这些对象。

RPC的实现要将类和过程调用数据发送给服务器,这些数据一般是将字符串序列化后发送的。Java的反射能力,就是从类名、方法名、参数类型和参数恢复出类,类的方法等,所以RPC的实现严重依赖Java的反射技术。

3 RPC客户端(org.apache.hadoop.ipc.Client.java)

3.1 客户端启动

RPC客户端昌从调用RPC.waitForProxy开始的,比如在DataNode初始化时创建的NamenodeProtocol客户端:

this.namenode = (DatanodeProtocol) 
      RPC.waitForProxy(DatanodeProtocol.class,
                       DatanodeProtocol.versionID,
                       nameNodeAddr, 
                       conf);

org.apache.hadoop.ipc.RPC类中有getProxy和waitForProxy方法,这两个方法都有许多变体,用作返回一个协议接口代理实例对象。这两类方法有如下几种形式的变体:

public static VersionedProtocol getProxy(
Class<?> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
)throws IOException

public static VersionedProtocol getProxy(
Class<?> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory
) throws IOException

public static VersionedProtocol getProxy(
Class<?> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
SocketFactory factory
) throws IOException
public static VersionedProtocol waitForProxy(
Class protocol,
    long clientVersion,
    InetSocketAddress addr,
    Configuration conf
) throws IOException

static VersionedProtocol waitForProxy(
Class protocol,
long clientVersion,
    InetSocketAddress addr,
    Configuration conf,
    long timeout
) throws IOException

waitForProxy两个变体只有一个参数不同,非public变体有timeout参数而public变体没有这个参数,实际上public变体将timeout设为Long.MAX_VALUE了,这个值等于263-1,也就是说,外部代码凡是调用waitForProxy(只能是调用那个public变体)的,如果协议接口代理实例对象不能顺利创建,那就会一直等下去。Hadoop中许多组件的运行都依赖RPC客户端能正常运行,比如DataNode,如果DataNode上的RPC客户端NamennodeProtocol不能正常运作,DataNode就无法于NameNode通信,此DataNode也必定不能正常工作,所以DataNode应调用waitForProxy获取协议接口代理实例对象。

在Hadoop中所有调用waitForProxy的地方有:

bubuko.com,布布扣

而DFSClient等组件则是直接调用getProxy来创建协议接口代理实例对象。getProxy如果创建代理失败,就会抛出异常,这样对于调用它的客户端来说,可以捕获这个异常进行处理。这对客户端编程是有异的,即总是把异常发送给客户端,而不是让客户端无限制地等待。

bubuko.com,布布扣

从上面这个列表里可以发现DataNode也调用getProxy创建代理对象。跟踪DataNode的调用代码,发现这个代理对象用途之一是进行数据恢复,这不是性命攸关的操作,因此如果创建代理对象失败,简单地记录日志就可以了。

协议接口代理实例对象创建过程如下描述。

(1)调用:

publicstatic VersionedProtocol waitForProxy(

    Class protocol,

    long clientVersion,

    InetSocketAddress addr,

    Configuration conf

) throws IOException

此方法调用另一个waitForProxy,将timeout设置为Long.MAX_VALUE;

(2)另一个waitForProxy调用getProxy方法,并将协议接口类对象protocol、协议在客户端的版本号clientVersion、服务器地址addr和系统配置conf传递给getProxy方法:

publicstatic VersionedProtocol getProxy(

    Class<?> protocol,

    long clientVersion,

    InetSocketAddress addr,

    Configuration conf

)throws IOException

(3)上述getProxy会调用NetUtils.getDefaultSocketFactory(conf)生成一个SocketFactory(Java内部对象)(所以对于每个RPC客户端,都会有其唯一的SocketFactory对象,因此Client缓存RPC.ClientCache维护着Map<SocketFactory,Client>,可以从SocketFactory对象找到对应的Client),然后调用:

publicstatic VersionedProtocol getProxy(

    Class<?> protocol,

    long clientVersion,

    InetSocketAddress addr,

    Configuration conf,

    SocketFactory factory

) throws IOException

除原四个参数之外,将生成的SocketFactory对象factory传递给这个带SocketFactory参数的方法;

(4)上述带SocketFactory参数的方法首先会创建UserGroupInformation对象ugi,然后调用UserGroupInformation.login(conf)登陆系统,login方法返回登陆成功的UserGroupInformation对象,赋值给ugi。然后,将protocol、clientVersion、addr、ugi、conf和factory传递给如下这个getProxy方法:

publicstatic VersionedProtocol getProxy(

    Class<?> protocol,

    long clientVersion,

    InetSocketAddress addr,

    UserGroupInformation ticket,

    Configuration conf,

    SocketFactory factory

) throws IOException

这里ugi变成了ticket;

(5)最后这个getProxy方法才是真正做事的地方,这个getProxy方法的代码如下所示:

/** Construct a client-side proxy object that implements the named protocol,
   * talking to a server at the named address. */
  public static VersionedProtocol getProxy(Class<?> protocol,
      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
      Configuration conf, SocketFactory factory) throws IOException {    

    VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(addr, ticket, conf, factory));
    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                  clientVersion);
    if (serverVersion == clientVersion) {
      return proxy;
    } else {
      throw new VersionMismatch(protocol.getName(), clientVersion, 
                                serverVersion);
    }
  }

这里就开始使用动态代理。调用Proxy.newProxyInstance方法生成一个protocol的协议接口动态代理实例对象,这个代理对象代理protocol接口中的方法,将Invoker作为实例,而Invoker的构造参数仅有addr、ticket、conf和factory。InvocationHandler实现类的构造方法都会传入实现接口的类的实例,以执行接口方法,但这里Invoker对象没有传入实现protocol的实例。这是因为RPC是远程调用,真正的实例在远程服务器上,Invoker的作用是将方法调用数据发送给远程RPC服务器。

至此,获取了协议接口动态代理实例对象proxy,可将proxy转换为protocol类型,proxy对protocol定义方法的调用都会被代理到Invoker对象上去。

比如,之后对proxy.getProtocolVersion(protocol.getName(),clientVersion)的调用实际上就是第一次的RPC调用,这个调用获取RPC服务器端运行的协议接口实例的版本号,然后对比服务器端和客户端RPC版本,不匹配就要抛出异常,匹配,就返回此协议接口动态代理实例对象proxy;

那么,由proxy调用getProtocolVersion方法,Invoker和Client类是如何处理的呢?这就是3.2节的内容。

3.2 客户端发出RPC调用流程

3.2.1 调用初始化

根据上述Java动态代理的原理,当我们根据proxy调用VersionedProtocol.getProtocolVersion方法时,该调用会被代理到Invoker类的invoke方法上。Invoker类实现了InvocationHandler接口,是RPC客户端向服务端发出调用的中转站,中转站的转动机器就是Invoker.invoke方法。

bubuko.com,布布扣

这就是Invoker类的成员,有四个成员变量:address、ticket、client和client是否关闭的标志isClosed。Client就是在这个类的invoke方法生成。

Invoker构造方法有四个参数address(服务器地址IP:Port)、ticket(客户端用户名和所在组)、conf(客户端全局配置)和factory(生成Socket的工厂类)。构造方法用address和ticket初始化自己的对应成员,然后调用CLIENTS(就是ClientCache)的getClient(conf,factory)方法生成一个客户端对象,并赋值给this.client成员。

Client对象实际上是在ClientCache.getClient方法内部创建,ClientCache维护一了张HashMap表CLIENTS,保存从SocketFactory到Client的映射,因此ClientCache.getClient方法首先根据传递进来的factory从CLIENTS查找其对应的Client对象,如果找得到,就调用Client.incCount()增加此Client的引用计数,否则,创建一个新的Client。Client.getClient()代码如下:

private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }

这里主要有Client的构造操作,实际上Client的构造没有进行任何操作,即没有与服务器端建立Socket连接,也没有向服务器端以任何形式发送数据,而仅仅初始化了Client的主要成员valueClass(返回值类型)、socketFactory和conf。至此,RPC Client的构造结束,现在可以通过协议接口动态代理实例对象调用协议接口定义的方法了,所有的调用都会被Client对象进行处理,发送到RPC服务器。

bubuko.com,布布扣

3.2.2 调用流程

以getProxy第一个调用proxy.getProtocolVersion(protocol.getName(),clientVersion)为例,此方法的原型在VersionedProtocol定义,所有协议接口均实现此方法,以明示自己的版本号。

VersionedProtocol.getProtocolVersion第一个参数是String类型,第二个是long类型。这个调用首先会被Java动态代理模块利用反射转发到Invoker.invoke()方法上,即执行此方法,此方法的代码如下:

public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), address, 
                    method.getDeclaringClass(), ticket);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }

这个方法首先调用client.call执行调用,获取返回值value,然后返回这个值。当invoke方法返回时,协议接口动态代理实例对象proxy.getProtocolVersion也就返回了。

调用proxy.getProtocolVersion方法时,Invoker.invoke方法的参数method=getProtocolVersion,而参数args={protocol.getName(),clientVersion}。假设protocol是NamenodeProtocol,版本号为2,那么args={“NamenodeProtocol”,2}。

client.call的四个参数中,第一个参数是Invocation对象,将method和args传递给Invocation构造方法,Invocation构造方法根据method和args解析出此次方法调用的调用方法名称methodName、调用方法参数类型列表parameterClasses、调用方法参数列表parameters。

bubuko.com,布布扣

client.call第二个参数是服务器地址address,第三个参数是method的接口类,第四个参数是客户端用户名和所在组的UserGroupInformation对象ticket。于是,对于方法proxy.getProtocolVersion的调用转到Client.call方法里了。这个方法就像整个Client的阀门,启动了整个Client的工作。

Client类有四个内部类,而ParallelCall和ParallelResults没被有到,实际上并行调用也不大可行。所以Client只有两个主要内部类:Client.Call和Client.Connection。除了一连用于Socket连接配置、超时的参数之外,Client内部维护如下几个重要的成员:

成员

描述

Hashtable<ConnectionId,Connection> connections

ConnectionId由:

l address(RPC服务器的);

l ticket(客户端名称和所在组);

l protocol(此连接面向的协议接口)。

由于一个Client只有唯一的UserGroupInformation,所以一个客户端可建立到不同地址的RPC服务器,运行不同协议接口的RPC服务器的连接,connections就是维护所有这些连接的列表。

Class<? extends Writable> valueClass;

调用方法的返回值类型,一般就是ObjectWritable

private SocketFactory socketFactory;

用户此客户端建立Socket连接

private int refCount = 1;

在Inovker.invoke方法里,每当客户端根据SocketFactory被检出一次,此计数加1.

一直提到的client.call的代码如下:

public Writable call(Writable param, InetSocketAddress addr, 
                       Class<?> protocol, UserGroupInformation ticket)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          throw wrapException(addr, call.error);
        }
      } else {
        return call.value;
      }
    }
  }

首先,我们知道这个方法的参数param就是RPC.Invocation对象,里面包装着methodName、parameterClasses和parameters。addr是要连接的RPC服务器的地址,protocol和ticket是协议接口和客户端名称和所属组名称。

从代码执行上看,此方法是一个阻塞型的方法,直到方法返回,否则协议接口动态代理实例对象不能再发出第二个调用。call方法的执行逻辑如下:

1、构造Call对象,封装这一次的RPC调用

首先根据param构造一个Client.Call对象。一个Call表示一个方法调用,Call的成员有五个:id、param、value、error和done,id是在Client内全局唯一的方法调用序号,param就是包括调用方法名称、调用方法参数列表、调用方法参数的Invocation对象,value是可能的返回值,error是可能的错误,done是调用是否完成的标志。每当构造一个Client.Call对象时,在Call的构造方法里都会使Call.id=Client.counter,然后对Client.counter加1,这个操作在Client.this上同步,因此就得到关于同步的一条结论:

更改哪里的成员变量,就对包含这个变量或包含这个变量的域进行同步。最细粒度的同步是直接对成员变量进行同步,或都对变量某一成员同步,同步粒度越细系统性能越好。

Client.Call中所有方法都有synchronized关键字,这也是实现互斥调用的必须,在Call.callComplete方法中,首先将Call.done设为true,然后调用notify()方法Client.Call对象的结构如下:

bubuko.com,布布扣

2、获取到指定address、ticket、protocol的连接,处理远程调用(上一步的call

此操作调用Client.getConnection(addr,protocol,ticket,call)方法获取一个connection。这个方法的源代码如下:

private Connection getConnection(InetSocketAddress addr,
                                   Class<?> protocol,
                                   UserGroupInformation ticket,
                                   Call call)
                                   throws IOException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));
    
    connection.setupIOstreams();
    return connection;
  }

这里首先根据addr(RPC服务器地址)、protocol(协议接口)和ticket(客户端用户名和所属组)构造一个ConnectionId对象remoteId,然后从HashMap列表Client.connections里根据remoteId查找是否已经存在Connection。如果存在,就获取这个Connection,否则,将remoteId传入Connection的构造方法,构造一个新的Connection,并将这个新的Connection加入到connections列表。

我们看一下Connection维护的重要数据结构,再一次提示,每个Connection,由ConnectionId唯一标识,而connectionId就是addr、protocol和ticket的合体:

InetSocketAddress server

RPC服务器地址

ConnectionHeader header

Connection第一次与RPC服务器通信时会发送Connection头,就是这个header,header中有两个成员:protocol类名称和ugi,分别是String和UserGroupInformation类型

ConnectionId remoteId

上文提到的remoteId

Socket socket = null

与RPC服务器的Socket连接

DataInputStream in;

上述Socket输入流

DataOutputStream out;

上述Socket输出流

Hashtable<Integer, Call> calls

call.id -> call的键值对集合,保存所有在此Connection上发送给RPC服务器的要处理的远程调用对象

AtomicBoolean shouldCloseConnection

是否要关闭这个Connection,若:

l 发送远程调用数据、接受RPC服务器回应或获取Socket输入输出流时出现异常;

l calls队列为空。

则将shouldCloseConnection设为true:

bubuko.com,布布扣

上述成员中,server、remoteId可以从参数remoteId获得。由remoteId获取其中的ticket和protocol,然后由protocol.getName()和ticket构造ConnectionHeader对象header。根据我们的假设:server=localhost:9000,header={“NamenodeProtocol”,”userName=lychee-pc/lychee&groupNames=None,root,Administrators,Remote,Desktop,Users,Users”}。

通过这样的构造,就得到一个Connection对象。当我们得到一个Client.Connection对象connection时,就调用Connection.addCall方法,将远程方法调用对象call加入到此connection,这个addCall方法会一直调用下去,直到call对象成功加入了connection。Connection.addCall方法的代码如下:

private synchronized boolean addCall(Call call) {
      if (shouldCloseConnection.get())
        return false;
      calls.put(call.id, call);
      notify();
      return true;
    }

如果Connection因为队列为空或者在处理Call时出现异常而使shouldCloseConnection为true时,就要返回false。这是有必要的操作,主要有两方面的原因:

1、如果Connection出现异常,就会执行Connection的清理程序,将Connection.calls里的每一个调用遍历一遍,并调用call.setException设置这些call的返回结果为“异常”以完成这个远程调用。所以很可能一个新的属于此线程的远程调用加入进来时这些清理工作还没完成;

2、当addCall返回false时,Client.getConnection会在下一次循环里再次调用addCall,直到addCall成功为止,这实际上是为了等待当前Connection清理完成,以便清除其错误状态,重新建立与远程服务器的连接。这实在是一个错误恢复过程。

关于addCall的第二点是,在将call.id -> call键值对加入Connection的calls列表后,会调用notify方法唤醒在此Connection对象等待的线程。Connection线程会不断循环从Connection.calls里取出远程调用进行处理,如果calls队列为空并且没有错误,那么就在当前对象上调用wait等待。直到其它线程调用了addCall方法,然后addCall方法调用notify方法,唤醒了这个Connection线程。也就是此Connection线程。遍历Connection类的代码,发现Connection里只有两处进行了notify和wait操作:

1、addCall方法里的notify;

2、waitForWork方法里的wait。

所以,这是典型的生产生/消费者线程调度情形。

将call对象加入connection之后,调用connection.setupIOStreams方法。这个方法首先会判断Connection.shouldCloseConnection是否为true,Connection.socket是否为null,如果不为null,说明null已创建,不用再获取输入输出流、发送头操作了,就会直接返回。否则,会打开到服务器的Socket连接Connection.socket,设置超时值,然后获取Connection.socket上的输入流in和输出流out。最后,向输出流写入服务器头信息,头信息是这种格式,其中header就是ConnectionHeader的序列化形式:

bubuko.com,布布扣

写完这个之后,调用Connection.start方法,启动Connection线程。Connection就开始异步处理其负责的所有远程调用了。

无论是RPC.Invoker.invoke方法,还是Client.call方法,都没对一个远程调用对象call进行同步,没对将此call对象加入其所属的Connection进行同步。这就意味着,只要来自于同一客户端(ticket)、连接同一RPC服务器(addr)上的同一协议接口(protocol)的远程调用对象call,都会添加到同一个Connection.calls队列里。当客户端是多线程,每个线程都向同一服务器发送远程调用时,这种情况就有可能发生。

一般情况下,对于这种提供调用接口,跨线程的调用同步(一个线程的远程调用必须等待另一线程调用结束)是不合常理的。线程的执行是顺序的,这种顺序执行是正确性的保证,因此可要求线程发出的远程调用是同步阻塞式调用,而不同线程发出的调用总体来说是可以异步的,也就是调用方式应该对同一线程同步,不同线程异步,因为各个线程相互独立,没有理由,也不应该让远程调用在不同的线程上进行同步。

因此,如果在RPC.Invoker.invoke上或Client.call上对call阻塞,就会使不同线程的远程调用在RPC处被同步,一个线程只能等待另一个线程远程调用结果返回后才能执行自己的远程调用,这是不合理的。避免这种远程调用同步的方式就是在RPC客户端层采用队列,来自同一线程的调用都是顺序执行的,来自不同线程的调用都会放入Connection.calls列表。

在Client.call代码里一个很奇怪的地方就是当调用getConnection方法返回一个连接connection后,还要调用connection.sendParam(call)将远程调用数据发送给RPC服务器。为什么不在Connection线程内部处理呢?我觉得这是一种性能优化方式。如果远程调用的数据发送、返回值接收、异常处理等等都在Connection线程处理,Connection只是一个线程,当成百上千个线程同时发出远程调用时,Connection线程很可能就会成为瓶颈。在操作系统或Java的线程调度里,地位相等的线程只要不结束,每个线程都会得到大致相同的CPU时间,所以将任务都丢给单一线程处理,会拖慢系统。因此,如果一个客户端有多个线程在使用,多个线程发出远程调用时,线程直接向RPC服务器发送远程调用数据,而不是由Connection来发送。

bubuko.com,布布扣

如上图所示,由客户端线程向RPC服务器直接发送方法调用数据,而不是Connection线程,使Connection线程解脱出来,只专心接收RPC服务器方法执行结果,并将结果发送给客户端即可。

同一RPC客户端的不同线程没对connection.sendParam(call)进行同步(Connection.sendParam方法没有加synchronized关键字),是不是意味着可以由许多线程异步地通过同一Connection.out输出流向RPC服务器发送数据呢?这显然是不行的,异步发送数据很容易使Socket输出流混淆各个方法调用数据,所以在Connection.sendParam方法里对out进行了同步,这也体现了同步应尽量“细粒度”以提高性能的原则。

每个远程方法调用向RPC输务器发送的数据如下:

bubuko.com,布布扣

Client.call方法发送完这些方法调用数据给RPC服务器之后,开始在call上同步(synchronized(call)),然后进入循环,这个循环检测如果call没完成,就会在call对象上等待。什么时候call能完成呢,当Connection.receiveResponse方法收到RPC服务器发回的方法调用结果时,就会调用call.setException或call.setValue方法,使调用完成。call.setException和call.setValue方法都会调用call.callComplete方法:

protected synchronized void callComplete() {

    this.done = true;

    notify();

}

对于Client.call方法中的while(!call.done)循环,如果当前正因为执行call.wait方法而使客户端睡眠,则Connection线程对callComplete的notify方法调用会使客户端线程在被唤醒,下一次循环时call.done = true,循环结束,Client.call调用于是结束,整个远程过程调用在客户端阶段就算结束了。

我们发现,Java同步原语wait和notify在整个客户端里被用到了两次:

1、Connection线程调用waitForWork方法时,如果发现当前远程调用队列Connection.calls为空,则调用wait在自己身上等待。RPC客户端线程调用Connection.addCall向Connection.calls加入一个远程调用对象时,addCall主法会调用notify唤醒等待在此Connection对象上的线程,即此Connection线程。这是一个典型的生产生/消费者队列;

2、上文所述的Client.call与Call.callComplete之间的应答控制。

3.3 数据结构

本节描述RPC客户端的数据成员,并归纳记录RPC客户端与RPC服务器之间通信所用的数据结构的图索引。这些数据结构基本全在前文讲述过,这里只是做个列表归纳。

键值对

key

value

说明

SocketFactory

Client

RPC.ClientCache有成员:Map<SocketFactory, Client> clients,就是RPC中缓存客户端的,每个客户端都可以为其生成的SocketFactory检出

ConnectionId

Client.Connection

Client有成员Hashtable<ConnectionId, Connection> connections,可看成Connection连接池。ConnectionId由address、ticket和protocol组成,也就意味着,同一客户端ticket到运行同一protocol的同一台服务器address使用同一个连接

实体类

类名称

成员

Invocation

methodName、parameterClasses、parameters

ConnectionHeader

protocol、ugi

ConnectionId

address、ticket、protocol

Client

成员

描述

Hashtable<ConnectionId,Connection> connections

ConnectionId由:

l address(RPC服务器的);

l ticket(客户端名称和所在组);

l protocol(此连接面向的协议接口)。

由于一个Client只有唯一的UserGroupInformation,所以一个客户端可建立到不同地址的RPC服务器,运行不同协议接口的RPC服务器的连接,connections就是维护所有这些连接的列表。

Class<? extends Writable> valueClass;

调用方法的返回值类型,一般就是ObjectWritable

private SocketFactory socketFactory;

用户此客户端建立Socket连接

private int refCount = 1;

在Inovker.invoke方法里,每当客户端根据SocketFactory被检出一次,此计数加1.

Client.Connection

InetSocketAddress server

RPC服务器地址

ConnectionHeader header

Connection第一次与RPC服务器通信时会发送Connection头,就是这个header,header中有两个成员:protocol类名称和ugi,分别是String和UserGroupInformation类型

ConnectionId remoteId

上文提到的remoteId

Socket socket = null

与RPC服务器的Socket连接

DataInputStream in;

上述Socket输入流

DataOutputStream out;

上述Socket输出流

Hashtable<Integer, Call> calls

call.id -> call的键值对集合,保存所有在此Connection上发送给RPC服务器的要处理的远程调用对象

AtomicBoolean shouldCloseConnection

是否要关闭这个Connection,若:

l 发送远程调用数据、接受RPC服务器回应或获取Socket输入输出流时出现异常;

l calls队列为空。

则将shouldCloseConnection设为true:

bubuko.com,布布扣

当某个Connection第一次与RPC服务器连接时,发送的连接头信息:

bubuko.com,布布扣

每一次sendParam发送的远程过程调用数据:

bubuko.com,布布扣

3.4 Client特别处理(异常、错误恢复等)

Client.Connection对远程调用过程中的任何错误的处理是非常严格的。Client.Connection中的错误分为两种,一种是在建立Socket连接(Client.Connection.setupIOstreams)时发生的连接错误,另一种是在处理远程调用返回值时的错误。

Client.Connection类中由handleConnectionFailure处理连接错误,handleConnectionFailure仅由setupIOstreams方法调用。错误处理流程是,调用socket.close关闭socket,然后将socket设置为null,从而在下一次RPC客户端发出远程调用时,会由setupIOstreams方法重新建立Socket连接。

第二类错误的处理的导火索是Client.Connection.markClosed方法,由如下其它方法调用:

bubuko.com,布布扣

可以看到不论是接收方法调用返回值、发送参数、获取输入输出流还是等待calls列表,只要有错误发生,就会调用markClosed方法,这个方法主要是将shouldCloseConnection变量设置为true,一旦shouldCloseConnection变量设置为true,Connection线程主循环即告结束,最终调用Client.Connection.close方法关闭连接。关闭连接的操作主要有:

1、将此连接从Client.connections列表中移除;

2、关闭Socket输入流(Client.Connection.in)和输出流(Client.Connection.out);

3、遍历Client.Connection.calls中所有的方法调用对象,设置返回结果为异常“Unexpected closed connection”,移除方法调用。

(全文完)

细水长流话Hadoop(2)Hadoop RPC 客户端,布布扣,bubuko.com

细水长流话Hadoop(2)Hadoop RPC 客户端

标签:des   style   blog   http   java   color   

原文地址:http://www.cnblogs.com/ahhuiyang/p/3854366.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!