标签:
> tar -xzf kafka_2.10-0.8.2.0.tgz> cd kafka_2.10-0.8.2.0
Kafka 使用 ZooKeeper 因此需要首先启动 ZooKeeper 服务。如果你没有ZooKeeper 服务,可以使用Kafka自带脚本启动一个应急的单点的 ZooKeeper 实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
运行如下命令查看此topic信息:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
我们也可以通过配置
,让brokers
自动创建该topic,
当向一个不存在的topic发布消息时 。
运行producer 然后输入几条消息到控制台,输入回车发送到服务端。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
不加参数运行命令会显示详细使用文档.
首先我们为每个broker 准备一个配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
Now 编辑这些新文件,内容如下:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
之前已经开启过Zookeeper 和 broker了,现在只需要再启动两就好:
> bin/kafka-server-start.sh config/server-1.properties &...
> bin/kafka-server-start.sh config/server-2.properties &...
创建一个新的topic 并为其设置副本因子:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
OK,现在我们已经搭建好一个集群了,但是我们还不知道他是怎么工作的?使用"describe topics"命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0输出的第一行所有分区的概述, 附加的每行是关于每个分区的说明。因为我们只有一个分区所以,这里只有一行。
我们再查看下原来的 topic:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这没什么惊讶的—原来的 topic 没有设置副本,所以
Replicas是
0, 我们创建它时集群中只有一个节点。
让我们发布几条消息到我们的新 topic:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let‘s consume these messages:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在让我们测试下容错性。这里 Broker 1 是 leader 所以我们 kill 掉它:
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java
...
> kill -9 7564
然后leader 切换为其他节点并且节点1 也不再 in-sync 设置中了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是消息仍然可以消费, 即使是之前的leader 写下的日志:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
标签:
原文地址:http://my.oschina.net/u/1421929/blog/470216