码迷,mamicode.com
首页 > 其他好文 > 详细

spark HelloWorld程序(scala版)

时间:2017-09-09 11:58:14      阅读:378      评论:0      收藏:0      [点我收藏+]

标签:parallel   其他   artifact   安装   target   match   dfs   加载   hdfs   

使用本地模式,不需要安装spark,引入相关JAR包即可:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency> 

创建spark:

        val sparkUrl = "local"
        val conf = new SparkConf()
                //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar"))
                .set("fs.hdfs.impl.disable.cache", "true")
                .set("spark.executor.memory", "8g")

        val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config(conf)
                .config("spark.some.config.option", "some-value")
                .master(sparkUrl)
                .getOrCreate()    

加载本地文件:

val parquetFileDF = spark.read.parquet("/home/panteng/下载/000001_0")
            //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")

文件操作:

parquetFileDF.createOrReplaceTempView("parquetFile")

val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000")
val diffDesc = descDF.distinct().sort("description")
diffDesc.createOrReplaceTempView("pre_desc")
val zhaoshang = spark.sql("select * from pre_desc")
zhaoshang.printSchema()

遍历处理:

zhaoshang.foreach(row => clustering(row))
val regexRdd = spark.sparkContext.parallelize(regexList)
regexRdd.repartition(1).saveAsTextFile("/home/panteng/下载/temp6")

spark.stop()

附其他函数:

def clustering(row: Row): String = {
        try {
            var tempRegex = new Regex("null")
            if (textPre.equals(row.getAs[String]("pre"))) {
                textList = row.getAs[String]("description").replaceAll("\\d","0") :: textList
                return "continue"
            } else {
                if (textList.size > 2) {
                    tempRegex = ScalaClient.getRegex(textList)
                    regexList = tempRegex :: regexList
                }
                if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) {
                    textPre = row.getAs[String]("pre")
                    textList = textList.dropRight(textList.size)
                    textList = row.getAs[String]("description") :: textList
                }
                return "ok - " + tempRegex.toString()
            }
        } catch {
            case e: Exception => println("kkkkkkk" + e)
        }
        return "error"
    }

 

技术分享
package scala.learn

import top.letsgogo.rpc.ThriftProxy

import scala.util.matching.Regex

object ScalaClient {
    def main(args: Array[String]): Unit = {
        val client = ThriftProxy.client
        val seqList = List("您尾号9081的招行账户入账人民币689.00元",
            "您尾号1234的招行一卡通支出人民币11.00元",
            "您尾号2345的招行一卡通支出人民币110.00元",
            "您尾号5432的招行一卡通支出人民币200.00元",
            "您尾号5436的招行一卡通入账人民币142.00元")
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence("您尾号1234的招行一卡通支出人民币200.00元")
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                regexSeq.append("(.*)")
            }
        }
        println(regexSeq)
        val regex = new Regex(regexSeq.mkString)
        for (seq <- seqList) {
            println(regex.findAllIn(seq).isEmpty)
        }
    }

    def getRegex(seqList: List[String]) = {
        val client = ThriftProxy.client
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence(seqList(0))
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                if(regexSeq.size > 4) {
                    val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0)
                    if (!"(.*)".equals(endStr)) {
                        regexSeq.append("(.*)")
                    }
                }else{
                    regexSeq.append("(.*)")
                }
            }
        }
        println(regexSeq + "  " + seqList.size)
        val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\\\d+"))
        //for (seq <- seqList) {
        //    println(regex.findAllIn(seq).isEmpty)
        //}
        regex
    }
}
批量数据提取正则

 

spark HelloWorld程序(scala版)

标签:parallel   其他   artifact   安装   target   match   dfs   加载   hdfs   

原文地址:http://www.cnblogs.com/tengpan-cn/p/7497488.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!