码迷,mamicode.com
首页 > 编程语言 > 详细

kafka_scram02 Java_Consumer

时间:2019-07-19 10:44:46      阅读:118      评论:0      收藏:0      [点我收藏+]

标签:rgs   host   int   time   print   prope   exception   alc   pack   

 

 

xshell 设置消费权限

/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Brent --consumer --allow-host 192.168.239.146 --topic AUTHTEST --group Brent_group

 

package com.chinalife.kafka2demo.java;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.Collections;
import java.util.Properties;

public class ConsumerScramJavaSubscribe {
  private static KafkaConsumer<String,String> consumer;
  private static Properties kfkProperties;
  private static String topic = "AUTHTEST";

  static{
    kfkProperties = new Properties();
    kfkProperties.put("bootstrap.servers","192.168.239.146:9094,192.168.239.147:9094");
    System.setProperty("java.security.auth.login.config","/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/etc/kafka/conf.dist/kafka_client_jaas.conf");
    kfkProperties.put("group.id","Brent_group");
    kfkProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    kfkProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    kfkProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    kfkProperties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
    //kfkProperties.put("sasll.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"Brent\" password=\"thisIsSecret\";");
  }

  /**
  * consumer 2 : 手动提交位移
  */
  private static void generalConsumerMessageManualCommitSync() {
    kfkProperties.put("enable.auto.commit",false);
    consumer = new KafkaConsumer<>(kfkProperties);
    consumer.subscribe(Collections.singletonList(topic));

    while(true){
      ConsumerRecords<String,String> records = consumer.poll(3000);
      for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +
        "," + record.value());
      }
      try{
        consumer.commitSync();
      }catch (CommitFailedException e){
        System.out.println("commit failed msg" + e.getMessage());
      }

     }

  }

  public static void main(String args[]){
     ConsumerScramJavaSubscribe.generalConsumerMessageManualCommitSync();
  }
}

kafka_scram02 Java_Consumer

标签:rgs   host   int   time   print   prope   exception   alc   pack   

原文地址:https://www.cnblogs.com/BrentBoys/p/11211678.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!