标签:检验 命令 函数 文件读取 lin 角度 工具 数据结构 lstat
HDFS 架构简述
Hadoop分布式文件系统(HDFS)是一个分布式的文件系统,运行在廉价的硬件上。它与现有的分布式文件系统有很多相似之处。然而与其他的分布式文件系统的差异也是显着的。HDFS是高容错的,被设计成在低成本硬件上部署。HDFS为应用数据提供高吞吐量的访问,适用于具有大规模数据集的应用程序。HDFS放松了一些POSIX的要求,以便提供流式方式来访问文件系统数据。
Block是一块磁盘当中最小的单位,HDFS中的Block是一个很大的单元。在HDFS中的文件将会按块大小进行分解,并作为独立的单元进行存储。
Block概念
磁盘有一个Block size的概念,它是磁盘读/写数据的最小单位。构建在这样的磁盘上的文件系统也是通过块来管理数据的,文件系统的块通常是磁盘块的整数倍。文件系统的块一般为几千字节(byte),磁盘块一般为512字节(byte)。
HDFS也有Block的概念,但它的块是一个很大的单元,默认是64MB。像硬盘中的文件系统一样,在HDFS中的文件将会按块大小进行分解,并作为独立的单元进行存储。但和硬盘中的文件系统不一样的是,存储在块中的硬的一个比块小的文件并不会占据一个块大小盘物理空间(HDFS中一个块只存储一个文件的内容)。
那为什么HDFS中的block如此之大呢?
在HDFS学习(一) – HDFS设计中,我们曾说过,对HDFS来说,读取整个数据的时间延迟要比读取到第一条记录的数据延迟更重要,就体现在这里。HDFS的Block设计的如此之大,也就是为了最小化寻道时间。把一个数据块设计的足够大,就能够使得数据传输的时间显著地大于寻找到Block所在时间。这样,传输一个由多个Block组成的文件的时间就取决于磁盘的传输速率。
举一个简单的例子,假设寻道时间大约为10ms,传输速度为100MB/s。为了使得寻道时间仅为传输时间的1%,我们就需要设置块的大小为100MB。HDFS默认的Block size是64MB,但是更多的企业里边,已经设置成128M,而且这个参数将随着新一代硬盘速度的增长而增长。
而Block Size的值也不宜设置过大,通常,Mapreduce中的Map任务一次只处理一个Block中的数据,如果启动太少的Task(少于集群中的节点的数量),作业的速度就会比较慢。
对HDFS进行块抽象有哪些好处呢?
一、一个显而易见的好处是:一个文件的大小,可以大于网络中任意一个硬盘的大小。
文件的块并不需要存储在同一个硬盘上,一个文件的快可以分布在集群中任意一个硬盘上。事实上,虽然实际中并没有,整个集群可以只存储一个文件,该文件的块占满整个集群的硬盘空间。
二、使用抽象块而非整个文件作为存储单元,大大简化了系统的设计。
简化设计,对于故障种类繁多的分布式系统来说尤为重要。以块为单位,一方面简化存储管理,因为块大小是固定的,所以一个硬盘放多少个块是非常容易计算的;另一方面,也消除了元数据的顾虑,因为Block仅仅是存储的一块数据,其文件的元数据,例如权限等就不需要跟数据块一起存储,可以交由另外的其他系统来处理。
三、块更适合于数据备份,进而提供数据容错能力和系统可用性。
为了防止数据块损坏或者磁盘或者机器故障,每一个block都可以被分到少数几天独立的机器上(默认3台)。这样,如果一个block不能用了,就从其他的一处地方,复制过来一份。
HDFS采用master-worker架构。一个HDFS集群是有一个Namenode和一定数目的Datanode组成。
HDFS NameSpace
HDFS 支持传统的层次型文件组织结构。用户或者应用程序可以创建目 录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数 现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前, HDFS 不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但 是 HDFS 架构并不妨碍实现这些特性。
NameNode
Namenode是一个中心服务器,负责管理文件系统的namespace和客户端对文件的访问。Datanode在集群中一般是一个节点一个,负责管理节点上它们附带的存储。在内部,一个文件其实分成一个或多个block,这些block存储在Datanode集合里。Namenode执行文件系统的namespace操作,例如打开、关闭、重命名文件和目录,同时决定block到具体Datanode节点的映射。
DataNode
DataNode在Namenode的指挥下进行block的创建、删除和复制。Namenode和Datanode都是设计成可以跑在普通的廉价的运行linux的机器上。Datanode 将 HDFS 数据以文件的形式存储在本地的文件系统中,它并不知道有 关 HDFS 文件的信息。它把每个 HDFS 数据块存储在本地文件系统的一个单独的文件 中。 Datanode 并不在同一个目录创建所有的文件,实际上,它用试探的方法来确定 每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录中创建所 有的本地文件并不是最优的选择,这是因为本地文件系统可能无法高效地在单个目 录中支持大量的文件。
HDFS采用java语言开发,因此可以部署在很大范围的机器上。一个典型的部署场景是一台机器跑一个单独的Namenode节点,集群中的其他机器各跑一个Datanode实例。这个架构并不排除一台机器上跑多个Datanode,不过这比较少见。
单一节点的Namenode大大简化了系统的架构。Namenode负责保管和管理所有的HDFS元数据,因而用户数据就不需要通过Namenode(也就是说文件数据的读写是直接在Datanode上)。
Secondary NameNode
Secondary NameNode 是用于对 fsimage, edit log进行合并操作。对于 NameNode来说,合并fsimage, edit log的动作只会在NameNode启动时做,在运行过程中是不会做的,是由Secondary NameNode来做的。
Secondary NameNode 会周期性的对fsimage, edit logs 进行合并。相关的配置有:
·dfs.namenode.checkpoint.period,指定合并周期。
·dfs.namenode.checkpoint.txns ,指定最大事务数。如果在一个合并周期内,对有频繁的文件操作,如果操作数超过这个值,也会强制进行合并,不会等到合并周期到达才进行合并。
合并过程,参见:http://www.cnblogs.com/f1194361820/p/6768623.html
Checkpoint Node
Checkpoint Node 与 Secondary NameNode的作用、配置都是一样的,只是启动的命令不同,估计是因为Secondary NameNode 的名字容易引起误解,所以才新加了这个Checkpoint Node。
另外在HDFS集群中,可以配置多个Checkpoint Node。
启动checkpoint node的命令:bin/hdfs namenode -checkpoint
此外,因为Secondary NameNode (或者Checkpoint Node)会定时的合并fsimage, edit logs,所以它们会必须是保留了相对来说最新的 filesystem namespace。那么当出现故障时,就可以使用它们来恢复 HDFS。从这个角度来说,它们其实相当于文件系统的一个冷备份。
Backup Node才是一个真正意义的备份,也可以认为它是文件系统的热备份。因为Backup Node是时刻与NameNode同步的,所以它不需要像Checkpoint Node那样周期性的下载namespace(fsimage, edit logs)来进行合并。
对于每个NameNode,目前版本只允许配置一个Backup Node,以后可能会有多个。而且利用Backup node模式就不允许注册CheckPoint Node了。
此外,因为Backup Node 会在内存和本地文件系统中对namespace进行备份,所以backup node的内存配置要不低于 NameNode的配置。
启动backup node的命令:bin/hdfs namenode -backup
1.使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
2.Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的DataNode地址;
3.客户端开发库Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据.
4.读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
5.当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。
6.读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。
如果要了解更详细的过程:参见http://shiyanjun.cn/archives/962.html
1.使用HDFS提供的客户端开发库Client,向远程的Namenode发起RPC请求;
2.Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件 创建一个记录,否则会让客户端抛出异常;
3.当客户端开始将数据写入文件的时候,会将数据切分成多个packets,并在内部以数据队列”data queue”的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根据在Namenode中对replication的设置而定。
4.开始以pipeline(管道)的形式将packet写入所有的replicas中。把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。
5.最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着”ack queue”,成功收到datanode返回的ack packet后会从”ack queue”移除相应的packet。
6.如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持replicas设定的数量。
如果要了解更详细的过程,参见:http://shiyanjun.cn/archives/942.html
从上面的写文件的过程,可以看到每个block会被复制到多个datanode上。在写block之前,会先选择将block存放到哪些datanode上。这个选择的策略是:采用机架感知策略来选择合适的datanode。
在选择完毕datanode后,就开始写数据。写数据时,就相关的datanode就开始自动完成数据复制了。
复制的过程:
When a client is writing data to an HDFS file, its data is first written to a local buffer as explained in the previous section. Suppose the HDFS file has a replication factor of three. When the local buffer accumulates a chunk of user data, the client retrieves a list of DataNodes from the NameNode. This list contains the DataNodes that will host a replica of that block.
The client then flushes the data chunk to the first DataNode. The first DataNode starts receiving the data in small portions, writes each portion to its local repository and transfers that portion to the second DataNode in the list. The second DataNode, in turn starts receiving each portion of the data block, writes that portion to its repository and then flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its local repository. Thus, a DataNode can be receiving data from the previous one in the pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is pipelined from one DataNode to the next.
修改副本数
集群只有3个Datanode,hadoop系统replication=4时,会出现什么情况?
对于上传文件到hdfs上时,当时hadoop的副本系数是几,这个文件的块数副本数就会有几份,无论以后你怎么更改系统副本系统,这个文件的副本数都不会改变,也就说上传到分布式系统上的文件副本数由当时的系统副本数决定,不会受replication的更改而变化,除非用命令来更改文件的副本数。因为dfs.replication实质上是client参数,在create文件时可以指定具体replication,属性dfs.replication是不指定具体replication时的采用默认备份数。文件上传后,备份数已定,修改dfs.replication是不会影响以前的文件的,也不会影响后面指定备份数的文件。只影响后面采用默认备份数的文件。但可以利用hadoop提供的命令后期改某文件的备份数:hadoop fs -setrep -R 1。如果你是在hdfs-site.xml设置了dfs.replication,这并一定就得了,因为你可能没把conf文件夹加入到你的 project的classpath里,你的程序运行时取的dfs.replication可能是hdfs-default.xml里的 dfs.replication,默认是3。可能这个就是造成你为什么dfs.replication老是3的原因。你可以试试在创建文件时,显式设定replication。replication一般到3就可以了,大了意义也不大。
当用户或应用程序删除某个文件时,这个文件并没有立刻从 HDFS 中删除。实际上, HDFS 会将这个文件重命名转移到 /trash 目录。只要文件还在 /trash 目录中,该文件就可以被迅速地恢复。文件在 /trash 中保存的时间是可 配置的,当超过这个时间时, NameNode 就会将该文件从名字空间中删除。 删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到 HDFS 空闲空间的增加之间会有一定时间的延迟。
只要被删除的文件还在 /trash 目录中,用户就可以恢复这个文件。如果 用户想恢复被删除的文件,他 / 她可以浏览 /trash 目录找回该文件。 /trash 目录仅仅保存被删除文件的最后副本。 /trash 目录与其他的目录没有什么区别 ,除了一点:在该目录上 HDFS 会应用一个特殊策略来自动删除文件。目前 的默认策略是删除 /trash 中保留时间超过 6 小时的文件。将来,这个策略可以 通过一个被良好定义的接口配置。
通过之前的对HDFS的基础架构的了解,很容易发现NameNode有单点故障的风险。
虽然有了Checkpoint Node、Backup Node等,保证了文件系统namespace数据不丢失,但是NameNode出现故障时,仍然是需要人为干预来保证文件系统正常工作,并且这种人为干预操作在启动一个NameNode后需要很长时间(加载fsimage,replay edit log,接收足够的block report后)后,文件系统才能正常工作。所以就急需一种HA机制来解决这个问题。
如果要实现HDFS的HA,对HDFS的基础架构稍作调整就可以了:
1)NameNode必须得有一个HA的共享存储(用于存储edit log)。当一个standby namenode启动后,它要从共享存储里读取edit log,并以此与active NameNode进行同步。随后standby NameNodeg还要一直持续的从共享存储里读取edit log来进行同步。
2)DataNodes必须发送block 报告给active NameNode和standby NameNode。
3)Client要采用一种对用于透明的机制来处理namenode故障转移
4)secondary namenode的角色被Standby NameNode取代。
从上面来看,主要就是引入了Standby NameNode来取代Secondary NameNode,并引入共享存储来存储edit log。
从上图中,我们可以看出 NameNode 的高可用架构主要分为下面几个部分:
NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务。
NameNode 主备切换主要由 ZKFailoverController、HealthMonitor 和 ActiveStandbyElector 这 3 个组件来协同实现:
ZKFailoverController 作为 NameNode 机器上一个独立的进程启动 (在 hdfs 启动脚本之中的进程名为 zkfc),启动的时候会创建 HealthMonitor 和 ActiveStandbyElector 这两个主要的内部组件,ZKFailoverController 在创建 HealthMonitor 和 ActiveStandbyElector 的同时,也会向 HealthMonitor 和 ActiveStandbyElector 注册相应的回调方法。
HealthMonitor 主要负责检测 NameNode 的健康状态,如果检测到 NameNode 的状态发生变化,会回调 ZKFailoverController 的相应方法进行自动的主备选举。
ActiveStandbyElector 主要负责完成自动的主备选举,内部封装了 Zookeeper 的处理逻辑,一旦 Zookeeper 主备选举完成,会回调 ZKFailoverController 的相应方法来进行 NameNode 的主备状态切换。
NameNode 实现主备切换的流程(图2)有下面几步:
1、HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
2、HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
3、如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
4、ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
5、ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
6、ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。
下面分别对 HealthMonitor、ActiveStandbyElector 和 ZKFailoverController 的实现细节进行分析:
ZKFailoverController 在初始化的时候会创建 HealthMonitor,HealthMonitor 在内部会启动一个线程来循环调用 NameNode 的 HAServiceProtocol RPC 接口的方法来检测 NameNode 的状态,并将状态的变化通过回调的方式来通知 ZKFailoverController。
HealthMonitor主要检测NameNode的两类状态,分别是 HealthMonitor.State 和 HAServiceStatus。HealthMonitor.State 是通过 HAServiceProtocol RPC 接口的 monitorHealth 方法来获取的,反映了 NameNode 节点的健康状况,主要是磁盘存储资源是否充足。HealthMonitor.State 包括下面几种状态:
·INITIALIZING:HealthMonitor 在初始化过程中,还没有开始进行健康状况检测;
·SERVICE_HEALTHY:NameNode 状态正常;
·SERVICE_NOT_RESPONDING:调用 NameNode 的 monitorHealth 方法调用无响应或响应超时;
·SERVICE_UNHEALTHY:NameNode 还在运行,但是 monitorHealth 方法返回状态不正常,磁盘存储资源不足;
·HEALTH_MONITOR_FAILED:HealthMonitor 自己在运行过程中发生了异常,不能继续检测 NameNode 的健康状况,会导致 ZKFailoverController 进程退出;
HealthMonitor.State 在状态检测之中起主要的作用,在 HealthMonitor.State 发生变化的时候,HealthMonitor 会回调 ZKFailoverController 的相应方法来进行处理,具体处理见后文 ZKFailoverController 部分所述。
而 HAServiceStatus 则是通过 HAServiceProtocol RPC 接口的 getServiceStatus 方法来获取的,主要反映的是 NameNode 的 HA 状态,包括:
·INITIALIZING:NameNode 在初始化过程中;
·ACTIVE:当前 NameNode 为主 NameNode;
·STANDBY:当前 NameNode 为备 NameNode;
·STOPPING:当前 NameNode 已停止;
HAServiceStatus 在状态检测之中只是起辅助的作用,在 HAServiceStatus 发生变化时,HealthMonitor 也会回调 ZKFailoverController 的相应方法来进行处理,具体处理见后文 ZKFailoverController 部分所述。
Namenode(包括 YARN ResourceManager) 的主备选举是通过 ActiveStandbyElector 来完成的,ActiveStandbyElector 主要是利用了 Zookeeper 的写一致性和临时节点机制,具体的主备选举实现如下:
创建锁节点
如果 HealthMonitor 检测到对应的 NameNode 的状态正常,那么表示这个 NameNode 有资格参加 Zookeeper 的主备选举。如果目前还没有进行过主备选举的话,那么相应的 ActiveStandbyElector 就会发起一次主备选举,尝试在 Zookeeper 上创建一个路径为/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 的临时节点 (${dfs.nameservices} 为 Hadoop 的配置参数 dfs.nameservices 的值,下同),Zookeeper 的写一致性会保证最终只会有一个 ActiveStandbyElector 创建成功,那么创建成功的 ActiveStandbyElector 对应的 NameNode 就会成为主 NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的方法进一步将对应的 NameNode 切换为 Active 状态。而创建失败的 ActiveStandbyElector 对应的 NameNode 成为备 NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的方法进一步将对应的 NameNode 切换为 Standby 状态。
注册 Watcher 监听
不管创建/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点是否成功,ActiveStandbyElector 随后都会向 Zookeeper 注册一个 Watcher 来监听这个节点的状态变化事件,ActiveStandbyElector 主要关注这个节点的 NodeDeleted 事件。
自动触发主备选举
如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时, ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock,这样处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件。收到这个事件之后,会马上再次进入到创建/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点的流程,如果创建成功,这个本来处于 Standby 状态的 NameNode 就选举为主 NameNode 并随后开始切换为 Active 状态。
当然,如果是 Active 状态的 NameNode 所在的机器整个宕掉的话,那么根据 Zookeeper 的临时节点特性,/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换。
使用隔离(fencing)防止脑裂
Zookeeper 在工程实践的过程中经常会发生的一个现象就是 Zookeeper 客户端“假死”,所谓的“假死”是指如果 Zookeeper 客户端机器负载过高或者正在进行 JVM Full GC,那么可能会导致 Zookeeper 客户端到 Zookeeper 服务端的心跳不能正常发出,一旦这个时间持续较长,超过了配置的 Zookeeper Session Timeout 参数的话,Zookeeper 服务端就会认为客户端的 session 已经过期从而将客户端的 Session 关闭。“假死”有可能引起分布式系统常说的双主或脑裂 (brain-split) 现象。具体到本文所述的 NameNode,假设 NameNode1 当前为 Active 状态,NameNode2 当前为 Standby 状态。如果某一时刻 NameNode1 对应的 ZKFailoverController 进程发生了“假死”现象,那么 Zookeeper 服务端会认为 NameNode1 挂掉了,根据前面的主备切换逻辑,NameNode2 会替代 NameNode1 进入 Active 状态。但是此时 NameNode1 可能仍然处于 Active 状态正常运行,即使随后 NameNode1 对应的 ZKFailoverController 因为负载下降或者 Full GC 结束而恢复了正常,感知到自己和 Zookeeper 的 Session 已经关闭,但是由于网络的延迟以及 CPU 线程调度的不确定性,仍然有可能会在接下来的一段时间窗口内 NameNode1 认为自己还是处于 Active 状态。这样 NameNode1 和 NameNode2 都处于 Active 状态,都可以对外提供服务。这种情况对于 NameNode 这类对数据一致性要求非常高的系统来说是灾难性的,数据会发生错乱且无法恢复。Zookeeper 社区对这种问题的解决方法叫做 fencing,中文翻译为隔离,也就是想办法把旧的 Active NameNode 隔离起来,使它不能正常对外提供服务。
ActiveStandbyElector 为了实现 fencing,会在成功创建 Zookeeper 节点 hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 从而成为 Active NameNode 之后,创建另外一个路径为/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 的持久节点,这个节点里面保存了这个 Active NameNode 的地址信息。Active NameNode 的 ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候 (注意由于/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 是临时节点,也会随之删除),会一起删除节点/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb。但是如果 ActiveStandbyElector 在异常的状态下 Zookeeper Session 关闭 (比如前述的 Zookeeper 假死),那么由于/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来。后面当另一个 NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行 fencing,具体处理见后文 ZKFailoverController 部分所述。
ZKFailoverController 在创建 HealthMonitor 和 ActiveStandbyElector 的同时,会向 HealthMonitor 和 ActiveStandbyElector 注册相应的回调函数,ZKFailoverController 的处理逻辑主要靠 HealthMonitor 和 ActiveStandbyElector 的回调函数来驱动。
对 HealthMonitor 状态变化的处理
如前所述,HealthMonitor 会检测 NameNode 的两类状态,HealthMonitor.State 在状态检测之中起主要的作用,ZKFailoverController 注册到 HealthMonitor 上的处理 HealthMonitor.State 状态变化的回调函数主要关注 SERVICE_HEALTHY、SERVICE_NOT_RESPONDING 和 SERVICE_UNHEALTHY 这 3 种状态:
如果检测到状态为 SERVICE_HEALTHY,表示当前的 NameNode 有资格参加 Zookeeper 的主备选举,如果目前还没有进行过主备选举的话,ZKFailoverController 会调用 ActiveStandbyElector 的 joinElection 方法发起一次主备选举。
如果检测到状态为 SERVICE_NOT_RESPONDING 或者是 SERVICE_UNHEALTHY,就表示当前的 NameNode 出现问题了,ZKFailoverController 会调用 ActiveStandbyElector 的 quitElection 方法删除当前已经在 Zookeeper 上建立的临时节点退出主备选举,这样其它的 NameNode 就有机会成为主 NameNode。
而 HAServiceStatus 在状态检测之中仅起辅助的作用,在 HAServiceStatus 发生变化时,ZKFailoverController 注册到 HealthMonitor 上的处理 HAServiceStatus 状态变化的回调函数会判断 NameNode 返回的 HAServiceStatus 和 ZKFailoverController 所期望的是否一致,如果不一致的话,ZKFailoverController 也会调用 ActiveStandbyElector 的 quitElection 方法删除当前已经在 Zookeeper 上建立的临时节点退出主备选举。
对 ActiveStandbyElector 主备选举状态变化的处理
在 ActiveStandbyElector 的主备选举状态发生变化时,会回调 ZKFailoverController 注册的回调函数来进行相应的处理:
如果 ActiveStandbyElector 选主成功,那么 ActiveStandbyElector 对应的 NameNode 成为主 NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的 becomeActive 方法,这个方法通过调用对应的 NameNode 的 HAServiceProtocol RPC 接口的 transitionToActive 方法,将 NameNode 转换为 Active 状态。
如果 ActiveStandbyElector 选主失败,那么 ActiveStandbyElector 对应的 NameNode 成为备 NameNode,ActiveStandbyElector 会回调 ZKFailoverController 的 becomeStandby 方法,这个方法通过调用对应的 NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,将 NameNode 转换为 Standby 状态。
如果 ActiveStandbyElector 选主成功之后,发现了上一个 Active NameNode 遗留下来的/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 节点 (见“ActiveStandbyElector 实现分析”一节“防止脑裂”部分所述),那么 ActiveStandbyElector 会首先回调 ZKFailoverController 注册的 fenceOldActive 方法,尝试对旧的 Active NameNode 进行 fencing,在进行 fencing 的时候,会执行以下的操作:
首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态。
如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施,Hadoop 目前主要提供两种隔离措施,通常会选择 sshfence:
sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死;
shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离;
只有在成功地执行完成 fencing 之后,选主成功的 ActiveStandbyElector 才会回调 ZKFailoverController 的 becomeActive 方法将对应的 NameNode 转换为 Active 状态,开始对外提供服务。
过去几年中 Hadoop 社区涌现过很多的 NameNode 共享存储方案,比如 shared NAS+NFS、BookKeeper、BackupNode 和 QJM(Quorum Journal Manager) 等等。目前社区已经把由 Clouderea 公司实现的基于 QJM 的方案合并到 HDFS 的 trunk 之中并且作为默认的共享存储实现,本部分只针对基于 QJM 的共享存储方案的内部实现原理进行分析。为了理解 QJM 的设计和实现,首先要对 NameNode 的元数据存储结构有所了解。
一种推荐的方式共享存储方式是采用QJM(Quorum Journal Manager)。在这个方案下,会运行一组的 Journal Nodes( 简称JNs)。
基于 QJM 的共享存储系统主要用于保存 EditLog,并不保存 FSImage 文件。FSImage 文件还是在 NameNode 的本地磁盘上。QJM 共享存储的基本思想来自于 Paxos 算法 (参见参考文献 [3]),采用多个称为 JournalNode 的节点组成的 JournalNode 集群来存储 EditLog。每个 JournalNode 保存同样的 EditLog 副本。每次 NameNode 写 EditLog 的时候,除了向本地磁盘写入 EditLog 之外,也会并行地向 JournalNode 集群之中的每一个 JournalNode 发送写请求,只要大多数 (majority) 的 JournalNode 节点返回成功就认为向 JournalNode 集群写入 EditLog 成功。如果有 2N+1 台 JournalNode,那么根据大多数的原则,最多可以容忍有 N 台 JournalNode 节点挂掉。
Active NameNode 和 StandbyNameNode 使用 JouranlNode 集群来进行数据同步的过程如图 5 所示,Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog:
1)Active NameNode 提交 EditLog 到 JournalNode 集群
当处于 Active 状态的 NameNode 调用 FSEditLog 类的 logSync 方法来提交 EditLog 的时候,会通过 JouranlSet 同时向本地磁盘目录和 JournalNode 集群上的共享存储目录写入 EditLog。写入 JournalNode 集群是通过并行调用每一个 JournalNode 的 QJournalProtocol RPC 接口的 journal 方法实现的,如果对大多数 JournalNode 的 journal 方法调用成功,那么就认为提交 EditLog 成功,否则 NameNode 就会认为这次提交 EditLog 失败。提交 EditLog 失败会导致 Active NameNode 关闭 JournalSet 之后退出进程,留待处于 Standby 状态的 NameNode 接管之后进行数据恢复。
从上面的叙述可以看出,Active NameNode 提交 EditLog 到 JournalNode 集群的过程实际上是同步阻塞的,但是并不需要所有的 JournalNode 都调用成功,只要大多数 JournalNode 调用成功就可以了。如果无法形成大多数,那么就认为提交 EditLog 失败,NameNode 停止服务退出进程。如果对应到分布式系统的 CAP 理论的话,虽然采用了 Paxos 的“大多数”思想对 C(consistency,一致性) 和 A(availability,可用性) 进行了折衷,但还是可以认为 NameNode 选择了 C 而放弃了 A,这也符合 NameNode 对数据一致性的要求。
2)Standby NameNode 从 JournalNode 集群同步 EditLog
当 NameNode 进入 Standby 状态之后,会启动一个 EditLogTailer 线程。这个线程会定期调用 EditLogTailer 类的 doTailEdits 方法从 JournalNode 集群上同步 EditLog,然后把同步的 EditLog 回放到内存之中的文件系统镜像上 (并不会同时把 EditLog 写入到本地磁盘上)。
这里需要关注的是:从 JournalNode 集群上同步的 EditLog 都是处于 finalized 状态的 EditLog Segment。“NameNode 的元数据存储概述”一节说过 EditLog Segment 实际上有两种状态,处于 in-progress 状态的 Edit Log 当前正在被写入,被认为是处于不稳定的中间态,有可能会在后续的过程之中发生修改,比如被截断。Active NameNode 在完成一个 EditLog Segment 的写入之后,就会向 JournalNode 集群发送 finalizeLogSegment RPC 请求,将完成写入的 EditLog Segment finalized,然后开始下一个新的 EditLog Segment。一旦 finalizeLogSegment 方法在大多数的 JournalNode 上调用成功,表明这个 EditLog Segment 已经在大多数的 JournalNode 上达成一致。一个 EditLog Segment 处于 finalized 状态之后,可以保证它再也不会变化。
从上面描述的过程可以看出,虽然 Active NameNode 向 JournalNode 集群提交 EditLog 是同步的,但 Standby NameNode 采用的是定时从 JournalNode 集群上同步 EditLog 的方式,那么 Standby NameNode 内存中文件系统镜像有很大的可能是落后于 Active NameNode 的,所以 Standby NameNode 在转换为 Active NameNode 的时候需要把落后的 EditLog 补上来。
处于Standby 状态的 NameNode 转换为 Active 状态的时候,有可能上一个 Active NameNode 发生了异常退出,那么 JournalNode 集群中各个 JournalNode 上的 EditLog 就可能会处于不一致的状态,所以首先要做的事情就是让 JournalNode 集群中各个节点上的 EditLog 恢复为一致。另外如前所述,当前处于 Standby 状态的 NameNode 的内存中的文件系统镜像有很大的可能是落后于旧的 Active NameNode 的,所以在 JournalNode 集群中各个节点上的 EditLog 达成一致之后,接下来要做的事情就是从 JournalNode 集群上补齐落后的 EditLog。只有在这两步完成之后,当前新的 Active NameNode 才能安全地对外提供服务。
补齐落后的 EditLog 的过程复用了前面描述的 Standby NameNode 从 JournalNode 集群同步 EditLog 的逻辑和代码,最终调用 EditLogTailer 类的 doTailEdits 方法来完成 EditLog 的补齐。使 JournalNode 集群上的 EditLog 达成一致的过程是一致性算法 Paxos 的典型应用场景,QJM 对这部分的处理可以看做是 Single Instance Paxos(参见参考文献 [3]) 算法的一个实现,在达成一致的过程中,Active NameNode 和 JournalNode 集群之间的交互流程如图 6 所示,具体描述如下:
图 6.Active NameNode 和 JournalNode 集群的交互流程图
1)生成一个新的 Epoch
Epoch 是一个单调递增的整数,用来标识每一次 Active NameNode 的生命周期,每发生一次 NameNode 的主备切换,Epoch 就会加 1。这实际上是一种 fencing 机制,为什么需要 fencing 已经在前面“ActiveStandbyElector 实现分析”一节的“防止脑裂”部分进行了说明。产生新 Epoch 的流程与 Zookeeper 的 ZAB(Zookeeper Atomic Broadcast) 协议在进行数据恢复之前产生新 Epoch 的过程完全类似:
Active NameNode 首先向 JournalNode 集群发送 getJournalState RPC 请求,每个 JournalNode 会返回自己保存的最近的那个 Epoch(代码中叫 lastPromisedEpoch)。
NameNode 收到大多数的 JournalNode 返回的 Epoch 之后,在其中选择最大的一个加 1 作为当前的新 Epoch,然后向各个 JournalNode 发送 newEpoch RPC 请求,把这个新的 Epoch 发给各个 JournalNode。
每一个 JournalNode 在收到新的 Epoch 之后,首先检查这个新的 Epoch 是否比它本地保存的 lastPromisedEpoch 大,如果大的话就把 lastPromisedEpoch 更新为这个新的 Epoch,并且向 NameNode 返回它自己的本地磁盘上最新的一个 EditLogSegment 的起始事务 id,为后面的数据恢复过程做好准备。如果小于或等于的话就向 NameNode 返回错误。
NameNode 收到大多数 JournalNode 对 newEpoch 的成功响应之后,就会认为生成新的 Epoch 成功。
在生成新的 Epoch 之后,每次 NameNode 在向 JournalNode 集群提交 EditLog 的时候,都会把这个 Epoch 作为参数传递过去。每个 JournalNode 会比较传过来的 Epoch 和它自己保存的 lastPromisedEpoch 的大小,如果传过来的 epoch 的值比它自己保存的 lastPromisedEpoch 小的话,那么这次写相关操作会被拒绝。一旦大多数 JournalNode 都拒绝了这次写操作,那么这次写操作就失败了。如果原来的 Active NameNode 恢复正常之后再向 JournalNode 写 EditLog,那么因为它的 Epoch 肯定比新生成的 Epoch 小,并且大多数的 JournalNode 都接受了这个新生成的 Epoch,所以拒绝写入的 JournalNode 数目至少是大多数,这样原来的 Active NameNode 写 EditLog 就肯定会失败,失败之后这个 NameNode 进程会直接退出,这样就实现了对原来的 Active NameNode 的隔离了。
2)选择需要数据恢复的 EditLog Segment 的 id
需要恢复的 Edit Log 只可能是各个 JournalNode 上的最后一个 Edit Log Segment,如前所述,JournalNode 在处理完 newEpoch RPC 请求之后,会向 NameNode 返回它自己的本地磁盘上最新的一个 EditLog Segment 的起始事务 id,这个起始事务 id 实际上也作为这个 EditLog Segment 的 id。NameNode 会在所有这些 id 之中选择一个最大的 id 作为要进行数据恢复的 EditLog Segment 的 id。
3)向 JournalNode 集群发送 prepareRecovery RPC 请求
NameNode 接下来向 JournalNode 集群发送 prepareRecovery RPC 请求,请求的参数就是选出的 EditLog Segment 的 id。JournalNode 收到请求后返回本地磁盘上这个 Segment 的起始事务 id、结束事务 id 和状态 (in-progress 或 finalized)。
这一步对应于 Paxos 算法的 Phase 1a 和 Phase 1b(参见参考文献 [3]) 两步。Paxos 算法的 Phase1 是 prepare 阶段,这也与方法名 prepareRecovery 相对应。并且这里以前面产生的新的 Epoch 作为 Paxos 算法中的提案编号 (proposal number)。只要大多数的 JournalNode 的 prepareRecovery RPC 调用成功返回,NameNode 就认为成功。
选择进行同步的基准数据源,向 JournalNode 集群发送 acceptRecovery RPC 请求 NameNode 根据 prepareRecovery 的返回结果,选择一个 JournalNode 上的 EditLog Segment 作为同步的基准数据源。选择基准数据源的原则大致是:在 in-progress 状态和 finalized 状态的 Segment 之间优先选择 finalized 状态的 Segment。如果都是 in-progress 状态的话,那么优先选择 Epoch 比较高的 Segment(也就是优先选择更新的),如果 Epoch 也一样,那么优先选择包含的事务数更多的 Segment。
在选定了同步的基准数据源之后,NameNode 向 JournalNode 集群发送 acceptRecovery RPC 请求,将选定的基准数据源作为参数。JournalNode 接收到 acceptRecovery RPC 请求之后,从基准数据源 JournalNode 的 JournalNodeHttpServer 上下载 EditLog Segment,将本地的 EditLog Segment 替换为下载的 EditLog Segment。
这一步对应于 Paxos 算法的 Phase 2a 和 Phase 2b(参见参考文献 [3]) 两步。Paxos 算法的 Phase2 是 accept 阶段,这也与方法名 acceptRecovery 相对应。只要大多数 JournalNode 的 acceptRecovery RPC 调用成功返回,NameNode 就认为成功。
4)向 JournalNode 集群发送 finalizeLogSegment RPC 请求,数据恢复完成
上一步执行完成之后,NameNode 确认大多数 JournalNode 上的 EditLog Segment 已经从基准数据源进行了同步。接下来,NameNode 向 JournalNode 集群发送 finalizeLogSegment RPC 请求,JournalNode 接收到请求之后,将对应的 EditLog Segment 从 in-progress 状态转换为 finalized 状态,实际上就是将文件名从 edits_inprogress_${startTxid} 重命名为 edits_${startTxid}-${endTxid},见“NameNode 的元数据存储概述”一节的描述。
只要大多数 JournalNode 的 finalizeLogSegment RPC 调用成功返回,NameNode 就认为成功。此时可以保证 JournalNode 集群的大多数节点上的 EditLog 已经处于一致的状态,这样 NameNode 才能安全地从 JournalNode 集群上补齐落后的 EditLog 数据。
需要注意的是,尽管基于 QJM 的共享存储方案看起来理论完备,设计精巧,但是仍然无法保证数据的绝对强一致,下面选取参考文献 [2] 中的一个例子来说明:假设有 3 个 JournalNode:JN1、JN2 和 JN3,Active NameNode 发送了事务 id 为 151、152 和 153 的 3 个事务到 JournalNode 集群,这 3 个事务成功地写入了 JN2,但是在还没能写入 JN1 和 JN3 之前,Active NameNode 就宕机了。同时,JN3 在整个写入的过程中延迟较大,落后于 JN1 和 JN2。最终成功写入 JN1 的事务 id 为 150,成功写入 JN2 的事务 id 为 153,而写入到 JN3 的事务 id 仅为 125,如图 7 所示 (图片来源于参考文献 [2])。按照前面描述的只有成功地写入了大多数的 JournalNode 才认为写入成功的原则,显然事务 id 为 151、152 和 153 的这 3 个事务只能算作写入失败。在进行数据恢复的过程中,会发生下面两种情况:
图 7.JournalNode 集群写入的事务 id 情况
a)如果随后的 Active NameNode 进行数据恢复时在 prepareRecovery 阶段收到了 JN2 的回复,那么肯定会以 JN2 对应的 EditLog Segment 为基准来进行数据恢复,这样最后在多数 JournalNode 上的 EditLog Segment 会恢复到事务 153。从恢复的结果来看,实际上可以认为前面宕机的 Active NameNode 对事务 id 为 151、152 和 153 的这 3 个事务的写入成功了。但是如果从 NameNode 自身的角度来看,这显然就发生了数据不一致的情况。
b)如果随后的 Active NameNode 进行数据恢复时在 prepareRecovery 阶段没有收到 JN2 的回复,那么肯定会以 JN1 对应的 EditLog Segment 为基准来进行数据恢复,这样最后在多数 JournalNode 上的 EditLog Segment 会恢复到事务 150。在这种情况下,如果从 NameNode 自身的角度来看的话,数据就是一致的了。
事实上不光本文描述的基于 QJM 的共享存储方案无法保证数据的绝对一致,大家通常认为的一致性程度非常高的 Zookeeper 也会发生类似的情况,这也从侧面说明了要实现一个数据绝对一致的分布式存储系统的确非常困难。
在理解了上面的ha架构后,以及基于qjm的共享存储方案后,想来基于nfs的共享存储也同样可以解决类似问题。
具体详情参见:
http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
下面对 NameNode 在进行状态转换的过程中对共享存储的处理进行描述,使得大家对基于 QJM 的共享存储方案有一个完整的了解,同时也作为本部分的总结。
1)NameNode 初始化启动,进入 Standby 状态
在 NameNode 以 HA 模式启动的时候,NameNode 会认为自己处于 Standby 模式,在 NameNode 的构造函数中会加载 FSImage 文件和 EditLog Segment 文件来恢复自己的内存文件系统镜像。在加载 EditLog Segment 的时候,调用 FSEditLog 类的 initSharedJournalsForRead 方法来创建只包含了在 JournalNode 集群上的共享目录的 JournalSet,也就是说,这个时候只会从 JournalNode 集群之中加载 EditLog,而不会加载本地磁盘上的 EditLog。另外值得注意的是,加载的 EditLog Segment 只是处于 finalized 状态的 EditLog Segment,而处于 in-progress 状态的 Segment 需要后续在切换为 Active 状态的时候,进行一次数据恢复过程,将 in-progress 状态的 Segment 转换为 finalized 状态的 Segment 之后再进行读取。
加载完 FSImage 文件和共享目录上的 EditLog Segment 文件之后,NameNode 会启动 EditLogTailer 线程和 StandbyCheckpointer 线程,正式进入 Standby 模式。如前所述,EditLogTailer 线程的作用是定时从 JournalNode 集群上同步 EditLog。而 StandbyCheckpointer 线程的作用其实是为了替代 Hadoop 1.x 版本之中的 Secondary NameNode 的功能,StandbyCheckpointer 线程会在 Standby NameNode 节点上定期进行 Checkpoint,将 Checkpoint 之后的 FSImage 文件上传到 Active NameNode 节点。
2)NameNode 从 Standby 状态切换为 Active 状态
当 NameNode 从 Standby 状态切换为 Active 状态的时候,首先需要做的就是停止它在 Standby 状态的时候启动的线程和相关的服务,包括上面提到的 EditLogTailer 线程和 StandbyCheckpointer 线程,然后关闭用于读取 JournalNode 集群的共享目录上的 EditLog 的 JournalSet,接下来会调用 FSEditLog 的 initJournalSetForWrite 方法重新打开 JournalSet。不同的是,这个 JournalSet 内部同时包含了本地磁盘目录和 JournalNode 集群上的共享目录。这些工作完成之后,就开始执行“基于 QJM 的共享存储系统的数据恢复机制分析”一节所描述的流程,调用 FSEditLog 类的 recoverUnclosedStreams 方法让 JournalNode 集群中各个节点上的 EditLog 达成一致。然后调用 EditLogTailer 类的 catchupDuringFailover 方法从 JournalNode 集群上补齐落后的 EditLog。最后打开一个新的 EditLog Segment 用于新写入数据,同时启动 Active NameNode 所需要的线程和服务。
3)NameNode 从 Active 状态切换为 Standby 状态
当 NameNode 从 Active 状态切换为 Standby 状态的时候,首先需要做的就是停止它在 Active 状态的时候启动的线程和服务,然后关闭用于读取本地磁盘目录和 JournalNode 集群上的共享目录的 EditLog 的 JournalSet。接下来会调用 FSEditLog 的 initSharedJournalsForRead 方法重新打开用于读取 JournalNode 集群上的共享目录的 JournalSet。这些工作完成之后,就会启动 EditLogTailer 线程和 StandbyCheckpointer 线程,EditLogTailer 线程会定时从 JournalNode 集群上同步 Edit Log。
http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/Federation.html
联邦模式随后说明
HDFS 的主要目标就是即使在出错的情况下也要保证数据存储的可靠性。 常见的三种出错情况是: Namenode 出错 , Datanode 出错和网络割裂 ( network partitions) 。
磁盘数据错误,心跳检测和重新复制
每个 Datanode 节点周期性地向 Namenode 发送心跳信号。网络割裂可能 导致一部分 Datanode 跟 Namenode 失去联系。 Namenode 通过心跳信号的缺 失来检测这一情况,并将这些近期不再发送心跳信号 Datanode 标记为宕机 ,不会再将新的 IO 请求发给它们。任何存储在宕机 Datanode 上的数据将不 再有效。 Datanode 的宕机可能会引起一些数据块的副本系数低于指定值, Namenode 不断地检测这些需要复制的数据块,一旦发现就启动复制操作。
在下列情况下,可能需要重新复制:某个 Datanode 节点失效,某个副本遭 到损坏, Datanode 上的硬盘错误,或者文件的副本系数增大。
数据完整性
从某个 Datanode 获取的数据块有可能是损坏的,损坏可能是由 Datanode 的存储设备错误、网络错误或者软件 bug 造成的。
HDFS 客户端软 件实现了对 HDFS 文件内容的校验和 (checksum) 检查。当客户端创建一个新 的 HDFS 文件,会计算这个文件每个数据块的校验和,并将校验和作为一个 单独的隐藏文件保存在同一个 HDFS 名字空间下。当客户端获取文件内容后 ,它会检验从 Datanode 获取的数据跟相应的校验和文件中的校验和是否匹 配,如果不匹配,客户端可以选择从其他 Datanode 获取该数据块的副本。
元数据磁盘错误
FsImage 和 Editlog 是 HDFS 的核心数据结构。如果这些文件损坏了,整个 HDFS 实例都将失效。因而, Namenode 可以配置成支持维护多个 FsImage 和 Editlog 的副本。任何对 FsImage 或者 Editlog 的修改,都将同步到它们的副 本上。这种多副本的同步操作可能会降低 Namenode 每秒处理的名字空间事 务数量。然而这个代价是可以接受的,因为即使 HDFS 的应用是数据密集的 ,它们也非元数据密集的。当 Namenode 重启的时候,它会选取最近的完整 的 FsImage 和 Editlog 来使用。
Namenode 是 HDFS 集群中的单点故障 (single point of failure) 所在。如果 Namenode 机器故障,是需要手工干预的。目前,自动重启或在另一台机器 上做 Namenode 故障转移的功能还没实现。
快照
快照支持某一特定时刻的数据的复制备份。利用快照,可以让 HDFS 在 数据损坏时恢复到过去一个已知正确的时间点。 HDFS 目前还不支持快照功 能,但计划在将来的版本进行支持。
本文参考:
http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-name-node/#N1008F
标签:检验 命令 函数 文件读取 lin 角度 工具 数据结构 lstat
原文地址:http://www.cnblogs.com/f1194361820/p/6856292.html