码迷,mamicode.com
首页 > 其他好文 > 详细

Flink-transformation(三)

时间:2021-06-11 18:25:57      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:分组   atm   pack   nta   tran   cal   lis   first   err   

 

ke01开启: nc -lk 8888

Map

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val streamValue = stream.map(x => {
      if (!x.contains("a")) {
        x
      }
    })
    streamValue.print()
    env.execute()
  }
}

[root@ke01 bigdata]# nc -lk 8888
b
c
b
a
a


结果:
11> b
12> c
1> b
2> ()
3> ()

 

flatMap

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val value = stream.flatMap( x => x.split(","))
    value.print()
    env.execute()
  }
}

a,c
a,d,e

结果:
3> a
3> c
4> a
4> d
4> e

 

使用flatMap代替filter

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

import scala.collection.mutable.ListBuffer
object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    val value = stream.flatMap( x => {
      val rest = new ListBuffer[String]
      if(!x.contains("a")){
        rest += x
      }
      rest.iterator
    })
    value.print()
    env.execute()
  }
}

abc
qwe
结果:
4> qwe

 

keyBy 分流算子,根据用户指定的字段进行分组
package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    // keyBy 0代表第一个,1代表第二个 stream.flatMap(_.split(
" ")).map((_, 1)).keyBy(0).print() env.execute() } } 结果: 8> (a,1) 8> (a,1) 3> (b,1) 3> (b,1) 6> (c,1) 8> (a,1)

 

 

keyBy
package com.text.transformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    }).print()
    env.execute()
  }
}


结果:
8> (a,1)
3> (b,1)
8> (a,1)
3> (b,1)

 

 

reduce,一般结合keyBy使用

package com.text.transformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8888)
    stream.flatMap(_.split(" ")).map((_, 1)).keyBy(new KeySelector[(String, Int), String]{
      override def getKey(value: (String, Int)): String = {
        value._1
      }
    }).reduce((x, y) => (x._1, x._2 + y ._2)).print()
    env.execute()
  }
}

结果:
8> (a,1)
8> (a,2)
8> (a,3)

 

union

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream1 = env.fromCollection(List(("a", 1), ("b", 2)))
    val stream2 = env.fromCollection(List(("a", 3), ("d", 4)))
    val value = stream1.union(stream2)
    value.print()
    env.execute()
  }
}

结果:
11> (b,2)
8> (a,3)
10> (a,1)
9> (d,4)

 

split

package com.text.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object MapOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 偶数分到一个流(first) 奇数分到另外一个流(second)
    val stream = env.generateSequence(1, 100)
    val splitStream = stream.split(info => {
      info % 2 match {
        case 0 => List("first")
        case 1 => List("second")
      }
    })
    // 查找当前流
    splitStream.select("first").print()
    env.execute()
  }
}

结果:

10> 10
6> 6
12> 12
6> 18
8> 8
6> 30
4> 4
2> 2
4> 16

 

Flink-transformation(三)

标签:分组   atm   pack   nta   tran   cal   lis   first   err   

原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14873512.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!