标签:html .com serialize tween 服务器ip org sys arrays 集群
kafka的操作相对来说简单很多
下载kafka http://kafka.apache.org/downloads
tar -zxvf kafka_2.12-2.1.0.tgz
rm kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 kafka
sudo vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
准备 worker1 worker2 worker3 这四台机器
首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群
具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html
server.properties
sudo vim server.properties
添加如下属性
broker.id=0 # 3台机器分别设置成0 1 2
log.dirs=/usr/local/kafka/logs
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
运行
运行
bin/kafka-server-start.sh config/server.properties
创建topic
bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper worker1:2181
订阅topic,利用worker2来订阅
bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning
topic发送消息
bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test
键入任何消息,worker2都能接收到
查看topic详情
bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
生产者
public class Producer
{
public static void main( String[] args ){
Properties props = new Properties();
// 服务器ip
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
// 属性键值对都序列化成字符串
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建一个生产者,向test主题发送数据
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("test", "生产者传递的消息"));
producer.close();
}
}
消费者
public class Consumer
{
public static void main( String[] args ){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between(
LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now()));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
标签:html .com serialize tween 服务器ip org sys arrays 集群
原文地址:https://www.cnblogs.com/ye-hcj/p/10260954.html