码迷,mamicode.com
首页 > 编程语言 > 详细

Spring集成Kafka-注解,xml配置2种方式实现

时间:2018-06-21 23:40:57      阅读:485      评论:0      收藏:0      [点我收藏+]

标签:bubuko   sys   request   over   comm   system   配置信息   nta   方式   

准备工作:

1.安装kafka+zookeeper环境

2.利用命令创建好topic

参考官网 http://kafka.apache.org/documentation/ 

一XML配置文件方式实现

??1. Pom文件,引入spring-kafka jar包这里需要注意2个地方

  1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)

  2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持

...
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
    <exclusions>
         <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
          </exclusion>
    </exclusions>
</dependency>
...

 2. 配置文件结构如下图

技术分享图片

 

3. kafka.properties文件内容如下  

 1 # brokers集群
 2 kafka.producer.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
 3 
 4 kafka.producer.acks = all
 5 
 6 #发送失败重试次数
 7 kafka.producer.retries = 3
 8 
 9 kafka.producer.linger.ms =  10
10 
11 # 33554432 即32MB的批处理缓冲区
12 kafka.producer.buffer.memory = 40960
13 
14 #批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能
15 kafka.producer.batch.size = 4096
16 
17 kafka.producer.defaultTopic = nwbs-eval-task
18 
19 kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
20 
21 kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
22 
23 
24 ################# kafka consumer ################## ,
25 kafka.consumer.bootstrap.servers = ip1:9092,ip2,ip3:9092
26 
27 # 如果为true,消费者的偏移量将在后台定期提交
28 kafka.consumer.enable.auto.commit = true
29 
30 #如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
31 kafka.consumer.auto.commit.interval.ms=1000 
32 
33 #order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
34 kafka.consumer.group.id = sccl-nwbs
35 
36 #在使用Kafka的组管理时,用于检测消费者故障的超时
37 kafka.consumer.session.timeout.ms = 30000
38 
39 kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
40 kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

4.consumer-kafka.xml 配置如下

  需要注意cn.**.kafka.KafkaConsumerSerivceImpl 此类 需要实现 MessageListener 接口

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5        http://www.springframework.org/schema/beans/spring-beans.xsd">
 6         <!-- 1.定义consumer的参数 -->
 7         <!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
 8         <bean id="consumerProperties" class="java.util.HashMap">
 9             <constructor-arg>
10                 <map>
11                     <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
12                     <entry key="group.id" value="${kafka.consumer.group.id}" />
13                     <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
14                     <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
15                     <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
16                     <entry key="retry.backoff.ms" value="100" />
17                     <entry key="key.deserializer"
18                            value="${kafka.consumer.key.deserializer}" />
19                     <entry key="value.deserializer"
20                            value="${kafka.consumer.value.deserializer}" />
21                 </map>
22             </constructor-arg>
23         </bean>
24 
25         <!-- 2.创建consumerFactory bean -->
26         <bean id="consumerFactory"
27               class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
28             <constructor-arg>
29                 <ref bean="consumerProperties" />
30             </constructor-arg>
31         </bean>
32 
33         <!--&lt;!&ndash; 3.定义消费实现类 &ndash;&gt;-->
34         <bean id="kafkaConsumerService" class="cn.**.kafka.KafkaConsumerSerivceImpl" />
35 
36         <!-- 4.消费者容器配置信息 -->
37         <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
38             <!-- topic -->
39             <constructor-arg name="topics">
40                 <list>
41                     <value>${kafka.task.eval.topic}</value>
42                     <value>${kafka.task.optimizeNetwork.topic}</value>
43                     <value>${kafka.task.business.topic}</value>
44                 </list>
45             </constructor-arg>
46             <property name="messageListener" ref="kafkaConsumerService" />
47         </bean>
48         <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
49         <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
50             <constructor-arg ref="consumerFactory" />
51             <constructor-arg ref="containerProperties" />
52             <property name="concurrency" value="${kafka.consumer.concurrency}" />
53         </bean>
54 </beans>

 

5. producer-kafka.xml 配置如下

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5        http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
 8     <!-- 定义producer的参数 -->
 9     <bean id="producerProperties" class="java.util.HashMap">
10         <constructor-arg>
11             <map>
12                 <entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
13                 <!--<entry key="group.id" value="${group.id}" />-->
14                 <entry key="retries" value="${kafka.producer.retries}" />
15                 <entry key="batch.size" value="${kafka.producer.batch.size}" />
16                 <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
17                 <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
18                 <entry key="acks" value="${kafka.producer.acks}" />
19                 <entry key="key.serializer"
20                        value="${kafka.producer.key.serializer}" />
21                 <entry key="value.serializer"
22                        value="${kafka.producer.value.serializer}"/>
23             </map>
24         </constructor-arg>
25     </bean>
26 
27     <!-- 创建kafkatemplate需要使用的producerfactory bean -->
28     <bean id="producerFactory"
29           class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
30         <constructor-arg>
31             <ref bean="producerProperties" />
32         </constructor-arg>
33     </bean>
34 
35     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
36     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
37         <constructor-arg ref="producerFactory" />
38         <constructor-arg name="autoFlush" value="true" />
39         <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
40     </bean>
41 </beans>

 

