标签:文件 zook esc offset rop 配置文件 sleep replicat trace
目录
由于kafka集成了kerberos 所以需要通过kerberos的认证
认证方式有两种
我们这里采用第一种
首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf
在client.properties文件里面写入
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup
在jaas.conf写入
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)
[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS
在执行kinit命令登陆kerberos用户
1.创建kafka topic
[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic
2.查看Topic列表
[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list
3.删除Topic
[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic
4.向Topic生产数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties
5.消费Topic数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties
此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限
ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]
我们以fayson用户为例 它属于user组(id+用户名 查看组)
[root@cdh-datanode03 kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka@GREE.IO
Valid starting Expires Service principal
09/11/2019 20:47:25 09/12/2019 20:47:25 krbtgt/GREE.IO@GREE.IO
renew until 09/18/2019 20:47:25
[root@cdh-datanode03 kafka_client]# kafka-sentry -cr -r kafka_role
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -arg -r kafka_role -g user
[root@cdh-datanode03 kafka_client]# kinit fayson
之后以此用户写入testTopic 就不会报权限问题了
此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"
需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件
producer.properties文件内容
bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#实现了Serializer接口的序列化类。用于告诉kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告诉kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#访问kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
consumer.properties文件内容
bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
jaas.conf文件内容
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
renewTicket=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
};
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1-cdh6.3.0</version>
</dependency>
package producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Properties;
class MyProducer {
private static final MyProducer Instance = new MyProducer();
private MyProducer() {
}
public static MyProducer getInstance() {
return Instance;
}
public int messageNo = 1;
/**
* 获得一个Kafka生产者实例
*
* @return
*/
public KafkaProducer Produce() {
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props = new Properties();
try {
props.load(this.getClass().getResourceAsStream("/producer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
KafkaProducer producer = new KafkaProducer(props);
return producer;
}
}
public class ProducerStarter implements Runnable {
private int threadIndex;
public ProducerStarter(int threadIndex) {
this.threadIndex = threadIndex;
}
/**
* 生产数据
*/
public void run() {
MyProducer pro = MyProducer.getInstance();
KafkaProducer prod = pro.Produce();
String topic = "testTopic";
int i = 0;
while (1 == 1) {
final int index = i++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
}
});
prod.flush();
//sleep 1min
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 启动200个线程,生产
*
* @param args
*/
public static void main(String args[]) {
for (int i = 0; i < 1; i++) {
System.out.println("启动线程:" + i);
Thread thread = new Thread(new ProducerStarter(i));
thread.start();
}
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
public class ConsumerStarter {
public static void main(String[] args) throws InterruptedException {
KafkaConsumer consumer = Consumer.getInstance().Consume();
consumer.subscribe(Arrays.asList("testTopic"));
//消费并打印消费结果
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord record: records) {
System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(1000);
}
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Properties;
/**
* Created by 260212 on 2018/4/12.
* Author:JackLee -赵化臣
* 描述:
*/
class Consumer {
private static final Consumer Instance=new Consumer();
private Consumer(){}
public static Consumer getInstance(){
return Instance;
}
/**
* 获得一个Kafka消费者
* kafka-clients版本要高于0.9.0.1,否则会取出为null
* @return
*/
public KafkaConsumer Consume (){
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props=new Properties();
try {
props.load(this.getClass().getResourceAsStream("/consumer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
return consumerSelf;
}
}
kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用
标签:文件 zook esc offset rop 配置文件 sleep replicat trace
原文地址:https://www.cnblogs.com/HarSenZhao/p/11508687.html