kafka配置双监听
由于公司需要在其他城市开通业务所以另外开了一个数据库但是kafka消费想直接使用旧的项目,以免多部署一个项目占用服务器内存空间。
首先先在配置文件中配置2个kafka的ip端口等信息
spring.kafka.xx.bootstrap-servers=xxx.xxx.x.x:xxxx kafkaip和端口
spring.kafka.xx.group-id=camphor
spring.kafka.xx.enable-auto-commit=false 自动提交设置关闭
spring.kafka.zz.bootstrap-servers=xxx.xxx.x.x:xxxx kafkaip和端口
spring.kafka.zz.group-id=camphor
spring.kafka.zz.enable-auto-commit=false 自动提交设置关闭
然后添加2个kafka的对应的配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaxxConfig {
@Value("${spring.kafka.xx.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.xx.group-id}")
private String groupId;
@Value("${spring.kafka.xx.enable-auto-commit}")
private boolean enableAutoCommit;
// 配置kafka模板
@Bean(name = "kafkaxxTemplate")
public KafkaTemplate<String, String> kafkaxxTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// 配置kafka工厂类
@Bean(name = "kafkaxxContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaxxContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties()
.setPollTimeout(3000);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties()
.setPollTimeout(15000);
return factory;
}
// 生产工厂
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
// 消费工厂
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
// 生产者配置
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 用于建立到Kafka集群的初始连接的主机/端口对列表。放入ip端口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 生产者发送失败后的重试次数,默认0
props.put(ProducerConfig.RETRIES_CONFIG, 10);
/**
* 在考虑请求完成之前,生产者要求leader收到的确认数量,这将控制发送的记录的持久性。
acks=0如果设置为零,则生产者不会等待来自服务器的任何确认。该记录将被立即添加到套接字缓冲区并被视为已发送。在这种情况下,retries不能保证服务器已经收到记录,并且配置不会生效(因为客户端通常不会知道任何故障)。为每个记录返回的偏移量将始终设置为-1。
acks=1这意味着领导者会将记录写入其本地日志中,但会在未等待所有追随者完全确认的情况下作出响应。在这种情况下,如果领导者在承认记录后但在追随者复制之前立即失败,那么记录将会丢失。
acks=all这意味着领导者将等待全套的同步副本确认记录。这保证只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最强有力的保证。这相当于acks = -1设置。
*/
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。不会尝试批量大于此大小的记录。发送给brokers的请求将包含多个批次,每个分区有一个可用于发送数据的分区。小批量大小将使批次不太常见,并可能降低吞吐量(批量大小为零将完全禁用批次)。一个非常大的批量大小可能会更浪费一点使用内存,因为我们将始终为预期的额外记录分配指定批量大小的缓冲区。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
// 生产者可用于缓冲等待发送到服务器的记录的总内存字节数。如果记录的发送速度比发送到服务器的速度快,那么生产者将会阻止max.block.ms它,然后它会抛出异常。
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 用于实现Serializer接口的密钥的串行器类。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
// 消费者配置
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// 用于建立到Kafka集群的初始连接的主机/端口对列表。放入ip端口
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 唯一的指明了consumer的group的名字,group名一样的进程属于同一个consumer group。
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 如果设为true,consumer会定时向ZooKeeper发送已经获取到的消息的offset。当consumer进程挂掉时,已经提交的offset可以继续使用,让新的consumer继续工作。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
// consumer向ZooKeeper发送offset的时间间隔。
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 用于实现Serializer接口的密钥的串行器类。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
第二个配置类
@Configuration
public class KafkazzConfig {
@Value("${spring.kafka.zz.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.zz.group-id}")
private String groupId;
@Value("${spring.kafka.zz.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean(name = "kafkazzTemplate")
public KafkaTemplate<String, String> kafkazzTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "kafkazzContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkazzContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties()
.setPollTimeout(3000);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties()
.setPollTimeout(15000);
return factory;
}
// 下面的配置跟上一个一样
接下来就是发送消息的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
public class KafkaSendUtil {
private static KafkaTemplate kafkaxxTemplate;
private static KafkaTemplate kafkazzTemplate;
@Autowired
public KafkaSendUtil(@Qualifier("kafkaxxTemplate") KafkaTemplate kafkaxxTemplate,
@Qualifier("kafkazzTemplate") KafkaTemplate kafkazzTemplate) {
KafkaSendUtil.kafkaxxTemplate = kafkaxxTemplate;
KafkaSendUtil.kafkazzTemplate = kafkazzTemplate;
}
public static ListenableFuture send(String topic, String type, String msg) {
String str = "通过配置或其他方式获取发送的对象";
ListenableFuture listenableFuture = null;
switch (str) {
case "xx":
listenableFuture = kafkaxxTemplate.send(topic, type, msg);
break;
case "zz":
listenableFuture = kafkazzTemplate.send(topic, type, msg);
break;
default:
break;
}
return listenableFuture;
}
}
最后是消费者的监听写法
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
public static final String topic = "test-topic";
// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
@KafkaListener(topics = {topic}, containerFactory = "kafkaxxContainerFactory")
public void listenerOne(ConsumerRecord<?, ?> record, Acknowledgment ack) {
LOGGER.info(" kafka xx 消费者 接收到消息:{}", record.toString());
ack.acknowledge(); // 消费提交
}
@KafkaListener(topics = {topic}, containerFactory = "kafkazzContainerFactory")
public void listenerTwo(ConsumerRecord<?, ?> record, Acknowledgment ack) {
LOGGER.info(" kafka zz消费者 接收到消息:{}", record.toString());
ack.acknowledge();
}
}