6. 调用Controller -这里 向kafka 中的 3个topic 发送了消息

/**
 * @author: hsc
 * @date: 2018/6/19 14:44
 * @description 文件描述
 */

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Value("nwbs-optimizeNetwork-task")
    private  String optimizeTopic ;

    @Value("nwbs-business-task")
    private String businessTopic;

    @RequestMapping(value = "/producer" , method = RequestMethod.POST)
    public void producer(@RequestBody JSONObject params){
        kafkaTemplate.send(optimizeTopic,params.toJSONString()+"optimizeTopic");
        kafkaTemplate.send(businessTopic,params.toJSONString()+"businessTopic");
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.sendDefault(params.toJSONString());;
        //发送成功回调
        SuccessCallback<SendResult<String, String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //成功业务逻辑
                System.out.println("onSuccess");
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                //失败业务逻辑
                System.out.println("onFailure");
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}

 二 注解方式实现

参考spring-kafka官方文档 https://docs.spring.io/spring-kafka/reference/htmlsingle/

1. 文件整体结构如图

技术分享图片

2. KafKaConsumerConfig.java代码

 1 /**
 2  * @author: hsc
 3  * @date: 2018/6/21 15:58
 4  * @description kafka 消费者配置
 5  */
 6 @Configuration
 7 @EnableKafka
 8 public class KafkaConsumerConfig {
 9     public KafkaConsumerConfig(){
10         System.out.println("kafka消费者配置加载...");
11     }
12     @Bean
13     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
14     kafkaListenerContainerFactory() {
15         ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
16                 new ConcurrentKafkaListenerContainerFactory();
17         factory.setConsumerFactory(consumerFactory());
18         factory.setConcurrency(3);
19         factory.getContainerProperties().setPollTimeout(3000);
20         return factory;
21     }
22 
23     @Bean
24     public ConsumerFactory<Integer, String> consumerFactory() {
25         return new DefaultKafkaConsumerFactory(consumerProperties());
26     }
27 
28     @Bean
29     public Map<String, Object> consumerProperties() {
30         Map<String, Object> props= new HashMap<String, Object>();
31         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.bootstrap.servers"));
32         props.put(ConsumerConfig.GROUP_ID_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.group.id"));
33         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.enable.auto.commit"));
34         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.auto.commit.interval.ms"));
35         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.session.timeout.ms"));
36         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.key.deserializer"));
37         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.value.deserializer"));
38         return props;
39     }
40 
41     @Bean
42     public KafkaConsumerListener kafkaConsumerListener(){
43         return new KafkaConsumerListener();
44     }
45 
46 }

3.KafKaProducerConfig.java

 1 /**
 2  * @author: hsc
 3  * @date: 2018/6/21 21:30
 4  * @description kafka 生产者配置
 5  */
 6 @Configuration
 7 @EnableKafka
 8 public class KafkaProducerConfig {
 9     public KafkaProducerConfig(){
10         System.out.println("kafka生产者配置");
11     }
12     @Bean
13     public ProducerFactory<Integer, String> producerFactory() {
14         return new DefaultKafkaProducerFactory(producerProperties());
15     }
16 
17     @Bean
18     public Map<String, Object> producerProperties() {
19         Map<String, Object> props = new HashMap<String, Object>();
20         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.bootstrap.servers"));
21         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.key.serializer"));
22         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.value.serializer"));
23         props.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.retries"));
24         props.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.batch.size",1048576));
25         props.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.linger.ms"));
26         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,PropertiesUtil.getInstance().getLong("kafka.producer.buffer.memory",33554432L));
27         props.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.acks","all"));
28         return props;
29     }
30 
31     @Bean
32     public KafkaTemplate<Integer, String> kafkaTemplate() {
33         KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
34         kafkaTemplate.setDefaultTopic(PropertiesUtil.getInstance().getString("kafka.producer.defaultTopic","default"));
35         return kafkaTemplate;
36     }
37 
38 }

4.KafkaConsumerListenser

/**
 * @author: hsc
 * @date: 2018/6/21 16:33
 * @description 消费者listener
 */
public class KafkaConsumerListener {
    /**
     * @param data
     */
    @KafkaListener(groupId="xxx" ,topics = "xxx")
    void listener(ConsumerRecord<String, String> data){
        System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+data.topic()+",分区:"+data.partition()
                +" ,委托时间:"+data.timestamp()+"]消息内容如下:");
        System.out.println(data.value());
    }
}

 

参考文章(结合官方文档一起看)

http://www.cnblogs.com/dennyzhangdd/p/7759875.html

 

Spring集成Kafka-注解,xml配置2种方式实现

标签:bubuko   sys   request   over   comm   system   配置信息   nta   方式   

原文地址:https://www.cnblogs.com/hsc13-lxy14/p/9211224.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!