码迷,mamicode.com
首页 > 其他好文 > 详细

获取kafka中的偏移

时间:2017-10-31 14:27:52      阅读:485      评论:0      收藏:0      [点我收藏+]

标签:this   pre   new t   log   seek   entry   assign   sig   --   

kafka0.10.0.0的版本:
for (i <- 0 to (offsetList.length - 1)) {
var topicPartition = new TopicPartition(topic, i)
topicArray.add(topicPartition)

kafkaConsumer.assign(util.Arrays.asList(topicPartition))
kafkaConsumer.seekToEnd(util.Arrays.asList(topicPartition))
val latestOffset = kafkaConsumer.position(topicPartition)
logInfo("partition"+i+"latestOffset"+latestOffset)
endOffsetMap.put(topicPartition, latestOffset)
}

kafka:0.11.0.0版本:
def test() = {
var endOffsetMap1 = kafkaConsumer.endOffsets(topicArray)
import scala.collection.JavaConversions._
for (entry <- endOffsetMap) {
var key: TopicPartition = entry._1
logInfo("test -----topic:"+this.topic +" partition:" + key.partition() +" endOffset:" + entry._2)
}

}

获取kafka中的偏移

标签:this   pre   new t   log   seek   entry   assign   sig   --   

原文地址:http://www.cnblogs.com/loveItLoveFaimly/p/7760856.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!