标签:zookeeper boot cep listeners color sms format 信息 sse
kafka 版本信息:kafka_2.12-2.3.0
使用kafka自带的zookeeper启动
bin/zookeeper-server-start.sh config/zookeeper.properties
kafka启动:
bin/kafka-server-start.sh config/server.properties
nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &
这是后台启动,不加nohup,日志会输出到控制台上面去。其中,server-start.log是自己写的一个log文件,在原有的文件logs下面是没有的。
配置SASL步骤:
1.修改bin/server.properties文件
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://ip:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://ip:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
其他值未做任何改变
2.新建config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
新建config/kafka_client_jaas.conf --此步可不建,因我要使用控制台去消费数据
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
3.修改bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_server_jaas.conf"
fi
修改bin/kafka-console-consumer.sh kafka-console-producer.sh
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"
fi
修改config/consumer.properties producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
此红色部分实际可不用修改,因我要使用控制台消费数据
4.控制台启动消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.8:9092 --topic test --from-beginning --consumer.config config/consumer.properties
5.java 应用充当生产者:
public class TestDemo {
public static void main(String[] args) throws Exception {
System.setProperty("java.security.auth.login.config", "/home/zhufei/software/kafka_2.12-2.3.0/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
Properties props = new Properties();
props.put("bootstrap.servers", "ip:9092");
// props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
// props.put("linger.ms", 1);
// props.put("buffer.memory", 33554432);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// props.put("partitioner.class", HashPartitioner.class.getName());
// props.put("interceptor.classes", EvenProducerInterceptor.class.getName());
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("test", null, "hello world 20190909 fox"));
producer.close();
}
}
7.结果:能正常发送,正常消费
zhufei@SilverRiver:~/software/kafka_2.12-2.3.0$ bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config config/consumer.properties
hello world 20190909 fox
标签:zookeeper boot cep listeners color sms format 信息 sse
原文地址:https://www.cnblogs.com/zf201149/p/11495134.html