标签:
转自: http://blog.csdn.net/kevin_hx001/article/details/9413565
http://kafka.apache.org/design.html
我们为什么要构建这个系统
Kafka是一个分布式、分区的、多副本的、多订阅者的“提交”日志系统。
我们构建这个系统是因为我们认为,一个实现完好的操作日志系统是一个最基本的基础设施,它可以替代一些系统来作诸如:消息处理,ETL(Extraction-Transformation-Loading),日志收集,流式处理等工作。我们的目标就是能有一个拥有足够吞吐量和能力的系统来将上面这些事情统一在一个平台上。
活动流数据是任何网站的一部分,这部分数据用来汇报站点的应用情况。这些数据包括:PV,哪些信息被展示给用户,搜索词等。这些信息通常是这样处理的:将它们以日志的形式存储到一些文件中,然后定期地对这些文件进行分析。系统运行数据包括服务器的运行情况(CPU,IO,请求时间,服务日志等等),收集这些数据也有许多不同的方法。
近年来,活动与运行数据已经成为站点的关键部分,稍微复杂一点的基础设施也就有了产生的需求。
活动流数据和运行数据的应用场景
1)News feed” features,将活动广播给你的朋友
2)通过评分,投票,点击来确定哪些项目的集合是有关联的。
3)安全方面:站点需要阻止无良的爬虫,限制性api,检测恶意访问以及其他一些检测和预防系统。
4)运行监测:大多数站点需要一些实时的、可靠的监控来跟踪运行情况,以便发生故障的时候触发报警器。
5)报表与批处理:将数据导入到数据仓库或者hadoop系统中,从而进行离线分析和报表生成以便商业决策。
活动流数据的特点
传统的日志文件收集对于离线的应用场景比如报表生成和批处理都有很好的支持,但是对于实时的处理有很高的延时和很高的计算复杂度。另外一方面,现存的消息和队列系统对实时与近实时的应用场景都是ok的,但是不能很好地处理大量的未消费队列,持久化常常是事后才想到。
当向离线系统比如hadoop这样的系统发数据时就会产生问题,这些系统只会每隔一小时或一天才会去一些数据源拉数据。Kafka的目的就是构建一个队列平台能够支持离线与在线的应用场景。
Kafka支持比较通用的消息语义。没有什么被绑定到活动处理上,尽管那是我们的motivating 应用场景。
部署
下图简单地展示了在LinkedIn内部的部署拓扑。
需要注意的是一个kafka集群处理来自不同数据源的活动数据。这就为离线和在线消费者(consumer)提供了一个单一的数据流水线。这一层为在线活动和异步处理提供了一层缓存。我们还用kafka来将数据复制到不同的数据仓库,以便离线处理。
我们不想让一个kafka集群跨越所有的数据中心,但是kafka是支持多数据中心的数据流拓扑结构。这可以通过在集群之间“镜像”或者“同步”来实现。这个特性非常简单,只要将镜像集群作为源集群的消费者。这就意味着可以将多个数据中心的数据集中到一个集群中来。下面是一个例子。
注意到在这两个集群里,各个节点之间是没有对应关系的。两个集群的大小有可能不一样,包含的节点数也不一样。一个节点可以镜像任意数目的源集群。
主要设计元素
有一系列的设计决策使得kafka与其他的消息系统不一样:
1)将消息持久化作为一种常见case
2)吞吐量是首要设计约束
3)消费状态被保存在消费者上而不是服务器上
4)分布式。生产者,broker,消费者都可以分布在不同的机器上。
这里的每一个特性在下面会详细地讲到。
基本要素
首先是一些基本的术语和概念。
消息是通讯的基本单元。消息被生产者发布到一个主题,也就是说被物理上发布到一个叫broker的服务器上。一定数量的消费者注册到一个主题,每个发布到这个主题的消息会递送给这些消费者。
kafka是分布式的——生产者,消费者,brokers都可以跑在一个集群上作为一个逻辑上的组而协作着。这对broker和生产者来说是相当自然的,但对消费者来说还需要额外的一些支持。每个消费者进程属于一个消费者组,每条消息只会传递给组内的一个进程。因此一个消费者组允许多个进程或者机器作为逻辑上的一个消费者运行。消费者组的概念是相当牛逼的,它可以用来支持队列语义或者JMS中的主题语义。如果是队列语义,我们可以将所有的消费者放到一个消费者组中,这种情况下,每条消息只会到达一个消费者。在主题语义中,每个消费者自成一组,这样,所有的消费者会接收到每一条消息。在我们的应用中,更一般的情况是,我们有多个逻辑上的组,每个组由多台机器组成,它们逻辑上作为一个整体运行。在大数据情况下,Kafka还有一个更好的特性:不管一个主题有多少个消费者,一条消息只会被存一次。
消息持久化和缓存
不要惧怕文件系统
Kafka高度依赖文件系统来存储和缓存消息。一般的人都认为“磁盘是缓慢的”,这使得人们对“持久化结构提供具有竞争性的性能”这样的结论持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。
一个有关磁盘性能的关键事实是:磁盘驱动器的吞吐量跟寻道延迟是相背离的。结果就是:在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是300M/秒,但是随机写的速度只有50K/秒,两者相差将近10000倍。线性读写在大多数应用场景下是可以预测的,因此,操作系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操作组合成一个大写物理写操作中。更多的讨论可以在ACM Queue Artical中找到,他们发现,对磁盘的线性读在有些情况下可以比内存的随机访问要快一些。
为了补偿这个性能上的分歧,现代操作系统在内存和磁盘缓存的利用上变得非常aggressive。现在操作系统会非常开心地将所有空闲的内存作为磁盘缓存,尽管在内存回收的时候会有一点性能上的代价。所有的磁盘读写操作会在这个统一的缓存上进行。这个特性不太空易被关掉,除非用直接IO的方法,所以尽管一个进程维护着一个进程内的数据缓。存,这些数据还是会在OS的页缓存中被复制,实际上就是所有的数据都保存了两次。
此外,我们是在JVM的基础上构建的,熟悉java内存应用管理的人应该清楚以下两件事情:
1)一个对象的内存消耗是非常高的,经常是所存数据的两倍或者更多。
2)随着堆内数据的增多,Java的垃圾回收会变得非常昂贵。
基于这些事实,利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好——我们至少要使得可用的缓存加倍,通过自动访问可用内存,并且通过存储更紧凑的字节结构而不是一个对象,这将有可能再次加倍。这么做的结果就是在一台32GB的机器上,如果不考虑GC惩罚,将最多有28-30GB的缓存。此外,这些缓存将会一直存在即使服务重启,然而进程内缓存需要在内存中重构(10GB缓存需要花费10分钟)或者它需要一个完全冷缓存启动(非常差的初始化性能)。它同时也简化了代码,因为现在所有的维护缓存和文件系统之间内聚的逻辑都在操作系统内部了,这使得这样做比one-off in-process attempts更加高效与准确。如果你的磁盘应用更加倾向于顺序读取,那么read-ahead在每次磁盘读取中实际上获取到这人缓存中的有用数据。
以上这些建议了一个简单的设计:不同于维护尽可能多的内存缓存并且在需要的时候刷新到文件系统中,我们换一种思路。所有的数据不需要调用刷新程序,而是立刻将它写到一个持久化的日志中。事实上,这仅仅意味着,数据将被传输到内核页缓存中并稍后被刷新。我们可以增加一个配置项以让系统的用户来控制数据在什么时候被刷新到物理硬盘上。
常数时间就满足要求
消息系统元数据的持久化数据结果经常是一个B树。B树是一个很好的结构,可以用在事务型与非事务型的语义中。但是它需要一个很高的花费。B树的操作需要O(logN)。通常情况下,这被认为与常数时间等价,但这对磁盘操作来说是不对的。磁盘寻道一次需要10ms,并且一次只能寻一个,因此并行化是受限的。
直觉上来讲,一个持久化的队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。尽管和B树相比,这种结构不能支持丰富的语义,但是它有一个优点,所有的操作都是常数时间,读数据不会阻塞写数据。
事实上几乎无限制的磁盘访问意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。
效率最大化
我们的假设是,消息的数量是相当大的,事实上是这个站点的一些page views。此外,我们假设,每一条被发布的消息至少被读一次(经常是多次),因此我们只去优化消费而不是生产。
一般情况下有两种情况会导致低效:大多的网络请求,过多的字节拷贝。
为了提高效率,API的构建是围绕消息集合的。一次网络请求发一个消息集合,而不是每一次只发一条消息。
MessageSet的实现本身是一个非常简单的API,它将一个字节数组或者文件进行打包。所以对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段可以按需反序列化(如果没有需要,可以不用反序列化)。
由broker保存的消息日志本身只是一个消息集合的目录,这些消息已经被写入磁盘。这种抽象允许单一一个字节可以被broker和消费者所分享(某种程度上生产者也可以,尽管生产者那头的消息只有再被计算过校验和之后才会加入到日志中去)。
维护这样的通用格式对可以对大多数重要的操作进行优化:持久日志数据块的网络传输。现在的Unix操作系统提供一种高优化的代码路径将数据从页缓存传到一个套接字(socket);在Linux中,这可以通过调用sendfile系统调用来完成。Java提供了访问这个系统调用的方法:FileChannel.transferTo api。
为了理解sendfile的影响,需要理解一般的将数据从文件传到套接字的路径:
1)操作系统将数据从磁盘读到内核空间的页缓存中
2)应用将数据从内核空间读到用户空间的缓存中
3)应用将数据写回内存空间的套接字缓存中
4)操作系统将数据从套接字缓存写到网卡缓存中,以便将数据经网络发出
这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
我们期望一个主题上有多个消费者是一种常见的应用场景。利用上述的零拷贝,数据只被拷贝到页缓存一次,然后就可以在每次消费时被重得利用,而不需要将数据存在内存中,然后在每次读的时候拷贝到内核空间中。这使得消息消费速度可以达到网络连接的速度。
端到端的批量压缩
在许多场景下,瓶颈实际上不是CPU而是网络。这在需要在多个数据中心之间发送消息的数据流水线的情况下更是如此。当然,用户可以不需要Kafka的支持而发送压缩后的消息,但是这会导致非常差的压缩率。高效的压缩需要将多个消息一块儿压缩而不是对每一个消息进行压缩。理想情况下,这可以在端到端的情况下实现,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩。
Kafka通过递归消息集合来支持这一点。一批消息可以放在一起被压缩,然后以这种形式发给服务器。这批消息会被递送到相同的消费者那里,并且保持压缩的形式,直到它到达目的地。
Kafka支持GZIP和Snappy压缩协议,更多的细节可以在这里找到:https://cwiki.apache.org/confluence/display/KAFKA/Compression
消费者状态
在Kafka中,消费者负责记录状态信息(偏移量),也就是已经消费到哪个位置了。准确地说,消费者库将他们的状态信息写到zookeeper中。但是,将状态数据写到另一个地方——处理结果所存放的数据中心——可能会更好。打个比方,消费者可能只需要简单地将一些合计值写到中心化的事务型OLTP数据库中。在这种情况下,消费者可以将状态信息写到同一个事务中。这解决了分布式一致性问题——通过去除分布式部分。类似的技巧可以用在一些非事务型的系统中。一个搜索系统可以将消费者状态存放在索引块中。尽管这不提供持久性保证,但这意味着索引可以和消费者状态保持同步:如果一个没有刷新的索引块在一次故障中丢失了,那么这些索引可以从最近的检查点偏移处开始重新消费。同样的,在并行加载数据到Hadoop时,可以利用类似的技巧。每个mapper在map 任务的最后将偏移量写到HDFS中。这样的话,如果一个加载任务失败了,每个mapper可以简单地从存储在HDFS中的偏移量处重启消费。
这个决定有另外一个好处。消费者可以重新消费已经消费过的数据。这违反了队列的性质,但是这样可以使多个消费者一起来消费。打个比方,如果一段消费者代码出bug了,在发现bug之间这个消费者又消费了一堆数据,那个在bug修复之后,消费者可以从指定的位置重新消费。
拉还是推?
Kafka采用的策略是:生产者把数据推到borker上,而消费者主动去broker上拉数据。最近的一些系统包括flume和scribe,都是broker将数据推给消费者,这有可能会存在一个问题,如果推的速度过快,消费者会被淹没。而在Kafka中不会出现这样的问题,因为消费者是主动去borker上拉数据的。
分布式
没有一个中心节点,broker之间是对等的,broker可以随时添加与删除。类似的,生产者与消费者可以在任何时间动态启动。每个borker在zookeeper上注册一些元数据。生产者与消费者可以利用zookeeper来发现主题,并且在生产与消费之间协调。关于这一点的细节会在下面讲到。
生产者
自动的生产者负载均衡
Kafka支持消息生产者在客户端的负载均衡,或者利用专有的负载均衡器来均衡TCP连接。一个专用的四层均衡器通过将TCP连接均衡到Kafka的broker上来工作。在这种配置下,所有的来自同一个生产者的消息被发送到一个borker上,这种做法的优点是,一个生产者只需要一个TCP连接,而不需要与zookeeper的连接。缺点是负载均衡只能在TCP连接的层面上来做,因此,它有可能不是均衡得非常好(如果一些生产者比其他生产者生产更多的消息,给每个broker分配相同的TCP连接不一定会使每个broker得到相同的消息)。
基于zookeeper的客户端的负载均衡可以解决这个问题。它允许生产者动态地发现新的broker,并且在每个请求上进行负载均衡。同样的,它允许生产者根据一些键将数据分开,而不是随机分,这可以增加与消费者的粘性(比如,根据用用户id来化分数据的消费)。这个特性被称为“语义化分”,下文会详述。
这种基于zookeeper的负载均衡如下所述。zookeeper watchers注册以下一些事件:
1)一个新的broker启动
2)一个broker关闭
3)一个新的主题注册进来
4)一个borker注册一个已经存在的主题
在内部,生产者维护一个与borker的弹性连接池。这个连接池通过zookeeper watchers的回调函数来保持更新以便与所有存活的broker建立或保持连接。当一个生产者对某一个主题的请求上来时,一个主题的分区被分区器提取到。连接池中的一个连接被用来将数据发送到前面所选的那个broker分区中。
异步发送
异步的非阻塞发送对于扩展消息系统是基本的。在Kafka中,生产者提供一个选项用来使用生产请求的异步分派(producer.type=async)。这允许将生产请求缓存在一个内存队列中,然后在被一个时间间隔或者预先设定的batch大小触发时发送出去。由于数据是从异构的机器上以不同的速率发布的,这种异步的缓存机制可以生成统一的通往broker的traffic, 从而使得网络资源得到充分利用,同时也提高吞吐量。
标签:
原文地址:http://www.cnblogs.com/vincent2010/p/4783100.html