标签:理解 分布 other 文件夹 删掉 理论 procedure 远程 随机
服务器只是一堆废铁而已,上面跑了tomcat,我们叫它web服务器;上面跑了mysql,我们叫它数据库服务器。所以不同服务器只是上面跑的进程(或者说程序)不同,我们是根据它们跑的进程来命名它们分别叫什么服务器的。
修改主机名:/etc/sysconfig/network
将主机名和IP绑定:/etc/hosts
修改ip地址:/etc/sysconfig/network-scripts/ifcfg-eth0
关闭防火墙:service iptables stop chkconfig iptables off
防火墙是禁止一些端口启动的,而hadoop运行需要占用很多端口,因此要关闭防火墙
此安装步骤适用于Hadoop 0.x 和 Hadoop 1.x 版本。
Hadoop 2.x 很复杂,建议参考官网的教程,下面的不适用于版本2.x。
分布式模式分两种,伪分布式和完全分布式。
hostname XXX
vi /etc/hosts
vi /etc/hosts
192.168.xx.yyy yourhostname
关闭防火墙
不论是伪分布安装,还是真实的分布式安装,如果不关闭防火墙,会遇到例如如下这种情况:执行
./start-dfs.sh
后,提示说都启动了,也将启动写进日志了,可是去其他机器上执行jps
发现并没有启动对应的Java进程。原因就是防火墙没有关闭。
验证:
service iptables status
chkconfig iptables off
chkconfig --list | grep iptables
SSH(secure shell)的免密码登陆
原理如图:
Q:为什么要设置免密码登录?
A:因为分布式应用程序,每台机器都有不同的进程,而整个分布式应用程序要想能起起来,需要各个机器上的进程都起起来并且还需要它们之间的配合。而每台机器上都有不同的进程需要启动,我们想做到在一台机器上能启动整个分布式应用,就需要达到在一台机器上能启动其他机器上的进程的目的。这个时候就需要免密码登录。
例如在node1上敲例如
start-all.sh
等命令,它为什么能启动整个分布式应用呢?是因为node1远程登录到了其他节点,启动了其他节点的进程,从而达到启动整个分布式应用的目的。其实不配置免密码登录,HDFS一样能启动起来,免密码只是为了运维方便,如果不设置,每次手动去敲会很麻烦。
ssh-keygen -t rsa
:产生密钥,位于~/.ssh
文件夹中cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
验证:
ssh localhost
然后两次exit
宿主机上安装WinSCP(远程传输数据)
rm -rf /usr/local/*
:这些东西用不到,我觉得可以全部删掉tar -zxvf hadoop-x.y.z.tar.gz
mv hadoop-x.y.z hadoop
配置hadoop、JDK
vi /etc/profile
,加下面几行export JAVA_HOME=/usr/local/jdk
export HADOOP_HOME=/usr/local/hadoop
export PATH=$HADOOP\_HOME/bin:$JAVA\_HOME/bin:$PATH
source /etc/profile
验证
java -version
修改$HADOOP\_HOME/conf
下的4个配置文件(在WinSCP上面修改)
hadoop-env.sh
:修改该文件中被#注释掉的JAVA_HOME
改成
export JAVA_HOME=/usr/local/jdk/
core-site.xml:(配置NameNode进程)
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hadoop0:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/tmp</value>
<description>HDFS的工作目录</description>
</property>
</configuration>
hdfs-site.xml:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
mapred-site.xml:(配置JobTracker进程的主机)(TaskTracker不用配,默认每个DateNode上跑一个TaskTracker)
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hadoop0:9001</value>
</property>
</configuration>
对hadoop进行格式化:hadoop namenode -format
启动hadoop: start-all.sh
可以用命令jps查看启动的java进程,
发现有5个。它们分别是:
NameNode、DataNode、SecondaryNameNode、JobTracker、TaskTracker
可以去跟踪NameNode源码查看,它既是RPC服务端(里面有
RPC.getServer(..., ..., ..., ...)
方法调用),又是一个web服务器(里面有一个org.mortbay.jetty.Server
类成员变量)也可以用浏览器来查看,
在浏览器地址栏输入:yourhostname:50070
,
回车即可
yourhostname:50070
中能看到NameNode,
说明NameNode是活着的;
yourhostname:50030
中能看到Map/Reduce,
说明JobTracker是活着的
若NameNode进程没有启动成功,原因可能为:
多次格式化hadoop文件系统也是错误的!!!解决办法为:
/usr/local/hadoop/tmp
文件夹,重新格式化去除hadoop启动过程中的警告信息
/etc/profile
中添加HADOOP_HOME_WARN_SUPRESS=1
/etc/hosts
,在该文件中含有所有结点的ip与hostname的映射信息;ssh-copy-id -i remotehostname
scp /root/.ssh/authorized_keys remotehostname1:/root/.ssh remotehostname2:/root/.ssh ...
scp -r /usr/local/jdk remotehostname1:/usr/local/
/etc/profile
复制到hadoop1和hadoop2结点,在目标结点执行source /etc/profile
cd /usr/local/hadoop/conf/
,编辑hadoop0的配置文件slaves(vi slaves
),改为从结点的主机名,分别是hadoop1和hadoop2,每个主机名各占一行。hadoop namenode -format
start-all.sh
注意: 对于配置文件core-site.xml
和mapred-site.xml
,在所有结点中都是相同的内容。
因此这些配置文件的内容应当在一开始搭建集群时就应该设计好,以后要避免修改。
验证: 以上步骤完成后,执行jps
命令。可以看到:
小细节: 按照上面步骤,SecondaryNameNode存放在主结点,若想把SecondaryNameNode存在其他机器(比如另一台内存较大的机器)上,则可以
stop-all.sh
cd /usr/local/hadoop/conf/
,编辑hadoop0的配置文件masters(vi masters
)。 masters文件中存放SecondaryNameNode的主机名。
start-all.sh
hadoop-daemon.sh start namenode
hadoop-daemon.sh start tasktracker
其它从结点上的hadoop族java进程都要在该新增结点中对应起动。
hadoop dfsadmin -refreshNodes
kill -9 $pid
默认从结点失联10分钟时,主结点就认为该从结点已宕机。
core-site.xml
中配置的是NameNode$HADOOP_HOME/conf/slaves
中配置的是DataNode$HADOOP_HOME/conf/master
中配置的是SecondaryNameNodeNameNode: 是整个文件系统的管理结点。
其主要功能:接受客户端的读写服务。
它维护整个:
1.从操作系统的文件系统分层的角度来看,NameNode可以理解为逻辑文件系统(管理元信息,不包括实际内容)和文件组织系统(知道文件及其逻辑块和物理块)的总和。
2.从Linux的VFS角度去看,NameNode可以看成VFS接口。
NameNode的metadata信息在启动后会加载到内存:
fsimage
fsimage
(这个位置信息在DataNode启动上报后,一直会在内存中)对metadata的修改不是马上由内存写到fsimage中,而是先写到了edits日志中。
例如,现在客户端发出一条删除某数据的请求,那么该请求被写进edits中,而此时fsimage并没有删除该数据的metadata。然后隔一段时间,edits中的日志文件和fsimage中的内容进行合并,此时,才发生真正的删除操作。
它不是NN的备份(但可以备份**一部分** 元数据,也不是实时备份),它的主要工作是帮助NN合并edits-log,减少NN启动时间。 SNN执行合并的时机:(以下两条任意满足一个,就会触发合并)伪分布式:多个节点在同一台机器上。
fs.checkpoint.period
默认3600秒fs.checkpoint.size
规定edits文件的最大值默认是64MB合并过程图示:
DataNode: 提供真实文件数据的存储服务。
它按块(Block)存储数据。HDFS默认Block大小是64M。
不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块的存储空间。
HDFS的DataNode在存储数据时,如果原始文件大小>64MB,则按照64MB大小切分;如果<=64MB,则占用磁盘空间是源文件实际大小。
Replication: 多副本,默认是3个。
HDFS读数据:
注意,图中步骤4和5是并发地去读各个Block
注意,图中步骤4和5是“三副本”的意思。但是client只负责写一个block,将block备份成三副本的任务,是由client所写block所在的DataNode完成的。
此外,还要注意步骤7:complete,即客户端写成功后,会给NameNode发消息让它知道写入成功。
HDFS文件权限:
其哲学是:阻止好人做错事,而不是阻止坏人做坏事。
你告诉我你是谁,我就认为你是谁。
正是这个原因,HDFS 安全性一般,它里面一般存储安全性不是那么高的数据。
HDFS安全模式:
安全模式和数据安全有关。如果强制离开安全模式,很可能会造成数据丢失。
启动时,默认进入安全模式30s,然后退出安全模式。
安全模式时候,不能进行增删改(会报SafeModeException),只能查。
检查是否处在安全模式:hadoop dfsadmin -safemode get
进入安全模式:hadoop dfsadmin -safemode enter
离开安全模式:hadoop dfsadmin -safemode leave
它们之间依赖DatanodeProtocol
接口通信:
The only way a NameNode can communicate with a DataNode is by returning values from these functions(DatanodeProtocol类中的functions).
函数原型:public DatanodeCommand[] sendHeartbeat(...){}
调用者和接收者是DataNode,实现者是NameNode。
在
DatanodeProtocol
通信协议中,DataNode是客户端,NameNode是服务端。
DataNode通过sendHeartbeat(...)
来tells NameNode that the DataNode is still alive and well.Includes some status info,too.
NameNode收到DataNode的“心跳”后,它会将反馈信息通过sendHeartbeat(...)
的返回值DatanodeCommand[]
返回。
It alse gives the NameNode a chance to return an array of
DatanodeCommand
objects.
A DatanodeCommand tells the DataNode to invalidata local block(s) or to copy them to other DataNodes, etc.
总结: DataNode通过sendHeartbeat(...)
函数的形参将自身信息传给NameNode;NameNode对DataNode发送的命令是通过函数的返回值的方式。因此该方法的形参和返回值实际上完成了DataNode和NameNode通信的双向交互。
NameNode和DataNode源码中的namenode
属性:
NameNode中,namenode属性是ClientProtocol
类型;
public ClientProtocol namenode = null;
DataNode中,namenode属性是DatanodeProtocol
类型;
public DatanodeProtocol namenode = null;
DataNode中心跳源码跟踪分析:
DataNode中的offerService()
方法是DataNode的主循环。offerService()
方法是在DataNode中的run()
方法中被调用,而run()
又是在DataNode的构造函数中被调用的。也就是说,run()
是在NameNode**一启动就执行了** 。
MR只是个分布式计算框架,除了它之外,还有很多分布式计算框架比如Storm、Spark等。
MR是离线计算框架,更适合做离线计算;Storm是流式计算框架,更适合做纯实时、毫秒级实时计算;Spark是流式的、内存计算框架,更适合做准实时、秒级计算。
所以每个计算框架都有各自的特点和适用场景。
MR设计理念:移动计算,而不是移动数据。
对于每一个split,都会有一个Java线程去执行map任务。
在Shuffling中,会发生合并和排序(会把相同key的进行合并)。
Reducing这一步可能只有一个Reduce任务,也可能有多个;怎么决定是一个还是多个呢,是由程序去决定的,程序可以随意地去定义有几个Reduce。
Shuffler阶段:MR最复杂的一个阶段。
左半边图,经过
merge on disk
后,放到磁盘上的数据已经分区并且排好序了。
一个map任务可能会产生多个文件,这是因为一个map任务的输入或输出可能很大,大到内存装不下;于是就需要在内存中执行一部分,然后spill(溢写)到磁盘,然后接着再执行,再spill,等等;这样一来一个map任务就可能会产生多个文件。
partition这一步在fetch的时候才会起作用;具体按什么规则分区,要看partition这部分代码怎么写的;程序猿不写也可以,因为有默认的分区规则(哈希模运算规则分区)。
分区是为了把map的输出数据进行负载均衡或者说解决数据倾斜问题的,换句话说就是为了给reducer做负载均衡。
Q:上面提到了,reduce会产生数据倾斜,为什么map不会产生数据倾斜?
A:
sort步骤:上面讨论了partition,现在来看sort
默认的排序按照字典序。
方便后面步骤中,map上的数据拷贝到对应的reduce任务所在的机器上去执行。
merge on disk:
这一步叫combiner
。
默认合并规则是按照哈希值合并;可以自定义合并规则。
如果程序猿设置过combiner,那么在这一步中会将有相同key的key/value对的value加起来,以减少溢写到磁盘的数据量。但是,就算程序猿写了combiner代码,也不是一定有机会执行,例子见下图,
如图,对于map输出的三个结果,第一个和第三个无法使用执行combiner,但是第二个可以。第二个中的两条数据
Car, 1
和Car, 1
可以执行combiner、从而变成一条Car, 2
。
Combiner问答:
Q:为什么使用Combiner?
A:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量少,传输时间变短,作业的整体时间变短。
Q:为什么Combiner不作为MR运行的标配,而是可选步骤呢?
A:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
Q:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作呢?
A:Combiner操作发生在map端,处理一个任务锁接收的文件中的数据,不能跨map任务执行;只有reduce可以接受多个马屁任务处理的结果。
reduce:
reduce这一步是在执行reduce任务的机器上进行的(跟map任务不是同一组机器)。
map任务的执行结果需要拷贝到执行reduce任务的机器上。
Q:怎么拷贝?
A:根据partition的结果,只拷贝分给该reduce的数据。例如reduce1只拷贝分给reduce1的数据,而不会去拷贝分给reduce2、reduce3等的数据。
右图中merge
这一步的不由程序猿控制。
注意左图中的
combiner
和右图中的merge
不同。
shuffle过程详解:
reduce过程详解:
走的网络,所以这一步可以将数据压缩后再传送。
同理,reduce输出也可以压缩后再输出。
job
FileInputFormat.setInputPaths(job, $INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
和JobTracker通信
submit(...)
方法,该方法能完成两件事情:一个是通过connect()
方法和JobTracker连接;另一个是通过submitJobInternal(...)
方法提交作业。connect()
方法有个内部类,内部类的run()
方法中new了一个JobClient类实例。init()
方法,该方法中调用了createRPCProxy(...)
方法。createRPCProxy(...)
方法返回值类型为JobSubmissionProtocol,这个类即为TaskTracker和JobTracker通信的协议,也是提交作业的接口。createRPCProxy(...)
方法中通过RPC.getProxy(...)
拿到代理对象。小结:JobClient可以看成一个指向JobTracker的链接,拿到它就相当于拿到一个服务端的代理对象。
提交作业:下面追踪submitJobInternal(...)
(提交作业给系统的内部方法)
jobSubmitClient.submitJob(...)
submitJob(...)
方法,发现是JobSubmissionProtocol接口的方法,而JobTracker实现了这个接口及submitJob(...)
方法小结:更准确的说,是调用了JobTracker的submitJobInternal(...)
方法,因此作业就是这样被提交到JobTracker。
总结: 我们在程序中写的代码如何提交作业到JobTracker中?
job.waitForCompletion(true)
会依次调用两句话,因此依次发生两件事情: connect();
info = jobClient.submitJobInternal(conf);
connect()
方法中,实际上创建了一个JobClient对象。在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol。JobSubmissionProtocol的实现类是JobTracker。jobClient.submitJobInternal(conf)
方法中,调用了JobSubmissionProtocol.submitJob(...)
,即执行的是JobTracker.submitJob(...)
。不同java进程间的对象方法的调用。
一方称作服务端(server),一方称作客户端(client)。可见RPC是C/S结构。
server端提供对象,供client调用,被调用对象的方法的执行发生在server端。
RPC是hadoop框架运行的基础。
注意:
服务端提供的对象必须是一个接口,接口要extends VersionedProtacal
为什么要是接口呢?因为返回的代理对象,而JDK中要求反射的代理对象必须实现接口
客户端能够调用的对象中的方法必须位于对象的接口中。
hadoop的数据类型要求必须实现Writable接口。
Long LongWritable
Integer IntWritable
Boolean BooleanWritable
String Text
Q:java类型如何转化为hadoop基本类型?
A:直接调用hadoop类型的构造方法,或者调用set(…)方法。
Q:hadoop基本类型如何转化为java类型?
A:对于Text,需要调用toString()方法,其他类型调用get()方法。
标签:理解 分布 other 文件夹 删掉 理论 procedure 远程 随机
原文地址:http://blog.csdn.net/chenyyhh92/article/details/54838298