摘要:导读随着大数据的不断发展,数据体量越来越大,为了更好地支持内部湖仓一体的使用场景,虎牙基于 Iceberg + Paimon 构建了实时湖仓架构。本文将对这一架构设计和实践过程中的痛点及优化进行介绍。
导读随着大数据的不断发展,数据体量越来越大,为了更好地支持内部湖仓一体的使用场景,虎牙基于 Iceberg + Paimon 构建了实时湖仓架构。本文将对这一架构设计和实践过程中的痛点及优化进行介绍。
主要内容包括以下几大部分:
1. 背景介绍
2. 整体架构设计
3. 任务实时入湖难点以及解决方案
4. 分钟级别微批调度
5. 未来展望
分享嘉宾|胡源峰 虎牙 大数据平台开发工程师
编辑整理|刘闰丰
内容校对|李瑶
出品社区|DataFun
01
背景介绍
首先来介绍一下为什么会选择使用 Iceberg + Paimon 的建设方式。
从实时性方面来看,由于现阶段离线数仓为主的流程建设已经不能满足很多现有的场景,我们希望通过 Iceberg 、Paimon 等湖格式增强虎牙在实时场景的能力。湖格式拥有很好的 data skip file 机制,可以提供更高的性能。同时在使用灵活性上也表现出色。另外,由于公司的开发人员大部分以离线作业为主,面对实时需求需要学习 Flink 等一系列技术,学习和使用成本较高,所以希望以更低成本的方式应对实时需求。数据入湖后直接以 Spark 为链路继续延续,并完成实时指标的计算,这样统一实时和离线两条链路,使用一条链路即可完成所有需求的开发。
02
整体架构设计
下面分享 Iceberg + Paimon 模式在虎牙的架构设计。
首先从源头开始,入湖的源头包括三类:
结构化数据源:主要来自于虎牙内部的 Bizer 系统,包括上游技术或者业务方实时产生的一些指标数据,这些指标数据是结构化的,已经定义好数据结构,所以我们接入的时候非常方便。异构数据:来自各个业务线负责方自己的维护,业务方自己负责从 Kafka 链路接入到 Iceberg/Paimon,他们自己通过写业务 ETL 逻辑接入。DB 业务数据:第三类就是 MySQL 这种 CDC 数据,直接通过 Flink CDC 接入到 Iceberg/Paimon 表。最终,我们全部使用 Spark 作为计算引擎来增量消费。这里不选择 Flink 的原因是由于我们太多的历史业务是在 HiveSQL 里面在做的,如果全部迁移到 Flink,成本会很高,并且用户会不是很有把握,所以我们还是沿用了 Spark 这一套模式。最终会产生推荐产品、业务报表、数据产品、数据分析等输出。
03
任务实时入湖难点以及解决方案
接下来介绍在 Paimon + Iceberg 的应用过程中遇到的一些问题和解决方案。
1. Iceberg 和 Paimon 介绍对比
Iceberg 是当前业界,特别是北美地区,统一的数据湖格式标准。而 Paimon 则是国内新兴的一个成员。二者的优劣势非常明显,具体体现在以下方面。
(1)实时/离线能力
Iceberg 本身的 batch 能力非常优秀,因为社区里面的主力开发者基本上是 Spark 社区的人为主。而对于 Paimon,基本上是来自于 Flink 团队,Paimon 和 Flink 的集成度非常高,比如 Paimon 的新 feature,那么一定会优先适配 Flink 引擎,而 Iceberg 的新 feature,一定是先适配 Spark 引擎。
(2)更新能力
更新能力方面,Iceberg 的 V2 表也支持行级更新,Paimon 也支持更新,但 Paimon 表由于底层是 LSM 结构,Paimon 的更新能力会比 Iceberg 中 V2 的格式好很多。
(3)生态能力
生态方面,Iceberg 的生态是非常好的,因为 Iceberg 的数据格式本身认可度很高,Trino、Spark 各种引擎都会优先和 Iceberg 适配。
但是在 Paimon 中很多引擎适配度并没有那么高,一些功能会比较弱,比如说很多场景下 push down 的可能不如 Iceberg 做得好。
(4)小文件处理能力
无论是用 Hive 还是 Iceberg,都会面临非常多小文件的问题。Iceberg 社区提供了 procedures 来给用户提供 compact,不过这些都是手动的,需要用户自己维护,这里虎牙会用 Amoro 来进行管控。
对于 Paimon 的小文件,社区默认会在引擎中提供 inline Compact,用户不需要自己去维护这些小文件。
(5)迭代成熟度方面
在迭代成熟度方面, Iceberg 本身是一个非常成熟的社区,但是迭代可能没那么快;而 Paimon 是近一两年活跃起来的,功能增加得非常快。
综合这两个数据湖的特点,我们在 CDC 场景优先选择了 Paimon,在纯粹的 append 的场景,则是选择 Iceberg。
2. 难点场景解决及效果
接下来分享结合过程中难点场景的解决和效果。
(1)cdc 场景对比
在 CDC 场景下我们做了一些对比,分别使用 Iceberg 和 Paimon 进行 CDC 入湖,测试的场景是比较苛刻的,主要体现在数据量,我们有 200 亿的 MySQL 数据,并且主键不是自增的,更新频率也非常高,这种情形下我们通过 Iceberg 和 Paimon 用同等资源下进行入湖的尝试。
我们发现如果用 Iceberg,要写一个 Flink writer,另外我们的 Amoro 的 optimizer 也会起来。在 optimizer 起来之后,可以发现 CPU 会定期的去做 Compact,会导致 CPU 飙得很高,而且由于 Amoro 需要把 EQ delete file 全部通过 Compact 之后变成 positional delete file,所以这个过程需要对文件进行全文件的扫描,因此导致整个 IO 的压力是非常大的。
但如果使用 Paimon,我们只需要写一个 Flink 任务,并且不需要去管 Compact 的动作,由内部进行 Compact,整体资源使用量是非常少的。目前来看,在 CDC 场景选择 Paimon 还是最合适的。
(2)Schema change
另外我们还遇到了 Schema change 的问题,不管是用 Iceberg 还是 Paimon 的实时入湖,都会遇到 schema change 的问题。
由于 Iceberg 本身没有社区提供 CDC 的 action,其次 Iceberg 没有动态 DDL 变更的能力,比如在 Iceberg 入湖的时候,通常是用户加了字段,用户需要自己去修改这个入湖任务,然后将该任务重启才能生效。在重启过程中如果使用的是 Flink,重启时长会非常长,可能停掉任务再重启这个过程,就需要 5 分钟,如果任务很大,甚至需要十几分钟都有可能。所以我们使用 Flink session 模式运行,当更新表结构的时候,不需要把这个 Flink 集群给销毁掉,只用对任务做一个 savepoint,然后继续在这个 session 集群进行任务提交。整体来说,更新字段的操作的延迟只需 5 秒钟甚至更短。
而 Paimon 本身社区提供了入湖的 action,可以自动识别 MySQL 的 DDL 变更,自动在任务里面将加字段等 DDL 动作完成。
另外,使用 Paimon 的 schema change,如果公司的 DBA 针对这种在线变更的动作,比如用了 gh-ost 在线变更工具,可能会导致任务失败,因为它会把 MySQL 表先建一张新的表,然后新的表里面把字段加进去,再把旧表覆盖掉,最后读取 binlog 进行恢复。
这种在线变更工具目前还没在 Paimon 的社区版本里实现,不过这个能力在 cdc3.x 已经有一批 PR 在跟进,值得期待。
在我们内部如果使用这种在线变更能力,我们是参考了 inlong 的设计,通过识别正则表达式,然后把 schema change 在 Paimon 的 action 内部完成。
(3)Flink 写入倾斜
我们在使用过程中还遇到了一个非常棘手的问题,就是 Iceberg 在写入的时候会产生倾斜,因为我们的数量非常大。当在英雄联盟 s 赛比赛,或者春季赛、秋季赛的时候,整体写入量峰值可能达到上百万、几百万的 RPS。
由于在写入 Iceberg 表时,我们需要进行分区,不但针对数据的时间进行分区,还需要根据其中某个字段进行分区,如果该字段存在部分值特别多,部分值数据很少,就会产生问题了,数据写入到 Iceberg 表中会存在两个策略,一个是 NONE 策略,就是在写入的过程中,会把这些数据分散到 writer 里面,这个 writer 就会导致写入的文件数量是等于这个分区的数量乘以这个 writer 数量。如果这个字段里面有 100 个分区,而且同时用了 100 个写入并行度,那在一次的 checkpoint 中就会产生 1 万个文件,这是不可接受的。
另一个策略就是 HASH。但是如果把分区字段进行 HASH,就会产生单机的瓶颈,比如该字段某一个值超过了总量的 50%,那这一个值完全只会使用一个 writer 来写入,这个时候单机的瓶颈就会导致这个任务的瓶颈,每一次 checkpoint 产生的文件数量正好就等于这个分区里面的这个值的数量。但是会导致特别的集中,这样的话写入吞吐量会非常低,在我们的测验下,一个并行度最多能够支撑 5 万的 RPS 的量,这在我们的场景下是完全不能接受的。
我们的解决方案如上图所示,需要统计一下这个分区里面每一个值大概占的比例,然后按照这个比例往下游分发,如果占了 50%,那下游有 200 个 writer 的情况下,这 50% 的占比会写到下游 100 个 writer 里面去。但是这样并不能完全解决问题,因为这个数据的占比不是固定的,而是随时在变,可能某一天的早上某个值的占比很高,但晚上凌晨的时候这个值占比很低,无法根据实际场景动态化。
最后我们在 Flink 的写入链路中加了一个数据静态统计的 operator, 这个 operator 会统计每一个分区最近一个 checkpoint 里面的占比,operator 的 event 会将这个 taskmanager 内的占比往 jobmanager 进行发送,在 checkpoint 完成之后,jobmanager 会对所有的统计进行汇总,再广播到 subtask 中,并保存。Operater 内保存这个状态 state,分区比例会往下发到每一个 subtask 里面。后面的 operator 完全根据这个统计信息往下分发,就能保证每一个 subtask 的写入都是均匀的,而且文件数量是最少的。我们内部的场景大概有 200 万的 RPS 的场景下,一次 checkpoint 只会产生 500 个文件,并且不会产生任何的数据倾斜。
最终,基于上述解决方案,吞吐力整体比社区提供的两种方案提升了 4 ~ 15 倍,小文件问题也降低了数百倍,同类任务我们配置的资源也会降低 4 倍以上。这个方案已经经过了我们两年 s 赛大流量数百万 RPS 的验证,达到了预期效果。
社区在 2021 年底已经发现写入倾斜这个问题并开始跟进,最终在 Iceberg 1.6 的版本中作为一个实验新功能发布,并且能力更强,如果指定了 Iceberg 的 order by,也可以帮你完成均匀的分配。
因为它是试验性功能,所以还存在一些小问题。比如序列化性能很低,某些对象并没有配置它的序列化器,会导致它退化成 kryo 的序列化方式,其中还有一些小 bug,对应的 issue 已经附在上图中,并且已经修复,大家可以尝试去社区拉取最新的 patch 以应用。
3. Yaml 配置文件一键入湖
另外还有一键入湖的功能,我们参考了 cdc3.x 等数据同步工具,相比于用 Flink CDC,用户更想用一些简单的配置,就能把数据直接入湖。学习成本也很低,因为用户根本不懂什么是 Flink,同时也希望开发非常简单,我们这里的配置完全兼容 cdc3.x,如果需要迁移将现在的模式迁移到 3.x,也就是只需要把 3.x 的引擎兼容进来,直接使用这套配图文件就可以。
相比于做成一个页面来说,这种方式非常方便,开发的过程也会非常迅速,如果有新功能的话可以直接添加,不需要做任何的页面功能,非常迅速,对于这种快速迭代的场景是非常有用的。
4. Flink Autoscaler 动态资源调配
另外一方面,我们在 Flink 写入到 Iceberg 的时候,资源的调配也是很重要的,因为数据量一定是变化的,从早上到下午到晚上的数据量是不一样的,所以在控制资源的过程中,如何应对数据洪峰是关键。
当然伴随着还有小文件的问题,因为 Flink 进行 Iceberg 的写入过程中,一个并行度在一次 checkpoint 会生成一个文件,并行度过高,那么生成的小文件无疑就会更多。对于这种数据集成的任务,在 Flink 中所需的资源量和数据量几乎是完全线性的,所以数据量涨了一倍,资源量就应该扩一倍。所以我们使用了 Flink Autoscaler 来解决这一问题。
当然,我们在内部应用 Flink Autoscalr 时也遇到了一些问题。
(1)重复消费
第一个就是 Rescale 后存在大的 lag,因为在资源扩容之后任务会从前面一个 checkpoint 进行恢复,当重新去消费上一个 checkpoint 时。会消费提交成功的数据,这就会导致每一次扩容之后需要重新消费一部分数据。目前这个问题在 FLIP-416 中已经解决掉了,大家可以去手动 cp,或者等到 Flink2.0 之后去应用。
(2)分区不均匀
不过除了重复消费,也可能存在 Kafka 的分区消费不均匀。举个例子,现在有三个 Kafka 分区,在写入 Iceberg 的时候,在扩容过程中并行度是从 1 扩容到 2 呢还是 3 呢?我们希望它是扩到 3,因为这样可以和 Kafka 的分区形成一个对应,每个并行度的消费的过程中都是均匀的。在我们本地验证之后,我们也将这个功能给贡献给了社区。
(3)资源收缩容
还有一点就是在缩容之后资源不释放,这个场景就是在一个 task manager 内有多个 slot 的情况下,在缩容中之后并不一定选择需要该 task manager 内所有的 slot 都释放掉。
解决方案就是每一个 task manager 只留一个 slot。当缩容完成后,在遇到扩容场景时,同样也会面临着任务重启非常频繁的情况,因为在 Flink 的 Adapter 调度器内,每一次进行资源增加之后,都会触发一次 rescale 动作,它也不会等到目标并行度,对应的解决方案大家也可以去把配图中的两个 flip 详细看一下,可进行合并使用或者等到配合 Flink2.0。上述这些问题和功能在 2.0 后续版本都会发布。
(4)动态上下游并行度
另外一点就是在写入湖的过程中由于上游分区数的设置,下游写入能力有限导致下游写入缓慢,从 Kafka 消费有三个分区,但是消费完成后进行写入时的速度跟不上上游的速度,这个时候三个分区限定了下游的上限。
所以我们对 Kafka 的 Connector 进行了改造,在 Kafka 往下分发的时候,主动把这个 Kafka 的算子链和写入链断开,然后通过 autosacler 就可以动态的去调整 writer 的并行度和 Kafka 读取的并行度。
Scan 的并行度在 Flink 的一个 flip 中。有 source 的一个设定,在 Kafka 里面,假设设定了 Kafka 读取的并行度,然后默认的并行度如果对比这个 Kafka 并行度是不一样的,就会主动的把链断开,不过写入的瓶颈除了上述并行度的设置,还是需要优先去判定是属于 IO 的瓶颈还是 CPU 的瓶颈导致了写入的问题,如果是 IO 瓶颈导致的,我们在疯狂扩容后,也会导致产生特别多的文件,并行度变大,文件就会变多,此刻也可以通过配置写入任务的并行上限,避免其扩大到非常大的程度,导致 HDFS 的压力。
实施上述解决方案后,我们资源利用率降低了 50% 以上,同时小文件数量也有所降低,因为在夜间数据量很少的时候并行度可以调下来,从而减少小文件。另外 optimizer 的 task 也降低了,因为我们不再需要启动这些任务去合并文件,任务也不会有延时,一旦有数据量增长,任务会自动扩容,把 Flink 的并行的也扩起来,上图展示了一个生产案例的扩缩曲线和延迟,可以看到效果是非常不错。
04
分钟级别微批调度
在讲完数据入湖的难点场景以及任务使用上的优化,下面讲解一下如何利用 Spark 来完成近实时的调度。
由于我们内部的场景大部分使用 Spark,大家很少写 Flink SQL。如果使用 Spark,那么数据写入到 Iceberg 的过程就不能像 Flink 一样实时进行调度。这里我们引用了 watermark 的概念,我们会在 Flink 写入数据到 Iceberg 时将 watermark 信息写入到 Iceberg 表的 commit 信息当中,每个 Iceberg 表的 snapshot 内都会保存它当前的这些 watermark 的信息。
比如此刻到达 10 点的时候,这条流用 Flink 写入后,每一次快照都会生成一个 watermark 的时间。假设触发任务设置的 5 分钟级别,当 watermark 到达了 10 点之后就会触发一个 Spark 任务,任务消费处理 9:55 到 10 点钟的数据。到下一个 snapshot 如果 watermark 大于了 10 点零五分,就又会触发下一个 Spark 作业,这个 Spark 作业消费的是 10 点钟到 10:05 的数据。这种五分钟级别的微批调度,完全满足了我们分钟级别的实时场景需求。
当数据写入生成 snapshot 时,同时会使用 Iceberg 社区提供的 commit report 功能。我们会把 commit 的 watermark 信息给 report 到外部的一个 Kafka 流,然后我们的调度系统会去实时消费这个流,假设发现了某一张表,并且配置了 5 分钟级别的调度。当从流中获取到 watermark 之后发现它满足了我们的要求,就会直接触发下游的 Spark 任务启动。任务中会把该 5 分钟内的数据读过来,完成后续所有的离线智能的调度。这一套完成之后,我们业务方几乎不需要修改任何一个 Spark sql 代码,就可以将所有离线 t +1 级别的任务完全迁移到现在 5 分钟级别。
Paimon 也同理,不过针对 cdc 这种实时更新的情况,因为 5 点钟的数据和 10 点钟的数据是不一样的,期间可能已经做了更新。所以需要在 select * from Paimon_table 时,找到这个 watermark 时间点的数据,通过找到对应的 Watermark 之后的第一个快照来实现处理。这一功能我们也已贡献给社区,大家可以在社区的 0.9 及之后版本中进行使用。
05
未来展望
目前我们在 Paimon 的场景只做了 CDC 一键入湖,用户可以在页面上勾选需要写入的 Paimon 表,就可以直接写入。当然,Paimon 还有很多的玩法,包括它的 merge engine 引擎:first-row、deduplicate、aggregation、partial-update 一系列东西,实际上在实时数仓领域还是有很多应用场景的,之后我们也会对这一部分进行尝试。
另外 Iceberg 本身我们在使用的时候仅仅是把数据入湖,然后通过 Spark 离线的方式去用,但实际上它也有很多特性,比如二级索引、NDV、order 聚簇优化, 以及 statistics 配合 Spark 进行 CBO 优化等一系列能力都是值得尝试的。
还有一个场景就是 CDC 的千表分钟级入湖,因为目前我们是用 CDC 来做 Paimon 入湖,但实际上 Flink CDC 来做 Paimon 入湖还是存在一定的局限。因为在我们整个公司的体量上,可能 MySQL 的库表接近 1 万张表,那是否每一个库都要建一个 Flink CDC 任务来维护呢?我觉得这可能对于我们的维护成本会比较高,所以说我们会探索一下如何在这种分钟级别场景下,用一些特殊的方式来完成 Paimon 千表万表入湖。
以上就是虎牙基于 Iceberg 和 Paimon 的一些实践,谢谢大家。
以上就是本次分享的内容,谢谢大家。
来源:DataFunTalk