标签:窗口 使用 批处理 阶段 ipc get -- 支持 程序
// Create a Streming filter for printing lines containing "error" in Scala
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream using data received after connecting to port 777 on the local machine
val lines = scc.socketTextStream("localhost", 7777)
// Filter our DStreams for lines with "error"
val errorLines = lines.filter(_.contains("error"))
//Print out the lines with errors
errorLines.print()
// Start our streaming context and wait for it to "finish" ssc.start() // Wait for the job to finish ssc.awaitTermination()

// Assumes ApacheAccessLog is utility class for parsing entries from Apache logs val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)) val ipCountsDStream = ipDStream.reduceByKey((x, y) => x +y)

def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
Some(State.getOrElse(0L) + values.size)
}
val responseCodeDSrteam = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
ipAddressRequeseCount.foreach { rdd =>
rdd.foreachPartition { partition =>
// Open connection to storage system(e.g. a database connection)
partition.foreach{ item =>
// Use connection to push item to system
}
// Close connection
}
}

ssc.chepoint("hdfs://...")
标签:窗口 使用 批处理 阶段 ipc get -- 支持 程序
原文地址:http://www.cnblogs.com/wttttt/p/6852102.html