OLAP系统广泛应用于BI、Reporting、Ad-hoc、ETL数仓分析等场景,本文主要从体系化的角度来分析OLAP系统的核心技术点,从业界已有的OLAP中萃取其共性,分为谈存储,谈计算,谈优化器,谈趋势4个章节。
列存的数据组织形式
行存,可以看做NSM (N-ary Storage Model)组织形式,一直伴随着关系型数据库,对于OLTP场景友好,例如innodb[1]的B+树聚簇索引,每个Page中包含若干排序好的行,可以很好的支持tuple-at-a-time式的点查以及更新等;而列存(Column-oriented Storage),经历了早期的DSM(Decomposition Storage Model)[2],以及后来提出的 PAX (Partition Attributes Cross) 尝试混合NSM和DSM,在C-Store论文[3]后逐渐被人熟知,用于OLAP,分析型不同于交易场景,存储IO往往是瓶颈,而列存可以只读取需要的列,跳过无用数据,避免IO放大,同质数据存储更紧凑,编码压缩友好,这些优势可以减少IO,进而提高性能。
编码与压缩
对于基本类型,例如数值、string等,列存可以使用合适的编码,减少数据体积,在C-Store论文中对于是否排序、NDV(Number of Distince Values)区分度,这4种排列组合,给出了一些方案,例如数值类型,无序且NDV小的,转成bitmap,然后bit-packing编码。其他场景的编码还有varint、delta、RLE(Run Length Encoding)、字典编码(Dictionary Encoding)等,这些轻量级的编码技术仅需要多付出一些CPU,就可以节省不小的IO。对于复杂类型,嵌套类型的可以使用Google Dremel论文[4]提出Striping/Assembly算法(开源Parquet),使用Definition Level+Repetition Level做编解码。在编码基础上,还可以进行传统的压缩,例如LZ4、Snappy、zstd。
一些其他的选项,包括HBase,实际存储的是纯二进制,仅支持Column Family,实际不是columnar format,一些序列化框架和Hadoop融合比较好的,例如Avro,也不是列式存储。
存储格式
现代的OLAP往往采用行列混存的方案,采用Data Block + Header/Footer的文件结构,例如Parquet、ORC,Data Block使用Row Group(Parquet的叫法,ORC叫做Stripe)-> Column Chunk -> Page 三层级,每一层又有metadata,Row Group meta包含row count,解决暴力count(*),Column Chunk meta包含max、min、sum、count、distinct count、average length等,还有字典编码,解决列剪枝,并且提供基础信息给优化器,Page meta同样可以包含max、min等,跳页用于加速计算。
存储索引
在Parquet、ORC中,除了列meta信息外,不提供其他索引,在其他存储上,支持了更丰富的索引,索引可以做单独的块(Index Block),或者形成独立的文件。例如阿里云ADB[5],对于cardinality较小的,可以做bitmap索引,多个条件下推使用and/or。倒排索引也是可选的,需要在空间和性能上有所折中,还可以支持全文检索。Bloom Filter可以加速等值查询过滤。另外,可以引入OLTP中主键的概念,文件内可按照主键排序,也就Data Block内部是有序的,主键的索引可以使用B+ Tree或者Masstree[6](例如KUDU[7]),或者借鉴LevelDB的思想,在Index Block内对主键Key做稀疏索引,方便二分查找,Index Block常驻内存,这样有利于对于点查(point query)和顺序扫描的范围查询(range query),另外其他列也可以做稀疏有序索引。
分布式存储
DAC(Divide And Conquer)在分布式领域也是屡试不爽,要突破单机存储大小和IO限制,就需要把一个文件划分为若干小分片(sharding),以某个字段做round-robin、constant、random、range、hash,分布在不同的文件或者机器,形成分布式存储。存在单机存储(SATA、SSD、NVM),例如Greenpulm基于PostgreSQL做OLAP,是share nothing架构。文件也可以存在分布式存储(GFS、HDFS)或者对象存储(S3、OSS),是share everthing(share storage)架构,好处在于扩展性和可用性的提高,由于存储网络延迟,所以一般都做批量、追加写,而非随机写,这把双刃剑也加大了OLAP在实时更新上难度,所以很多都放弃了实时写和ACID能力。下面重点说下分布式存储,文件存储在HDFS上,每个分片是一个HDFS block(例如128MB大小),便于高吞吐大块IO顺序读,串行强一致写,一个Row Group大小等于block size,便于上层计算引擎例如Spark SQL作业并行计算。一些新型的分布式存储上,与HDFS不同,更专用的去管理文件和分片,采用Centralized Master + 多个Tablet架构,例如KUDU以及OLTP新兴的Tikv,分片的多副本依赖于一致性协议Paxos或者Raft,多个分片组成Raft-Group,这样可以打散一个表(文件)到多分片多副本的架构上。Centralized Master管理分片存放的位置,元数据(例如范围),便于负载均衡、分裂合并等。
数据分区
再往宏观放大,一张表可以存在若干分区(partitioning),例如Hive按照某个时间字段做分区,利用metastore保存,方便做查询分区剪枝(partition prune),直接跳过不需要扫描的文件。分区往往采用range或者hash策略,按照range分区,可以优化OLAP的范围查询和快速点查,按照hash分区,可以有效解决hotspot热点读,还可以将二者结合,做二级分区(two-level),例如阿里云ADB、ClickHouse,一般DISTRIBUTED BY HASH再PARTITION BY RANGE,而百度Palo一般先按时间一级分区,更好做冷热数据区分,二级分区采用hash。
实时写入和ACID
随着实时数仓和HTAP,HSAP[8]等概念的兴起,对于传统数据处理的Lambda架构弊端就凸显出来,链路长,数据冗余,数据一致性不好保证等。融合OLTP的能力,第一点就是在之前所述的immutable table file上做实时增删改,要保证低延迟,高吞吐,可以借鉴LSM-Tree架构,优化写吞吐,将流式的低延迟随机写,最终变成聚批mini-batch的group commit顺序写,依赖write-ahead log保证持久性,最终形成base+delta的文件结构,读流程需要进行delta或者base的跳读,扫描需要做多路归并,另外后台通过major、minor compaction可以不断优化读性能,在阿里云ADB,KUDU,Google MESA[9]里面都采用了类似的方案。在读写一致性层面,需要提供ACID和事务隔离特性,比较好保证单行和mini-batch的原子性,持久性不言而喻,对于一致性和事务隔离,可以采用MVCC机制,每个写都带有version,很简单的实现带版本查一致性,快照一致性(snapshot isolation)。
查询步骤
SQL语言是OLAP的标配,一个完整的SQL查询步骤包括1)SQL词法解析,语法解析,2)形成抽象语法树(AST),3)校验检查,4)AST转成关系代数表达式(relational algebra),5)根据关系代数表达式生成执行计划,先生成逻辑执行计划(logical plan),6)经过优化器生成最优的执行计划,7)根据执行计划生成物理执行计划(physical plan),8)最终交由执行器执行并返回结果。
由SQL到AST的过程,类库和工具较多,C++可用Lex/Yacc,Java可用JavaCC/ANTLR,也可以自己手写实现。由AST到关系代数表达式,可以使用visitor模式遍历。下一章节谈优化器。本节聚焦在物理执行计划后的执行阶段。
OLAP数据建模分类
ROLAP和MOLAP。Relational OLAP(ROLAP)对SQL支持好,查询灵活,使用组合模型,雪花或者星型模型组织多张表。ROLAP计算的数据规模往往小于离线大数据计算(Hive/Spark),ROLAP产品很多,包括传统的Greenpulm、Vertica、Teradata,Sql-on-Hadoop系的Presto、Impala、Spark SQL、HAWQ,云计算厂商的阿里云ADB、Google BigQuery,AWS RedShift,有学术界出品的MonetDB[10],还有新兴的ClickHouse。
如果把查询阶段分为
cache
/\
|
pre-computing -> computing -> post computing
上面的提到的存储技术更多是为了ROLAP在computing阶段优化考虑的,如果把计算中的熵前置到pre-computing阶段做预计算,也可以大幅优化computing阶段。Multidimensional OLAP(MOLPA)可以把数据预计算,有些场景下不一定需要细粒度的fact,可以严格区分维度列和指标列,使用Kylin、Druid等,利用上卷(roll-up)做数据立方体(data cube),这样可以大大减少OLAP场景下聚合查询的IO,另外百度Palo、Google MESA,基于上卷操作做物化视图,也减少了IO消耗,所以他们对于高并发查询支持普遍较好,但是缺点就在于查询不够灵活,数据有冗余。下文主要针对ROLAP谈计算。
计算引擎分类
物理执行计划往往是一个DAG,叶子节点都是TableScan,这个DAG的分布式执行器就是计算引擎(Query Engine),分为两个流派。
第一类是基于离线计算引擎,例如Hive on MR,Spark SQL,阿里云MaxCompute,支持超大规模的数据,进行了容错保证,多个stage落盘(spill to disk),使用resource manager调度和queueing,作业可能持续非常长的时间,占用大量资源,并发低。
第二类是MPP,例如Greenpulm、Presto、Impala、阿里云ADB,RedShift支持大规模数据,不需要reource manager耗时的分配资源和调度任务,long-running的task manager,只需要轻量级的调度,查询一般不容错,算子并行执行,并行度有限制避免straggler node影响TP99,相比基于离线的计算引擎往往是短任务,查询耗时不会太长。
Presto、Impala属于Sql-on-Hadoop MPP,利用Hive metastore,直接读取Parquet、ORC等文件格式,Greenpulm、RedShift基于PostgreSQL,阿里云ADB采用私有的数据存储技术,计算存储分离的架构,存储表到分布式存储盘古上。
MPP架构
基本上MPP都由coordinator,worker,metastore,scheduler组成,各个产品名称不同而已,coordinator负责从SQL到物理执行计划的生成,期间使用metastore获取待查询数据源、分区、分片信息、SQL校验等,调用scheduler分发task到不同的worker节点执行,收集结果返回。一个物理执行计划等同于一个DAG,每个节点抽象为一个逻辑的算子(operator),最简单的算子实现就是解释执行(interpreted)的模式,由worker跑起来就可以看做一个task了,可以包含一个或者多个逻辑上的算子(有时候task也叫做stage)。算子包括HashJoin、TableScan、Aggregation等,叶子节点一般是TableScan,拉取存储中数据。MPP架构就是充分利用分布式的特性,让算子并行的计算,加速查询。
计算执行
数据流。DAG在进行数据流动时,采用pipeline方式,也就是下游stage不用等上游stage完全执行结束就可以拉取数据并执行计算。数据不落盘,算子之间通过内存直接拷贝到socket buffer发送,需要保证内存足够大,否则容易OOM。
火山模型(Volcano-style),是一种Row-Based Streaming Iterator Model算子的实现,只需要open、next、close三个函数,就可以实现数据从底向上的“拉”取,驱动计算进行。
向量化执行(Vectorized query)。MonetDB论文提出了火山模型的改进方案——向量化执行,火山模型tuple-at-a-time的实现,每个算子执行完传递一行给上游算子继续执行,函数调用过多,且大量的虚函数调用,条件分支预测失败,直接现象就是CPU利用率低(low IPC)。而现代的CPU有多级流水线可以实现指令级并行,超标量(super scalar)实现乱序执行,对于forloop可以有效优化,超线程还能实现线程级并行,而CPU多级的Cache,以及cache line的有效利用避免cache miss,再配合编译器的优化,都会大大加速计算过程。向量化执行的思想就是算子之间的输入输出是一批(Batch,例如上千行)数据,这样可以让计算更多的停留在函数内,而不是频繁的交互切换,提高了CPU的流水线并行度,而且还可以使用SIMD指令,例如AVX指令集来实现数据并行处理。实际实现中,例如Impala各个算子的input虽然是RowBatch,但除了TableScan算子,其他的也是火山模型执行式的row by row处理,TableScan读存储,列式内存布局加速pushdown的filter执行,aggregation下推后还可以使用SIMD指令加速聚合。但是向量化也会带来额外的开销,就是物化中间结果(materlization),以牺牲物化的开销换取更高的计算性能。
动态代码生成(codegen)。解释执行(interpreted)的算子,因为面向通用化设计,大数据集下往往效率不高,可以使用codegen动态生成算子逻辑,例如Java使用ASM或者Janino,C++使用LLVM IR,这样生成的算子更贴近计算,减少了冗余和虚函数调用,还可以多个算子糅合成一个函数。另外表达式计算的codegen还可以做的更极致,一些简单的计算可以做成汇编指令,进一步加速。
关于向量化或者codegen,孰优孰劣,论文Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask [11]进行了深入的对比。二者也可以融合,通过codegen生成向量化执行代码,另外也不一定做wholestage codegen,和解释执行也可以一起配合。
计算的耗时有一部分会损耗在IO、CPU的闲置上。内存的布局和管理,行式布局还是列式布局,对于CPU Cache是否友好,内存池还是按需分配,都会影响着系统的吞吐,C++可自行维护Arena或者使用jemalloc等框架,而Java的heap memory比较低效还影响GC,因此使用Unsafe API操作堆外内存。另外Arrow的兴起,也对于跨进程通信后,不必进行数据反序列化、内存分配再拷贝,就可以读取列式的数据,也进一步加速了计算。
常见算子实现
TableScan算子直读底层数据源,例如Presto,抽象了很好了connector,可对接多种数据源(Hadoop,对象存储等),一般都支持projection、filter,因此可以做filter pushdown和projection pushdown到TableScan。
Join算子的实现,如果两个表都很小,最简单的利用in-memory hash join、simple nested loop join;一大一小,扫描小表,根据大表做index lookup join,还可以广播小表做broadcast join,例如一般维度表都比较小;两个大表,如果两个表的join key的一级分区策略相同,则可以很好的对齐,避免大表shuffle,直接在大表的sharding做local join,如果不能对齐,则两个表按照join key shuffle到其他节点,重分布式后再做hash join,另外如果两个表的join key有序,还可以使用sort-merge join。
资源管理与调度
MPP架构下coordinator需要scheduler调度task到worker节点,对于长计算任务或者ETL任务,会占用很多资源,导致OLAP的并发度受限,其他请求需要排队,因此很难服务对外在线请求,为了迎合混合负载,传统scheduler简单粗暴的调度和资源管理已经无法满足要求,因此可以进行任务的fine grained schedule避免空闲资源,请求间对资源的使用尽量的隔离,避免bad query吃满资源,同时可以label化集群,让更多的小请求也可以快速得到响应。当OLAP系统足够高性能后,更好的资源管理和调度,将会提升OLAP为一个支持高并发、低延迟的,可对外提供在线服务的系统,而不仅仅是一个in-house的分析系统。
查询优化器不光是传统数据库DB2、Oracle、MySQL的核心,在OLAP里也至关重要。AST转为SQL形式化表达语言——关系代数表达式(relational algebra),代码实现就是一颗关系运算符组成的树,查询优化主要是围绕着“等价交换”的原则做相应的转换,优化关系代数表达式。关系代数的基本运算包括投影(project)、选择(select)、并(union)、差(set difference)、连接(join)等。优化器分为Rule-Based Optimizer (RBO) 和Cost-Based Optimizer (CBO) 两类。
RBO
会将原有表达式裁剪掉,遍历一系列规则(Rule),只要满足条件就转换,生成最终的执行计划。一些常见的规则包括分区裁剪(Partition Prune)、列裁剪、谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折叠(Constant Folding)、子查询内联转join等。
CBO
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。