hdfs上输入文件所在包含两个目录,分别是: /20170503/shoplast/
/20170503/shop/
但是我想过滤掉shop,只把shoplast作为输入
故我实现了过滤器如下:
public static class FileNameFilter implements PathFilter { @Override public boolean accept(Path path) { if (path.getName().endsWith("last")) { return true; } else { return false; } } }
然后mapreduce的输入设为 /20170503/*, 开始执行。。。
结果 Total input paths to process : 0 输入文件数为0! 这什么鬼!
-----------------------------
看了源码之后感觉自己是个弱智啊。源码如下:
protected List<FileStatus> listStatus(JobContext job ) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); List<IOException> errors = new ArrayList<IOException>(); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); for (int i=0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath()); while (iter.hasNext()) { LocatedFileStatus stat = iter.next(); if (inputFilter.accept(stat.getPath())) { if (recursive && stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { result.add(stat); } } } } else { result.add(globStat); } } } } if (!errors.isEmpty()) { throw new InvalidInputException(errors); } LOG.info("Total input paths to process : " + result.size()); return result; }
仔细看这段
for (FileStatus globStat: matches) { if (globStat.isDirectory()) { RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath()); while (iter.hasNext()) { LocatedFileStatus stat = iter.next(); if (inputFilter.accept(stat.getPath())) { if (recursive && stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { result.add(stat); } } } } else { result.add(globStat); } }
以为过滤器是针对最终的输入文件名。如果输入的路径为目录,它会跟进里面的文件的
好吧,接下来修改下过滤器吧。
public static class FileNameFilter implements PathFilter { @Override public boolean accept(Path path) { if (path.getParent().getName().endsWith("last")) { return true; } else { return false; } } }
再次运行,当当当
cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://20170503/* matches 0 files Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://20170503/* matches 0 files
还是读取不到输入文件,这是什么原因呢?看源码吧,还是上面贴出的那个方法。
Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); }
不继续跟进globStatus方法了,想了解的自己去看源码吧。总之,这里是针对父目录的也应用了过滤器
结论:过滤器不光针对最终的文件,输入路径的父目录也要应用过滤器。
原文地址:http://1992zhong.blog.51cto.com/3963309/1921938