实现:在输入目录下的所有文件中,检索给定的字符串所出现的行,并将这些行的内容输出到本地文件系统的输出文件夹中。
shell:hadoop jar resultFilter.jar resultFilter <dfs path> <local path> <match string> <file lines num>
package hdfs.fs.nefu; import java.io.File; import java.io.IOException; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class resultFilter { public static void main(String[] args) throws IOException { // TODO Auto-generated method stub Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); FileSystem local = FileSystem.getLocal(conf);//本地文件系统实例 Path inputdir,localdir; //hdfs上的路径 ,本地路径 FileStatus inputFiles[]; //记录文件信息 FSDataOutputStream output = null; //创建文件的时候 需要输出流 FSDataInputStream input = null; //打开文件的时候 需要输入流 /* * @param scanner:一个可以使用正则表达式来分析基本类型和字符串的 简单文本扫描器。 * Scanner 使用分隔符模式将其输入分解为标记,默认情况下该分隔符模式与空白匹配。 * 然后可以使用不同的 next 方法将得到的标记转换为不同类型的值。 * */ Scanner scan; String str;//待匹配的字符串 byte[] buffer; int singleFileLines; int numLines,i,numFiles; if(args.length!=4){ System.err.println("usage resultFilter<dst path><local path>"+ "<match str><single file lines>"); return; } inputdir = new Path(args[0]); //从输入处 获得每个文件的限制行数 singleFileLines = Integer.parseInt(args[3]); try{ inputFiles = fs.listStatus(inputdir);//获取目录信息 numLines = 0; numFiles = 1; localdir = new Path(args[1]); //若目标路径存在 就删除文件 if(local.exists(localdir)){ local.delete(localdir, true); } for(i=0;i<inputFiles.length;i++){ if(inputFiles[i].isDir() == true){ continue; } //输出文件名 System.out.println(inputFiles[i].getPath().getName()); //打开文件 投入输入流中 input = fs.open(inputFiles[i].getPath()); //从输入流 扫描 scan = new Scanner(input); while(scan.hasNext()){ str = scan.nextLine(); if(str.indexOf(args[2])==-1){ continue; } numLines++; //如果是1 需要创建文件 if(numLines == 1){ /*@param File.separator:系统默认的分隔符 windows:\\ unix,linux:/*/ localdir = new Path(args[1]+File.separator+numLines); output = local.create(localdir); numFiles++; } buffer = (str+"\n").getBytes();//字符串转化为字符数组 output.write(buffer,0,buffer.length); //达到设定的文件行数的上限 if(numLines == singleFileLines){ output.close(); numLines = 0; } } //关闭流 scan.close(); input.close(); } if(output != null){ output.close(); } }catch(Exception e){ e.printStackTrace(); } } }
原文地址:http://blog.csdn.net/xd_122/article/details/39432783