标签:保存 间隔 llb server begin -- HERE 本地 fse
* consumerGroup:消费者组名
* MessageModel:消息模型,定义了消息传递到消费者的方式,默认是 MessageModel.CLUSTERING
* MessageModel.BROADCASTING:广播
* MessageModel.CLUSTERING:集群
* consumeFromWhere: 消费者开始消费的位置,默认值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列最后的位置开始消费
* ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:从队列前面最开始消费
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP: 从指定时间开始消费,之前的消息将会被忽略
* consumeTimestamp:
* allocateMessageQueueStrategy:消息分配策略
* subscription:订阅关系
* offsetStore:存储消息偏移量
* consumeThreadMin:线程池最小值,默认值是20
* consumeThreadMax:线程池最大值,默认值是20
* consumeConcurrentlyMaxSpan:单个队列并行消费最大的跨度,默认2000
* pullThresholdForQueue:一个队列最大的消费个数,默认1000
* pullInterval:消息拉取的时间间隔
* pullBatchSize:消息拉取的个数,默认32啊
* consumeMessageBatchMaxSize:批量消费量,默认1
* messageListener:消息监听器,用来处理消息,它有两个实现类
* MessageListenerOrderly:按顺序一个个消费
* MessageListenerConcurrently:并行消费
* 同一个 consumerGroup 里,并且订阅的 tag 也必须是一样的,这样的 consumer 实例才能组成 consumer 集群;
* 当 consumer 使用集群消费时,每条消息只会被 consumer 集群内的任意一个 consumer 实例消费一次;
* 默认的消费模式就是集群模式;
* 集群模式天然实现负载均衡机制
* 同一个 consumerGroup 里的 Consumer 会消费订阅 Topic 的全部消息
* 通过 consumer.setMessageModel(MessageModel.BROADCASTING) 方法设置
* 在 RocketMQ 中,相同类型的消息会放到一个 Topic 里,为了可以并行操作,一个 Topic 会有多个 MessageQueue。
* Offset 是指某个 Topic 下的一条消息在某个 MessageQueue 里的位置;
* 通过 Offset 的值可以定位到这条消息
从类结构可以看出 Offset 分为本地文件类型和远程文件类型。
* 集群模式下因为每个 Consumer 消费所订阅主题的一部分,所以采用远程文件存储 Offset;
* 广播模式下,由于每个 Consumer 需要消费所有的消息,所以采用本地文件存储 Offset。
OffseStore 使用 Json 格式存储,例如:
{
"OffsetTable":{
1:{
"brokeName":"localhost",
"QueueId":1,
"Topic":"broker1"
},
2:{
"brokeName":"localhost",
"QueueId":2,
"Topic":"broker2"
}
}
}
根据对读取操作的控制情况,可以消费者分为两种类型。一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是 DefaultMQPullConsumer ,读取操作中的大部分功能由使用者自主控制。
DefaultMQPushConsumer 只需要设置好各种参数和设置传入处理消息的回调函数即可,系统收到消息后会自动调用处理函数来处理消息,而且加入新的 DefaultMQPushConsumer 后会自动做负载均衡。
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupName");
// 设置服务器地址
consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 订阅指定主题
consumer.subscribe("topicTest","*");
// 注册消息监听事件
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msg:"+msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
1) 读取 topic 的消息队列 message queue 的信息;
2) 按队列去拉取一定数目的消息;
3) 持久化message queue的消费进度 offset;
4) 根据不同的消息状态做不同的处理
public enum PullStatus {
// 拉取成功
FOUND,
// 没有消息可以拉取
NO_NEW_MSG,
// 过滤结果不匹配
NO_MATCHED_MSG,
// 偏移量非法,太大或太小
OFFSET_ILLEGAL
}
public class PullConsumer {
// 本地 offset 存储容器,生产环境可以放到数据库或 Redis 中
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("DefaultMQPullConsumer");
// 设置服务器地址
pullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 启动消费者
pullConsumer.start();
// 从指定 topic 获取所有的队列
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues("topicTest");
// 遍历队列,拉取消息
for (MessageQueue mq : messageQueues) {
System.out.printf("从队列中消费: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
// 获取 offset
Long offset = getMessageQueueOffset(mq);
// 拉取32个消息
PullResult pullResult =
pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
System.out.printf("%s%n", pullResult);
// 保存 offset
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
pullConsumer.shutdown();
}
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0l;
}
}
**DefaultMQPullConsumer ** 已经被标识为废弃,替代的是 DefaultLitePullConsumer,下面我们就直接使用 DefaultLitePullConsumer 来操作。
public class LitePullConsumer {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
// 设置服务器地址
litePullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 关闭自动提交偏移量
litePullConsumer.setAutoCommit(false);
// 启动消费者
litePullConsumer.start();
// 获取队列
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("topicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
标签:保存 间隔 llb server begin -- HERE 本地 fse
原文地址:https://www.cnblogs.com/markLogZhu/p/12545597.html