1. 概述 在2.x中,spark有两个用来与kafka整合的代码,版本代号为0.8和0.10,由于在0.8,kafka有两套消费者api,根据高级api得到了Receiver-based Approach,根据低级api得到了Direct Approach,而在0.10由于kafka只有一套消费者 ...
分类:
其他好文 时间:
2020-12-16 11:50:30
阅读次数:
4
各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可在消费时按分区进行顺序消费,保证每个分区的消息具备局部顺序性。由于需要确保分区消息的顺序性,并不能并 ...
分类:
编程语言 时间:
2020-12-15 12:04:52
阅读次数:
6
消息丢失的场景如果KafkaProducer使用“发后即忘”的方式发送消息,即调用producer.send(msg)方法来发送消息,方法会立即返回,但此时并不能说明消息已经发送成功。消息发送方式详见初次邂逅Kafka生产者。如果在消息过程中发生了网络抖动,那么消息就会丢失;或发送的消息本身不符合要求,如大小超过Broker端的承受能力等(消息太大的情况在生产中实际遇到过,最后通过在发送前将消息分
分类:
其他好文 时间:
2020-12-08 12:53:00
阅读次数:
5
微信公众号:[中间件兴趣圈]作者简介:《RocketMQ技术内幕》作者方案背景背景:基于Dubbo服务的治理,是否可以支持业务级别的灰度发布、是否基于业务参数的路由转发。例如以GIS为例,当发布一个新版本时,是否可以以按照解析地址或合作伙伴来区分,版本发布之初,只希望地址为:广东省的解析请求发送到新版本,而其他的地址请求还是使用旧版;或者根据合作伙伴例如UCP(优享寄)的请求转发到新版本服务器,其
分类:
其他好文 时间:
2020-12-08 12:47:20
阅读次数:
5
温馨提示:本文基于Kafka2.2.1版本。本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构图。从上文初识KafkaProducer生产者,可以通过KafkaProducer的send方法发送消息,send方法的声明如下:Future<RecordMetadata>send(ProducerRecord&l
分类:
其他好文 时间:
2020-12-08 11:59:45
阅读次数:
3
温馨提示:整个KafkaClient专栏基于kafka-2.3.0版本。1、KafkaProducer概述根据KafkaProducer类上的注释上来看KafkaProducer具有如下特征:KafkaProducer是线程安全的,可以被多个线程交叉使用。KafkaProducer内部包含一个缓存池,存放待发送消息,即ProducerRecord队列,与此同时会开启一个IO线程将ProducerR
分类:
其他好文 时间:
2020-12-07 12:49:28
阅读次数:
11
消息组接到某项目组反馈,topic在扩容后出现部分队列无法被消费者,导致消息积压,影响线上业务?考虑到该问题是发送在真实的线上环境,为了避免泄密,本文先在的虚拟机中来重现问题。1、案情回顾1.1集群现状集群信息如下:例如业务主体名topic_dw_test_by_order_01的路由信息如图所示:当前的消费者信息:broker的配置信息如下:brokerClusterName=DefaultCl
分类:
其他好文 时间:
2020-12-07 12:44:57
阅读次数:
13
主从模式环境可以保障消息的即时性与可靠性投递一条消息后,关闭主节点从节点继续可以提供消费者数据进行消费,但是不能接收消息主节点上线后进行消费进度offset同步准备两台机器,一主一从:机器IPhostname角色192.168.243.169rocketmq01master192.168.243.170rocketmq02slave我这里事先在两台机器上安装好了RocketMQ,关于RocketM
分类:
其他好文 时间:
2020-12-05 10:54:50
阅读次数:
10
Apache Kafka由Scala和Java编写,基于生产者和消费者模型作为开源的分布式发布订阅消息系统。它提供了类似于JMS的特性,但设计上又有很大区别,它不是JMS规范的实现,如Kafka允许多个消费者主动拉取数据,而在JMS中只有点对点模式消费者才会主动拉取数据 ...
分类:
其他好文 时间:
2020-12-04 11:04:22
阅读次数:
7
将 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者 异步提交 commitAsync() 即: ConsumerRecords<> records = consumer.poll(); for (ConsumerRecord<> reco ...
分类:
其他好文 时间:
2020-12-03 12:16:37
阅读次数:
6