标签:resource ace org 大数据 write get nes lte cal
package com.sjw.flink
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
object Score {
def main(args: Array[String]): Unit = {
//批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//读取数据
val dataDS: DataSet[String] = env.readTextFile("src/main/resources/score.txt")
//封装样例类
val linesDS: DataSet[ScoreTest] = dataDS.map(data => {
val arr: Array[String] = data.split(",")
ScoreTest(arr(0).toInt, arr(1).toInt, arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toInt)
})
val filterDS: DataSet[ScoreTest] = linesDS.filter(data =>(data.A>=60 && data.B>=60 && data.C>=60 && data.D>=60 && data.E>=60))
val sumDS: DataSet[(Int, Int)] = filterDS.map(x => {
var sum = x.A + x.B + x.C + x.D + x.E
(x.id, sum)
})
val result: DataSet[Int] = sumDS.filter(_._2>=600).map(_._1)
// result.print()
result.writeAsText("D:\\workspace\\fink_demo\\src\\main\\resources\\id1.txt",WriteMode.NO_OVERWRITE)
env.execute()
}
}
//样例类
case class ScoreTest(id:Int,A:Int,B:Int,C:Int,D:Int,E:Int)
标签:resource ace org 大数据 write get nes lte cal
原文地址:https://www.cnblogs.com/whyuan/p/13276905.html