大数据处理之日均百亿级日志处理:微博基于 Flink 的实时计算平台建设(一)

随着微博业务线的快速扩张,微博广告各类业务日志的数量也随之急剧增长。传统基于 Hadoop 生态的离线数据存储计算方案已在业界形成统一的默契,但受制于离线计算的时效性制约,越来越多的数据应用场景已从离线转为实时。微博广告实时数据平台以此为背景进行设计与构建,目前该系统已支持日均处理日志数量超过百亿,接入产品线、业务日志类型若干。

一.技术选型

相比于 Spark,目前 Spark 的生态总体更为完善一些,且在机器学习的集成和应用性暂时领先。但作为下一代大数据引擎的有力竞争者-Flink 在流式计算上有明显优势,Flink 在流式计算里属于真正意义上的单条处理,每一条数据都触发计算,而不是像 Spark 一样的 Mini Batch 作为流式处理的妥协。Flink 的容错机制较为轻量,对吞吐量影响较小,而且拥有图和调度上的一些优化,使得 Flink 可以达到很高的吞吐量。而 Strom 的容错机制需要对每条数据进行 ack,因此其吞吐量瓶颈也是备受诟病。

这里引用一张图来对常用的实时计算框架做个对比。

1

Flink 特点

Flink 是一个开源的分布式实时计算框架。Flink 是有状态的和容错的,可以在维护一次应用程序状态的同时无缝地从故障中恢复;它支持大规模计算能力,能够在数千个节点上并发运行;它具有很好的吞吐量和延迟特性。同时,Flink 提供了多种灵活的窗口函数。

1)状态管理机制

Flink 检查点机制能保持 exactly-once 语义的计算。状态保持意味着应用能够保存已经处理的数据集结果和状态。

2

2)事件机制

Flink 支持流处理和窗口事件时间语义。事件时间可以很容易地通过事件到达的顺序和事件可能的到达延迟流中计算出准确的结果。

3

3)窗口机制

Flink 支持基于时间、数目以及会话的非常灵活的窗口机制(window)。可以定制 window 的触发条件来支持更加复杂的流模式。

4

4)容错机制

Flink 高效的容错机制允许系统在高吞吐量的情况下支持 exactly-once 语义的计算。Flink 可以准确、快速地做到从故障中以零数据丢失的效果进行恢复。

5

5)高吞吐、低延迟

Flink 具有高吞吐量和低延迟(能快速处理大量数据)特性。下图展示了 Apache Flink 和 Apache Storm 完成分布式项目计数任务的性能对比。

6

二.架构演变

初期架构

初期架构仅为计算与存储两层,新来的计算需求接入后需要新开发一个实时计算任务进行上线。重复模块的代码复用率低,重复率高,计算任务间的区别主要是集中在任务的计算指标口径上。

在存储层,各个需求方所需求的存储路径都不相同,计算指标可能在不通的存储引擎上有重复,有计算资源以及存储资源上的浪费情况。并且对于指标的计算口径也是仅局限于单个任务需求里的,不通需求任务对于相同的指标的计算口径没有进行统一的限制于保障。各个业务方也是在不同的存储引擎上开发数据获取服务,对于那些专注于数据应用本身的团队来说,无疑当前模式存在一些弊端。

7

后期架构

随着数据体量的增加以及业务线的扩展,前期架构模式的弊端逐步开始显现。从当初单需求单任务的模式逐步转变为通用的数据架构模式。为此,我们开发了一些基于 Flink 框架的通用组件来支持数据的快速接入,并保证代码模式的统一性和维护性。在数据层,我们基于 Clickhouse 来作为我们数据仓库的计算和存储引擎,利用其支持多维 OLAP 计算的特性,来处理在多维多指标大数据量下的快速查询需求。在数据分层上,我们参考与借鉴离线数仓的经验与方法,构建多层实时数仓服务,并开发多种微服务来为数仓的数据聚合,指标提取,数据出口,数据质量,报警监控等提供支持。

8

整体架构分为五层:

1)接入层:接入原始数据进行处理,如 Kafka、RabbitMQ、File 等。

2)计算层:选用 Flink 作为实时计算框架,对实时数据进行清洗,关联等操作。

3)存储层:对清洗完成的数据进行数据存储,我们对此进行了实时数仓的模型分层与构建,将不同应用场景的数据分别存储在如 Clickhouse,Hbase,Redis,Mysql 等存储。服务中,并抽象公共数据层与维度层数据,分层处理压缩数据并统一数据口径。

4)服务层:对外提供统一的数据查询服务,支持从底层明细数据到聚合层数据 5min/10min/1hour 的多维计算服务。同时最上层特征指标类数据,如计算层输入到Redis、Mysql 等也从此数据接口进行获取。

