<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
* Skeleton for a Flink Streaming Job.
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
* <p>To package your appliation into a JAR file for execution, run
* ‘mvn clean package‘ on the command line.
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for ‘mainClass‘).
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
* Here, you can start creating your execution plan for Flink.
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
* then, transform the resulting DataStream<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
* and many more.
* Have a look at the programming guide for the Java API:
* http://flink.apache.org/docs/latest/apis/streaming/index.html
DataStream<String> text = env.socketTextStream("", 9000);
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
// execute program
env.execute("Java WordCount from SocketTextStream Example");
