标签:size mem 超过 sum ext tac xtend save 收集
生产者
KafkaExtendProducer.send("topic","key","value");
消费者
@Slf4j
@RequiredArgsConstructor
@Component
public class ConsumerThread implements Runnable {
private final KafkaConsumerBuilder kafkaConsumerBuilder;
@PostConstruct
public void init() {
for (int i = 0; i < MqConstant.MESSAGE_THREAD_COUNT; i++) {
new Thread(this, "consume-proxy-thread-" + i).start();
}
}
@Override
public void run() {
KafkaConsumer<?, ?> consumer = kafkaConsumerBuilder.addTopic("11").build();
log.info("ConsumerThread {} run...", Thread.currentThread().getName());
List<UserAction> list = new ArrayList<>();
long lsatInsertTime = 0;
while (Boolean.TRUE) {
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(1));
try {
if (!records.isEmpty()) {
for (ConsumerRecord record : records) {
UserAction userAction = JSONUtil.toBean((String) record.value(), UserAction.class);
list.add(userAction);
if (list.size() >= MqConstant.DATA_SIZE) {
log.info("批量插入{}条数据", list.size());
lsatInsertTime = System.currentTimeMillis();
consumer.commitSync();
list.clear();
}
}
}
else {
if (lsatInsertTime != 0 && (System.currentTimeMillis() - lsatInsertTime) > MqConstant.INVALID_TIME
&& list.size() > 0) {
log.info("批量插入已经保存超过30s的行为上报数据");
lsatInsertTime = System.currentTimeMillis();
consumer.commitSync();
list.clear();
}
}
}
catch (Exception e) {
e.printStackTrace();
log.error("行为上报信息异常{}", e.getMessage());
// todo 收集err信息并保存
// String errStr = Convert.toStr(list);
// ConsumeErrorMsg consumeErrorMsg = new ConsumeErrorMsg();
// consumeErrorMsg.setErrFrom(MqConstant.ACTION_TOPIC);
// consumeErrorMsg.setErrInfo(e.getMessage());
// consumeErrorMsg.setErrData(errStr);
// consumeErrorMsg.setState(0);
// consumeErrorMsg.setCreateAt(LocalDateTime.now());
// consumeErrorMsgService.saveMsg(consumeErrorMsg);
list.clear();
}
}
}
}
标签:size mem 超过 sum ext tac xtend save 收集
原文地址:https://www.cnblogs.com/lyj98/p/14276782.html