spark streaming 开发实例
spark 原生语言是scala, 我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1。
网上下载 scala-2.10.1 安装包。解压即可。
path 增加 %SCALA_HOME%\bin
我使用的Ide 是Intellj idea ,为了提供scala 支持,还要先安装Scala 插件。
插件安装后,新建一个 project ,选择scala
选择安装的 scala SDK 路径
add maven 支持
再新建一个maven module
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rihai.spark</groupId> <artifactId>spark-streaming</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>hdfs-streaming</module> </modules> <properties> <scala.version>2.10.1</scala.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.4.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spark-streaming</artifactId> <groupId>com.rihai.spark</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hdfs-streaming</artifactId> </project>
pom 说明:
maven编译scala,需要一个专门的maven-scala-plugin。该插件可以编译java 和 scala。
然后在main文件夹下,建一个scala文件夹,File->project structure 设置scala文件夹为 sources 文件类型。
一个spark 项目就创建好了。
spark streaming 简单理解就是分批次处理数据源的数据,然后输出到外部数据源。最大优点是可以做到秒级实时处理。
streaming 数据源可以为:文件系统(hdfs),kafka,flume等。输出方式有:文件系统(hdfs),databases等。
在本例中以 文件系统 作为数据源,实时处理用户访问的日志数据。
U86942038,P68658056,1,2016-09-27 15:17:01:137
U25452627,P27395813,5,2016-09-27 15:19:43:901
数据源目录的日志会不断更新,streaming 程序会定时处理该目录更新的日志数据。
创建一个scala class ,如图:
1 package com.rihai.spark.hdfs 2 3 4 import org.apache.spark._ 5 import org.apache.spark.streaming._ 6 import scala.collection.mutable 7 import scala.collection.mutable.ListBuffer 8 9 /** 10 * Created by rihaizhang on 2016/9/26. 11 */ 12 object PageLoggingStreaming { 13 14 def createContext(appName: String, timeUnit: Duration, checkpointPath: String, dataPath: String): StreamingContext = { 15 16 println("Creating new context !") 17 val conf = new SparkConf().setAppName(appName) 18 val ssc = new StreamingContext(conf, timeUnit) 19 ssc.checkpoint(checkpointPath) 20 21 //e.g. 22 //U86942038,P68658056,1,2016-09-27 15:17:01:137 23 24 val lines = ssc.textFileStream(dataPath) 25 26 val page_user = lines.map(line => { 27 28 val strs = line.split(",") 29 30 (strs(1), strs(0)) 31 32 }) 33 34 //单次page-user 35 val p_u = page_user.groupByKey().flatMap(s => { 36 37 val set = new mutable.HashSet[String]() 38 39 for (user <- s._2) { 40 set.+=(user) 41 } 42 43 val listBuffer = new ListBuffer[(String, String)] 44 45 for (elem <- set) { 46 listBuffer.+=((s._1, elem)) 47 } 48 listBuffer.toTraversable 49 }) 50 //p_u.persist(StorageLevel.MEMORY_ONLY) 51 52 // window page-user 53 val wp_u = p_u.window(timeUnit * 2, timeUnit * 2).groupByKey().flatMap(s => { 54 55 val set = new scala.collection.mutable.HashSet[String]() 56 57 for (user <- s._2) { 58 set.+=(user) 59 } 60 61 val listBuffer = new ListBuffer[(String, String)] 62 63 for (elem <- set) { 64 listBuffer.+=((s._1, elem)) 65 } 66 listBuffer.toTraversable 67 }) 68 69 val updateFun = (newValues: Seq[Iterable[String]], prevValues: Option[Iterable[String]]) => { 70 71 val set = new scala.collection.mutable.HashSet[String]() 72 73 for (user <- prevValues.getOrElse(Iterable[String]())) { 74 set.+=(user) 75 } 76 for (value <- newValues) { 77 for (user <- value) { 78 set.+=(user) 79 } 80 } 81 Some(set.toIterable) 82 } 83 84 // updateState page-user 85 val sp_u = p_u.groupByKey().updateStateByKey[Iterable[String]](updateFun).flatMap(s => { 86 87 val listBuffer = new ListBuffer[(String, String)] 88 for (elem <- s._2) { 89 listBuffer.+=((s._1, elem)) 90 } 91 listBuffer.toTraversable 92 }) 93 94 sp_u.checkpoint(timeUnit * 8) 95 96 //print 97 p_u.print() 98 wp_u.print() 99 sp_u.print() 100 101 ssc 102 } 103 104 def main(args: Array[String]): Unit = { 105 106 if (args.length < 2) { 107 System.err.println("Your arguments error !") 108 System.exit(1) 109 } 110 111 val time_unit = Seconds(20) 112 val checkpointPath = args(0) 113 val dataPath = args(1) 114 115 val ssc = StreamingContext.getOrCreate(checkpointPath, () => createContext("page.logging.streaming", time_unit, checkpointPath, dataPath)) 116 // val ssc = createContext("page.logging.streaming", time_unit, checkpointPath, dataPath) 117 ssc.start() 118 119 for (i <- 1 to 10) { 120 println("loop-" + i) 121 Thread.sleep(1000 * 20) 122 } 123 124 ssc.stop(true, true) 125 //ssc.awaitTermination() 126 127 System.exit(0) 128 } 129 130 131 }
配置run configuration ,设置checkpoint目录和数据源目录
spark streaming会自动识别新增文件,并读取。
Creating new context ! loop-1 16/10/11 18:35:40 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes ------------------------------------------- Time: 1476182140000 ms ------------------------------------------- ------------------------------------------- Time: 1476182140000 ms ------------------------------------------- loop-2 ------------------------------------------- Time: 1476182160000 ms ------------------------------------------- (P68658056,U86942038) (P27395813,U21453697) (P27395813,U12142025) (P27395813,U26712632) ------------------------------------------- Time: 1476182160000 ms ------------------------------------------- (P68658056,U86942038) (P27395813,U21453697) (P27395813,U12142025) (P27395813,U26712632) ------------------------------------------- Time: 1476182160000 ms ------------------------------------------- (P68658056,U86942038) (P27395813,U21453697) (P27395813,U12142025) (P27395813,U26712632) loop-3 ------------------------------------------- Time: 1476182180000 ms ------------------------------------------- (P68658056,U86142038) (P27395813,U21453697) (P27395813,U26941232) (P27395814,U12142025) ------------------------------------------- Time: 1476182180000 ms ------------------------------------------- (P68658056,U86142038) (P68658056,U86942038) (P27395813,U21453697) (P27395813,U12142025) (P27395813,U26712632) (P27395813,U26941232) (P27395814,U12142025) loop-4 ------------------------------------------- Time: 1476182200000 ms ------------------------------------------- ------------------------------------------- Time: 1476182200000 ms ------------------------------------------- (P68658056,U86142038) (P27395813,U21453697) (P27395813,U26941232) (P27395814,U12142025) ------------------------------------------- Time: 1476182200000 ms ------------------------------------------- (P68658056,U86142038) (P68658056,U86942038) (P27395813,U21453697) (P27395813,U12142025) (P27395813,U26712632) (P27395813,U26941232) (P27395814,U12142025)
先写一个日志生成程序,定时往一个临时hdfs目录写入日志文件,并移动至最终目录。先放临时目录的原因是为了保证其原子性。hadoop 的开发可以参考 hadoop 开发&调试
1 package com.rihai.hadoop.hdfs; 2 3 4 import java.io.IOException; 5 import java.text.SimpleDateFormat; 6 import java.util.*; 7 8 /** 9 * Created by rihaizhang on 9/7/2016. 10 */ 11 public class CreateLogging { 12 13 private static List<String> usrIdList; 14 private static List<String> pageList; 15 private static int usrCount = 100; 16 private static int pageCount = 1000; 17 private static int scoreMax = 5; 18 private static String tempPath = "/user/rihai/logdata/tmp"; 19 private static String mainPath = "/user/rihai/logdata/"; 20 //private static String tempPath = "hdfs://master:9000/user/rihai/logdata/tmp"; 21 //private static String mainPath = "hdfs://master:9000/user/rihai/logdata/"; 22 23 public static void main(String[] args) throws InterruptedException, IOException { 24 25 buildUsrIds(); 26 bulidPageList(); 27 bulidLog(); 28 System.out.println("正在运行"); 29 Scanner sc = new Scanner(System.in); 30 String input = sc.nextLine(); 31 System.out.println("运行结束"); 32 } 33 34 /** 35 * 生成日志 36 * 37 * @throws InterruptedException 38 * @throws IOException 39 */ 40 private static void bulidLog() throws InterruptedException, IOException { 41 42 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); 43 SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMddHHmmss"); 44 for (int i = 0; i < 100; i++) { 45 StringBuilder sb = new StringBuilder(); 46 //build log 47 for (int j = 0; j < 100; j++) { 48 if (j > 0) { 49 sb.append("\n"); 50 } 51 int pIndex = GetRandom(pageCount); 52 int uIndex = GetRandom(usrCount); 53 int score = GetRandom(scoreMax) + 1; 54 String datestr = format.format(Calendar.getInstance().getTime()); 55 sb.append(String.format("%s,%s,%s,%s", usrIdList.get(uIndex), pageList.get(pIndex), score, datestr)); 56 } 57 String fileName = String.format("log_%s.txt", format2.format(Calendar.getInstance().getTime())); 58 //send to hdfs 59 System.out.println("准备写入"); 60 SendToHdfs(fileName, sb.toString()); 61 //sleep 62 Thread.sleep(1000 * 60 * 2); 63 } 64 65 } 66 67 /** 68 * 发送Hdfs 69 * 70 * @param fileName 71 * @param content 72 * @throws IOException 73 */ 74 private static void SendToHdfs(String fileName, String content) throws IOException { 75 76 HdfsUtil hdfsUtil = new HdfsUtil(); 77 78 if (!hdfsUtil.exists(tempPath)) { 79 boolean result = hdfsUtil.createDirectory(mainPath); 80 if (result) { 81 System.out.println(tempPath + " 创建成功!"); 82 } else { 83 System.out.println(tempPath + " 创建失败!"); 84 return; 85 } 86 } 87 88 String tempFileName = tempPath + "/" + fileName; 89 String newFileName = mainPath + "/" + fileName; 90 91 hdfsUtil.createFile(tempFileName, content); 92 System.out.println(String.format("写入%s成功", tempFileName)); 93 94 boolean result = hdfsUtil.renameFile(tempFileName, newFileName); 95 96 System.out.println(String.format("移动至%s%s", newFileName, result ? "成功" : "失败")); 97 98 } 99 100 /** 101 * 随机生成页面 102 */ 103 private static void bulidPageList() { 104 Random random = new Random(); 105 /** 106 * e.g. 107 * P93002432 108 */ 109 pageList = new ArrayList<String>(); 110 for (int i = 0; i < pageCount; i++) { 111 int temp = random.nextInt(100000000); 112 pageList.add(String.format("P%08d", temp)); 113 } 114 } 115 116 /** 117 * 随机生成用户 118 */ 119 private static void buildUsrIds() { 120 Random random = new Random(); 121 /** 122 * e.g. 123 * U00234999 124 */ 125 usrIdList = new ArrayList<String>(); 126 for (int i = 0; i < usrCount; i++) { 127 int temp = random.nextInt(100000000); 128 usrIdList.add(String.format("U%08d", temp)); 129 } 130 } 131 132 /** 133 * 取随机数 134 * 135 * @param max 136 * @return 137 */ 138 private static int GetRandom(int max) { 139 Random random = new Random(); 140 int temp = random.nextInt(max); 141 return temp; 142 } 143 144 }
1 package com.rihai.hadoop.hdfs; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.*; 5 import org.apache.hadoop.io.IOUtils; 6 7 import java.io.ByteArrayOutputStream; 8 import java.io.IOException; 9 10 /** 11 * hdfs 工具类 12 * Created by rihaizhang on 9/6/2016. 13 */ 14 public class HdfsUtil { 15 private Configuration conf = new Configuration(); 16 17 public HdfsUtil() { 18 } 19 20 public HdfsUtil(Configuration conf) { 21 this.conf = conf; 22 } 23 24 /** 25 * 检查文件或目录是否存在 26 * 27 * @param path 28 * @return 29 * @throws IOException 30 */ 31 public boolean exists(String path) throws IOException { 32 try (FileSystem fs = FileSystem.get(conf)) { 33 return fs.exists(new Path(path)); 34 } 35 } 36 37 /** 38 * 创建目录 39 * 40 * @param dirPath 41 * @return 42 * @throws IOException 43 */ 44 public boolean createDirectory(String dirPath) throws IOException { 45 try (FileSystem fs = FileSystem.get(conf)) { 46 boolean result = fs.mkdirs(new Path(dirPath)); 47 return result; 48 } 49 } 50 51 /** 52 * 创建文件 53 * 54 * @param filePath 55 * @param bytes 56 * @throws IOException 57 */ 58 public void createFile(String filePath, byte[] bytes) throws IOException { 59 try (FileSystem fs = FileSystem.get(conf)) { 60 try (FSDataOutputStream output = fs.create(new Path(filePath))) { 61 output.write(bytes); 62 } 63 } 64 } 65 66 /** 67 * 创建文件 68 * 69 * @param filePath 70 * @param contents 71 * @throws IOException 72 */ 73 public void createFile(String filePath, String contents) throws IOException { 74 createFile(filePath, contents.getBytes("UTF-8")); 75 } 76 77 /** 78 * 追加文件 79 * 80 * @param filePath 81 * @param bytes 82 * @throws IOException 83 */ 84 public void appendFile(String filePath, byte[] bytes) throws IOException { 85 try (FileSystem fs = FileSystem.get(conf)) { 86 try (FSDataOutputStream output = fs.append(new Path(filePath))) { 87 output.write(bytes); 88 } 89 } 90 } 91 92 /** 93 * 追加文件 94 * 95 * @param filePath 96 * @param contents 97 * @throws IOException 98 */ 99 public void appendFile(String filePath, String contents) throws IOException { 100 appendFile(filePath, contents.getBytes("UTF-8")); 101 } 102 103 /** 104 * 删除文件或目录 105 * 106 * @param filePath 107 * @param recursive 108 * @return 109 * @throws IOException 110 */ 111 public boolean deleteFile(String filePath, boolean recursive) throws IOException { 112 try (FileSystem fs = FileSystem.get(conf)) { 113 boolean result = fs.delete(new Path(filePath), recursive); 114 return result; 115 } 116 } 117 118 public boolean renameFile(String sourcePath, String targetPath) throws IOException { 119 120 try (FileSystem fs = FileSystem.get(conf)) { 121 boolean result = fs.rename(new Path(sourcePath), new Path(targetPath)); 122 return result; 123 } 124 } 125 126 /** 127 * 读取文件 128 * 129 * @param filePath 130 * @return 131 * @throws IOException 132 */ 133 public String readFile(String filePath) throws IOException { 134 try (FileSystem fs = FileSystem.get(conf)) { 135 FSDataInputStream input = fs.open(new Path(filePath)); 136 //byte[] buffer = new byte[input.available()]; 137 //input.readFully(0, buffer); 138 ByteArrayOutputStream output = new ByteArrayOutputStream(input.available()); 139 140 IOUtils.copyBytes(input, output, conf); 141 String fileContent = output.toString("UTF-8"); 142 return fileContent; 143 } 144 } 145 146 }
运行CreateLogging 程序:
重新配置PageLoggingStreaming 程序的run configuration ,设置checkpoint目录和数据源目录