标签:
旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:
新版的Producer API提供了以下功能:
producer.type=async做到。
缓存的大小可以通过一些参数指定:queue.time
和batch.size
。一个后台线程((kafka.producer.async.ProducerSendThread
)从队列中取出数据并让kafka.producer.EventHandler
将消息发送到broker,也可以通过参数event.handler定制
handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler
接口,并在callback.handler
中配置。kafka.serializer.DefaultEncoder
。
zk.connect
实现。如果不使用Zookeeper,也可以使用broker.list
参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。kafka.producer.Partitioner类对消息分区
。
hash(key)%numPartitions
.如果key是null,就随机的选择一个。可以通过参数partitioner.class
定制分区函数。新的api完整实例如下:
下面这个是用到的分区函数:
Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。
高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。
低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。
这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。
每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。
标签:
原文地址:http://www.cnblogs.com/tonychai/p/4437023.html