码迷,mamicode.com
首页 > 其他好文 > 详细

flink流计算随笔(6)

时间:2018-11-13 20:39:24      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:director   print   global   connect   doc   run   row   mod   generated   

?生成,编译模板工程

MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   4499      0  0:00:02  0:00:02 --:--:--  4508
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): SocketWindowWordCount
Organization (org.example): myhaspl.com
Version (0.1-SNAPSHOT): 
Scala version (2.11.12): 
Flink version (1.6.0): 

-----------------------------------------------
Project Name: SocketWindowWordCount
Organization: myhaspl.wordcount
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under socketwindowwordcount
MacBook-Air:SocketWindowWordCount myhaspl$ ls   
socketwindowwordcount
$cd  socketwindowwordcount
$sbt clean assembly
MacBook-Air:socketwindowwordcount myhaspl$ sbt run
[info] Loading settings for project socketwindowwordcount-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/socketwindowwordcount/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaaaa/Documents/scala/learn_2/socketwindowwordcount/)
[warn] Multiple main classes detected.  Run ‘show discoveredMainClasses‘ to see the list

Multiple main classes detected, select one to run:

 [1] myhaspl.wordcount.Job
 [2] myhaspl.wordcount.SocketTextStreamWordCount
 [3] myhaspl.wordcount.WordCount

Enter number: 3

[info] (and,1)
[info] (arrows,1)
[info] (be,2)
[info] (is,1)
[info] (nobler,1)
[info] (of,2)
[info] (a,1)
[info] (in,1)
[info] (mind,1)
[info] (or,2)
[info] (slings,1)
[info] (suffer,1)
[info] (against,1)
[info] (arms,1)
[info] (not,1)
[info] (outrageous,1)
[info] (sea,1)
[info] (the,3)
[info] (tis,1)
[info] (troubles,1)
[info] (whether,1)
[info] (fortune,1)
[info] (question,1)
[info] (take,1)
[info] (that,1)
[info] (to,4)
[success] Total time: 8 s, completed Oct 11, 2018 8:56:09 AM
MacBook-Air:learn2 myhaspl$ sbt run
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaaa/Documents/scala/learn_2/learn2/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaaa/Documents/scala/learn_2/learn2/)
[info] Running (fork) learn 
[info] 16
[info] 2 add 5 =7
[info] 2 add 0 =2
[info] 15
[success] Total time: 2 s, completed Oct 11, 2018 11:18:48 AM
MacBook-Air:learn_2 myhaspl$ pwd
/Users/A/Documents/scala/learn_2
MacBook-Air:learn_2 myhaspl$ vim learn_2.scala
object learn {

  def main(args: Array[String]): Unit = {
      println(myPower(2,4))
      println(myAdd(2,5))
      println(myAdd(2))  
      println(mySum(1,2,3,4,5))
  }

  @annotation.tailrec
  def myPower(x:Int,n:Int,t:Int=1):Int={
    if (n<1) t
    else myPower(x,n-1,x*t) 
  }
  def myAdd(x:Int,y:Int=0)={
     val result:Int=x+y
     s"$x add $y =$result"
  }
  def mySum(nums:Int*)={//可变参数
     var sumNum=0
     for (num<-nums){
       sumNum+=num
     }
     sumNum
  }

}
MacBook-Air:learn_2 myhaspl$ ls
learn_2.scala
MacBook-Air:learn_2 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   3185      0  0:00:03  0:00:03 --:--:--  3189
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): learn2
Organization (org.example): myhaspl
Version (0.1-SNAPSHOT): 
Scala version (2.11.12): 
Flink version (1.6.0): 

-----------------------------------------------
Project Name: learn2
Organization: myhaspl
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under learn2
MacBook-Air:learn_2 myhaspl$ ls
learn2      learn_2.scala
MacBook-Air:learn_2 myhaspl$ cd learn2
MacBook-Air:learn2 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn2 myhaspl$ cd sr
-bash: cd: sr: No such file or directory
MacBook-Air:learn2 myhaspl$ cd src
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd main
MacBook-Air:main myhaspl$ ls
resources   scala
MacBook-Air:main myhaspl$ cd scala
MacBook-Air:scala myhaspl$ ls
myhaspl
MacBook-Air:scala myhaspl$ cd myhaspl
MacBook-Air:myhaspl myhaspl$ ls
Job.scala           WordCount.scala
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$ rm *.scala
MacBook-Air:myhaspl myhaspl$ ls
MacBook-Air:myhaspl myhaspl$ cp /Users/aaaaa/Documents/scala/learn_1/src/learn.scala learn.scala
MacBook-Air:myhaspl myhaspl$ ls
learn.scala
MacBook-Air:myhaspl myhaspl$ pwd
/Users/aaaaa/Documents/scala/learn_2/learn2/src/main/scala/myhaspl
MacBook-Air:myhaspl myhaspl$ sbt clean assembly

