标签:sip nts var partition 使用 enc char park conf
//统计access.log文件里面IP地址对应的省份,并把结果存入到mysql
package access1 import java.sql.DriverManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext} object AccessIp { def main(args: Array[String]): Unit = { //new sc val conf = new SparkConf () .setAppName ( this.getClass.getSimpleName ) .setMaster ( "local[*]" ) val sc = new SparkContext ( conf ) //读取数据 val accesslines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\access.log" ) val iplines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\ip.txt" ) //处理数据 val ip1 = iplines.map ( tp => { val splits = tp.split ( "[|]" ) val start = splits ( 2 ).toLong val end = splits ( 3 ).toLong val province = splits ( 6 ) (start, end, province) } ).collect () //广播变量(这里使用是不对,当数据使用三次的时候,在使用广播变量,否则会占内存) val broads: Broadcast[Array[(Long, Long, String)]] = sc.broadcast ( ip1 ) //处理数据 val result2 = accesslines.map ( tp => { val splits = tp.split ( "[|]" ) val ip = splits ( 1 ) val ips = MyUtils.ip2Long ( ip ) val valiues: Array[(Long, Long, String)] = broads.value val index = MyUtils.binarSearch ( valiues, ips ) var province = "" if (index != -1) { province = valiues ( index )._3 } (province, 1) } ).reduceByKey ( _ + _ ).sortBy ( -_._2 ) //写入mysql result2.foreachPartition ( filter => { //获取mysql的链接 val connection = DriverManager.getConnection ( "jdbc:mysql://localhost:3306/test1?characterEncoding=UTF-8&serverTimezone=GMT%2B8", "root", "123456" ) filter.foreach ( tp => { val ps = connection.prepareStatement ( "insert into suibian values(?,?)" ) //设置参数 ps.setString ( 1, tp._1 ) ps.setInt ( 2, tp._2 ) //提交 ps.executeLargeUpdate () ps.close () } ) connection.close () } ) sc.stop () broads.unpersist ( true ) } }
package access1 object MyUtils { //ip地址转换为lang类型 def ip2Long(ip: String): Long = { val fragments = ip.split ( "[.]" ) var ipNum = 0L for (i <- 0 until fragments.length) { ipNum = fragments ( i ).toLong | ipNum << 8L } ipNum } //二分查找法 def binarSearch(array: Array[(Long, Long, String)], target: Long): Int = { var low = 0 var high = array.length - 1 while (low <= high) { var mid = low + ( high - low ) / 2 if (array ( mid )._1 <= target && array ( mid )._2 >= target) { return mid } else if (array ( mid )._1 > target) { high = mid - 1 } else { low = mid + 1 } } return -1 } }
标签:sip nts var partition 使用 enc char park conf
原文地址:https://www.cnblogs.com/wangshuang123/p/11082113.html