标签:
kafka是用scala编写,用zookeeper做协调。scala的代码没学习过,这边主要看别人写的文档和自己的工作实践来的。笔记可能会写多篇,因为这东西要经常看啊,不看过了多久就忘了。
东西看完了就要问问自己3W1H
What:kafka是一个分布式(多broker,zookeeper)的,分区的(partition),消息复制的(replicate)的消息中间件(log service)
When:1.做消息分发 2.实时和离线的数据分析 3.日志收集
Why:kafka的分布式可以保证系统的稳定,kafka的动态伸缩可以让系统达到最大的性能,分区可以保证负载均衡,复制可以保证数据完整
How:用kafka我们需要实现producer和customer,下面就具体讲讲
kafka的组成是producers+brokers+zookeeper+customers
producer是消息生产者,负责发送消息。需要配置metadata.broker.list来连接broker,以前是需要配置zk.connect的属性的,是通过producer来通过zookeeper做负载均衡的(partition),现在直接通过metadata.broker.list来做,不再需要zk.connect属性(https://issues.apache.org/jira/browse/KAFKA-369)
其中有个属性producer.type,sync和async,默认是sync,即一条一条消息发送,但大部分时候我们为了提高producer的效率,会选择使用async,来批量发送,这样就会有一些相关的属性要设置
//tprops.put("queue.buffering.max.ms", 5000); //tprops.put("queue.buffering.max.messages", 10000); //the producer will block indefinitely and never willingly drop a send. //tprops.put("queue.enqueue.timeout.ms", -1); //tprops.put("batch.num.messages", 200);
broker通过partition分区来将topic负载分割。producer发送消息的时候通过partitioner.class配置的算法来将消息放到哪个partition中。默认的算法使用key的hash。这个key应该是该topic对应的值。producer发送消息支持压缩,compression.codec。默认不压缩。如果压缩的话中间过程一直是压缩状态,直到customer解压。
一个topic的一个partition为一个目录,目录中会有个seglist记录所有的segfile,每个segfile是一定行数的内容,每一行是一条消息,包括消息id,消息长度,消息内容。
一次删除会删除一个seglist的一行里面的记录,kafka一般是删除7天前的数据。
broker启动时会向zookeeper注册。/brokers/ids/[0....N]---->host:port
收到topic后会将将自己注册到topic下。/brokers/topics/[topic]/[0...N](partition)表示topic在哪里存在,供customer使用
customer启动后,会根据之前同group的customer的情况,进行读取topic的负载均衡读取。即如果之前已经c1已经在读取topic和p1和p2了,那么c2启动后可能帮它读取p2了。一个partition只会被一个customer读取。
group.id标志自己的组,zookeeper.connect连接zookeeper来注册自己和获取其他customer,topic的情况
auto.commit.enable默认自动提交
customer有两种API,一个是简单,一个是封装好的,一般用后者,只有想处理hdfs这种特殊性的需求用简单的来做。
标签:
原文地址:http://blog.csdn.net/lockedstar/article/details/43528661