标签:end imp lin span 令行 matrix 路径 pip ack
概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码
负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。
Apache Beam 于2017年1月10日成为Apache新的顶级项目。
主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。
基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。
关键步骤:
/**
* MIT.
* Author: wangxiaolei(王小雷).
* Date:17-2-20.
* Project:ApacheBeamWordCount.
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class WordCount {
/**
*1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// 将文本行划分为单词
String[] words = c.element().split("[^a-zA-Z‘]+");
// 输出PCollection中的单词
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
*2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
*/
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/**
*3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// 将文本行转换成单个单词
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// 计算每个单词次数
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
/**
*4.可以自定义一些选项(Options),比如文件输入输出路径
*/
public interface WordCountOptions extends PipelineOptions {
/**
* 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
*/
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
/**
* 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
*/
@Description("Path of the file to write to")
@Required
String getOutput();
void setOutput(String value);
}
/**
* 5.运行程序
*/
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
}
}
pom.xml
模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)Spark运行
设置VM options
-DPapex-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
Apex运行
设置VM options
-DPapex-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
Flink运行等等
设置VM options
-DPflink-runner
设置Programe arguments
--inputFile=pom.xml --output=counts
mvn archetype:generate -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots -DarchetypeGroupId=org.apache.beam -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples -DarchetypeVersion=LATEST -DgroupId=org.example -DartifactId=word-count-beam -Dversion="0.1" -Dpackage=org.apache.beam.examples -DinteractiveMode=false
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
Apache Beam WordCount编程实战及源码解读
标签:end imp lin span 令行 matrix 路径 pip ack
原文地址:http://blog.csdn.net/dream_an/article/details/56277784