标签:parallel 其他 artifact 安装 target match dfs 加载 hdfs
使用本地模式,不需要安装spark,引入相关JAR包即可:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency>
创建spark:
val sparkUrl = "local" val conf = new SparkConf() //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar")) .set("fs.hdfs.impl.disable.cache", "true") .set("spark.executor.memory", "8g") val spark = SparkSession .builder() .appName("Spark SQL basic example") .config(conf) .config("spark.some.config.option", "some-value") .master(sparkUrl) .getOrCreate()
加载本地文件:
val parquetFileDF = spark.read.parquet("/home/panteng/下载/000001_0") //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")
文件操作:
parquetFileDF.createOrReplaceTempView("parquetFile") val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000") val diffDesc = descDF.distinct().sort("description") diffDesc.createOrReplaceTempView("pre_desc") val zhaoshang = spark.sql("select * from pre_desc") zhaoshang.printSchema()
遍历处理:
zhaoshang.foreach(row => clustering(row)) val regexRdd = spark.sparkContext.parallelize(regexList) regexRdd.repartition(1).saveAsTextFile("/home/panteng/下载/temp6") spark.stop()
附其他函数:
def clustering(row: Row): String = { try { var tempRegex = new Regex("null") if (textPre.equals(row.getAs[String]("pre"))) { textList = row.getAs[String]("description").replaceAll("\\d","0") :: textList return "continue" } else { if (textList.size > 2) { tempRegex = ScalaClient.getRegex(textList) regexList = tempRegex :: regexList } if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) { textPre = row.getAs[String]("pre") textList = textList.dropRight(textList.size) textList = row.getAs[String]("description") :: textList } return "ok - " + tempRegex.toString() } } catch { case e: Exception => println("kkkkkkk" + e) } return "error" }
package scala.learn import top.letsgogo.rpc.ThriftProxy import scala.util.matching.Regex object ScalaClient { def main(args: Array[String]): Unit = { val client = ThriftProxy.client val seqList = List("您尾号9081的招行账户入账人民币689.00元", "您尾号1234的招行一卡通支出人民币11.00元", "您尾号2345的招行一卡通支出人民币110.00元", "您尾号5432的招行一卡通支出人民币200.00元", "您尾号5436的招行一卡通入账人民币142.00元") var words: List[String] = List() for (seq <- seqList) { val list = client.splitSentence(seq) for (wordIndex <- 0 until list.size()) { words = list.get(wordIndex) :: words } } val wordlist = words.map(word => (word, 1)) //方法一:先groupBy再map var genealWords: List[String] = List() wordlist.groupBy(_._1).map { case (word, list) => (word, list.size) }.foreach((row) => { (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords) }) val list = client.splitSentence("您尾号1234的招行一卡通支出人民币200.00元") val regexSeq: StringBuilder = new StringBuilder val specialChar = List("[", "]", "(", ")") for (wordIndex <- 0 until list.size()) { var word = list.get(wordIndex) if (genealWords.contains(word) && !("*".equals(word))) { if (specialChar.contains(word.mkString(""))) { word = "\\" + word } regexSeq.append(word) } else { regexSeq.append("(.*)") } } println(regexSeq) val regex = new Regex(regexSeq.mkString) for (seq <- seqList) { println(regex.findAllIn(seq).isEmpty) } } def getRegex(seqList: List[String]) = { val client = ThriftProxy.client var words: List[String] = List() for (seq <- seqList) { val list = client.splitSentence(seq) for (wordIndex <- 0 until list.size()) { words = list.get(wordIndex) :: words } } val wordlist = words.map(word => (word, 1)) //方法一:先groupBy再map var genealWords: List[String] = List() wordlist.groupBy(_._1).map { case (word, list) => (word, list.size) }.foreach((row) => { (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords) }) val list = client.splitSentence(seqList(0)) val regexSeq: StringBuilder = new StringBuilder val specialChar = List("[", "]", "(", ")") for (wordIndex <- 0 until list.size()) { var word = list.get(wordIndex) if (genealWords.contains(word) && !("*".equals(word))) { if (specialChar.contains(word.mkString(""))) { word = "\\" + word } regexSeq.append(word) } else { if(regexSeq.size > 4) { val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0) if (!"(.*)".equals(endStr)) { regexSeq.append("(.*)") } }else{ regexSeq.append("(.*)") } } } println(regexSeq + " " + seqList.size) val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\\\d+")) //for (seq <- seqList) { // println(regex.findAllIn(seq).isEmpty) //} regex } }
标签:parallel 其他 artifact 安装 target match dfs 加载 hdfs
原文地址:http://www.cnblogs.com/tengpan-cn/p/7497488.html