标签:input boolean ams flat putty asi src cte data
最近一直在看 StreamGraph 生成的源码,刚好有点思路,准备动手了发现,
如果不说下 Transformation 后面的 StreamGraph 会差比较多意思,
所以先做点铺垫。
Transformation 类是 Flink 转换算子的基类,实现类有下面这些
AbstractMultipleInputTransformation
CoFeedbackTransformation
FeedbackTransformation
KeyedMultipleInputTransformation
LegacySourceTransformation
MultipleInputTransformation
OneInputTransformation
PartitionTransformation
PhysicalTransformation
SelectTransformation
SideOutputTransformation
SinkTransformation
SourceTransformation
SplitTransformation
TwoInputTransformation
UnionTransformation
类图:
从这些 Transformation 中也可以看出Flink 支持的转换类型: Source、Sink、一个输入、两个输入、多个输入、Union、侧输出、Select、分区 等转换操作
env.addSource(new SimpleStringSource)
调用 StreamExecutionEnvironment.scala 的 addSource 方法
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = { require(function != null, "Function must not be null.") val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] asScalaStream(javaEnv.addSource(cleanFun, typeInfo)) }
然后调用 javaEnv.addSource 方法
StreamExecutionEnvironment.java
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) { return addSource(function, "Custom Source", typeInfo); } public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); boolean isParallel = function instanceof ParallelSourceFunction; clean(function); // 创建 StreamSource final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); // 使用 StreamSource 创建 DataStreamSource 同时创建 Source 的Transformation 了, this 指 env return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName); }
DataStreamSource.java 使用输入的 sourceName, operator, outTypeInfo, Parallelism 创建 LegacySourceTransformation
public DataStreamSource( StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator, boolean isParallel, String sourceName) { super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism())); this.isParallel = isParallel; if (!isParallel) { setParallelism(1); } }
最终调用到 DataStream.java 的 DataStream 方法,将生成的 LegacySourceTransformation 放入到 DataStream 中
public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) { this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null."); this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null."); }
addSource 返回一个 DataStreamSource ,transformation 是 LegacySourceTransformation,并携带 StreamExecutionEnvironment 对象,继续后面算子的调用
stream
.map(str => str)
代码执行到 map 这一行时,会调用到 DataStream.scala 的 map 方法
def map[R: TypeInformation](fun: T => R): DataStream[R] = { if (fun == null) { throw new NullPointerException("Map function must not be null.") } val cleanFun = clean(fun) val mapper = new MapFunction[T, R] { def map(in: T): R = cleanFun(in) } // 又调用 map map(mapper) } def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] = { if (mapper == null) { throw new NullPointerException("Map function must not be null.") } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] // stream.map 调用到 DataStream.java 中了 asScalaStream(stream.map(mapper, outType).asInstanceOf[JavaStream[R]]) }
注: Flink 主要功能还是在 Java 代码中, Scala Api 就像个外壳,用 Scala 包装了一下,方便 Scala 代码调用,实际上还是会调用到 Java 代码上去
DataStream.java 的 map 方法
这里调用 transform 方法,要构建 Transformation 了,对于这个测试的写法来说, outputType 是 "String", Transformation 名是 "Map"
SimpleOperatorFactory.of(operator)) 获取的工厂类是: SimpleUdfStreamOperatorFactory str => str 就是 Udf
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) { // 可以看到 transform ,这里的 outputType 是 String 了 return transform("Map", outputType, new StreamMap<>(clean(mapper))); } public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); }
DataStream.java 的 doTransform 方法创建 map 算子对于的 OneInputTransformation, 同时创建一个新的 DataStream: SingleOutputStreamOperator
// 真正创建 Transformation protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo // 检验和设置 transformation 输出类型 transformation.getOutputType(); // 创建 一个输入的 Transformation, this.transformation 上一算子的 Transformation 做为 当前算子的 输入 Transformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) // 创建 一个输出的 StreamOperator 也是 DataStream, 也 携带 environment SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); // 讲 创建的 Transformation 放到 ExecutionEnvironment 的 transformations 列表中 getExecutionEnvironment().addOperator(resultTransform); // 返回 SingleOutputStreamOperator return returnStream; }
所以,执行完 map 后,返回的也是一个新的 DataStream,这不像有些用户,objectA.methodA().methodB() 每次都返回原来的 objectA
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo // 检验输出和设置输出类型 transformation.getOutputType(); // configure the type if needed // 检查输入方法类型 if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } // 创建一个 sinkOperator StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); // 使用 sinkOperator 创建 DataStreamSink , 同是创建 SinkTransformation DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); // 把 SinkTransformation 添加到 transformations getExecutionEnvironment().addOperator(sink.getTransformation()); // 返回 DataStreamSink return sink; }
DataStreamSink.java 创建 DataStreamSink 的时候,用当前的 DataStream 和 StreamSink 做参数, 当前的 DataStream 做为 StreamSink 的 input Transformation
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism()); }
在创建 Sink 的 DataStream 的时候,将 前一个算子生成的 DataStream 传入 做为了 Sink 的 input Transformation。
从 env.addSource.map.addSink 最简单的 Flink 程序,可以看到 Flink 创建 StreamGraph 前的 Transformation 生成过程,其他如: flatMap、filter、union、process 基本类似,其他如 join、window、forward 也相差不大
比如:
val join = process.join(map) .where(str => str) .equalTo(str => str) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .apply(new JoinFunction[String, String, String] { override def join(first: String, second: String): String = { first + ";" + second } })
从 apply 追下去,会 看到 在 WindowedStream.java 的 apply 方法中 调用了 input.transform(opName, resultType, operator) 生成了 一个输出的 Transformation
@PublicEvolving public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); } protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream;
Transformation 就是 用户代码,转换成 Flink 算子的结果,Transformation
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
【源码】Flink 三层图结构——StreamGraph 生成前准备 Transformation
标签:input boolean ams flat putty asi src cte data
原文地址:https://www.cnblogs.com/Springmoon-venn/p/13928033.html