标签:group list 生产 ica boot test keep prope err
一、Kafka 0.11
参考文档
(1)https://kafka.apache.org/0110/documentation.html
二、kafka 0.8
1、命令行操作
(1)新建topic
> bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0
(2)发送消息
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic msg_format_v0
2、API使用
(1)pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
(2)生产者api使用
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//topic: 目标topic; key: message的序号; value: 写入的message信息;
producer.send(new ProducerRecord<>("msg_format_v0", "key", "value"));
//当不需要指定key值时,采用下面的方法
//Producer<Object, String> producer2 = new KafkaProducer<>(props);
//producer2.send(new ProducerRecord<>("msg_format_v1", "value"));
producer.close();
}
}
参考文档
(1)https://kafka.apache.org/082/documentation.html#producerapi
标签:group list 生产 ica boot test keep prope err
原文地址:https://www.cnblogs.com/hxuhongming/p/12812832.html