标签:query zip 利用 tor 命中率 生成 shuff style 读取数据
SQL On Hadoop 设计的一个基本原则是:将计算任务移动到数据所在的节点而不是反过来。这主要出于网络优化的目的,因为数据分布在不同的节点,如果移动数据那么将会产生大量的低效的网络数据传输。数据本地化一般分为三种:节点局部性 (Node Locality), 机架局部性 (Rack Locality) 和全局局部性 (Global Locality)。节点局部性是指将计算任务分配到数据所在的节点上,此时无需任何数据传输,效率最佳。机架局部性是指将计算任务移动到数据所在的机架,虽然计算任务和数据分属不同的计算节点,但是因为机架内部网络传输速度明显高于机架间网络传输,所以机架局部性也是一种不错的方式。其他的情况属于全局局部性,此时需要跨机架进行网络传输,会产生非常大的网络传输开销。
调度系统在进行任务调度时,应该尽可能的保证节点局部性,然后是机架局部性,如果以上两者都不能满足,调度系统也会通过网络传输将数据移动到计算任务所在的节点,虽然性能相对低效,但也比资源空置比较好。
为了实现数据本地化调度,调度系统会结合延迟调度算法来进行任务调度。核心思想是优先将计算任务调度到数据所在的节点 i,如果节点 i 没有足够的计算资源,那么等待几秒钟后如果节点 i 依然没有计算资源可用,那么就放弃数据本地化将该计算任务调度到其他计算节点。
在一个追求低延迟的 SQL On Hadoop 系统中,尽可能的减少中间结果的磁盘物化可以极大的提高查询性能。 如下图,Hive 执行引擎采用 pull 获取数据,其优点是可以进行细粒度的容错,缺点是下游的 MapReduce 必须等待上游 MapReduce 完全将数据写入到磁盘后才能开始 pull 数据。Presto 采用 push 方式获取数据,数据完全以流的方式在不同 stage 之间进行传输,中间结果不需要物化到磁盘,从而使得 presto 具有非常高效的执行速度,缺点是不能支持细粒度的容错。
(点击放大图像)
图 3push 和 pull
传统的关系存储模型将一个元组的列连续存储,即使只查询一个列,也需要将整个元组读取出来,可以发现,当查询只有少量列时,性能非常低。
列存储的思想是将元组垂直划分为列族集合,每一个列族独立存储,列族可以退化为只仅包含一个列的平凡列族。当查询少量列时,列存储模型可以极大的减少磁盘 IO 操作,提高查询性能。当查询的列跨越多个列族时,需要将存储在不同列族中列数据拼接成原始数据,由于不同列族存储在不同的 HDFS 节点上,导致大量的数据跨越网络传输,从而降低查询性能。因此在实际使用列族时,通常根据业务查询特点,将频繁访问的列放在一个列族中。
在传统的数据库领域中,人们已经对列存储进行了非常深刻的研究,并且很多研究成果已经被应用到工业领域,其中包括轻量级压缩算法,直接操作压缩数据,延迟物化,向量化执行引擎。可是纵观目前 SQL On Hadoop 系统,这些技术的应用仍然远远的落后于传统数据库,在最近的一些 SQL On Hadoop 中已经添加了向量化执行引擎,轻量级压缩算法,但是诸如直接操作压缩数据,延迟解压等技术还没有被应用到 SQL on Hadop 系统。关于列存储的更多内容可以参见 [20]。
列存储压缩
列存储压缩算法具有如下特点:
压缩比 列存储模型具有非常高的压缩比,通常可以达到 10:1,而行存储压缩比通常只有 4:1。如图 4:
(点击放大图像)
图 4 重量级压缩算法
轻量级压缩算法 (Leight-Weight Compression) 轻量级压缩算法是 CPU 友好的。行存储模型只能使用 zip,lzo,snappy 等重量级压缩算法,这些算法最大的缺点是压缩和解压缩速度比较慢,通常每秒只能解压至多几百兆数据。相反,列存储模型不仅可以使用重量级压缩算法,还可以使用一些非常轻量级的压缩算法,比如 Run-length encode,Bit Vector。轻量级压缩算法不仅具有较好的压缩比,而且还具有非常高的压缩和解压速度。目前在 ORC File 和 Parquet 存储中,已经支持 Bit packing,Run-length enode,Dictionary encode 等轻量级压缩算法。
直接操作压缩数据 (Operating Directly on Compressed Data) 当使用轻量级压缩算法时,可能无需解压即可直接获取计算结果。例如:Run Length Encode 算法将连续重复的字符压缩为字符个数和字符,比如 aaaaaabbccccaaaa 将被压缩为 6a2b4c4a,其中 6a 表示有连续 6 个字符 a。现在假设一个某列包含上述压缩的字符串,当执行 select count(*) from table where columnA=’a’时,不需要解压 6a2b4c4a,就能够知道 a 的个数是 10。
需要注意的是,由于行存储只能使用重量级压缩算法,所以直接操作压缩数据不能被应用到行存储。
延迟解压 parquet 中的数据按块存储,每个块存储了最小值,最大值等轻量级索引,比如某个块的最小值最大值分别是 100 和 120,这表明该块中的任意一条数据都介于 100 到 120 之间,因此当我们执行 select column a from table where v>120 时,执行引擎可以跳过这个数据块,而不必将其解压再进行数据过滤。相反,在行存储中,必须将数据块完整的读取到内存中,解压,然后再进行数据过滤,导致不必要的磁盘读取操作。
一般情况下,压缩 HDFS 中的文件可以极大的提高查询性能。压缩能够减少数据所占用的存储空间,减少磁盘 IO 的读写,提高数据处理速度,此外,压缩还能够减少网络传输量,提高网络传输速度。在 SQL On Hadoop 中,压缩主要应用在 HDFS 中的数据源,shuffle 数据,最终计算结果。
如果应用程序是 io-bound 的,那么压缩数据可以提高数据处理速度,因为压缩后的数据变小了,所以可以增加数据读写速度。需要主要的是,压缩算法并不是压缩比越高越好,压缩率越高的算法压缩和解压缩速度就越慢,用户需要在 cpu 和 io 之间取得一个良好的平衡。例如 gzip2 拥有非常高的压缩比,但是其压缩和解压缩速度却非常慢,甚至可能超过数据未压缩时的读写时间,因此没有 SQL On Hadooop 系统使用 gzip2 算法,目前在 SQL On Hadoop 系统中比较流行的压缩算法主要有:Snappy,Lzo,Glib。
如果应用程序是 cpu-bound 的,那么选择一个可以 splittable 的压缩算法是很重要的,如果一个文件是 splittabe 的,那么这个文件可以被切分为多个可以并行读取的数据块,这样 MR 或者 Spark 在读取文件时,会为每一个数据块分配一个 task 来读取数据,从而提高数据查询速度。
查询执行引擎 (query execution engine) 是数据库中的一个核心组件,用于将查询计划转换为物理计划,并对其求值返回结果。查询执行引擎对数据库系统性能影响很大,目前主要的执行引擎有如下四类:Volcano-style,Block-oriented processing,Column-at-a-time,Vectored iterator model。下面分别介绍这四种执行引擎。
Volcano-style, 最早的查询执行引擎是 Volcano-style execution engine(火山执行引擎,火山模型),也叫做迭代模型 (iterator model),或者 one-tuple-at-a-time。在这种模型中,查询计划是一个由 operator 组成的 tree 或者 DAG,其中每一个 operator 包含三个函数:open,next,close。Open 用于申请资源,比如分配内存,打开文件,close 用于释放资源,next 方法递归的调用子 operator 的 next 方法生成一个元组。图 1 描述了 select id,name,age from people where age >30 的火山模型的查询计划,该查询计划包含 User,Project,Select,Scan 四个 operator,每个 operator 的 next 方法递归调用子节点的 next,一直递归调用到叶子节点 Scan operato,Scan Operator 的 next 从文件中返回一个元组。
(点击放大图像)
图 3-4 火山模型 摘自文献 [2,page 39]
火山模型的主要缺点是昂贵的解释开销 (interpretation overhead) 和低下的 CPU Cache 命中率。首先,火山模型的 next 方法通常实现为一个虚函数,在编译器中,虚函数调用需要查找虚函数表, 并且虚函数调用是一个非直接跳转 (indirect jump), 会导致一次错误的 CPU 分支预测 (brance misprediction), 一次错误的分支预测需要十几个周期的开销。火山模型为了返回一个元组,需要调用多次 next 方法,导致昂贵的函数调用开销。[] 研究表明,在采用火山执行模型的 MySQL 中执行 TPC-H Q1 查询,仅有 10% 的时间用于真正的查询计算,其余的 90% 时间都浪费在解释开销 (interpretation overhead)。其次,next 方法一次只返回一个元组,元组通常采用行存储,如图 3-5 Row Format,如果顺序访问第一列 1,2,3,那么每次访问都将导致 CPU Cache 命中失败 (假设该行不能完全放入 CPU Cache 中)。如果采用 Column Format,那么只有在访问第一个值时才出现缓存命中失败,后续访问 2 和 3 时都将缓存命中成功, 从而极大的提高查询性能。
(点击放大图像)
图 3-6 行存储和列存储
Block-oriented processing,Block-oriented processing 模型是对火山模型的一个改进,该模型一次 next 调用返回一批元组, 元组个数在 100-1000 不等,next 内部使用一个循环来处理这批元组。在图 1 的火山模型中,Select operator next 方法可以如下实现:
def next():Array[Tuple]={ // 调用子节点的 next 方法,返回一个元组向量,该向量包含 1024 个元组 val tuples=child.next() val result=new ArrayBuffer[Tuple] for(i=0;i<tuples.length;i++){ 30="" age="" val="">30) result.append(tuples(i)) } result// 返回结果 }
Block-oriented processing 模型的优点是一次 next 返回多个元组,减少了解释开销,同时也被证明增加了 CPU Cache 的命中率,当 CPU 访问元组中的某个列时会将该元组加载到 CPU Cache(如果该元组大小小于 CPU Cache 缓存行的大小), 访问后继的列将直接从 CPU Cache 中获取,从而具有较高的 CPU Cache 命中率,然而如果之访问一个列或者少数几个列时 CPU 命中率仍然不理想。该模型最大的一个缺点是不能充分利用现代编译器技术,比如在上面的循环中,很难使用 SIMD 指令处理数据。
Column-at-a-time 模型,向量化执行的最早历史可以追朔到 MonetDB[], 在 MonetDB 提出了一个叫做 Column-at-a-time 的查询执行模型,该模型中每一次 next 调用返回一个或者多个列,每个列以数组形式返回。该模型优点是具有非常高的查询效率,缺点是一个列数据需要被物化到内存甚至磁盘,导致很高的内存占用和 io 开销,同时数据不能放到 CPU Cache 中,导致较低的 CPU Cache 命中率。
Vectored iterator model,VectorWise 提出了 Vectored iterator model 模型,该模型是对 Column-at-a-time 的改进,next 调用不是返回完整的一个列,而是返回一个可以放到 CPU Cache 的向量。该模型避免了 Column-at-a-tim CPU Cache 命中率低的缺点。Vectored iterator model 最大的优点是可以使用运行时编译器 (JIT) 动态的生成更适合现代处理器的指令,比如 JIT 可以生成 SIMD 指令来处理向量。考虑 TPC-H Q1 查询:SELECT l_extprice*(1-l_discount)*(1+l_tax) FROM lineitem。该 SQL 查询的执行计划如下:
(点击放大图像)
其中 Project operator 的 next 方法可以如下实现 (scala 伪代码):
def next():Array[Tuple]={ val tuples=child.next() var result=new ArrayBuffer[Int] for(i=0;i<tuples.length;i++){ r="tuples.l_extprice*(1-tuple.l_discount)*(1+tuple.l_tax)" retult="" tuple="tuples(i)" val="">
近几年,一些 SQL On Hadoop 系统引入了向量化执行引擎,比如 Hive,Impala,Presto,Spark 等,尽管其实现细节不同,但核心思想是一致的:尽可能的在一次 next 方法调用返回多条数据,然后使用动态代码生成技术来优化循环,表达式计算从而减少解释开销,提高 CPU Cache 命中率,减少分支预测。
Impala 中的向量化执行引擎本质上属于 Block-oriented processing,imapla 的每次 next 调用返回一批元组,这种模型仍然具有较低的 CPU Cache 命中率,同时也很难使用 SIMD 等指令进行优化,为了缓解这个问题,Impala 使用动态代码生成技术,对于大循环,表达式计算等进行使用动态代码生成来进行优化。
在 Spark2.0 中,实现了基于 Parquet 的向量化执行引擎 [12],该执行引擎属于 Vectored iterator model,引擎在调用 next 方法时以列存储格式返回一批元组,可以使用循环来处理该批元组。此外为了更充分的利用现代 CPU 特性,Spark 还支持整阶段代码生成技术,核心思想是将多个 operator 编译到一个方法中,从而减少解释开销。
动态代码生成一般和向量化执行引擎结合使用,因为向量执行引擎的 next 方法内部可以使用 for 循环来处理元组向量或者列向量,使用动态代码生成技术可以在运行时对 next 方法生成更高效的执行代码。研究证明向量化执行引擎和动态代码生成可以减少解释开销 (interpretation overhead), 见文献 [18],主要影响以下三个方面:
Select, 当 select 语句中包含复杂的表达式计算时,比如 avg,sum,count,select 的计算性能主要受 CPU Cache 和 SIMD 指令影响。当数据不能放到 CPU Cache 时,CPU 大部分时间都在等待数据从内存加载到 CPU Cache,因此当 CPU 执行计算所需的数据在 CPU Cache 中时可以极大的提高计算性能。一条 SIMD 指令可以同时计算多个数据,因此使用 SIMD 指令执行表达式计算可以提高计算性能。
where,与 Select 语句不同的是 Where 语句一般不需要复杂的计算,影响 where 性能更多的是分支预测。如果 CPU 分支预测错误,那么之前的 CPU 流水线将全被清洗,一次 CPU 分支预测错误可能至少浪费十几个指令周期的开销。通过使用动态代码生成技术,JIT 编译器能够自动的生成分支预测友好的指令。
Hash,hash 算法影响 equal-join,group 的查询性能,hash 算法的 CPU Cache 命中率很低。[18] 描述了一种缓存友好的 hash 算法,可以显著的提高 hash 计算性能。
动态代码生成有两种:C++ 系和 java 系。其中 C++ 系可以直接生成本机可执行二进制代码,并且能够生成高效的 SIMD 指令,例如 Impala 使用 C++ 实现查询执行引擎,同时使用 LLVM 编译器动态的生成本机可执行二进制代码,LLVM 可以生成 SIMD 指令对表达式执行计算。Java 系利用反射机制动态的生成 java 字节码,一般而言,不能充分利用 SIMD 指令进行优化,Spark 使用反射机制动态的生成 java 字节码,通常很难直接利用 SIMD 进行表达式优化。此外在 Spark2.0 中所提供的整阶段代码生成 (Whole-Stage Code Generation) 技术也是动态代码生成技术将多个 Operator 编译成一个方法进行优化。
需要注意的是,动态代码生成技术并不总是万能药,在下图中,impala 的动态代码生成技术并没有提高 TPC-DS Q42,Q52,Q55 的查询速度,主要原因这些 SQL 语句的 SELECT 语句中并没有什么复杂的计算。
SQL On Hadoop 设计的一个基本原则是:将计算任务移动到数据所在的节点而不是反过来
标签:query zip 利用 tor 命中率 生成 shuff style 读取数据
原文地址:http://www.cnblogs.com/akirajay/p/7803655.html