5)应用层:以统一查询服务为支撑对各个业务线数据场景进行支撑。

  • 监控报警:对 Flink 任务的存活状态进行监控,对异常的任务进行邮件报警并根据设定的参数对任务进行自动拉起与恢复。根据如 Kafka 消费的 offset 指标对消费处理延迟的实时任务进行报警提醒。

  • 数据质量:监控实时数据指标,对历史的实时数据与离线 hive 计算的数据定时做对比,提供实时数据的数据质量指标,对超过阈值的指标数据进行报警。

三.数据处理流程

1.整体流程

整体数据从原始数据接入后经过 ETL 处理, 进入实时数仓底层数据表,经过配置化聚合微服务组件向上进行分层数据的聚合。根据不同业务的指标需求也可通过特征抽取微服务直接配置化从数仓中抽取到如 Redis、ES、Mysql 中进行获取。大部分的数据需求可通过统一数据服务接口进行获取。

9

2.问题与挑战

原始日志数据因为各业务日志的不同,所拥有的维度或指标数据并不完整。所以需要进行实时的日志的关联才能获取不同维度条件下的指标数据查询结果。并且关联日志的回传周期不同,有在 10min 之内完成 95% 以上回传的业务日志,也有类似于激活日志等依赖第三方回传的有任务日志,延迟窗口可能大于1天。

并且最大日志关联任务的日均数据量在 10 亿级别以上,如何快速处理与构建实时关联任务的问题首先摆在我们面前。对此我们基于 Flink 框架开发了配置化关联组件。对于不同关联日志的指标抽取,我们也开发了配置化指标抽取组件用于快速提取复杂的日志格式。以上两个自研组件会在后面的内容里再做详细介绍。

1)回传周期超过关联窗口的日志如何处理?

对于回传晚的日志,我们在关联窗口内未取得关联结果。我们采用实时+离线的方式进行数据回刷补全。实时处理的日志我们会将未关联的原始日志输出到另外一个暂存地(Kafka),同时不断消费处理这个未关联的日志集合,设定超时重关联次数与超时重关联时间,超过所设定任意阈值后,便再进行重关联。离线部分,我们采用 Hive 计算昨日全天日志与 N 天内的全量被关联日志表进行关联,将最终的结果回写进去,替换实时所计算的昨日关联数据。

2)如何提高 Flink 任务性能?

① Operator Chain

为了更高效地分布式执行,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

Flink 会在生成 JobGraph 阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个 task(一个线程)中执行,以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟。下面以官网中的例子进行说明。

10

图中,source、map、[keyBy|window|apply]、sink 算子的并行度分别是 2、2、2、2、1,经过 Flink 优化后,source 和 map 算子组成一个算子链,作为一个 task 运行在一个线程上,其简图如图中 condensed view 所示,并行图如 parallelized view 所示。算子之间是否可以组成一个 Operator Chains,看是否满足以下条件:

  • 上下游算子的并行度一致;

  • 下游节点的入度为 1;

  • 上下游节点都在同一个 slot group 中;

  • 下游节点的 chain 策略为 ALWAYS;

  • 上游节点的 chain 策略为 ALWAYS 或 HEAD;

  • 两个节点间数据分区方式是 forward;

  • 用户没有禁用 chain。

② Flink 异步 IO

流式计算中,常常需要与外部系统进行交互。而往往一次连接中你那个获取连接等待通信的耗时会占比较高。下图是两种方式对比示例:

11

图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户 a、b、c 等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。

③ Checkpoint 优化

Flink 实现了一套强大的 checkpoint 机制,使它在获取高吞吐量性能的同时,也能保证 Exactly Once 级别的快速恢复。

首先提升各节点 checkpoint 的性能考虑的就是存储引擎的执行效率。Flink 官方支持的三种 checkpoint state 存储方案中,Memory 仅用于调试级别,无法做故障后的数据恢复。其次还有 Hdfs 与 Rocksdb,当所做 Checkpoint 的数据大小较大时,可以考虑采用 Rocksdb 来作为 checkpoint 的存储以提升效率。

其次的思路是资源设置,我们都知道 checkpoint 机制是在每个 task 上都会进行,那么当总的状态数据大小不变的情况下,如何分配减少单个 task 所分的 checkpoint 数据变成了提升 checkpoint 执行效率的关键。

最后,增量快照. 非增量快照下,每次 checkpoint 都包含了作业所有状态数据。而大部分场景下,前后 checkpoint 里,数据发生变更的部分相对很少,所以设置增量 checkpoint,仅会对上次 checkpoint 和本次 checkpoint 之间状态的差异进行存储计算,减少了 checkpoint 的耗时。

3)如何保障任务的稳定性?

在任务执行过程中,会遇到各种各样的问题,导致任务异常甚至失败。所以如何做好异常情况下的恢复工作显得异常重要。

① 设定重启策略

Flink 支持不同的重启策略,以在故障发生时控制作业如何重启。集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略。

默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。

常用的重启策略:

  • 固定间隔(Fixed delay);

  • 失败率(Failure rate);

  • 无重启(No restart)。

② 设置 HA

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java