码迷,mamicode.com
首页 > 其他好文 > 详细

如何系统的了解Kafka

时间:2021-03-01 13:50:00      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:容量   class   副本   软件   设置   private   github   read   logs   

1.概述

在大数据的浪潮下,时时刻刻都会产生大量的数据。比如社交媒体、博客、电子商务等等,这些数据会以不同的类型存储在不同的平台里面。为了执行ETL(提取、转换、加载)操作,需要一个消息中间件系统,该系统应该是异步和低耦合的,即来自各种存储系统(如HDFS、Cassandra、RDBMS等)的数据可以同时转存在一个地方,而所有这些数据源都是彼此独立的。解决这个问题的方法之一是Kafka,它是一个开源的分布式消息处理平台。

2.内容

2.1 专业术语

  • Message:它基本上是一个键值对,在值部分包含有用的数据和记录;
  • Topic:对于多租户,可以创建多个主题,这只是发布和订阅消息的名称;
  • Partition:对于多线程任务,可以在一个Topic中,创建多个分区,提升生产者和消费者的性能;
  • Offset:消息以类似于提交日志的顺序形式存储,并且从0开始为每个消息提供顺序ID(每个分区偏移量从0开始);
  • Broker:Kafka集群由多个服务节点组成,这些服务节点只是集群中托管Zookeeper维护的无状态服务器的节点,英文这里没有主从概念,所以所有的Broker都是同级别的(每个Partition上会有Leader和Follower);
  • Consumer:用于消费Topic的应用程序;
  • ConsumerGroup:消费不同Topic所使用的相同GroupID;
  • Producer:用于生产Topic的应用程序。

2.2 Kafka为何需要Zookeeper

Zookeeper是一个分布式集群管理系统,它是一个为分布式应用提供一致性服务的软件,提供的功能包含:配置维护、域名服务、分布式服务等。它目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的提供提供给用户。而在Kafka中,它提供了以下功能:

  • 控制器选举:对于特定主题,分区中的所有读写操作都是通过复制副本的数据来完成的,每当Leader宕机,Zookeeper就会选举出新的Leader来提供服务;
  • 配置Topic:与某个Topic相关的元数据,即某个特定Topic是否位于Broker中,有多少个Partition等存储在Zookeeper中,并在生产消息时持续同步;
  • ACL:Topic的权限控制均在Zookeeper中进行维护。

2.3 Kafka有哪些特性?

Kafka的一些关键特性,使得它更加受到喜爱,针对传统消息系统的不同:

  • 高吞吐量:吞吐量表示每秒可以处理的消息数(消息速率)。由于我们可以将Topic分布到不同的Broker上,因此我们可以实现每条数以千次的读写操作;
  • 分布式:分布式系统是一个被分割成多台运行的机器的系统,所有这些机器在一个集群中协同工作,在最终用户看来是一个单一的节点。Kafka是分布式的,因为它存储、读取和写入多个节点上的数据,这些节点被称为Broker,它与Zookeeper一起共同创建了一个称为Kafka集群的生态系统;
  • 持久性:消息队列完全保存在磁盘上,而不是保存在内存中,同一数据的多个副本(ISR)可以跨不同的节点存储。因此,不存在由于故障转移场景而导致数据丢失的可能性,并使其具有持久性;
  • 可伸缩性:任何系统都可以水平或垂直伸缩,纵向可伸缩性意味着向相同的节点添加更多的资源,如CPU、内存,并且会产生很高的操作成本。水平可伸缩性可以通过简单的在集群中添加几个节点来实现,这增加了容量需求。Kafka水平扩展意味着当我们的容量用完时,我们可以在集群中添加一个新的节点。

2.4 Producer如何写数据?

生产者首先获取Topic中的元数据,以便知道需要使用消息更新哪个Broker。元数据也存储在Broker中,并与Zookeeper保持连续同步。因此,若有多个生产者都希望连接到Zookeeper来访问元数据,会导致性能降低。当生产者获取了Topic和元数据信息,它就会在Leader所在的Broker节点的日志中写入消息,而之后Follower(ISR)会将其复制进行同步。

技术图片

 

 

 在写入操作可以是同步的,即仅当Follower还在其日志中同步消息时,或者异步,即只有Leader更新信息消息,状态发生给生产者。磁盘上的消息可以保留特定的持续时间,在此期限后,将自动清除旧消息,并且不再可供使用。默认情况下,设置为7天。可以通过三种策略将消息写到Topic。

  1. send(key,value,topic,partition):专门提供需要进行写操作的分区。不建议使用该方式,因为它可能会导致分区大小不均衡;
  2. send(key,value,topic):在这里,默认的HashPartitioner用于确定要写入消息的分区,方式查找key的Hash并进行取模,该Topic的分区,也可以编写我们自己定义的分区程序;
  3. send(null,value,topic):在这种情况下,消息以循环方式存储在所有分区中。

