码迷,mamicode.com
首页 > 其他好文 > 详细

kafka 客户端封装

时间:2016-07-08 23:23:08      阅读:509      评论:0      收藏:0      [点我收藏+]

标签:

kafka客户端封装源码

1.为什么进行封装?

  kafka官方自带的客户端,需要对参数进行设置,如下代码,很多参数的key都是字符串,这样对于编程人员来说非常不友好。参数很多的时候,有两种处理方式:(1)传一个config类进去解析;(2)使用建造者模式,笔者在此就使用建造者模式,对官方客户端进行简单封装,使之易用。

官方的例子如下:

 1  Properties props = new Properties();
 2  props.put("bootstrap.servers", "localhost:4242");
 3  props.put("acks", "all");
 4  props.put("retries", 0);
 5  props.put("batch.size", 16384);
 6  props.put("linger.ms", 1);
 7  props.put("buffer.memory", 33554432);
 8  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 9  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
10 
11  Producer<String, String> producer = new KafkaProducer<>(props);
12  for(int i = 0; i < 100; i++)
13      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
14 
15  producer.close();

封装后:

CloudKafkaConsumer<String, String> kafkaConsumer =
                KafkaConsumerBuilder.builder()
                        .withBootstrapServers("172.31.1.161:9092")
                        .withGroupId("connector")
                        .withAutoCommit("true")
                        .withCommitIntervalMs("1000")
                        .withSessionimeoutMs("30000")
                        .build();
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(5000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf(record.value());
        }

2.这样其实还不是很好,对于value的类型没有限定,所以我们可以对value的类型限定。如果是基础类型就限定为:boolean、int、String。如果不是基础类型就限定为Enum,这样客户端使用起来会优雅很多。

 

kafka 客户端封装

标签:

原文地址:http://www.cnblogs.com/acceml/p/5654715.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!