码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka (一)

时间:2015-11-29 00:54:18      阅读:164      评论:0      收藏:0      [点我收藏+]

标签:

使用Kafka最新版本0.9

 

Kafka 配置


 

1. 安装

首先需要安装Java,推荐安装Java8,不然会出现一些莫名其妙的错误

kafka_2.11-0.9.0.0.tgz

 

tar -xzf kafka_2.11-0.9.0.0.tgz

为了方便,更改一下目录名

mv kafka_2.11-0.9.0.0.tgz kafka

 

2.配置Kafka服务端属性

安装的是单节点,集群的配置非常简单,可以看看其他的资料

cd config

vim server.properties

 

有2个关键属性需要修改

advertised.host.name

advertised.port

 

这2个属性需要修改成IP地址或者机器名,如果Kafka是部署在云端中,可能云端的虚拟机中有别的虚拟IP,需要在这2个地址配置。

这个测试环境中,是部署在虚拟机中,通过路由相连,仍然需要配置这2个属性,将advertised.host.name设置为虚拟机的IP地址。

如果不配置advertised.host.name,只配置host.name,在另外一台机器上使用Java的Client连接时,会被解析成localhost。

 

3. 启动Kafka自带的Zookeeper

bin/kafka-server-start.sh config/server.properties

 

4. 启动Kafka

bin/kafka-server-start.sh config/server.properties

 

Kafka 简单测试


1. 创建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

2. 查看Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

3. 生产消息

新建一个Console

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

输入字符串回车即可

 

4. 消费消息

新建一个Console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在生产消息的Console,继续输入字符并回车,这里会即时消费

 

Kafka Java客户端


 

1. Maven文件

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
 </dependency>

 

2. Produce代码

        Properties props = new Properties();        
        props.put("bootstrap.servers", "192.168.1.160:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("my-topic2", Integer.toString(i), Integer.toString(i)));


        producer.close();    

bootstrap.servers即Kafka broker的地址

 

3. Consumer代码

      Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.160:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        //consumer.subscribe(Arrays.asList("foo", "bar"));
        consumer.subscribe(Arrays.asList("my-topic2"));


        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        }

 

Consumer Group


 

Kafka的Consumer可以加入一个Group,如果一个Group中的Consumer数大于该Topic的Partition数,则某些Consumer不会Fetch到数据,如果Consumer数小于Partition数,则某些Consumer会比别的Consumer消费更多的消息。

如果一个Consumer没有加入任何Group,则这个Consumer会消费该Topic下所有Partition的消息

建议最好的方式是一个Consumer对应一个Partition

 

在Kafka中会储存每个Consumer消费消息的Offset,Consumer可以选择Auto Commit Offset或者Manual Commit

在进行设计时,如果对Consumer有一定一致性的要求,可以选择Manual Commit,例如Consumer宕机,重启或者新Consumer加入时,可以从该Partition的Offset记录位置开始消费,还有一种情况则是Consumer宕机,不重启并且没有新的Consumer加入,在0.9.0。0版本的Kafka中,会有一个Consumer的协调者来处理Consumer的Reblance,启动Reblance,那么在同一个Group中的其他Consumer也可以消费宕机的Consumer订阅的Partition的消息。但这样设计的前提是Consumer必须被设计成无状态的。

 

Kafka 传递的消息


 

Kafka传递的消息是<key,value>,让我想起了Map-Reduce,利用Kafka可以快速构建一个轻量级的MR并行分析

Kafka对Hadoop的支持也非常的好。

 

Kafka (一)

标签:

原文地址:http://www.cnblogs.com/dopeter/p/5003689.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!