Java生产者示例代码如下:

public class JProducer extends Thread {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        long counter = 1L;
        while (true) {
            String json = "{\"id\":" + (counter++) + ",\"date\":\"" + new Date().toString() + "\"}";
            String k = "key" + counter;
            producer.send(new ProducerRecord<String, String>("test01", k, json), (recordMetadata, e) -> {
                if (e == null) {
                    System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset());
                } else {
                    e.printStackTrace();
                }
            });
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // producer.close();
    }

}

2.5 消费者如何订阅消息?

由于Kafka的速度非常快并且可以获取实时消息,因此单个消费者肯定会在Topic中读取很大一部分消息时出现延迟。为了克服这类问题,可以创建一个消费者组,该消费者组由多个具有相同GroupID的消费者组成。每个使用者都连接有一个唯一的分区,该分区所在所有使用者之间平均分配。将分区分配给特定使用者是消费者组协调器的责任,协调器由Broker被提名担任该角色。为了管理活跃的消费者,消费者组中的所有成员会将它们的心跳发送到组协调器。

关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。

在0.10.x以后的版本中,Kafka底层架构发生了变化,将消费者的信息由Zookeeper存储迁移到Topic(__consumer_offsets)中进行存储。消费的偏移量Key(groupid, topic, partition)以及Value(Offset, ...)

技术图片

 

 

 Java消费者示例代码如下:

public class JConsumer extends Thread {

    private String groupId;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            JConsumerSsl jc = new JConsumerSsl("jgroup" + i);
            jc.start();
        }
    }

    public JConsumerSsl(String groupId) {
        this.groupId = groupId;
    }

    @Override
    public void run() {
        consumer(this.groupId);
    }

    public static void consumer(String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test01"));
        boolean flag = true;
        while (flag) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            try {
                // sleep(60 * 1000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        consumer.close();
    }

}

2.6 Kafka为何如此之快?

由于Kafka遵循了一定的策略,这也是它设计的一部分,以使得它性能更好、更快。

  • 没有随机磁盘访问:它使用称为不可变队列的顺序数据结构,其中读写操作始终为恒定时间O(1)。它在末尾附加消息,并从头开始或者从特定偏移量读取;
  • 顺序IO:现代操作系统将其大不部分可用的内存分配给磁盘缓存,并且更快的用于存储和检索顺序数据;
  • 零拷贝:由于根本没有修改数据,因此将磁盘中的数据不必要的加载到应用程序内存中,因此,它没有将其加载到应用程序中,而是通过Socket字节,缓冲区以及网络从context缓存区发送了相同的数据;
  • 消息批处理:为了避免多次网络交互,将多个消息分组在一起;
  • 消息压缩:在通过网络传输消息之前,使用gzip、snappy等压缩算法对消息进行压缩,然后在Consumer中使用API将其解压。

2.7 数据如何存储在Broker上?

在打开Kafka服务器之前,Broker中的所有消息都存储在配置文件中的配置的日志目录中,在该目录内,可以找到包含特定Topic分区的文件夹,其格式topic_name-partition_number,例如topic1-0。另外,__consumer_offsets这个Topic也存储在同一日志目录中。

技术图片

 

 

在特定Topic的分区目录中,可以找到Kafka的Segment文件xxx.log,索引文件xxx.index和时间索引xxx.timeindex。当达到旧的Segment大小或者时间限制时,会在创建新的Segment文件时将属于该分区的所有数据写入活跃的Segment中。索引将每个偏移量映射到其消息在日志中的位置,由于偏移量时顺序的,因此将二进制搜索应用于在特定偏移量的日志文件中查找数据索引。

2.8 日志压缩

  • 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的max.compaction.lag.ms属性来保证从编写消息到消息符合压缩条件之间的最大延时
  • 消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
  • 消息的偏移量永远不会改变,它是日志中位置的永久标识符
  • 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时。

详情分析可阅读《Kafka日志压缩剖析》。

3.总结

以上就是笔者给大家简要的汇总了Kafka的各个知识点,包含常见的术语、Consumer & Producer的使用方式、存储流程等

另外,笔者开源的一款Kafka监控关系系统Kafka-Eagle,喜欢的同学可以Star一下,进行关注。

Kafka Eagle源代码地址:https://github.com/smartloli/kafka-eagle

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。 

 

如何系统的了解Kafka

标签:容量   class   副本   软件   设置   private   github   read   logs   

原文地址:https://www.cnblogs.com/smartloli/p/14459504.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!