标签:容量 class 副本 软件 设置 private github read logs
在大数据的浪潮下,时时刻刻都会产生大量的数据。比如社交媒体、博客、电子商务等等,这些数据会以不同的类型存储在不同的平台里面。为了执行ETL(提取、转换、加载)操作,需要一个消息中间件系统,该系统应该是异步和低耦合的,即来自各种存储系统(如HDFS、Cassandra、RDBMS等)的数据可以同时转存在一个地方,而所有这些数据源都是彼此独立的。解决这个问题的方法之一是Kafka,它是一个开源的分布式消息处理平台。
Zookeeper是一个分布式集群管理系统,它是一个为分布式应用提供一致性服务的软件,提供的功能包含:配置维护、域名服务、分布式服务等。它目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的提供提供给用户。而在Kafka中,它提供了以下功能:
Kafka的一些关键特性,使得它更加受到喜爱,针对传统消息系统的不同:
生产者首先获取Topic中的元数据,以便知道需要使用消息更新哪个Broker。元数据也存储在Broker中,并与Zookeeper保持连续同步。因此,若有多个生产者都希望连接到Zookeeper来访问元数据,会导致性能降低。当生产者获取了Topic和元数据信息,它就会在Leader所在的Broker节点的日志中写入消息,而之后Follower(ISR)会将其复制进行同步。
在写入操作可以是同步的,即仅当Follower还在其日志中同步消息时,或者异步,即只有Leader更新信息消息,状态发生给生产者。磁盘上的消息可以保留特定的持续时间,在此期限后,将自动清除旧消息,并且不再可供使用。默认情况下,设置为7天。可以通过三种策略将消息写到Topic。
Java生产者示例代码如下:
public class JProducer extends Thread { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); long counter = 1L; while (true) { String json = "{\"id\":" + (counter++) + ",\"date\":\"" + new Date().toString() + "\"}"; String k = "key" + counter; producer.send(new ProducerRecord<String, String>("test01", k, json), (recordMetadata, e) -> { if (e == null) { System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset()); } else { e.printStackTrace(); } }); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // producer.close(); } }
由于Kafka的速度非常快并且可以获取实时消息,因此单个消费者肯定会在Topic中读取很大一部分消息时出现延迟。为了克服这类问题,可以创建一个消费者组,该消费者组由多个具有相同GroupID的消费者组成。每个使用者都连接有一个唯一的分区,该分区所在所有使用者之间平均分配。将分区分配给特定使用者是消费者组协调器的责任,协调器由Broker被提名担任该角色。为了管理活跃的消费者,消费者组中的所有成员会将它们的心跳发送到组协调器。
关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。
在0.10.x以后的版本中,Kafka底层架构发生了变化,将消费者的信息由Zookeeper存储迁移到Topic(__consumer_offsets)中进行存储。消费的偏移量Key(groupid, topic, partition)以及Value(Offset, ...)
Java消费者示例代码如下:
public class JConsumer extends Thread { private String groupId; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1; i++) { JConsumerSsl jc = new JConsumerSsl("jgroup" + i); jc.start(); } } public JConsumerSsl(String groupId) { this.groupId = groupId; } @Override public void run() { consumer(this.groupId); } public static void consumer(String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test01")); boolean flag = true; while (flag) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); try { // sleep(60 * 1000L); } catch (Exception e) { e.printStackTrace(); } } consumer.close(); } }
由于Kafka遵循了一定的策略,这也是它设计的一部分,以使得它性能更好、更快。
在打开Kafka服务器之前,Broker中的所有消息都存储在配置文件中的配置的日志目录中,在该目录内,可以找到包含特定Topic分区的文件夹,其格式topic_name-partition_number,例如topic1-0。另外,__consumer_offsets这个Topic也存储在同一日志目录中。
在特定Topic的分区目录中,可以找到Kafka的Segment文件xxx.log,索引文件xxx.index和时间索引xxx.timeindex。当达到旧的Segment大小或者时间限制时,会在创建新的Segment文件时将属于该分区的所有数据写入活跃的Segment中。索引将每个偏移量映射到其消息在日志中的位置,由于偏移量时顺序的,因此将二进制搜索应用于在特定偏移量的日志文件中查找数据索引。
详情分析可阅读《Kafka日志压缩剖析》。
以上就是笔者给大家简要的汇总了Kafka的各个知识点,包含常见的术语、Consumer & Producer的使用方式、存储流程等
另外,笔者开源的一款Kafka监控关系系统Kafka-Eagle,喜欢的同学可以Star一下,进行关注。
Kafka Eagle源代码地址:https://github.com/smartloli/kafka-eagle
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
标签:容量 class 副本 软件 设置 private github read logs
原文地址:https://www.cnblogs.com/smartloli/p/14459504.html