7 月 6 日,Apache Flink 1.11 正式发布。从 3 月初进行功能规划到 7 月初正式发版,1.11 用将近 4 个月的时间重点优化了 Flink 的易用性问题,提升用户的生产使用体验。
SQL 作为 Flink 中公认的核心模块之一,对推动 Flink 流批一体功能的完善至关重要。在 1.11 中,Flink SQL 也进行了大量的增强与完善,开发大功能 10 余项,不仅扩大了应用场景,还简化了流程,上手操作更简单。
其中,值得注意的改动包括:
随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。
SQL
是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL
支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL
后,可能只需要几行 SQL 就可以轻松搞定。
Flink SQL 的发展大概经历了以下阶段:
经过了多个版本的迭代支持,SQL 模块在 Flink 中变得越来越重要,Flink 的 SQL 用户也逐渐扩大。基于 SQL 模块的 Python 接口和机器学习接口也在快速发展。毫无疑问, SQL 模块作为最常用的 API 之一和生态的集成变得越来越重要。
Flink SQL 在原有的基础上扩展了新场景的支持:
除此之外,Flink SQL 也从多个方面提高 SQL 的易用性,系统性的解决了之前的 Bug、完善了用户 API。
CDC 支持
CDC
格式是数据库中一种常用的模式,业务上典型的应用是通过工具(比如 Debezium 或 Canal)将 CDC
数据通过特定的格式从数据库中导出到 Kafka 中。在以前,业务上需要定义特殊的逻辑来解析 CDC 数据,并把它转换成一般的
Insert-only 数据,后续的处理逻辑需要考虑到这种特殊性,这种 work-around 的方式无疑给业务上带来了不必要的复杂性。
如果 Flink SQL 引擎能原生支持 CDC 数据的输入,将 CDC 对接到 Flink SQL 的 Changelog Stream 概念上,将会大大降低用户业务的复杂度。
流计算的本质是就是不断更新、不断改变结果的计算。考虑一个简单的聚合
SQL,流计算中,每次计算产生的聚合值其实都是一个局部值,所以会产生 Changelog Stream。在以前想要把聚合的数据输出到
Kafka 中,如上图所示,几乎是不可能的,因为 Kafka 只能接收 Insert-only 的数据。
Flink 之前主要是因为
Source&Sink 接口的限制,导致不能支持 CDC 数据的输入。Flink SQL 1.11 经过了大量的接口重构,在新的
Source&Sink 接口上,支持了 CDC 数据的输入和输出,并且支持了 Debezium 与 Canal
格式(FLIP-105)。这一改动使动态 Table Source 不再只支持 append-only
的操作,而且可以导入外部的修改日志(插入事件)将它们翻译为对应的修改操作(插入、修改和删除)并将这些操作与操作的类型发送到后续的流中。
如上图所示,理论上,CDC 同步到 Kafka 的数据就是 Append 的一个流,只是在格式中含有 Changelog 的标识:
为了消费 CDC 数据,用户需要在使用 SQL DDL 创建表时指指定“format=debezium-json”或者“format=canal-json”:
CREATE TABLE my_table ( ... ) WITH ( 'connector'='...', -- e.g. 'kafka' 'format'='debezium-json' );
Flink 1.11 的接口都已 Ready,但是在实现上:
Source & Sink 重构
Source & Sink 重构的一个重要目的是支持上节所说的 Changelog,但是除了 Changelog 以外,它也解决了诸多之前的遗留问题。
新 Source & Sink 使用标准姿势 (详见官方文档):
CREATE TABLE kafka_table (
... ) WITH ( 'connector' = 'kafka-0.10', 'topic' =
'test-topic', 'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092', 'format' =
'json', 'json.fail-on-missing-field' = 'false' );
Flink 1.11 为了向前兼容性,依然保留了老 Source & Sink,使用 “connector.type” 的 Key,即可 Fallback 到老 Source & Sink 上。
■ Factory 发现机制
Flink 1.11 前,用户可能经常遇到一个异常,叫做 NoMatchingFactory 异常:
指的是,定义了一个 DDL,在用的时候,DDL 属性找不到对应的 TableFactory 实现,可能的原因是:
但是报的异常让人非常疑惑,根据异常的提示消息,很难找到到底哪里的代码错了,更难明确知道哪个 Key 写错了。
public interface Factory { String factoryIdentifier(); …… }
所以在
Flink 1.11 中,社区重构了 TableFactory 接口,提出了一个新的 Factory 接口,它有一个方法,叫做
FactoryIdentifier。以后所有的 Factory 的 look up 都通过
identifier。这样的话就非常清晰明了,找不到是因为 Classpath 下没 Factory 的类,找得到那就可以定位到 Factory
的实现中,进行确定性的校验。
■ 类型与数据结构
之前的 Source&Sink 接口支持用户自定义数据结构,即框架知道如何把自定义的数据结构转换为 Flink SQL 认识的内部数据结构,如:
public interface TableSource<T> { TypeInformation<T> getReturnType(); ... }
用户可以自定义泛型 T,通过 getReturnType 来告诉框架怎么转换。
不过问题来了,当 getReturnType 和 DDL 中声明的类型不一致时怎么办?特别是两套类型系统的情况下,如:Runtime 的 TypeInformation,SQL 层的 DataType。由于精度等问题,可能导致经常出现类型不匹配的异常。
Flink 1.11 系统性地解决了这个问题。现在 Connector 开发者不能自定义数据结构,只能使用 Flink SQL 内部的数据结构:RowData。所以保证了默认 Type 与 DDL 的对应,不用再返回类型让框架去确认了。
RowData 数据结构在 SQL 内部设计出来为了:
■ Upsert 与 Primary Key
流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比如 JDBC、HBase,当遇到复杂的 SQL 时,时常会出现:
UpsertStreamTableSink
需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的
UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。
如何发现 Primary Key?一个方法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。
这种情况下在简单
Query 当中很好,也满足语义,也非常自然。但是如果是一个复杂的 Query,比如聚合又 Join
再聚合,那就只有报错了。不能期待优化器有多智能,很多情况它都不能推断出 PK,而且,可能业务的 SQL 本身就不能推断出
PK,所以导致了这样的异常。
怎么解决问题?Flink 1.11 彻底的抛弃了这个机制,不再从 Query 来推断 PK 了,而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个Jdbc Sink 会按照对应的 Key 来进行更新。如此,就跟 Query 完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义来。
CREATE TABLE jdbc_table (
id BIGINT,
...
PRIMARY KEY (id) NOT ENFORCED
)
首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 使用调度工具来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时需要叠加起来。
问题是慢,延迟大,并且 Ad-hoc 分析延迟也比较大,因为前面的数据入库,或者前面的调度的 ETL 会有很大的延迟。Ad-hoc 分析再快返回,看到的也是历史数据。
所以现在流行构建实时数仓,从 Kafka 读计算写入 Kafka,最后再输出到 BI DB,BI DB 提供实时的数据服务,可以实时查询。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。但是:
能否结合离线数仓和实时数仓两者的优势,然后构建一个 Lambda 的架构?
核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完 Hive 数仓、SQL,然后实时数仓也要写完相应 SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受,如果是大量的作业,维护成本其实是非常大的。
如何既享受 Ad-hoc 的好处,又能实现实时化的优势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些探索和尝试,如下图所示。它能实时地按 Streaming 的方式来导出数据,写到 BI DB 中,并且这套系统也可以用分析计算框架来进行 Ad-hoc 的分析。这个图当中,最重要的就是 Flink Streaming 的导入。
早期 Flink 版本在 DataStreaming 层,已经有一个强大的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的系统,能实现一条数据不多,一条数据不少的 Sink。
具体原理是基于两阶段提交:
这样一个 File system 的 Writer 看似比较完美了。但是在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来通知 Hive Metastore 呢?
SQL 层在 StreamingFileSink,扩展了 Partition 的 Committer。
相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而后面再接了一个节点 PartitionCommitter。支持的 Commit Policy 有:
Committer 挂在 Writer 后, 由 Commit Trigger 决定什么时机来 commit :
Hive 数仓中存在大量的 ETL 任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:
针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,支持:
甚至可以使用 10 分钟级别的分区策略,使用 Flink 的 Hive streaming source 和 Hive streaming sink ,可以大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也支持针对 Table 全量的 Ad-hoc 查询,提高灵活性。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;
另外除了 Scan 的读取方式,Flink 1.11 也支持了 Temporal Join 的方式,也就是以前常说的 Streaming Dim Join。
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
目前支持的方式是 Cache All,并且是不感知分区的,比较适合小表的情况。
Flink SQL 遵循的是 ANSI-SQL 的标准,而 Hive SQL 有它自己的 HQL 语法,它们之间的语法、语义都有些许不同。
如何让 Hive 用户迁移到 Flink 生态中,同时避免用户太大的学习成本?为此, Flink SQL 1.11 提供了 Hive Dialect,可以使得用户在 Flink 生态中使用 HQL 语言来计算。目前只支持 DDL,后续版本会逐步攻坚 Qeuries。
Hive Integration 提供了一个重量级的集成,功能丰富,但是环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?
Flink table 在长久以来只支持一个 CSV 的 Filesystem Table,并且还不支持 Partition,行为上在某些方面也有些不符合大数据计算的直觉。
Flink 1.11 重构了整个 Filesystem connector 的实现:
用几句简单的 SQL,不用搭建 Hive 集成环境即可:
在 Flink 1.10 以后,Hive MetaStore 逐渐成为 Flink streaming SQL 中 Table 相关的 Meta 信息的存储。比如,可以通过 Hive Catalog 保存 Kafka Tables。这样可以在启动的时候直接使用 Tables。
通过 DDL 这种方式,把 SQL 提交到 Cluster,就可以写入 Kafka,或者写入 MySQL、 DFS。使用 Hive Catalog 后,是不是说只用写一次 DDL,之后的流计算作业都是直接使用 Kafka 的 Table 呢?
不完全是,因为还是有一些缺陷。比如,一个典型的 Kafka Table 有一些 Execution 相关的参数。因为 kafka 一般来说都是存 15 天以内的数据,需要指定每次消费的时间偏移,时间偏移是在不断变化的。每次提交作业,使用 Kafka Table 的参数是不一样的。而这些参数又存储在 Catalog 里面,这种情况下只能创建另外一张表,所以字段和参数要重写一遍,非常繁琐。
Table Hints:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。