百万QPS,秒级延迟,携程基于实时流的大数据基础层建设。

一、背景

2017年9月携程金融成立,本着践行金融助力旅行的使命,开始全面开展集团风控和金融业务,需要在携程DC构建统一的金融数据中心,实现多地多机房间的数据融合,满足离线和在线需求;涉及数千张mysql表到离线数仓 、实时数仓、在线缓存的同步工作。由于跨地域、实时性、准确性、完整性要求高,集团内二次开发的DataX(业界常用的离线同步方案)无法支持。以mysql-hive同步为例,DataX通过直连MySQL批量拉取数据,存在以下问题:

1)性能瓶颈:随着业务规模的增长,离线批量拉取的数据规模越来越大,影响mysql-hive镜像表的产出时间,进而影响数仓下游任务。对于一些需要mysql-hive小时级镜像的场景更加捉襟见肘。

2)影响线上业务:离线批量拉取数据,可能引起慢查询,影响业务库的线上服务。

3)无法保证幂等:由于线上库在实时更新,在批量拉取SQL不变的情况下,每次执行可能产生不一样的结果。比如指定了create_time 范围,但一批记录的部分字段(比如支付状态)时刻在变化。也即无法产出一个明确的mysql-hive镜像 , 对于一些对时点要求非常高的场景(比如离线对账) 无法接受。

4)缺乏对DELETE的支持:业务库做了DELETE操作后,只有整表全量拉取,才能在Hive镜像里体现。

二、方案概述

基于上述背景,我们设计了一套基于binlog实时流的数据基础层构建方案,并取得了预期效果。架构如图,各模块简介:

1)webUI做binlog采集的配置,以及mysql->hive,mysql→实时数仓,mysql→在线缓存的镜像配置工作。

2)canal负责binlog采集 ,写入kafka ;其中kafka在多地部署,并通过专线实现topic的实时同步。

3)spark-streaming 负责将binlog写入HDFS。

4)merge 离线调度的ETL作业,负责将HDFS增量和 snap 合并成新的 snap。

5)mirror 负责将binlog事件更新到实时数仓、在线缓存。

6)基础服务:包括历史数据的重放,数据校验,全链路监控,明文检测等功能。

三、详细介绍

本章将以mysql-hive镜像为例,对技术方案做详细介绍。

3.1.binlog采集

canal是阿里巴巴开源的Mysql binlog增量订阅和消费组件,在业界有非常广泛的应用,通过实时增量采集binlog ,可以降低对mysql 的压力,细粒度的还原数据的变更过程,我们选型canal 作为binlog采集的基础组件,根据应用场景做了二次开发,其中raw binlog → simple binlog 的消息格式转换是重点。

下面是binlog采集的架构图:

canal 在1.1.4版本引入了canal-admin工程,支持面向WebUI的管理能力;我们采用原生的canal-admin 对binlog采集进行管理 ,采集粒度是 mysql instance级别。

Canal Server会向canalAdmin 拉取所属集群下的所有mysql instance 列表,针对每个mysql instance采集任务,canal server通过在zookeeper创建临时节点的方式实现HA,并通过zookeeper实现binlog position的共享。

canal 1.1.1版本引入MQProducer 原生支持kafka消息投递 , 图中instance active 从mysql 获取实时的增量raw binlog数据,在MQProducer 环节进行raw binlog → simple binlog的消息转换,发送至kafka。我们按照instance 创建了对应的kafka topic,而非每个database 一个topic , 主要考虑到同一个mysql instance 下有多个database,过多的topic (partition) 导致kafka随机IO增加,影响吞吐。发送Kafka时以schemaName+tableName作为partitionKey,结合producer的参数控制,保证同一个表的binlog消息按顺序写入kafka。

参考producer参数控制:

max.in.flight.requests.per.connection=1
retries=0
acks=all

topic level 的配置:

topic partition 3副本, 且
min.insync.replicas=2

从保证数据的顺序性、容灾等方面考虑,我们设计了一个轻量级的SimpleBinlog消息格式:

  • binlogOffset:全局序列ID,由${timestamp}${seq} 组成,该字段用于全局排序,方便Hive做row_number 取出最新镜像,其中seq是同一个时间戳下自增的数字,长度为6。
  • executeTime:binlog 的执行时间。
  • eventType:事件类型:INSERT,UPDATE,DELETE。
  • schemaName:库名,在后续的spark-streaming,mirror 处理时,可以根据分库的规则,只提取出前缀,比如(ordercenter_001 → ordercenter) 以屏蔽分库问题。
  • tableName:表名,在后续的spark-streaming,mirror 处理时,可以根据分表规则,只提取出前缀,比如(orderinfo_001 → orderinfo ) 以屏蔽分表问题。
  • source:用于区分simple binlog的来源,实时采集的binlog 为 BINLOG, 重放的历史数据为 MOCK 。
  • version:版本
  • content:本次变更的内容,INSERT,UPDATE 取afterColumnList,DELETE 取beforeColumnList。

金融当前部署了4组canal 集群,每组2个物理机节点,跨机房DR部署,承担了数百个mysql instance binlog采集工作。Canal server 自带的性能监控基于Prometheus实现,我们通过实现 PrometheusScraper 主动拉取核心指标,推送到集团内部的Watcher监控系统上,配置相关报警,其中各mysql instance 的binlog采集延迟是全链路监控的重要指标。

系统上线初期遇到过canal-server instance脑裂的问题,具体场景是active instance 所在的canal-server ,因网络问题与zookeeper的链接超时,这时候standby instance 会抢占创建临时节点,成为新的active;也就出现了2个active 同时采集并推送binlog的情况。解决办法是active instance 与zookeeper链接超时后,立即自kill,再次发起下一轮抢占。

