码迷,mamicode.com
首页 > Web开发 > 详细

MetaQ 实例之三

时间:2015-10-28 19:44:05      阅读:2736      评论:0      收藏:0      [点我收藏+]

标签:

六、Message Consumer

        一、配置消费者:每个Java的消费者都需要一个ConsumerConfig的配置实例。

        二、消费者分组      

                在MetaQ里,消费者被认为是一个集群,也就是说认为是有一组的机器在共同分担消费一个topic。因此消费者配置ConsumerConfig中最重要的配置是group,每个消费者都必须告诉MetaQ它属于哪个group,然后MetaQ会找出这个group下所有注册上来的消费者,在他们之间做负载均衡,共同消费一个或多个topic。注意,不同group之间可以认为是不同的消费者,他们消费同一个topic下的消息的进度是不同。

                举例来说,假设你有一个topic为business-logs,是所有业务系统的日志。然后现在你对这些日志要做两个事情:一个是存储到HDFS这样的分布式文件系统,以便后续做分析处理;以个是Twitter Storm这样的实时分析系统,做实时的数据分析、告警和展现。显然,这里你就需要两个group,比如我们有一个group叫hdfs-writer,它有三台机器同时消费business-logs,将日志存储到HDFS集群。同时,你也有另一个group叫storm-spouts,有5台机器用来给storm集群喂数据。这两个group是隔离,虽然是消费同一个topic,但是两者是消费进度(消费了多少个消息,等待消费多少个消息等信息)是不同的。但是同一个group内,例如hdfs-writer的三台机器,这三台机器是共同消费business-logs下的消息,同一条消息只会被这hdfs-writer三台机器中的一台处理,但是这条消息还会被twitter-spouts等其他分组内的某一台机器消费。

          三、创建ConsumerConfig

                创建ConsumerConfig并传入分组名称:

  group  ;  consumerConfig   (group);

        ConsumerConfig的其他重要选项还包括:

  • fetchRunnerCount, 因为MetaQ的消费者是以pull模型来从服务端拉取数据并消费,这个参数设置并行拉取的线程数,默认是CPUs个。关于消费的并发模型请看下面的并发处理小节。

  • fetchTimeoutInMills,同步抓取的请求超时,默认10秒,通常不需要修改此参数。

  • maxDelayFetchTimeInMills,当上一次没有抓取到的消息,抓取线程sleep的最大时间,默认5秒,单位毫秒。当某一次没有抓取到消息的时候,抓取线程会开始休眠maxDelayFetchTimeInMills的10分之1时间,如果下次还是没有抓到,则休眠maxDelayFetchTimeInMills的10分之2时间,以此类推直到最多休眠maxDelayFetchTimeInMills时间。中途如果任何一次抓取开始获取数据,则计数清零从10分之1重新开始计算。当你对消息的实时性特别敏感的时候应该调小此参数,并同时调小服务端的unflushInterval参数。

  • consumerId, 单个消费者的id,必须全局唯一,通常用于标识分组内的单个消费者,可不设置,系统会根据IP和时间戳自动生成。

  • offset, 第一次消费开始位置的offset,默认都是从服务端的最早数据开始消费。

  • commitOffsetPeriodInMills, 保存消费者已经消费的数据的offset的间隔时间,默认5秒,单位毫秒。更大的间隔,在故障和重启时间可能重复消费的消息更多,更小的间隔,可能给存储造成压力。

  • maxFetchRetries,同一条消息在处理失败情况下最大重试消费次数,默认5次,超过就跳过这条消息并调用RejectConsumptionHandler处理。关于RejectConsumptionHandler请看下面的拒绝处理小节。

        这些参数都有相应的getter/setter方法来设置。

        四、创建消费者

             final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig);   

        五、Offset存储

                MetaQ的消费模型是一种拉取的模型,消费者根据上次消费数据的绝对偏移量(offset)从服务端的数据文件中拉取后面的数据继续消费,因此这个offset信息就非常关键,需要可靠地保存。默认情况下,MetaQ是将offset信息保存在你使用的zookeeper集群上,也就是ZkOffsetStorage所做的事情,它实现了OffsetStorage接口。通常这样的保存是可靠并且安全的,但是有时候可能你也需要其他选项,目前还提供两个不同的OffsetStorage实现:

  • LocalOffsetStorage,使用consumer的本地文件作为offset存储,默认存储在${HOME}/.meta_offsets的文件里。适合消费者分组只有一个消费者的情况,无需共享offset信息。例如广播类型的消费者就特别合适。

  • MysqlOffsetStorage,使用Mysql作为offset存储,使用前需要创建表结构:

  `` (   ()  AUTO_INCREMENT,   () ,   () ,   () ,   () ,   () ,   (),
  KEY  (,,)
) ENGINEInnoDB DEFAULT CHARSETutf8;

你也可以实现自己的OffsetStorage存储。如果你想使用除了zookeeper之外的offset存储,可以在创建消费者的时候传入:

  consumer  sessionFactorycreateConsumer(consumerConfig, (dataSource));

mysql存储需要传入JDBC数据源。

        第一次消费的offset初始值。

        前面提到ConsumerConfig有个offset参数可以设置第一次消费的时候开始的绝对偏移量,默认这个参数是0,也就是从服务端现有消息的最小偏移量开始,从头开始消费所有消息。

但是,通常情况下,新的消费分组都是希望从最新的消息开始消费,ComsumerConfig提供了一个setConsumeFromMaxOffset(boolean always)方法来设置从最新位置开始消费。其中always参数表示是否每次消费者启动都从最新位置开始消费,这样就忽略了在消费者停止期间的消息。通常仅在测试的时候将always参数设置为true,以便每次测试最新的消息。除非你真的不需要消费者停止期间(比如重启间隔)的消息,否则不要将always设置为真。

        六、https://github.com/killme2008/Metamorphosis/wiki/Java-MessageConsumer

MetaQ 实例之三

标签:

原文地址:http://my.oschina.net/MrMichael/blog/523254

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!