标签:
目前大数据系统在处理方法上有一些共同特点:
所有大数据计算处理模式都有一个目的,就是使输入和输出并行化,从而提高数据处理性能。
大数据分发到多个节点有两个好处:
大数据无法处理网络过载的问题,如果传输动辄上T的数据,会使网络带宽耗尽,网络拥挤,甚至导致系统故障,为了更好的处理,我们要把数据分布到各个节点上,而且除了把程序要移动到存放数据的节点,程序运行所依赖的函数库也需要移动到节点上,这样大数据可以让我们集中式的部署程序代码,大数据系统后台会在计算任务启动之前把这些程序移动到各个数据处理节点上。
虽然典型的大数据处理系统都希望把数据处理过程放在拥有数据节点的本地完成,但并不是每次都能实现,大数据系统会把计算任务尽量调度到离数据点最近的节点。
假设我们要计算2000年美国各州的总销售量,并按州排序,销售数据已经随机分发到各个计算节点,利用大数据技术主要分为如下步骤:
核心思想是把数据按照某一列或者某一组列的值,按照某种形式划分,以分别处理。但是这系统的缺陷在于需要在算法设计的时候就决定数据如何划分,而划分的准则通常由底层的用例来决定,如此一来,就不适合临时的数据查询需求。
内存数据库系统类似于MPP系统,他们的不同之处在于内存数据库系统的每个节点都拥有巨大的内存,并且大部分数据会被预先加载到内存中。系统的缺陷是采用了大量的硬件和软件,费用高昂。
Hadoop系统对MapReduce框架的实现具有如下几个重要的特征:
下面我们介绍下MapReduce大数据系统定义:
MapReduce编程泛型的一个重要不足是它不适合迭代算法。大量的数据科学计算算法很自然需要使用到迭代,并最终收敛到一个解。当使用这样的算法时候,MapReduce任务每次都要从持久性存储中重新读取数据,所以每次迭代产生的结果需要存到持久性存储中供下次迭代计算使用,这个过程导致了不必要的I/O操作,对系统吞吐量造成重大的影响。
整体同步并行系统和MapReduce过程十分相似,与MapReduce程序不同之处在于,BSP系统程序执行由一系列的超步(这个和Map处理的过程类似)组成,这些超步保持栅栏同步,向主节点发送数据并进行相关的信息交换。每当一次迭代执行完毕,主节点会通知每个数据处理节点进行下一次迭代。
Hadoop系统使用HBase来作为自己的NoSQL数据存储,大多数的RDBMS使用者都要求数据库必须遵守ACID准则,但是遵守这些准则是需要系统代价的。当数据库后台需要处理峰值为每秒数百万次的事务操作时候,要求苛刻的遵守ACID准则对数据库来说是个巨大的挑战。
对苛刻的ACID准则做出妥协是必要的,做出妥协的主要理论依据就是CAP理论:
Hadoop是谷歌以2004年发表的一篇关于MapReduce的论文作为基础开发的,就自身来讲,Hadoop是一个基于Java语言的MapReduce框架。随着Hadoop被越来越多的企业采用,自身不断改进并衍生出很多子项目。
MapReduce模型有两个彼此独立的步骤,这两个步骤可以配置并需要用户在程序中自定义:
Hadoop系统中MapReduce的核心思路是:将输入的数据在逻辑上分割成多个数据块,每个逻辑数据块被Map任务单独地处理。数据块处理后所得结果会被划分到不同的数据集,且将数据集排序完成。每个经过排序的数据集传输到Reduce任务进行处理。
当数据量不是很大的时候,我们计算文档中每个词出现的个数并不是一件难事。
当数据量非常大的时候,我们可以尝试使用MapReduce来解决这个计数问题
Hadoop1.x版本系统的组件:
1.名称节点(NameNode):维护着存储在HDFS上的所有文件的元数据信息。这些元数据信息包括组成文件的数据块信息,及这些数据块在数据节点上的位置。
2.辅助名称节点(SecondaryNameNode):这不是名称节点的备份,实际上是Hadoop平台的一个组件,为名称节点组件执行一些内务处理功能。
3.数据节点(DataNode):把真正的数据块存放在本地硬盘上,这些数据块组成了保存在HDFS上的每个文件。
4.作业跟踪器(JobTracker):负责一个任务的整个执行过程,它的具体功能包括:调度各个子任务(Mapper和Redeuce任务各自的子任务)到各自的计算节点运行,时刻监控任务运行和计算节点的健康状况,对失败的子任务重新调度执行。
5.任务跟踪器(TaskTracker):运行在各个数据节点上,用来启动和管理各个Map/Reduce任务,与作业跟踪器通信。
对于大数据,我们首要解决的就是大数据的存储问题,这里我们采用HDFS分布式存储的解决方案。HDFS是主从架构,运行名称节点进程的服务器为主节点,运行数据节点进行的服务器为从节点。在Hadoop系统中,每个文件都被分隔成多个数据块,每个数据块的大小为64MB,也可以配置成32MB或者128MB,这些数据块存储在数据节点上,为了防止节点故障,这些数据块是有备份的,系统默认的备份数量是3,具有机架感知功能的Hadoop系统把文件的一个数据块存储在本地机架上的一台计算节点上,第二个备份会存放在另外一个远程机架上的计算节点上,第三个备份会存放在第二次数据块备份机架上的另一台计算节点上。Hadoop系统借助一个单独配置的网络拓扑文件实现机架感知功能,这个网络拓扑文件配置了机架到计算节点的域名系统DNS名称之间的映射,该网络拓扑文件路径配置在Hadoop配置文件中。
当客户端想HDFS请求读取或者存储一个文件的时候,它需要知道要访问的数据节点是哪一个,NameNode负责管理所有的文件操作,包括文件/目录的打开、关闭、重命名、移动等等。数据节点就负责存储实际的文件数据,这是一个非常重要的区别,当客户点请求或者发送文件数据,文件的数据在物理上不是经过NameNode传输的,客户端仅仅是简单地从NameNode获取文件的元数据,然后根据其元数据信息直接从数据节点获取文件的数据块。
需要注意的是NameNode并不存储数据节点的身份信息,数据节点的身份信息在集群启动的时候从每个数据节点获取。名称节点维护的信息是:HDFS的文件由哪些数据块(数据节点上每个数据块的文件名组成)
元数据存储在名称节点的本地磁盘上,但是为了快速访问,在集群操作的时候会把这些元信息加载到内存。这个提高了Hadoop系统的操作性能,但是如果Hadoop存储小文件,很容易会使元数据的数据量大幅增长,导致名称节点更大量的内存占用。但是同时成为了Hadoop系统的一个瓶颈,由此衍生出Hadoop2.x
客户端把一个文件写入到HDFS文件那系统需要经过以下几个步骤
1)客户端访问名称节点,名称节点返回组成文件的数据块列表以及数据块的位置(包括备份数据块的位置)
2)客户端会直接访问数据节点以获取数据块中的数据。如果此时其访问数据节点出现古战,就会访问存在备份数据块的数据节点
3)读取数据块的时候会计算该数据块的校验和,并将该校验和与写入文件时候的校验和作比较,如果检验失败,则从其他数据节点获取备份数据块。
1)名称节点仅仅重命名了文件路径,使其移动到/trash目录,需要注意的是,这个操作过程是链接到重命名文件路径的元数据的更新操作。这个执行过程非常迅速,/trash目录中的文件会保存一段时间,这个保存时间是预先确定的(当前设定为6小时而且当前不可配置)在这段时间内,把删除的文件从/trash目录中移动出来即可迅速地恢复该文件。
2)当/trash目录中的文件超过了保存时间,名称节点就会将该文件从HDFS命名空间删除。
3)删除文件就会使得改文件相关的数据块被释放,HDFS系统最后会显示增加了一些空闲的空间。‘
辅助名称节点的作用就是周期性地把edit文件中的内容与fsimage文件中的内容合并,辅助名称节点会周期性地顺序执行下列步骤:
1)辅助名称节点会请求名称节点来结转文件,确保新的更新保存到一个新的文件,这个新的文件名字叫做edits.new
2)辅助名称节点向名称节点请求获取fsimage文件和edits文件
3)辅助名称节点把edits文件和fsimage文件合并,生成一个新的fsimage文件
4)名称节点从辅助名称节点接收到新生成的fsimage文件,并替代旧的fsimage文件。同时将edits文件中的内容替换成步骤1中创建的edit.new文件的内容。
5)更新fstime文件来记录发生的检查点操作
任务跟踪器守护进程在集群中每台计算节点运行,接收诸如Map和Reduce和Shuffle这些操作任务的请求,每个任务跟踪器都会分配一定的槽位数,其槽位数的数量一般与计算节点上可用的CPU核数一致,任务跟踪器接收到一个来自作业跟踪器的请求后,就会启动一个任务,任务跟踪器会为这个任务初始化一个新的JVM。
作业跟踪器守护进程负责启动和监控MapReduce作业,当一个客户端向Hadoop系统提交一个作业,作业的启动流程如图:
1)作业跟踪器收到了作业请求
2)大多数的MapReduce作业都需要一个或多个输入文件目录,任务跟踪器向名称节点发出请求,获得一个数据节点的列表。这个列表的数据节点存储了组成输入文件数据的数据块。
3)作业跟踪器为作业的执行做准备工作。在这个步骤中,任务跟踪器确定执行该作业需要的任务(Mapper和Reducer任务)数量。作业跟踪器尽量把这些任务都调度到离数据块最近的位置进行。
4)作业跟踪器把任务提交到每个任务跟踪器节点去执行。任务跟踪器节点监控任务执行情况。任务跟踪器以预先设定的时间间隔发送心跳信息到作业跟踪器,如果作业跟踪器在预先设定的时间间隔之后,没有收到任务跟踪器发来的心跳信息,那么就认为该任务跟踪节点出现故障,任务就会被调度到另外一个节点去运行。
5)一旦所有任务都执行完毕,作业跟踪器就会更新作业状态为成功,如果任务反复失败达到一定数量,作业跟踪器就会宣布作业运行失败、
6)客户端会轮询作业跟踪器及时地获得作业运行状态、
MapReduce已经进行了全新升级,升级后的版本被称为MapReduce2.0或者YARN,YARN是一套应用编程接口,兼容MRV1,Hadoop1.x中的作业调度器承担两个主要功能:
YARN把这两个功能分为两个守护进程来分别承担,这样的设计使得系统有一个全局的资源管理器以及每个程序有一个应用程序管理器。注意这里我们提到了程序而不是作业,在新的系统中,一个程序既可以指传统概念上的一个单独的MapReduce作业,也可以指一系列作业组成的有向无环图(DAG)
YARN系统由一下几个组成部分:
容器:是YARN框架中的计算单元。一部分CPU内核和一部分内存构成了容器,一个应用程序运行在一组容器当中。它是一个任务进行工作得单元子系统,也可以认为YARN框架中的容器相当于MapReduce v1中的一个任务执行器。集群节点与容器之间的关系是:一个节点可以运行多个容器,但是一个容器只能运行在一个节点之内。应用程序管理器的一个实例会向全局资源管理请求获取资源。调度器会通过每个节点的节点管理器来分配资源(容器)。节点管理器会向全局资源管理器汇报每个容器的使用情况。
节点管理器:运行在集群中的一个节点上,集群中的每一个节点都会运行一个自己的节点管理器,它是一个从属服务,它接收来自资源管理器的请求,然后分配容器给应用程序。它还负责监控和汇报资源使用情况给资源管理器。节点管理器的任务如下:
资源管理器:核心是个调度,当多个应用程序竞争使用集群资源的时候,它来负责资源的调度,确保集群资源的优化合理使用。资源管理有一个插件化的调度器,该调度器按程序队列和集群的处理能力,负责为正在运行的多个应用程序分配其所需的集群资源。
应用程序管理器:是一个特性的框架函数库实例,同资源管理器协调沟通资源,并通过节点管理器来获取这些系统资源,然后执行任务。可以提高拓展性并且框架更加的通用。
YARN的运行图
通过HDFS的系统介绍,在Hadoop1.x的系统中,名称节点会引发系统单点故障,如果运行名称节点的服务器出现故障了,那么整个集群都会处于不可用的状态,除非名称节点在另外一台服务器上重新启动。
Hadoop2.x引入了高可用名称节点的概念,高可用名称节点的背后核心思想是使用两个相同的名称节点,一个处于活动模式,另外一个处于待机模式,处于活动模式的名称节点对系统提供服务,处在待机模式的名称节点需要实时同步活动名称节点的数据,一旦活动名称节点当机,系统可以快速的进行故障切换。在当前的设计中,为了这个目的,两个名称节点必须共享同一个存储设备(通过NFS),活动名称节点的任何修改都会记录到共享存储设备当中的edits日志文件中。待机名称节点将这些修改应用到自己的名称空间中红,一旦活动名称节点发生故障,待机名称节点会确保edits文件中的所有数据都被应用,并接管活动名称节点的职责。
多节点集群模式:从系统逻辑上,其系统运行情况与伪分布模式是一致的。
具体的安装教程可以查看厦门大数据实验室写的教程:Hadoop安装教程
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount{
public static class MyMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable>output,Reporter reporter)throws IOException{
output.collect(new Text(value.toString()),new IntWritable(1));
}
}
public static class MyReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReducer.class);
conf.setReducerClass(MyReducer.class);
conf.setNumReduceTasks(1);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordTest{
public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String w=value.toString();
context.write(new Text(w),new IntWritable(1));
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}
public static void main(String[] args)throws Exception{
Job job=Job.getInstance(new Configuration());
job.setJarByClass(WordTest.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean status=job.waitForCompletion(true);
if(status){
System.exit(0);
}
else{
System.exit(1);
}
}
}
Hadoop集群中的每台计算节点都有自己的一组配置文件,有两种主要类型的配置文件:-default.xml和-site.xml,site中配置项覆盖default中相同的配置项。
标签:
原文地址:http://blog.csdn.net/ws1233456789/article/details/50960987