3.2 历史数据重放

有两个场景需要我们采集历史数据:

1)首次做 mysql-hive镜像 ,需要从mysql加载历史数据;

2)系统故障(丢数等极端情况),需要从mysql恢复数据。

有两种方案:

1)从mysql 批量拉取历史数据,上传到HDFS 。需要考虑批量拉取的数据与 binlog 采集产出的mysql-hive镜像的格式差异,比如去重主键的选择,排序字段的选择等问题。

2)流式方式, 批量从mysql 拉取历史数据,转换为simple binlog消息流写入kafka,同实时采集的simple binlog流复用后续的处理流程。在合并产生mysql-hive镜像表时,需要确保这部分数据不会覆盖实时采集的simple binlog数据。

我们选用了更简单易维护的方案2,并开发了一个binlog-mock 服务,可以根据用户给出的库、表(前缀)以及条件,按批次(比如每次select 10000行)从mysql查询数据,组装成simple_binlog消息发送kafka。

对于mock的历史数据,需要注意:

1)保证不覆盖后续实时采集的binlog:simple binlog消息里binlogOffset字段用于全局排序,它由${timestamp}+${seq}组成,mock的这部分数据 timestamp 为发起SQL查询的时间戳向前移5分钟,seq为000000;

2)落到哪个分区:我们根据binlog事件时间(executeTime) 判断数据所属哪个dt分区,mock的这部分数据 executeTime 为用户指定的一个值,默认为${yesterday}。

3.3 Write2HDFS

我们采用spark-streaming 将kafka消息持久化到HDFS,每5分钟一个批次,一个批次的数据处理完成(持久化到HDFS)后再提交consumer offset,保证消息被at-least-once处理;同时也考虑了分库分表问题、数据倾斜问题:

屏蔽分库分表:以订单表为例,mysql数据存储在ordercenter_00 ... ordercenter_99 100个库,每个库下面又有
orderinfo_00...orderinfo_99 100张表,库前缀schemaNamePrefix=ordercenter,表前缀tableNamePrefix=orderinfo,统一映射到tableName=${schemaNamePrefix}_${tableNamePrefix}里; 根据binlog executeTime字段生成对应的分区dt,确保同一个库表同一天的数据落到同一个分区目录里: base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}

防止数据倾斜: 系统上线初期经常出现数据倾斜问题,排查发现某些时间段个别表由于业务跑批等产生的binlog量特别大,一张表一个批次的数据需要写入同一个HDFS文件,单个HDFS文件的写入速度成为瓶颈。因此增加了一个环节(Step2),过滤出当前批次里的“大表",将这些大表的数据分散写入多个HDFS文件里。

base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}-[${randomInt}]

3.4 生成镜像

3.4.1 数据就绪检查

spark-streaming作业每5分钟一个批次将kafka simple_binlog消息持久化到HDFS,merge任务是每天执行一次。每天0点15分,开始进行数据就绪检查。我们对消息的全链路进行了监控,包括binlog采集延迟 t1 、kafka同步延迟 t2 、spark-streaming consumer 延迟 t3。假设当前时间为凌晨0点30分,设为t4,若t4>(t1+t2+t3) 说明 T-1日数据已全部落入HDFS,即可执行下游的ETL作业(merge)。

3.4.2 Merge

HDFS上的simple binlog数据就绪后,下一步就是对相应MySQL业务表数据进行还原。以下是Merge的执行流程,步骤如下:

1)加载T-1 分区的simple binlog数据

数据就绪检查通过后,通过 MSCK REPAIR PARTITION 加载T-1分区的simple_binlog数据,注意:这个表是原始的simple binlog数据,并未平铺具体mysql表的字段。如果是首次做mysql-hive镜像,历史数据重放的simple binlog也会落入T-1分区。

2)检查Schema ,并抽取T-1增量

请求mirror后台,获取最新的mysql schema,如果发生了变更则更新mysql-hive镜像表(snap),让下游无感知;同时根据mysql schema 的field列表 、以及"hive主键" 等配置信息,从上述simple_binlog分区抽取出mysql表的T-1日明细数据 (delta)。

3)判断业务库是否发生了归档操作,以决定后续合并时是否忽略DELETE事件。

业务DELETE数据有2种情况:业务修单等引起的正常DELETE,需要同步变更到Hive;业务库归档历史数据产生的DELETE,这类DELETE操作需要忽略掉。

系统上线初期,我们等待业务或DBA通知,然后手工处理,比较繁琐,很多时候会有通知不到位的情况,导致Hive数据缺失历史数据。为了解决这个问题,在Merge之前进行程自动判断,参考规则如下:

a)业务归档通常是大批量的DELETE(百万+),因此可以设置一个阈值,比如500W或日增量的7倍。

b)业务归档的时间段通常比较久,比如设置阈值为30天。如果满足了条件1,且删除的这些数据在30天以前,则属于归档产生的DELETE。

4)对增量数据(delta)和当前快照(snap T-2)进行合并去重,得到最新snap T-1。

下面通过一个例子说明merge的过程,假设订单order表共有id,order_no,amount三个字段,id是全局唯一建; snap表t3 是mysql-hive镜像,merge过程如图展示。

1)加载目标(dt=T-1)分区里的simple binlog数据,表格式如t1;

2)请求mirror后台获取mysql的最新schema,从t1 抽取数据到临时表t2;

3)snap表t3 与mysql schema进行适配(本例无变更);

4)对增量表t2、存量snap t3 进行union(对t3自动增加type列,值为INSERT),得到临时表t4;

5)对t4表按唯一键id进行row_number,分组按binlogOffset降序排序,序号为1的即为最新数据。

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java