标签:strong tin eth super getc pac 配置 exce ADG
MRAppMaster由YARN框架启动(动态启动,随机选取)
MRAppMaster由YARN框架启动(动态启动,随机选取)
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } return isSuccessful(); }
public void submit() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); }
JobStatus submitJobInternal(Job job, Cluster cluster){ Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //从ResourceManager中获取资源配置存放路径 JobID jobId = submitClient.getNewJobID(); //获取jobid,用于创建目录 job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //生成完整路径 JobStatus status = null; try { copyAndConfigureFiles(job, submitJobDir); //提交jar包和配置文件到资源hdfs配置路径 int maps = writeSplits(job, submitJobDir); //获取切片信息,返回要启动的map任务数量 // Write job file to submit dir writeConf(conf, submitJobFile); //写描述文件xml到hdfs配置路径 } }
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; maps = writeNewSplits(job, jobSubmitDir); return maps; }
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = //通过反射获取InputFormat实例---默认从TextInputFormat中获取 ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); //从实例中获取切片信息(有多个),放在list T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits 生成切片 List<InputSplit> splits = new ArrayList<InputSplit>(); //list存放切片信息 List<FileStatus> files = listStatus(job); //获取:该job,所需要的输入数据所在的目录下的文件列表 for (FileStatus file: files) { //遍历所有的文件 Path path = file.getPath(); //获取文件完整路径 hdfs://hadoopH1:9000/wc/input/wcdata.txt long length = file.getLen(); //获取文件大小 160 if (length != 0) { //处理有内容的文件 BlockLocation[] blkLocations; //获取文件block信息---包括偏移量起止,主机名信息 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //查看该文件是否可以被切片,某些文件不允许切片 long blockSize = file.getBlockSize(); //获取文件块默认大小128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算切片大小,详细见(七),返回大小128M long bytesRemaining = length; //剩余未处理(未切片)字节数为160字节 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //循环进行切片,若是剩余字节/切片大小>SPLIT_SLOP则进行切片,其中SPLIT_SLOP为1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { //单独对最后部分的剩余字节进行切片处理 此处bytesRemaining大小160字节,直接到此处切片 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); return splits; }
protected long getFormatMinSplitSize() { return 1; }
public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); }
public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); }
//指定要处理的输入数据存放的路径 FileInputFormat.setInputPaths(wcjob, new Path("hdfs://hadoopH1:9000/wc/input"));
[ LocatedFileStatus{
path=hdfs://hadoopH1:9000/wc/input/wcdata.txt;
isDirectory=false;
length=160;
replication=1;
blocksize=134217728;
modification_time=1582019683334;
access_time=1582336696735;
owner=hadoop;
group=supergroup;
permission=rw-r--r--;
isSymlink=false}
]
BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); }
long blockSize = file.getBlockSize(); //获取文件块默认大小128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算切片大小,详细见(七),返回大小128M long bytesRemaining = length; //剩余未处理(未切片)字节数为160字节 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //循环进行切片,若是剩余字节/切片大小>SPLIT_SLOP则进行切片,其中SPLIT_SLOP为1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { //单独对最后部分的剩余字节进行切片处理 此处bytesRemaining大小160字节,直接到此处切片 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //传入block位置,和切片偏移量,获取该block当前的索引值 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, // blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); }
splits信息:
[hdfs://hadoopH1:9000/wc/input/wcdata.txt:0+160] //列表 每个元素《切片》后面是切片的起始地址和终止地址
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { //获取该文件偏移量所在的文件块block中,----逻辑转物理 // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
传入信息:
文件路径 hdfs://hadoopH1:9000/wc/input/wcdata.txt 切片偏移量 0 剩余字节数 160 当前文件块所在的主机名 [hadoopH1]
获取管理块的缓存副本的主机列表(主机名) [] 因为我们伪分布设置副本为1,所以为空
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); //其中blockSize大小默认128M maxSize为long类型最大值,minSize为1 }
Hadoop基础---shuffle机制(进一步理解Hadoop机制)
标签:strong tin eth super getc pac 配置 exce ADG
原文地址:https://www.cnblogs.com/ssyfj/p/12355820.html