标签:catch package row mpi static iterable 上传 extends output
新建Maven项目
根据zookeeper和hadoop版本配置pom.xml,可用使用 echo stat|nc localhost 2181 查看zookeeper版本
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aidata</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hadoop-version>3.0.0</hadoop-version> <zookeeper-version>3.4.5</zookeeper-version> </properties> <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.3</version> <configuration> <classifier>dist</classifier> <appendAssemblyId>true</appendAssemblyId> <descriptorRefs> <descriptor>jar-with-dependencies</descriptor> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
三个有单词文件上传HDFS,使用tab分隔
hdfs dfs -put wc_tes* /input/wc
编写MapReduce程序
package com.aidata.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMRJob { //Map阶段 /** * 输入数据键值对类型: * LongWritable:输入数据的偏移量 * Text:输入数据类型 * * 输出数据键值对类型: * Text:输出数据key的类型 * IntWritable:输出数据value的类型 */ public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\t"); for(String word : words){ //word 1 context.write(new Text(word),new IntWritable(1)); } } } //Reduce阶段 /** * 输入数据键值对类型: * Text:输入数据的key类型 * IntWritable:输入数据的key类型 * * 输出数据键值对类型: * Text:输出数据的key类型 * IntWritable:输出数据的key类型 */ public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // word {1,1,1,...} int sum = 0; for(IntWritable value : values){ sum += value.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) { //1.配置job Configuration conf = new Configuration(); Job job = null; //2.创建job try { job = Job.getInstance(conf); } catch (IOException e) { e.printStackTrace(); } job.setJarByClass(WordCountMRJob.class); //3.给job添加执行流程 //3.1 HDFS中需要处理的文件路径 Path path = new Path(args[0]); try { //job添加输入路径 FileInputFormat.addInputPath(job,path); } catch (IOException e) { e.printStackTrace(); } //3.2设置map执行阶段 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class);//map输出key类型 job.setMapOutputValueClass(IntWritable.class); //map输出value类型 //3.3设置reduce执行阶段 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class);//reduce输出key类型 job.setOutputValueClass(IntWritable.class);//reduce输出value类型 //3.4设置job计算结果输出路径 Path output = new Path(args[1]); FileOutputFormat.setOutputPath(job,output); //4. 提交job,并等待job执行完成 try { boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }
点击maven的package进行打包,jar包会在target目录中,如果idea没有target
上传jar包到集群,运行
hadoop jar bigdata-1.0-SNAPSHOT.jar com.aidata.mapreduce.WordCountMRJob /input/wc/ /output/wc
查看节点的输出结果
hdfs dfs -ls /output/wc
比如CDH中安装了LZO,想使用下
安装lzop
yum install lzop
拷贝jar包到本地,本人使用的CDH6.3.1的,LZOjar包如下
/opt/cloudera/parcels/GPLEXTRAS-6.3.1-1.gplextras6.3.1.p0.1470567/lib/hadoop/lib/hadoop-lzo-0.4.15-cdh6.3.1.jar
三个制表符分隔单词的文件
压缩文件
lzop -v wc*.txt
上传到hdfs
hdfs dfs -put wc*.txt.lzo /input
建立索引
lzo压缩文件的可切片特性依赖于其索引,故我们需要手动为lzo压缩文件创建索引。若无索引,则lzo文件的切片只有一个。
hadoop jar /opt/cloudera/parcels/GPLEXTRAS-6.3.1-1.gplextras6.3.1.p0.1470567/lib/hadoop/lib/hadoop-lzo-0.4.15-cdh6.3.1.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/
将LZOjar包放到idea的resources目录中,点击 add library
第三方包,需要在maven中配置一下,否正maven不识别
maven打包过程用的是maven-compiler-plugin插件进行编译,但是由于项目中存在第三方jar包,maven-compiler-plugin无法获知第三方jar包的位置,因此报错“程序包xxx不存在”,解决方法:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.2</version> <configuration> <source>1.8</source> <target>1.8</target> <compilerArguments> <extdirs>${project.basedir}/src/main/resources</extdirs> </compilerArguments> <encoding>UTF-8</encoding> </configuration> </plugin>
mapreduce程序修改一下
package com.aidata.mapreduce; import com.hadoop.compression.lzo.LzopCodec; import com.hadoop.mapreduce.LzoTextInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMRJob { //Map阶段 public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\t"); for(String word : words){ //word 1 context.write(new Text(word),new IntWritable(1)); } } } //Reduce阶段 public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // word {1,1,1,...} int sum = 0; for(IntWritable value : values){ sum += value.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) { //1.配置job Configuration conf = new Configuration(); Job job = null; //2.创建job try { job = Job.getInstance(conf); } catch (IOException e) { e.printStackTrace(); } job.setJarByClass(WordCountMRJob.class); job.setInputFormatClass(LzoTextInputFormat.class); //配置reduce结果压缩以及压缩格式 FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class); //3.给job添加执行流程 //3.1 HDFS中需要处理的文件路径 Path path = new Path(args[0]); try { //job添加输入路径 FileInputFormat.addInputPath(job,path); } catch (IOException e) { e.printStackTrace(); } //3.2设置map执行阶段 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class);//map输出key类型 job.setMapOutputValueClass(IntWritable.class); //map输出value类型 //3.3设置reduce执行阶段 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class);//reduce输出key类型 job.setOutputValueClass(IntWritable.class);//reduce输出value类型 //3.4设置job计算结果输出路径 Path output = new Path(args[1]); FileOutputFormat.setOutputPath(job,output); //4. 提交job,并等待job执行完成 try { boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }
运行程序
hadoop jar bigdata-1.0-SNAPSHOT.jar com.aidata.mapreduce.WordCountMRJob /input/ /output
如果没有
FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
则要指定输出格式
如若未在程序中配置输入和输出都为Lzo格式,可以在命令行通过 -D 开头的参数进行配置
hadoop jar myjar.jar -D mapred.reduce.tasks=2 -D mapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat -D mapred.output.compress=true -D mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec /input /output
CDH中reduce task数量的设置
标签:catch package row mpi static iterable 上传 extends output
原文地址:https://www.cnblogs.com/aidata/p/12455716.html