标签:work sel hdf pts 简单 阻塞 http cte NPU
Google大数据技术:MapReduce、BigTable、GFS
Hadoop:一个模仿Google大数据技术的开源实现
数据块
磁盘中的关系:
HDFS同样也有块(block)的概念,但是大很多,默认为64MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。但与其他文件系统不同的是,HDFS中小于 一 个块大小的文件不会占据整个块的空间。
有关二级名称节点更多的介绍:https://blog.csdn.net/x_i_y_u_e/article/details/52430932
在读取的时候,如果客户端在与数据节点通信时遇到一个错误,那么它就会去尝试对这个块来说下一个最近的块。它也会记住那个故障的数据节点,以保证不会再对之后的块进行徒劳无益的尝试。客户端也会确认从数据节点发来的数据的校验和。如果发现一个损坏的块,它就会在客户端试图从别的数据节点中读取一个块的副本之前报告给名称节点。
这个设计的一个重点是,客户端直接联系数据节点去检索数据,并被名称节点指引到每个块中最好的数据节点。因为数据流动在此集群中是在所有数据节点分散进行的,所以这种设计能使HDFS可扩展到最大的并发客户端数量。同时,名称节点只不过是提供块位置请求(存储在内存中,因而非常高效),不是提供数据。否则如果客户端数量增长,名称节点会快速成为一个“瓶颈”。
名称节点如何选择哪个数据节点来保存副本?我们需要在可靠性与写入带宽和读取带宽之间进行权衡。例如,因为副本管线都在单独一个节点上运行,所以把所有副本都放在一个节点基本上不会损失写入带宽,但这并没有实现真的冗余(如果节点发生故障,那么该块中的数据会丢失)。同样,离架读取的带宽是很高的,另一个极端,把副本放在不同的数据中心会最大限度地增大冗余,但会以带宽为代价。即使在相同的数据中心(所有的Hadoop集群到目前为止都运行在同一个数据中心),也有许多不同的放置策略。其实,Hadoop在发布的0.17.0版本中改变了放置策略来帮助保护块在集群间有相对平均的分布。
Hadoop的策略是在与客户端相同的节点上放置第一个副本(若客户端运行在集群之外,就可以随机选择节点,不过系统会避免挑选那些太满或太忙的节点)。第二个副本被放置在第一个不同的随机选择的机架上(离架)。第三个副本被放置在与第二个相同的机架上,但放在不同的节点。更多的副本被放置在集群中的随机节点上,不过系统会尽量避免在相同的机架上放置太多的副本。
一旦选定副本放置的位置,就会生成一个管线,会考虑到网络拓扑。副本数为3的管道看起来如图
总的来说,这样的方法在稳定性(块存储在两个机架中)、写入宽带(写入操作只需要做一个单一网络转换)、读取性能(选择从两个机架中进行读取)和集群中块的分布(客户端只在本地机架写入一个块)之间,进行较好的平衡。
JobClient的runJob()方法用于新建JobClient实例和调用其submitJob()方法的简便方法(如图步骤1)。提交作业后,runJob()将每秒轮询作业的进度,如果发现与上一个记录不同,便把报告显示到控制台。作业完成后,如果成功,就显示作业计数器。否则,导致作业失败的错误指令会被记录到控制台。
JobClient的submitJob()方法所实现的作业提交过程如下:
TaskTracker执行一个简单的循环,定期发送心跳(heartbeat)方法调用JobTracker。心跳方法告诉jobtracker,tasktracker是否还存活,同时也充当两者之间的消息通道。作为心跳方法调用的一部分,tasktracker会指明它是否已经准备运行新的任务,如果是,jobtracker会为它分配一个任务,并使用心跳方法的返回值与tasktracker进行通信(步骤7)。
在jobtracker为tasktracker选择任务之前,jobtracker先选定任务所在的作业,但是默认的方法是简单维护一个作业优先级列表。选择好作业后,jobtracker就可以为该作业选定一个任务。
针对map任务和reduce任务,tasktracker有固定数量的槽。例如,一个tasktracker可能可以同时运行两个map任务和reduce任务。(准确数量由tasktracker核的数量和内存大小来决定)。默认调度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,如果tasktracker至少有一个空闲的map任务槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。
要选择一个reduce任务,jobtracker只是简单地从尚未运行的reduce任务列表中选取下一个来执行,并没有考虑数据的本地化,然而,对于一个map任务,它考虑的是tasktracker的网络位置和选取一个距离其输入划分文件最近的tasktracker。在最理想的情况下,任务是data-local(数据本地化)的,与分割文件所在节点运行在相同的节点上。同样,任务也可能是rack-local(机架本地化的):和分割文件在同一个机架,但不在同一个节点。一些任务既不是数据本地化的,也不是机架本地化,从与他们自身运行的不同机架上检索数据。可以通过查看作业的计数器得知每种类型任务的比例。
现在,tasktracker已经被分配了任务,下一步是运行任务。首先,它本地化作业的JAR文件,将它从共享文件系统复制到tasktracker所在的文件系统。同时,将应用程序所需要的全部文件从分布式缓存复制到本地磁盘(步骤8)。然后,为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下。第三部,新建一个TaskRunner实例来运行任务。
TaskRunner启动一个新的Java虚拟机(步骤9)来运行每个任务(步骤10),使得用户定义的map和reduce函数的任何缺陷都不会影响tasktracker(比如导致它奔溃或者挂起)。但在不同的任务之间重用JVM还是可能的。
子进程通过umbilical接口与父进程进行通信。它每隔几秒便告知父进程它的进度,直到任务完成。
流和管道都运行特殊的map和reduce任务,目的是运行用户提供的可执行程序,并于之通信。
应用流时,流任务使用标准输入和输出流,与进程(可以用任何语言编写)进行通信。另一方面,管道任务则监听套接字,发送其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建立一个与其父java管道任务的持续连接套接字。
在这两种情况下,Java进程都会把输入键/值对传给外部的进程,后者通过用户定义的map或者reduce函数来执行它并把输出的键/值对传回Java进程。从tasktracker的角度看,就像tasktracker的子进程在运行自己的map或者reduce代码。
MapReduce作业是一个长时间运行批量作业,可以运行数秒甚至数小时。由于这是一个很长的时间段,所以对于用户而言,能够得知作业进展是很重要的。一个作业和它的每个任务都有一个状态,包括:作业或者任务的状态(比如,运行成功,失败);map和reduce的进度;作业计数器的值;状态消息或描述(可以由用户代码来设置)。这些状态信息在作业期间不断被改变,那么它们是如何与客户端通信的呢?
任务正在运行时,对任务进度(即任务完成率)保持追踪。对于map任务,这是已处理完输入的百分比。对于reduce任务,则稍微有点复杂,但程序仍然会估计reduce输入已处理的百分比。整个过程分成三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reducer一半的输入,那么任务的进度便是5/6。因为已经完成拷贝和排序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。
任务也有一组计数器可以对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。
如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到tasktracker。在另一个线程中,每隔三秒检查此标志一次,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒发送心跳到jobtracker(5秒间隔是最小值,因为心跳间隔是由集群的大小来决定的:对于一个更大的集群,间隔会更长一些),并且在此调用(指心跳调用)中,所有由tasktracker运行的任务,它们的状态都会被发送至jobtracker。计数器的发送间隔通常大于5秒,因为计数器占的带宽相对较高。
jobtracker将这些更新合并起来,产生一个全局试图,表明正在运行的所有作业及其所含任务的状态。最后,正如前面提到的,JobClient通过每秒查看jobtracker来接收最新状态。客户端也可以使用JobClient的getJob()方法来得到一个RunningJob的实例,后者包含作业的所有状态信息。
jobtracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成功”。然后,在JobClient查询状态时,它将得知任务已成功完成,所有便显示一条消息告知用户,然后从runJob()返回。状态更新在MapReduce系统中的传播如图。
如果jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到回调(指HTTP作业通知的回调)的客户端可以通过job.end.notification.url属性来设置。
最后,jobtracker清空作业的工作状态,指示tasktracker也清空作业的工作状态(比如删除中间输出)。
MapReduce保证每个reduce输入都已按键排序。系统执行的过程——map输出传到reduce作为后者的输入——即称为shuffle(混洗或称洗牌)。
map端开始产生输出结果时,并不是简单地将它写到磁盘。这个过程更复杂,它利用缓冲的方式写到内存,并出于效率的原因预先进行排序。如图
每个map任务都有一个环形内存缓存区,任务会把输出写到此。默认情况下,缓冲区的大小为100MB,此值可以通过io.sort.mb属性来修改。当缓冲内容达到指定大小时(io.sort.spill.percent,默认为0.80,80%),一个后台线程便开始把内容溢写(spill)到磁盘中。在线程工作的同时,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到溢写过程结束。
溢写将按轮询的方式写到mapred.local.dir属性指定的目录,在一个作业相关子目录中。在写到磁盘之前,线程首先根据数据最终被传送到的reducer,将数据划分成相应的分区。在每个分区中,后台线程按键进行内排序(in-memory sort)。此时如果有一个combiner,它将基于排序后输出运行。
一旦内存缓冲区达到溢写阈值,就会新建一个溢写文件,因此在map任务写入其最后一个输出记录之后,会有若干个溢写文件。在任务完成之前,溢写文件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认值是10。
如果已经指定combiner,并且溢写次数至少为3(min.num.spills.for.combiner属性的值)时,combiner就会输出文件被写之前执行。combiner会针对输入反复运行,但不会影响最终结果。运行combiner的意义在于使map输出更紧凑,从而只有较少数据被写到本地磁盘后传给reducer。
map输出被写到磁盘时,对它进行压缩往往是个很好的主意,因为这样会让写入磁盘的速度更快,节约磁盘空间和减少传给reducer的数据量。默认情况下,输出是不压缩的,但是只要将mapred.compress.map.output设置为true,就可以弃用此功能。使用的压缩库由mapred.map.output.compression.codec定义。
reducer通过HTTP得到输出文件的分区。用于服务于文件分区的工作线程,其数量由任务的tracker.http.threads属性来控制。此设置针对的是每个tasktracker,而不是针对每个map任务槽。默认是40,在运行大规模作业的大型集群上,此值可以根据需要而增加。
转到处理过程的reduce这一端。map输出文件位于运行map任务的tasktrack的本地磁盘(注意,尽管map输出经常写到map tasktracker的本地磁盘,但reduce输出并不这样),不过在现在,tasktracker需要它为分区文件运行reduce任务。而且,reduce任务可以在不同时间完成,因此只要一个任务结束,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行地取得map输出。默认是5个线程,但这个默认值可以通过设置mapred.reduce.parallel.copies属性来改变。
如果map输出相当小,则会被复制到reduce tasktracker的内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,它说明用作此用途的堆空间的百分比);否则,被复制到磁盘。内存缓冲区达到阈值大小(由mapred.job.shuffle.merge.percent决定)或达到map输出阈值(由mapred.inmem.merge.threshold控制)时,会被合并,进而被溢写到磁盘中。
随着磁盘上积累的副本增多,后台线程会将它们合并为一个更大的、排好序的文件。这会为后面的合并节省一些时间。注意,任何压缩的map输出(通过map任务)都必须在内存中被解压缩,以便于合并。
所有map输出被复制期间,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其按顺序排序。这将循环进行。比如,如果有50个map输出,而合并系数是10(10是默认设置,右io.sort.factor属性设置,与map的合并类似),合并将进行5轮。每轮将10个文件合并成一个文件,因此最后有5个中间文件。
最后阶段,即reduce阶段,合并直接把数据输入reduce函数,而不是最后还有一轮将5个文件合并成一个已排序的文件。此举省略了一次磁盘往返行程。最后的合并既可来自内存中,也可来自磁盘。
在reduce阶段,对已排序输出中的每个键依次调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。如果采用hdfs,由于tasktracker节点也运行数据节点,所有第一个块副本会被写到本地磁盘。
标签:work sel hdf pts 简单 阻塞 http cte NPU
原文地址:https://www.cnblogs.com/Mayny/p/9368103.html