标签:this pre new t log seek entry assign sig --
kafka0.10.0.0的版本:
for (i <- 0 to (offsetList.length - 1)) {
var topicPartition = new TopicPartition(topic, i)
topicArray.add(topicPartition)
kafkaConsumer.assign(util.Arrays.asList(topicPartition))
kafkaConsumer.seekToEnd(util.Arrays.asList(topicPartition))
val latestOffset = kafkaConsumer.position(topicPartition)
logInfo("partition"+i+"latestOffset"+latestOffset)
endOffsetMap.put(topicPartition, latestOffset)
}
kafka:0.11.0.0版本:
def test() = {
var endOffsetMap1 = kafkaConsumer.endOffsets(topicArray)
import scala.collection.JavaConversions._
for (entry <- endOffsetMap) {
var key: TopicPartition = entry._1
logInfo("test -----topic:"+this.topic +" partition:" + key.partition() +" endOffset:" + entry._2)
}
}
标签:this pre new t log seek entry assign sig --
原文地址:http://www.cnblogs.com/loveItLoveFaimly/p/7760856.html