码迷,mamicode.com
首页 > 其他好文 > 详细

kafka producer程序编写调试

时间:2019-05-02 14:30:26      阅读:119      评论:0      收藏:0      [点我收藏+]

标签:read   rop   put   接收   enc   depend   producer   for   comm   

1、Maven里面配置jar  

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.2</version>
  </dependency>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
  </dependency>

2、Idea里面编写producer程序

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
import java.util
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    //kafka-console-producer.sh --broker-list master:9092,master:9093 -topic mykafka2
    val brokers="master:9092,master:9093"
    val topic = "mykafka2"

    val props = new util.HashMap[String,Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    val msgPerSec = 2
    val wordgPerMsg = 3
    val producer = new KafkaProducer[String,String](props)
    while(true){
      for(i<- 1 to msgPerSec){
        val str = (1 to wordgPerMsg).map(x=>Random.nextInt(100)).toString.mkString(" ")
        println(str)
        val msg = new ProducerRecord[String,String](topic,null,str)
        producer.send(msg)
    }
    Thread.sleep(1000)
  }


 }
}

3、Idea里面查看结果

技术图片

4、通过kafka 消费着接收的数据

  kafka-console-consumer.sh --zookeeper master:12181/kafka0.11 --topic mykafka2

  技术图片

 

kafka producer程序编写调试

标签:read   rop   put   接收   enc   depend   producer   for   comm   

原文地址:https://www.cnblogs.com/BrentBoys/p/10802225.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!