MacBook-Air:learn2 myhaspl$ pwd
/Users/aaaaa/Documents/scala/learn_2/learn2
MacBook-Air:learn2 myhaspl$  sbt clean assembly
[info] Updated file /Users/bbbb/Documents/scala/learn_2/learn2/project/build.properties: set sbt.version to 1.2.4
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaa/Documents/scala/learn_2/learn2/project
[info] Updating ProjectRef(uri("file:/Users/aassfdfsdaxg/Documents/scala/learn_2/learn2/project/"), "learn2-build")...
[info] Done updating.
[warn] There may be incompatibilities among your library dependencies.
[warn] Run ‘evicted‘ to see detailed eviction warnings
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/cccccc/Documents/scala/learn_2/learn2/)
[success] Total time: 0 s, completed Oct 11, 2018 11:03:29 AM
[info] Updating ...
[info] Done updating.
[info] Compiling 1 Scala source to /Users/AAA/Documents/scala/learn_2/learn2/target/scala-2.11/classes ...
[info] Done compiling.
[info] Checking every *.class/*.jar file‘s SHA-1.
[info] Merging files...
[info] SHA-1: eaaa2f651ba4387defc6282c2de36e6dd4402f32
[info] Packaging /Users/aaaaaasdf/Documents/scala/learn_2/learn2/target/scala-2.11/Flink Project-assembly-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 12 s, completed Oct 11, 2018 11:03:41 AM
MacBook-Air:learn2 myhaspl$ sbt
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/learn2/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaa/Documents/scala/learn_2/learn2/)
[info] sbt server started at local:///Users/aaaaaa/.sbt/1.0/server/e759170bdd67731a9bda/sock
sbt:Flink Project> compile
[success] Total time: 1 s, completed Oct 11, 2018 11:27:46 AM
sbt:Flink Project> package
[info] Packaging /Users/aaaaaa/Documents/scala/learn_2/learn2/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Oct 11, 2018 11:27:53 AM

MacBook-Air:target myhaspl$ cd scala-2.11
MacBook-Air:scala-2.11 myhaspl$ ls
Flink Project-assembly-0.1-SNAPSHOT.jar flink-project_2.11-0.1-SNAPSHOT.jar
classes                 resolution-cache

跑官方例子,从端口接收字符串文本,然后,wordcount

MacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Air.local.
Starting taskexecutor daemon on host MacBook-Air.local.

MacBook-Air:Documents myhaspl$ nc -l 9800
aa ss dd ff gg
bye
^C

MacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800
Starting execution of program
Program execution finished
Job with JobID 1625d3a29cfdf8fa8a77c3e5c8e9d30e has finished.
Job Runtime: 9181 ms
单词以5秒的时间窗口(处理时间,滚动窗口)计数,并打印到标准输出。监视任务管理器的输出文件,并在nc中写入一些文本(点击后一行一行地将输入发送到Flink):
启动nc -l 9800后需要快速输入,5秒

MacBook-Air:flink myhaspl$ ls -la libexec/log/*
-rw-r--r--  1 myhaspl  admin  19733 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.log
-rw-r--r--  1 myhaspl  admin      0 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.out
-rw-r--r--  1 myhaspl  admin  32693 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
-rw-r--r--  1 myhaspl  admin     43 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
aa : 1
gg : 1
ff : 1
dd : 1
ss : 1
bye : 1
MacBook-Air:flink myhaspl$ 

清空log下的文件,并重新启动cluster,然后,输入几段字符,记住输入时一定快,在5秒内搞定,否则在.out文件中找不到结果的,因为程序上设定如此


object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run ‘SocketWindowWordCount --port <port>‘")
                return
            }
        }

        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, ‘\n‘)

        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // Data type for words with count
    case class WordWithCount(word: String, count: Long)
}
MacBook-Air:flink myhaspl$ ./libexec/bin/stop-cluster.sh
MacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.sh
MacBook-Air:Documents myhaspl$ nc -l 9800
ss
ss
ff 
gg 
ss
^C
MacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800
Starting execution of program
Program execution finished
Job with JobID 3cda8e474ed0050ef05828fc91cd8302 has finished.
Job Runtime: 3509 ms

MacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
ss : 2
gg : 1
ff : 1
MacBook-Air:flink myhaspl$ 

我们下面把时间 由5秒搞长一点
重新建立一个flink模板

MacBook-Air:scala myhaspl$ mkdir learn_3
MacBook-Air:scala myhaspl$ cd learn_3
MacBook-Air:learn_3 myhaspl$ ls
MacBook-Air:learn_3 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   5130      0  0:00:02  0:00:02 --:--:--  5274
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): learn3
Organization (org.example): myhaspl
Version (0.1-SNAPSHOT): 
Scala version (2.11.12): 
Flink version (1.6.0): 

-----------------------------------------------
Project Name: learn3
Organization: myhaspl
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under learn3
MacBook-Air:learn_3 myhaspl$ 
MacBook-Air:learn_3 myhaspl$ cd learn3
MacBook-Air:learn3 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn3 myhaspl$ cd src
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd main
MacBook-Air:main myhaspl$ ls
resources   scala
MacBook-Air:main myhaspl$ cd scala
MacBook-Air:scala myhaspl$ ls
myhaspl
MacBook-Air:scala myhaspl$ cd myhaspl
MacBook-Air:myhaspl myhaspl$ ls
Job.scala           WordCount.scala
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$ rm W*.scala
MacBook-Air:myhaspl myhaspl$ rm J*.scala
MacBook-Air:myhaspl myhaspl$ ls
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$ 
MacBook-Air:myhaspl myhaspl$ pwd
/Users/xxxxx/Documents/scala/learn_3/learn3/src/main/scala/myhaspl
MacBook-Air:myhaspl myhaspl$ cd ../../..
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd ../..
MacBook-Air:learn_3 myhaspl$ ls
learn3
MacBook-Air:learn_3 myhaspl$ cd learn3
MacBook-Air:learn3 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn3 myhaspl$ sbt clean assembly
MacBook-Air:learn3 myhaspl$ sbt
[info] Loading settings for project learn3-build from assembly.sbt ...
[info] Loading project definition from /Users/zzzzzz/Documents/scala/learn_3/learn3/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/zzzzzz/Documents/scala/learn_3/learn3/)
[info] sbt server started at local:///Users/lzz/.sbt/1.0/server/1d8f10f02b1fbf814396/sock
sbt:Flink Project> compile
[success] Total time: 1 s, completed Oct 11, 2018 3:30:19 PM
sbt:Flink Project> package
[info] Packaging /Users/zzz/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Oct 11, 2018 3:30:29 PM
sbt:Flink Project> exit
[info] shutting down server
MacBook-Air:learn3 myhaspl$ 

停止cluster,重新启动,这次取消了时间窗口,可以慢慢输入

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 29505) on host MacBook-Air.local.
Stopping standalonesession daemon (pid: 29093) on host MacBook-Air.local.
MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/log
flink-myhaspl-standalonesession-0-MacBook-Air.local.log flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
flink-myhaspl-standalonesession-0-MacBook-Air.local.out flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:learn3 myhaspl$ rm ~/Documents/flink/libexec/log/*
MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/log
MacBook-Air:learn3 myhaspl$ 
MacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800
MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Air.local.
Starting taskexecutor daemon on host MacBook-Air.local.
MacBook-Air:learn3 myhaspl$ 

MacBook-Air:learn_3 myhaspl$ nc -l 9800
Unbounded streams have a start but no defined end. They do not terminate and provide data as it is generated. Unbounded streams must be continuously processed, i.e., events must be promptly handled after they have been ingested. It is not possible to wait for all input data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data often requires that events are ingested in a specific order, such as the order in which events occurred, to be able to reason about result completeness.
Bounded streams have a defined start and end. Bounded streams can be processed by ingesting all data before performing any computations. Ordered ingestion is not required to process bounded streams because a bounded data set can always be sorted. Processing of bounded streams is also known as batch processing.
^C

MacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800
Starting execution of program
Program execution finished
Job with JobID 7f66cd617236d64520a08a44b0544234 has finished.
Job Runtime: 63502 ms
MacBook-Air:learn3 myhaspl$ 

MacBook-Air:learn_3 myhaspl$ ls ~/Documents/flink/libexec/log
flink-myhaspl-standalonesession-0-MacBook-Air.local.log
flink-myhaspl-standalonesession-0-MacBook-Air.local.out
flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:learn_3 myhaspl$ cat ~/Documents/flink/libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
...
(bounded,4)
(data,5)
(set,1)
(can,2)
(always,1)
(be,6)
(sorted,1)
(processing,2)
(of,1)
(bounded,5)
(streams,6)
(is,5)
(also,1)
(known,1)
(as,3)
(batch,1)
(processing,3)
(bye,1)

启动flink Scala REPL

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-scala-shell.sh local
Starting Flink Shell:
log4j:WARN No appenders could be found for logger (org.apache.flink.configuration.GlobalConfiguration).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Starting local Flink cluster (host: localhost, port: 8081).

Connecting to Flink cluster (host: localhost, port: 8081).

                         ?▓██▓██?
                     ▓████??█▓?▓███▓?
                  ▓███▓??        ???▓██?  ?
                ?██?   ??▓▓█▓▓??      ?████
                ██?         ??▓███?    ?█?█?
                  ?▓█            ███   ▓??██
                    ▓█       ?????▓██▓???▓▓█
                  █? █   ???       ███▓▓█ ?█???
                  ████?   ?▓█▓      ██??? ▓███?
               ??█▓▓██       ▓█?    ▓█?▓██▓ ?█?
         ▓??▓████? ██         ?█    █▓??█???█?
        ███▓?██▓  ▓█           █   █▓ ?▓█▓▓█?
      ?██▓  ?█?            █  █? ?█████▓? ██▓??
     ███? ? █?          ▓ ?█ █████???    ?█?▓  ▓?
    ██▓█ ??▓?          ▓███████▓?       ?█? ?▓ ▓██▓
 ?██▓ ▓█ █▓█       ??█████▓▓??         ██??  █ ?  ▓█?
 ▓█▓  ▓█ ██▓ ?▓▓▓▓▓▓▓?              ?██▓           ?█?
 ▓█    █ ▓███▓??              ?▓▓▓███▓          ??? ▓█
 ██▓    ██?    ??▓▓███▓▓▓▓▓██████▓?            ▓███  █
▓███? ███   ?▓▓???   ?▓████▓?                  ??▓?  █▓
█▓??▓▓██  ??????????▓██▓?                            █▓
██ ▓??█   ▓▓▓▓???  ?█▓       ?▓▓██▓    ▓?          ??▓
▓█▓ ▓?█  █▓?  ??▓▓██?            ?▓█?   ??????▓█████?
 ██? ▓█?█?  ?▓▓?  ▓█                █?      ????   ?█?
 ▓█   ?█▓   ?     █?                ?█              █▓
  █▓   ██         █?                 ▓▓        ?█▓▓▓?█?
   █▓ ?▓██?       ▓?                  ▓█▓?????▓█?    ?█
    ██   ▓█▓?      ?                    ??█?██?      ▓▓
     ▓█?   ?█▓??                         ?? █?█▓?????██
      ?██?    ?▓▓?                     ▓██▓?█? ?▓▓▓▓?█▓
        ?▓██?                          ▓?  ?█▓█  ?????
            ?▓▓▓▓▓?????????????????????????▓▓  ▓??█?

              F L I N K - S C A L A - S H E L L

NOTE: Use the prebound Execution Environments to implement batch or streaming programs.

  Batch - Use the ‘benv‘ variable

    * val dataSet = benv.readTextFile("/path/to/data")
    * dataSet.writeAsText("/path/to/output")
    * benv.execute("My batch program")

    HINT: You can use print() on a DataSet to print the contents to the shell.

  Streaming - Use the ‘senv‘ variable

    * val dataStream = senv.fromElements(1, 2, 3, 4)
    * dataStream.countWindowAll(2).sum(0).print()
    * senv.execute("My streaming program")

    HINT: You can only print a DataStream to the shell in local mode.

scala> 
scala> val text = benv.fromElements(
     |   "To be, or not to be,--that is the question:--",
     |   "Whether ‘tis nobler in the mind to suffer",
     |   "The slings and arrows of outrageous fortune",
     |   "Or to take arms against a sea of troubles,")
scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@254dfd34

scala> counts.print()
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

print()命令将自动将指定的任务发送给JobManager执行,并在终端中显示计算结果。

可以将结果写入文件。然而,在这种情况下,您需要调用execute来运行您的程序:

 benv.execute("MyProgram")
scala> val text = benv.fromElements("Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.")
text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6c551798

scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@2be71a78
scala> counts.print()
(1,1)
(3,1)
(a,4)
(amount,1)
(but,1)
(certain,1)
(compete,1)
(cpu,1)
(currently,1)
(dedicate,1)
(each,2)
(example,1)
(fixed,1)
(for,2)
(from,1)
(happens,1)
(has,1)
(here,1)
(instead,1)
(isolation,1)
(its,1)
(jobs,1)
(managed,4)
(means,1)
(memory,4)
(no,1)
(not,1)
(note,1)
(of,5)
(only,1)
(other,1)
(represents,1)
(reserved,1)
(resources,2)
(separate,1)
(slot,2)
(slots,2)
(slotting,1)
(subset,1)
(subtask,1)
(subtasks,1)
(task,1)
(taskmanager,2)
(tasks,1)
(that,2)
(the,3)
(three,1)
(to,1)
(will,2)
(with,2)
scala> :q
 good bye ..
MacBook-Air:learn3 myhaspl$ 

flink流计算随笔(6)

标签:director   print   global   connect   doc   run   row   mod   generated   

原文地址:http://blog.51cto.com/13959448/2316212

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!