从 Flink 的官方文档,我们知道 Flink 的编程模型分为四层,sql 是最高层的 api, Table api 是中间层,DataSteam/DataSet Api 是核心,stateful Streaming process 层是底层实现。
刚开始我们直接使用 Flink Table 做为数据关联的方式,直接将接入进来的 DataStream 注册为 Dynamic Table 后进行两表关联查询,如下图:
但尝试后发现在做那些日志数据量大的关联查询时往往只能在较小的时间窗口内做查询,否则会超过 datanode 节点单台内存限制,产生异常。但为了满足不同业务日志延迟到达的情况,这种实现方式并不通用。
之后,我们直接在 DataStream 上进行处理,在 CountWindow 窗口内进行关联操作,将被关联的数据 Hash 打散后存储在各个 datanode 节点的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 这一特性进行算子内数据的备份与恢复。这种方式是可行的,但受制于 Rocksdb 集群物理磁盘为非 SSD 的因素,这种方式在我们的实际线上场景中关联耗时较高。
如 Redis 类的 KV 存储的确在查询速度上提升不少,但类似广告日志数据这样单条日志大小较大的情况,会占用不少宝贵的机器内存资源。经过调研后,我们选取了 Hbase 作为我们日志关联组件的关联数据存储方案。
为了快速构建关联任务,我们开发了基于 Flink 的配置化组件平台,提交配置文件即可生成数据关联任务并自动提交到集群。下图是任务执行的处理流程。
示意图如下:
下图是关联组件内的执行流程图:
随着日志量的增加,某些需要进行关联的日志数量可能达到日均十几亿甚至几十亿的量级。前期关联组件的配置化生成任务的方式的确解决了大部分线上业务需求,但随着进一步的关联需求增加,Hbase 面临着巨大的查询压力。在我们对 Hbase 表包括 rowkey 等一系列完成优化之后,我们开始了对关联组件的迭代与优化。
第一步,减少 Hbase 的查询。我们使用 Flink Interval Join 的方式,先将大部分关联需求在程序内部完成,只有少部分仍需查询的日志会去查询外部存储(Hbase). 经验证,以请求日志与实验日志关联为例,对于设置 Interval Join 窗口在 10s 左右即可减少 80% 的 hbase 查询请求。
① Interval Join 的语义示意图
数据 JOIN 的区间 - 比如时间为 3 的 EXP 会在 IMP 时间为[2, 4]区间进行JOIN;
WaterMark - 比如图示 EXP 一条数据时间是 3,IMP 一条数据时间是 5,那么WaterMark是根据实际最小值减去 UpperBound 生成,即:Min(3,5)-1 = 2;
过期数据 - 出于性能和存储的考虑,要将过期数据清除,如图当 WaterMark 是 2 的时候时间为 2 以前的数据过期了,可以被清除。
② Interval Join 内部实现逻辑
③ Interval Join 改造
因 Flink 原生的 Intervak Join 实现的是 Inner Join,而我们业务中所需要的是 Left Join,具体改造如下:
取消右侧数据流的 join 标志位;
左侧数据流有 join 数据时不存 state。
2)关联率动态监控
在任务执行中,往往会出现意想不到的情况,比如被关联的数据日志出现缺失,或者日志格式错误引发的异常,造成关联任务的关联率下降严重。那么此时关联任务虽然继续在运行,但对于整体数据质量的意义不大,甚至是反向作用。在任务进行恢复的时,还需要清除异常区间内的数据,将 Kafka Offset 设置到异常前的位置再进行处理。
故我们在关联组件的优化中,加入了动态监控,下面示意图:
关联任务中定时探测指定时间范围 Hbase 是否有最新数据写入,如果没有,说明写 Hbase 任务出现问题,则终止关联任务;
当写 Hbase 任务出现堆积时,相应的会导致关联率下降,当关联率低于指定阈值时终止关联任务;
当关联任务终止时会发出告警,修复上游任务后可重新恢复关联任务,保证关联数据不丢失。
为了快速进行日志数据的指标抽取,我们开发了基于 Flink 计算平台的指标抽取组件Logwash。封装了基于 Freemaker 的模板引擎做为日志格式的解析模块,对日志进行提取,算术运算,条件判断,替换,循环遍历等操作。
下图是 Logwash 组件的处理流程:
组件支持文本与 Json 两种类型日志进行解析提取,目前该清洗组件已支持微博广告近百个实时清洗需求,提供给运维组等第三方非实时计算方向人员快速进行提取日志的能力。
配置文件部分示例:
Flink 中 DataStream 的开发,对于通用的逻辑及相同的代码进行了抽取,生成了我们的通用组件库 FlinkStream。FlinkStream 包括了对 Topology 的抽象及默认实现、对 Stream 的抽象及默认实现、对 Source 的抽象和某些实现、对 Operator 的抽象及某些实现、Sink 的抽象及某些实现。任务提交统一使用可执行 Jar 和配置文件,Jar 会读取配置文件构建对应的拓扑图。
对于 Source 进行抽象,创建抽象类及对应接口,对于 Flink Connector 中已有的实现,例如 kafka,Elasticsearch 等,直接创建新 class 并继承接口,实现对应的方法即可。对于需要自己去实现的 connector,直接继承抽象类及对应接口,实现方法即可。目前只实现了 KafkaSource。
与 Source 抽象类似,我们实现了基于 Stream 到 Stream 级别的 Operator 抽象。创建抽象 Operate 类,抽象 Transform 方法。对于要实现的 Transform 操作,直接继承抽象类,实现其抽象方法即可。目前实现的 Operator,直接按照文档使用。如下:
针对 Sink,我们同样创建了抽象类及接口。对 Flink Connector 中已有的 Sink 进行封装。目前可通过配置进行数据输出的 Sink。目前以实现和封装的 Sink 组件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。
创建 Stream 抽象类及抽象方法 buildStream,用于构建 StreamGraph。我们实现了默认的 Stream,buildStream 方法读取 Source 配置生成 DataStream,通过 Operator 配置列表按顺序生成拓扑图,通过 Sink 配置生成数据写出组件。
对于单 Stream,要处理的逻辑可能比较简单,主要读取一个 Source 进行数据的各种操作并输出。对于复杂的多 Stream 业务需求,比如多流 Join,多流 Union、Split 流等,因此我们多流业务进行了抽象,产生了 Topology。在 Topology 这一层可以对多流进行配置化操作。对于通用的操作,我们实现了默认 Topology,直接通过配置文件就可以实现业务需求。对于比较复杂的业务场景,用户可以自己实现 Topology。
我们对抽象的组件都是可配置化的,直接通过编写配置文件,构造任务的运行拓扑结构,启动任务时指定配置文件。
正文文本框 Flink Environment 配置化,包括时间处理类型、重启策略,checkpoint 等;
Topology 配置化,可配置不同 Stream 之间的处理逻辑与 Sink;
Stream 配置化,可配置 Source,Operator 列表,Sink。
配置示例如下:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。