码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code

时间:2019-04-09 18:55:52      阅读:180      评论:0      收藏:0      [点我收藏+]

标签:ble   value   ssi   commit   java   bsp   clust   val   key   

Return: Map[TopicPartition, Long] 

Code:

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString)
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

val kc: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

kc.partitionsFor(new String(topic)).asScala.map{partitionInfo =>

val topicPartition = new TopicPartition(topic, partitionInfo.partition())
kc.assign(Seq(topicPartition).asJava)
kc.seekToBeginning(Seq(topicPartition).asJava)
topicPartition ->  kc.position(topicPartition)
}.toMap

Key point: Scala code call Java lib

Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code

标签:ble   value   ssi   commit   java   bsp   clust   val   key   

原文地址:https://www.cnblogs.com/yjyyjy/p/10678438.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!