标签:pid cto stream pack AC 文档 contain 回调 存储
主要参考spring开源项目: Spring for Apache Kafka
此处主要介绍consumer和peoducer的使用,其他如connector、stream等,可查阅官网文档,开发时需注意个主键版本兼容性,此处我spring版本为
<spring.version>4.3.13.RELEASE</spring.version>
首先pom文件添加kafka相关jar:
<!--spring for kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.6.RELEASE</version> </dependency>
接下来演示如何消费kafka中的消息:
首先编写消费者监听器类KafkaConsumerListen.java(回调方法无consumer实例):
package com.yinz.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; public class KafkaConsumerListen implements MessageListener<String, String>{ @Override public void onMessage(ConsumerRecord<String, String> data) { System.out.println("--------------" + data.value()); } }
KafkaConcurConsumerListen.java(回调方法有consumer实例):
package com.yinz.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.ConsumerAwareMessageListener; public class KafkaConcurConsumerListen implements ConsumerAwareMessageListener<String, String>{ @Override public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) { System.out.println(consumer.hashCode() +">>>>>>" + data.partition() +">>>>>>>>>>" + data.value()); } }
接下来配置相关bean,此处我kafka相关配置均在kafka.xml中:
kafka.xml文件内容如下(包含了单个消费者和并发消费者两种情况,实际使用时配置一种即可):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> <!-- 定义生产者类 --> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="47.98.131.199:9092" /> <entry key="retries" value="1" /> <!-- 消息发送失败,重试次数 --> <entry key="batch.size" value="16384" /> <!-- 消息打包发送,大小不能超过该值 --> <entry key="linger.ms" value="1" /> <!-- 消息发送前,延迟该毫秒,当负载较重是,可减少发送请求的次数,即打包一次发送多条消息 --> <entry key="buffer.memory" value="33554432" /> <!-- 生产者缓冲区大小,用于存储还未发送到broker中的消息,超过该值会等待,再后面会抛一次 --> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg ref="producerProperties"></constructor-arg> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="false" /> <!-- true 会显著降低性能 --> <property name="defaultTopic" value="test" /> <!-- 默认topic --> </bean> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="47.98.131.199:9092" /> <entry key="group.id" value="0" /> <!-- consumer group id --> <entry key="enable.auto.commit" value="false" /> <!-- 不自动提交offset --> <entry key="auto.commit.interval.ms" value="1000" /> <!-- 定期提交offset时间间隔 --> <entry key="session.timeout.ms" value="15000" /> <!-- 15秒没发生心跳意味着改consumer失效 --> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!-- 实际执行消息消费的类,回调方法中无consumer实例 --> <!--
<bean id="messageListernerConsumerService" class="com.yinz.kafka.KafkaConsumerListen" /> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <!-- 同一group中配置单个消费者,消费者只有一个,不能达到负载均摊和容灾 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean>
--> <!-- 实际执行消息消费的类, 回调方法中有consumer实例 --> <bean id="messageConcurListernerConsumerService" class="com.yinz.kafka.KafkaConcurConsumerListen" /> <bean id="containerConcurProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test-par"/> <property name="messageListener" ref="messageConcurListernerConsumerService"/> </bean> <!-- 同一group中配置concurrency个消费者, 可达到负载均摊和容灾, 若concurrency 大于top分区数,会自动降级 --> <bean id="messageConcurListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerConcurProperties"/> <property name="concurrency" value="3"></property> </bean> </beans>
生产者比较简单,只需注入kafkaTemplate bean,调用他的send方法发送消息即可。
标签:pid cto stream pack AC 文档 contain 回调 存储
原文地址:https://www.cnblogs.com/yinz/p/9151675.html