码迷,mamicode.com
首页 > 编程语言 > 详细

java连接kafka总结

时间:2021-04-30 12:13:39      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:解决   else   override   trie   reset   block   rop   thread   family   

1、连接demo是采用的这篇博客中的内容:

https://blog.csdn.net/weixin_39098944/article/details/108067005

主要代码如下:

(1)添加依赖

 1 <dependency>
 2                 <groupId>org.apache.kafka</groupId>
 3                 <artifactId>kafka_2.12</artifactId>
 4                 <version>1.0.0</version>
 5             <scope>provided</scope> 
 6         </dependency>
 7         
 8         <dependency>
 9                 <groupId>org.apache.kafka</groupId>
10                 <artifactId>kafka-clients</artifactId>
11                 <version>1.0.0</version>
12         </dependency>
13         
14         <dependency>
15             <groupId>org.apache.kafka</groupId>
16             <artifactId>kafka-streams</artifactId>
17             <version>1.0.0</version>
18         </dependency>

(2)生产者

 1 import java.util.Properties;
 2 import org.apache.kafka.clients.producer.KafkaProducer;
 3 import org.apache.kafka.clients.producer.ProducerRecord;
 4 import org.apache.kafka.common.serialization.StringSerializer;
 5 
 6 /**
 7  1. 
 8  2. Title: KafkaProducerTest
 9  3. Description: 
10  4. kafka 生产者demo
11  5. Version:1.0.0  
12  6. @author dengcs
13  */
14 public class KafkaProducerTest implements Runnable {
15 
16     private final KafkaProducer<String, String> producer;
17     private final String topic;
18     public KafkaProducerTest(String topicName) {
19         Properties props = new Properties();
20         props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
21         props.put("acks", "all");
22         props.put("retries", 0);
23         props.put("batch.size", 16384);
24         props.put("key.serializer", StringSerializer.class.getName());
25         props.put("value.serializer", StringSerializer.class.getName());
26         this.producer = new KafkaProducer<String, String>(props);
27         this.topic = topicName;
28     }
29 
30     @Override
31     public void run() {
32         int messageNo = 1;
33         try {
34             for(;;) {
35                 String messageStr="你好,这是第"+messageNo+"条数据";
36                 producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
37                 //生产了100条就打印
38                 if(messageNo%100==0){
39                     System.out.println("发送的信息:" + messageStr);
40                 }
41                 //生产1000条就退出
42                 if(messageNo%1000==0){
43                     System.out.println("成功发送了"+messageNo+"条");
44                     break;
45                 }
46                 messageNo++;
47             }
48         } catch (Exception e) {
49             e.printStackTrace();
50         } finally {
51             producer.close();
52         }
53     }
54     
55     public static void main(String args[]) {
56         KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
57         Thread thread = new Thread(test);
58         thread.start();
59     }
60 }

(3)消费者

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;


/**
 * 
* Title: KafkaConsumerTest
* Description: 
*  kafka消费者 demo
* Version:1.0.0  
* @author dengcs
 */
public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                    msgList = consumer.poll(1000);
                    if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的
                        if(messageNo%100==0){
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        //当消费了1000条就退出
                        if(messageNo%1000==0){
                            break;
                        }
                        messageNo++;
                    }
                }else{    
                    Thread.sleep(1000);
                }
            }        
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }  
    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}

2、遇到的一个问题:org.apache.kafka.common.errors.TimeoutException

解决方法:https://blog.csdn.net/maoyuanming0806/article/details/80553632/

问题原因:上述代码中的 master:9092,slave1:9092,slave2:9092,是别人的例子,我自己的代码里是IP1:9092,IP2:9092,IP3:9092.

连接kafka的时候,报错TimeoutException,之后将log日志级别改为debug,发现java.io.IOException: Can‘t resolve address: izwz9c79fdwp9sb65vpyk3z:9092,也就是说我的客户端在连接izwz9c79fdwp9sb65vpyk3z:9092,如不是我的ip:port.

解决方法:在 windows则去添加一条host映射

C:\Windows\System32\drivers\etc\hosts

39.108.61.252 izwz9c79fdwp9sb65vpyk3z
127.0.0.1 localhost

问题解决。




java连接kafka总结

标签:解决   else   override   trie   reset   block   rop   thread   family   

原文地址:https://www.cnblogs.com/GSONG/p/14719035.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!