标签:日志采集 start string from 字符 xtend user nts agent
今天主要进行了spark实验六的学习,Spark Streaming 编程初级实践
#flume-to-spark.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 33333 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port =44444 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam object FlumeEventCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local [2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S ER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } }
package org.apache.spark.examples.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.internal.Logging object StreamingExamples extends Logging { /** Set reasonable logging levels for streaming if the user has not configured log4 j. */ def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark‘s default logging, then we overri de the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"
进行打包:
标签:日志采集 start string from 字符 xtend user nts agent
原文地址:https://www.cnblogs.com/ljm-zsy/p/12268395.html