标签:为我 generate 任务 偏移量 队列 apach 分布 三种方式 IV
Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点:
随着Kafka开源后被业界成功且广泛的使用,LinkedIn开发Kafka的核心技术人员Jay Kreps离开LinkedIn成立了一个新公司Confluent,打造了一个基于Kafka且拥有更为丰富的产品线,意图构建一个基于Kafka的生态系统,与Kafka相比,Confluent包含了更多的组件:
从物理结构上看,整个Kafka系统由消息生产者(Producer)、消息消费者(Consumer)、消费存储服务器(Broker Server)外加Zookeeper构成。整个Kafka的架构方案非常简单,典型的无状态水平扩展架构,通过水平增加Broker实例实现系统的高吞吐率,而有状态的数据则存储到Zookeeper中。
Kafka采用Push-Pull模式,生产者发送消息时,可根据策略将消息存储在Kafka集群的任意一台Broker上,消费者通过定时轮询(非固定周期)的方式从Broker上取得消息。消息发送到哪一台服务器上,又从哪台服务器上获取消息,则是由逻辑结构解决的,或者说逻辑结构建立在物理结构基础上,对于生产者、消费者而言,只要了解逻辑结构就可以了。
从逻辑上讲,一个Kafka集群中包含若干个消息队列,每个消息队列都有自己的名称,在Kafka中消息队列的名称被称为Topic,为了实现系统的高吞吐率,每个消息队列被拆分成不同部分,即我们所说的分区(Partition),分区存储在不同的Broker中。生产者发送消息时可根据一定策略发送到不同的分区中,这类似于数据库的分库分表操作,同样消费者拉取消息时,也可以根据一定策略从某个分区中读取消息。就物理结构而言,每个分区就是Broker上的一组文件,试想一下并发的对多个分布在不同Broker上的文件进行读写,性能当然显著优于对单台Broker上的文件进行读写,我们所说的Kafka具有高吞吐率就是这个道理。
Kafak每个Topic的消息都存储在日志文件中,Kafka消息日志文件由一个索引文件和若干个具体的消息文件构成。每个消息文件都由起始消息编号构成,通过索引可以快速定位消息文件进行读写,由于消息是顺序写入文件中,所以读写效率非常高。在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。现代的操作系统都对读写做了大量的优化,比如使用read-ahead和write-behind技巧,读取时候成块的预读取数据,写入时将各种微小琐碎的逻辑组织合并成一次较大的物理写入,很多时候线性读写磁盘比随机读取内存都快。
与其他常见的消息队列不同,Kafka有一个叫做消费组的概念,多个消费者被逻辑上合并在一起叫做消费组。一个消息队列理论上可拥有无限个消费组,消费组是Kafka有别于其他消息队列的一个重要概念,同一个分区的消息只能被一个消费组内的某个消费者读取,但其他消费组内的消费者仍然可读取这个分区的消费。如下图所示整个Kafka消息队列由两个broker server构成,server1上包含两个分区p0、p3,server2上包含两个分区p1、p2。现在有两个消费组A、B,消费组A中包含两个消费者C1、C2,消费组B中包含4个消费者C3、C4、C5、C6。那么假定P0分区上有一条消息。Consumer Group A中的C1、C2其中之一会消费这条消息,Consumer Group B中的C3、C4、C5、C6其中之一也会消费这条消息,也就是说两个消费组A、B中的消费者都会同时消费这条消息,而组内只能有一个消费者消费这条消息。
我们所说的C1、C2只是一个逻辑上的划分就具体实现而言,C1、C2可以是一个进程内部的两个线程,也可以是两个独立的进程,对于C3、C4、C5、C6也是同样的道理。我们知道Kafka每个分区中的消息都是以顺序结构保存到文件中的,那么消费者每次从什么位置读取消息呢,奥秘就是每个消费者都保存Offset到Zookeeper中。
如前所述,Kafka是一个Push-Pull模式的消息队列,并且可以有多个生产者、多个消费者,那么这些生产者和消费者是如何协同工作的呢?首先我们来看生产者怎么确定把消费发送到哪个分区上。默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?我们来看下面的代码:
if(key == null) { // 如果没有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id
id match {
case Some(partitionId) =>
partitionId // 如果有的话直接使用这个分区Id就好了
case None => // 如果没有的话,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
partitionId
}
}
可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求Topic元数据时清空缓存)。
接下来我们来看消费者如何获取消息。对于消费者Kafka提供的两种分配策略: range和roundrobin,由参数 partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
为了保证高可靠,Kafka每个分区都有一定数量的副本,当故障发生时通过Zookeeper选择其一作为领导者,Kafka采用同步复制机制,写Leader完成后在写副本。如果某个副本写失败,则将这个副本从当前分区一致集合中摘除,后期根据一定策略在进行异步补偿,将不一致状态变为一致状态。极端情况下如果所有副本写入均失败,变为不一致状态,如果在变成一致状态前Leader崩溃,那么消息才可能真正丢失,但极端情况很难出现,一旦出现这种极端情况,任何系统都无能为力了,所以我们说Kafka还是非常可靠的。
我们现在使用192.168.104.101、192.168.104.102两台Centos 6服务器安装Kafka。安装Kafka之前首先需要安装Zookeeper,为了简便起见我们采用单机伪分布式集群安装Zookeeper,将Zookeeper安装在192.168.104.101这台服务器上,并启动三个实例,组成高可靠的Zookeeper集群。
接下来我们安装Kafka_2.11-0.10.0.1,因为我们有192.168.104.101、192.168.104.102两台服务器,因此我们可以构建一个完整的Kafka集群。在192.168.104.101服务器上修改Kafka安装目录下的config/server.properties文件,设置如下参数:
broker.id=0
listeners=PLAINTEXT://192.168.104.101:9092
advertised.listeners=PLAINTEXT://192.168.104.101:9092
zookeeper.connect=192.168.104.101:2181,192.168.104.101:2182,192.168.104.101:2183
对于192.168.104.102这台机器,我们将listeners、advertised.listeners中的ip地址改为192.168.104.102。经过上述设置我们可以在两个服务器上分别使用bin/kafka-server-start.sh config/server.properties命令启动Kafka集群了。
如前所述Kafka是一个消息队列,生产者发送消息到Kafka,消费者从Kafka中拉取消息,因此Kafka提供生产者、消费者两类API供程序开发使用。我们先来看一个生产者、消费者的简单例子,了解一下Kafka Client API的基本用法,而后在深入了解Kafka Client API的细节。
package com.Kafka.sample.newapi;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class Producer {
public void run() throws InterruptedException {
KafkaProducer<String, String> producer = getProducer();
int i = 0;
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(ClientConfig.TOPICS, String.valueOf(i), "This is message: " + i);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
}
});
i++;
Thread.sleep(1000);
}
}
private KafkaProducer<String, String> getProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
return kp;
}
public static void main(String[] args) {
Producer producer = new Producer();
try {
producer.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Kafka 0.82版之后,提供新的API,对于生产者的API来讲,使用逻辑比较简单,推荐使用新API向Kafka发送消息。向Kafka发送消息时首先需要构建一个KafkaProducer对象,并设置发送消息的一些参数。Producer端的常用配置有:
相比起Producers API的便宜使用,Consumer API的使用要复杂很多,核心问题就是如何高可靠的处理消息,保证消息不丢失。Kafka为了保证消息不丢失能被消费者成功的处理,在消费者处理消息成功后需要向Kafka发送确认确认消息被成功的消费。
package com.Kafka.sample.newapi;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public void run() {
KafkaConsumer<String, String> consumer = getConsumer();
consumer.subscribe(Arrays.asList(ClientConfig.TOPICS));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
}
}
private KafkaConsumer<String, String> getConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
props.put("group.id", "1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);
return kc;
}
public static void main(String[] args) throws Exception{
Consumer consumer = new Consumer();
consumer.run();
}
}
上面的代码很容易看懂,但props.put("auto.commit.interval.ms", "1000")下文中会特殊说明一下,这涉及到Kafka消息的高可靠处理。
事实上,Kafka不仅仅能够发送String类型的消息,也可以发送其他类型的消息,但这需要将消息转换为二进制格式。
网上各种文章经常谈到Kafka丢消息问题,那么Kakfa真的不可靠,只能用在允许有一定错误的系统中吗?这个问题还得从Kaka的设计初衷来看。
Kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说Kafka并不能完全保证数据不丢失。尽管Kafka官网声称能够保证at-least-once,但如果consumer进程数小于partition_num,这个结论不一定成立。考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。如果auto.commit.enable=false,假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里需要着重说明的是,当手动执行commit的时候,实际上是对这个consumer进程所占有的所有partition进行commit,Kafka暂时还没有提供更细粒度的commit方式,也就是说,即使t2没有处理完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在处理的这条数据就丢失了。如果希望能够严格的不丢数据,解决办法有两个:
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
public ManualOffsetConsumer() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g1");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test"));
final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
LOG.info("now commit offset");
consumer.commitSync();
buffer.clear();
}
}
}
}
上面例子中我们将自动提交改为手动提交,如果取得消息后,因为某种原因没有进行提交,那么消息仍然保持在Kafka中,可以重复拉取之前没有确认的消息,保证消息不会丢失,但有可能重复处理相同的消息,消费者接收到重复消息后应该通过业务逻辑保证重复消息不会带来额外影响,也就是所谓的幂等设计, 这种方法就是Kafka所说的At-Least-Once。上面的这种读取消息的方法是单线程的,除此之外还可以用多线程方法读取消息,每个线程从指定的分区中读取消息。
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g2");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
LOG.info("now consumer the message it‘s offset is :"+record.offset() + " and the value is :" + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
LOG.info("now commit the partition[ "+partition.partition()+"] offset");
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
我们还可以进一步让消费者消费某个分区的消息。
public static void main(String[] args) {
Properties props = new Properties();
//设置brokerServer(Kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id", "manual_g4");
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
TopicPartition partition0 = new TopicPartition("producer_test", 0);
TopicPartition partition1 = new TopicPartition("producer_test", 1);
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.assign(Arrays.asList(partition0, partition1));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我们在192.168.104.101、192.168.104.102两台服务器上启动Kafka组成一个集群,现在我们观察一下topic t1的情况。
bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
我们看到t1由两个分区组成,分布于Leader 0、1两个服务器上。下面我们运行消费者程序,同时运行生产者程序,我们向topic t1发送111、222、333、444、555、666的数据。
bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1
111
222
333
444
555
666
接下来我们观察一下消费者接收消息的情况。
fetched from partition 0, offset: 3, message:
fetched from partition 1, offset: 4, message: 111
fetched from partition 0, offset: 4, message: 222
fetched from partition 1, offset: 5, message: 333
fetched from partition 0, offset: 5, message: 444
fetched from partition 1, offset: 6, message: 555
fetched from partition 0, offset: 6, message: 666
可以看到消息被非常均匀的发送到两个分区,消费者从两个分区中拉取了消息。为了模拟故障,我们手工kill 101上的Kafka进程。这时我们在观察Kafka的分区情况,对照之前的结果,我们发现两个分区的Leader都变为1了,说明Kafka启用了副本机制进行故障切换。
./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: t1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1
我们继续向分区发送888, 999,消费者仍然能够接收到发送的消息,而不受故障进度的影响。逻辑上看消费者只是读取分区上的消息,与具体的服务器没关系。
fetched from partition 1, offset: 7, message: 999
fetched from partition 0, offset: 7, message: 888
我们为已经创建的包含两个分区的Topic在添加一个分区。
Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3
我们观察一下增加分区后的结果:
bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0
Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: t1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
接下来,我们使用生产者程序发送数据,过了一段时间后发现生产者程序已经可以向新增分区写入数据了。说明分区的增减对正在运行的应用程序(生产者、消费者)没有影响, 生产者、消费者都不需要重新启动。
待补充
待补充
目前Kafka有三个常用的监控系统: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,这三个系统或多或少都有些问题,不是特别完善,推荐使用KafkaOffsetMonitor。
Kafka 0.9+增加了一个新的特性 Kafka Connect ,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、 Elastic Search 、 Apache Ignite 等。
Kafka Connect特性包括:
当前Kafka Connect支持两种分发担保:at least once (至少一次) 和 at most once(至多一次),exactly once将在未来支持,当前已有的Connectors包括:
Connector Name | Owner | Status |
---|---|---|
HDFS | confluent-platform@googlegroups.com | Confluentsupported |
JDBC | confluent-platform@googlegroups.com | Confluentsupported |
Debezium - CDC Sources | debezium@gmail.com | Community project |
MongoDB Source | a.patelli@reply.de a.topchyan@reply.de | In progress |
MQTT Source | tomasz.pietrzak@evok.ly | Community project |
MySQL Binlog Source | wushujames@gmail.com | In progress |
Twitter Source | rollulus@xs4all.nl | In progress |
Cassandra Sink | Cassandra Sink | Community project |
Elastic Search Sink | ksenji@gmail.com | Community project |
Elastic Search Sink | hannes.stockner@gmail.com | In progress |
Elastic Search Sink | a.patelli@reply.de a.topchyan@reply.de | In progress |
Kafka Connect目前正在开发中,最新的组件请查看官网https://www.confluent.io/product/connectors/
我们来看一个使用Kafka Connect从一个文件读取数据在传输到另一个文件的例子。
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
修改connect-file-source.properties文件:
file=/root/data.txt
topic=t1
修改connect-file-sink.properties文件:
file=/root/output.txt
topics=t1
Kafka Streams是一套类库,嵌入到java应用程序中,它使得Apache Kafka可以拥有流处理的能力,通过使用Kafka Stream API进行业务逻辑处理最后写回Kakfa或者其他系统中。Kafka Stream中有几个重要的流处理概念:严格区分Event time和Process Time、支持窗口函数、应用状态管理。开发者使用Kafka Stream的门槛非常低,比如单机进行一些小数据量的功能验证而不需要在其他机器上启动一些服务(比如在Storm运行Topology需要启动Nimbus和Supervisor,当然也支持Local Mode),Kafka Stream的并发模型可以对单应用多实例进行负载均衡。有了Kafka Stream可以在很多场景下代替Storm、Spark Streaming减少技术复杂度。目前Kafka Stream仍然处于开发阶段,不建议生产环境使用,所以期待正式版发布吧。
Camus是Linkedin开源的一个从Kafka到HDFS的数据管道,本质上上Camus是一个运行在Hadoop中的MapReduce程序,调用一些Camus提供的API从Kafka中读取数据然后写入HDFS。Camus2015年已经停止维护了,gobblin是后续产品,camus功能是是Gobblin的一个子集,通过执行MapReduce任务实现从Kafka读取数据到HDFS,而gobblin是一个通用的数据提取框架,可以将各种来源的数据同步到HDFS上,包括数据库、FTP、Kafka等。
Kafka作为一个消息中间件,最长应用的场景是将数据进行加工后从源系统移动到目的系统,也就是所谓的ETL过程,ETL是一个数据从源头到目的地的移动过程,当然其中也伴随数据清洗。通常数据源头是应用程序所输出的消息、日志、生产数据库数据。应用程序输出消息通常由应用程序主动控制写入Kafka的行为,而从日志、生产数据库到Kafka通常由第三方独立应用处理。从日志到Kafka典型的技术方案如ELK,从生产数据库到Kafka通常可采用如下三种方式:
数据通过Kafka移动到Hadoop通常有如下方案:
从目前看这些方法都是常用的成熟方案,很多技术也在被一线互联网公司所使用,比如京东内部在使用Gobblin将数据从Kafka同步到Hdfs中,但从长远看Kafka Connect则是最佳方案,毕竟是官方标准出品而且Kafka Connect还在快速的发展。
每天处理几十亿条消息:Yelp的实时数据管道
标签:为我 generate 任务 偏移量 队列 apach 分布 三种方式 IV
原文地址:https://www.cnblogs.com/elixir/p/9102395.html