标签:消息 broker 链接 bin col keytab bindings runtime latest
基于kerberos 加密的kafka消息我们在用spring binder链接的时候配置应当如下:
#input spring.cloud.stream.bindings.process-in-0.destination=input-topic spring.cloud.stream.bindings.process-in-0.binder=kafka1 spring.cloud.stream.bindings.process-in-0.group=groupId1 spring.cloud.stream.bindings.process-in-0.consumer.startOffset=latest spring.cloud.stream.binders.kafka1.type=kafka spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/input.keytab spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com spring.cloud.stream.binders.kafka1.environment.spring.kafka.consumer.value-deserializer=com.kafka.message.serializer.EntityDeserialize #output spring.cloud.stream.bindings.process-out-0.destination=output-topic spring.cloud.stream.bindings.process-out-0.binder=kafka2 spring.cloud.stream.bindings.process-out-0.group=groupId2 spring.cloud.stream.binders.kafka2.type=kafka spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=host1:port,host2:port spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.serviceName=kafka spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.storeKey=true spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.keyTab=/file/output.keytab spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.jaas.options.principal=user@domain.com
同时要设置java变量
System.setProperty("java.security.krb5.conf", "/file/krb5.conf"); System.setProperty("sun.security.krb5.debug", "true");
另外,要序列化kafka message的内容除了
默认提供的基本类型序列化器,我们也可以自定义例如上面配置的
com.kafka.message.serializer.EntityDeserialize
public class EntityDeserialize implements Deserializer { private ObjectReader objectReader = new ObjectMapper().readerFor(EntityTest.class); @Override public EntityTest deserialize(String arg0, byte[] arg1) { try { return objectReader.readValue(arg1); } catch (Exception e) { throw new RuntimeException("message deserialize error", e); } } } //对应的functional方法 @Bean public Function<Flux<Message<EntityTest>>, Flux<Message<String>>> process(MessageProcessService messageProcessService) { return messageFlux ->MessageProcessService.process(messageFlux); }
基于 Spring functional 的Secured Kafka (kerberos) configuration
标签:消息 broker 链接 bin col keytab bindings runtime latest
原文地址:https://www.cnblogs.com/lgtrajectory/p/14017469.html