标签:
下面的代码中AffairClient类中包含了三个内之类,分别对应于Hadoop Mapreduce程序运行所需的Mapper类,Reducer类,和主类。
AffairClient类中其余方法用于配置和运行EMR程序。
可以修改相关参数来对程序做适当调整。比如:修改map和reduce函数,添加combiner类,或者设置集群大小。
这个样例是一个去重的mapreduce程序,具体见map函数和reduce函数。
我们创建的是一个Maven项目,因为是在AWS EMR上运行hadoop程序,所以需要AWS和hadoop-client的dependency:
<!-- aws --> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.10.26</version> </dependency> <!-- hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency>
另外一个可能会出现的情况是在运行Java程序的环境下(可能是某台远程服务器)的CLASSPATH没有aws java sdk或hadoop-client对应的jar包,这个时候运行程序可能会出现ClassNotFoundException,所以我们需要在pom.xml中的build->plugins中添加如下代码使得依赖的jar包也能在mvn package的时候被打到jar包里:
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
运行程序前,我们需要把输入文件放到程序中全局变量INPUT_DIR目录下;还需要把jar包放到JAR_DIR目录下,并且jar包名为JAR_NAME。
可以通过如下指令在server上对maven项目打jar包:
$ mvn clean install
我们假设最终将jar包改名为了affair.jar并上传到了s3上对应的位置,之后再affair.jar包同一目录下输入如下指令便可启动AWS EMR的MapReduce程序:
$ java -cp affair.jar AffairClient
在终端显示的结果类似如下:
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L STARTING this job flow runs a mapreduce affair.
PENDING AffairClient$AffairJob Affair
j-61EJHJXPSR2L RUNNING this job flow runs a mapreduce affair.
RUNNING AffairClient$AffairJob Affair
j-61EJHJXPSR2L RUNNING this job flow runs a mapreduce affair.
RUNNING AffairClient$AffairJob Affair
j-61EJHJXPSR2L RUNNING this job flow runs a mapreduce affair.
RUNNING AffairClient$AffairJob Affair
j-61EJHJXPSR2L TERMINATING this job flow runs a mapreduce affair.
COMPLETED AffairClient$AffairJob Affair
j-61EJHJXPSR2L TERMINATING this job flow runs a mapreduce affair.
COMPLETED AffairClient$AffairJob Affair
j-61EJHJXPSR2L TERMINATING this job flow runs a mapreduce affair.
COMPLETED AffairClient$AffairJob Affair
j-61EJHJXPSR2L TERMINATING this job flow runs a mapreduce affair.
COMPLETED AffairClient$AffairJob Affair
j-61EJHJXPSR2L TERMINATED this job flow runs a mapreduce affair.
COMPLETED AffairClient$AffairJob Affair
如果出现错误了(比如说有的时候输出目录已存在就会出现错误),可以在AWS Web管理控制台中的EMR中查看对应的syslog。
代码:
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.record.compiler.generated.ParseException; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure; import com.amazonaws.services.elasticmapreduce.model.Cluster; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult; import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig; import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig; import com.amazonaws.services.elasticmapreduce.model.ListStepsRequest; import com.amazonaws.services.elasticmapreduce.model.ListStepsResult; import com.amazonaws.services.elasticmapreduce.model.PlacementType; import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest; import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; import com.amazonaws.services.elasticmapreduce.model.StepConfig; import com.amazonaws.services.elasticmapreduce.model.StepSummary; import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest; public class AffairClient { private static AmazonElasticMapReduceClient emr; private static final long SLEEP_TIME = 1000 * 30; private static final String JAR_DIR = "s3://bucketname/affair/"; private static final String JAR_NAME = "affair.jar"; private static final String INPUT_DIR = "s3://bucketname/affair/input/"; private static final String OUTPUT_DIR = "s3://bucketname/affair/output/"; private static final String LOG_DIR = "s3://bucketname/affair/log/"; private static final String JOB_FLOW_NAME = "this job flow runs a mapreduce affair."; private static final String AWS_ACCESS_KEY = "YOUR_AWS_ACCESS_KEY"; private static final String AWS_SECRET_KEY = "YOUR_AWS_SECRET_LEY"; public static class AffairMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(value), new Text("")); } } public static class AffairReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static class AffairJob extends Configured implements Tool { public int run(String[] arg0) throws Exception { Configuration conf = getConf(); conf.set("mapred.reduce.tasks", "" + 1); Job job = new Job(conf, "Affair MR job"); job.setJarByClass(AffairJob.class); job.setMapperClass(AffairMapper.class); job.setReducerClass(AffairReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(INPUT_DIR)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AffairJob(), args); System.exit(exitCode); } } public static void main(String[] args) throws ParseException { // emr jobflow try { String mainClass = AffairJob.class.getName(); String stepName = mainClass + " Affair"; runStep(mainClass, JAR_NAME, stepName); } catch (Exception e) { e.printStackTrace(); } } private static void runStep(String mainClass, String jarName, String stepName) throws InterruptedException { String jarPath = JAR_DIR + JAR_NAME; HadoopJarStepConfig hadoopJarStep = new HadoopJarStepConfig(jarPath); hadoopJarStep.setMainClass(mainClass); hadoopJarStep.setArgs(null); StepConfig step = new StepConfig().withName(stepName) .withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW) .withHadoopJarStep(hadoopJarStep); String logUri = LOG_DIR; JobFlowInstancesConfig instances = createInstances(); List<StepConfig> steps = new ArrayList<StepConfig>(); steps.add(step); String jobFlowId = CreateJobFlow(JOB_FLOW_NAME, logUri, instances, steps); terminateJobFlow(jobFlowId); } private static void terminateJobFlow(String jobFlowId) { TerminateJobFlowsRequest request = new TerminateJobFlowsRequest().withJobFlowIds(jobFlowId); emr.terminateJobFlows(request); } private static String CreateJobFlow(String jobFlowName, String logUri, JobFlowInstancesConfig instances, List<StepConfig> steps) throws InterruptedException { AWSCredentials credentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); emr = new AmazonElasticMapReduceClient(credentials); // run job flow RunJobFlowRequest request = new RunJobFlowRequest().withName(jobFlowName) .withLogUri(logUri) .withSteps(steps) .withInstances(instances); RunJobFlowResult result = emr.runJobFlow(request); // get job flow details String jobFlowId = result.getJobFlowId(); boolean runing = true; while(runing) { Thread.sleep(SLEEP_TIME); List<String> jobFlowIdList = new ArrayList<String>(); jobFlowIdList.add(jobFlowId); System.out.println(getJobFlowStatus(jobFlowIdList)); for(String clusterId : jobFlowIdList) { DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterId); DescribeClusterResult describeClusterResult = emr.describeCluster(describeClusterRequest); Cluster cluster = describeClusterResult.getCluster(); if(cluster.getStatus().getState().contains("FAILED") || cluster.getStatus().getState().contains("COMPLETED") || cluster.getStatus().getState().contains("TERMINATED") || cluster.getStatus().getState().contains("SHUTTING_DOWN") || cluster.getStatus().getState().contains("WAITING")) runing = false; break; } } return jobFlowId; } private static String getJobFlowStatus(List<String> jobFlowIdList) { String info = new String(); for(String clusterId : jobFlowIdList) { DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterId); DescribeClusterResult describeClusterResult = emr.describeCluster(describeClusterRequest); Cluster cluster = describeClusterResult.getCluster(); info += cluster.getId() + "\t" + cluster.getStatus().getState() + "\t" + cluster.getName() + "\n"; ListStepsRequest listStepsRequest = new ListStepsRequest().withClusterId(clusterId); ListStepsResult listStepsResult = emr.listSteps(listStepsRequest); for(StepSummary step : listStepsResult.getSteps()) { info += "\t" + step.getStatus().getState() + "\t" + step.getName() + "\n"; } } return info; } private static JobFlowInstancesConfig createInstances() { JobFlowInstancesConfig instances = new JobFlowInstancesConfig() .withHadoopVersion("1.0.3") .withInstanceCount(5) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType(InstanceType.M1Large.toString()) .withSlaveInstanceType(InstanceType.M1Large.toString()) .withPlacement(new PlacementType("us-east-1a")); return instances; } }
在AWS EMR上运行Map Reduce的Java示例程序 及 操作小计
标签:
原文地址:http://www.cnblogs.com/wuyouwulv/p/emr_mapreduce.html