标签:
一、配置消费者:每个Java的消费者都需要一个ConsumerConfig
的配置实例。
二、消费者分组
在MetaQ里,消费者被认为是一个集群,也就是说认为是有一组的机器在共同分担消费一个topic。因此消费者配置ConsumerConfig
中最重要的配置是group,每个消费者都必须告诉MetaQ它属于哪个group,然后MetaQ会找出这个group下所有注册上来的消费者,在他们之间做负载均衡,共同消费一个或多个topic。注意,不同group之间可以认为是不同的消费者,他们消费同一个topic下的消息的进度是不同。
举例来说,假设你有一个topic为business-logs
,是所有业务系统的日志。然后现在你对这些日志要做两个事情:一个是存储到HDFS这样的分布式文件系统,以便后续做分析处理;以个是Twitter Storm这样的实时分析系统,做实时的数据分析、告警和展现。显然,这里你就需要两个group,比如我们有一个group叫hdfs-writer
,它有三台机器同时消费business-logs
,将日志存储到HDFS集群。同时,你也有另一个group叫storm-spouts
,有5台机器用来给storm集群喂数据。这两个group是隔离,虽然是消费同一个topic,但是两者是消费进度(消费了多少个消息,等待消费多少个消息等信息)是不同的。但是同一个group内,例如hdfs-writer
的三台机器,这三台机器是共同消费business-logs
下的消息,同一条消息只会被这hdfs-writer
三台机器中的一台处理,但是这条消息还会被twitter-spouts
等其他分组内的某一台机器消费。
三、创建ConsumerConfig
创建ConsumerConfig并传入分组名称:
group ; consumerConfig (group);
ConsumerConfig
的其他重要选项还包括:
fetchRunnerCount, 因为MetaQ的消费者是以pull模型来从服务端拉取数据并消费,这个参数设置并行拉取的线程数,默认是CPUs个。关于消费的并发模型请看下面的并发处理小节。
fetchTimeoutInMills,同步抓取的请求超时,默认10秒,通常不需要修改此参数。
maxDelayFetchTimeInMills,当上一次没有抓取到的消息,抓取线程sleep的最大时间,默认5秒,单位毫秒。当某一次没有抓取到消息的时候,抓取线程会开始休眠maxDelayFetchTimeInMills的10分之1时间,如果下次还是没有抓到,则休眠maxDelayFetchTimeInMills的10分之2时间,以此类推直到最多休眠maxDelayFetchTimeInMills时间。中途如果任何一次抓取开始获取数据,则计数清零从10分之1重新开始计算。当你对消息的实时性特别敏感的时候应该调小此参数,并同时调小服务端的unflushInterval
参数。
consumerId, 单个消费者的id,必须全局唯一,通常用于标识分组内的单个消费者,可不设置,系统会根据IP和时间戳自动生成。
offset, 第一次消费开始位置的offset,默认都是从服务端的最早数据开始消费。
commitOffsetPeriodInMills, 保存消费者已经消费的数据的offset的间隔时间,默认5秒,单位毫秒。更大的间隔,在故障和重启时间可能重复消费的消息更多,更小的间隔,可能给存储造成压力。
maxFetchRetries,同一条消息在处理失败情况下最大重试消费次数,默认5次,超过就跳过这条消息并调用RejectConsumptionHandler
处理。关于RejectConsumptionHandler
请看下面的拒绝处理小节。
这些参数都有相应的getter/setter方法来设置。
四、创建消费者
final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig);
五、Offset存储
MetaQ的消费模型是一种拉取的模型,消费者根据上次消费数据的绝对偏移量(offset)从服务端的数据文件中拉取后面的数据继续消费,因此这个offset信息就非常关键,需要可靠地保存。默认情况下,MetaQ是将offset信息保存在你使用的zookeeper集群上,也就是ZkOffsetStorage
所做的事情,它实现了OffsetStorage
接口。通常这样的保存是可靠并且安全的,但是有时候可能你也需要其他选项,目前还提供两个不同的OffsetStorage
实现:
LocalOffsetStorage
,使用consumer的本地文件作为offset存储,默认存储在${HOME}/.meta_offsets
的文件里。适合消费者分组只有一个消费者的情况,无需共享offset信息。例如广播类型的消费者就特别合适。
MysqlOffsetStorage
,使用Mysql作为offset存储,使用前需要创建表结构:
`` ( () AUTO_INCREMENT, () , () , () , () , () , (), KEY (,,) ) ENGINEInnoDB DEFAULT CHARSETutf8;
你也可以实现自己的OffsetStorage
存储。如果你想使用除了zookeeper之外的offset存储,可以在创建消费者的时候传入:
consumer sessionFactorycreateConsumer(consumerConfig, (dataSource));
mysql存储需要传入JDBC数据源。
第一次消费的offset初始值。
前面提到ConsumerConfig
有个offset
参数可以设置第一次消费的时候开始的绝对偏移量,默认这个参数是0,也就是从服务端现有消息的最小偏移量开始,从头开始消费所有消息。
但是,通常情况下,新的消费分组都是希望从最新的消息开始消费,ComsumerConfig
提供了一个setConsumeFromMaxOffset(boolean always)
方法来设置从最新位置开始消费。其中always
参数表示是否每次消费者启动都从最新位置开始消费,这样就忽略了在消费者停止期间的消息。通常仅在测试的时候将always
参数设置为true,以便每次测试最新的消息。除非你真的不需要消费者停止期间(比如重启间隔)的消息,否则不要将always设置为真。
六、https://github.com/killme2008/Metamorphosis/wiki/Java-MessageConsumer
标签:
原文地址:http://my.oschina.net/MrMichael/blog/523254