摘要:导读在数据技术快速发展的当下,小米在数据领域积极探索,针对 Iceberg 在内部应用中存在的不足,通过引入 PAImon 对湖仓一体的数据湖底座进行升级重构,并自研数据集成引擎,以此降低使用门槛,更好地满足业务需求与多样化的个性化需求。接下来,让我们一同走进
导读在数据技术快速发展的当下,小米在数据领域积极探索,针对 Iceberg 在内部应用中存在的不足,通过引入 PAImon 对湖仓一体的数据湖底座进行升级重构,并自研数据集成引擎,以此降低使用门槛,更好地满足业务需求与多样化的个性化需求。接下来,让我们一同走进小米,深入探寻多源湖仓数据集成在小米的应用与实践。
主要内容包括以下几个部分:
1. 现状介绍
2. 多源湖仓架构
3. 应用实践
4. 未来计划
5. 问答环节
分享嘉宾|孙建强 小米科技 大数据研发工程师
编辑整理|曾晓辉
内容校对|李瑶
出品社区|DataFun
01
现状介绍
1. 数据开发平台整体架构
在小米的数据生态体系中,构建了一套层次清晰、功能完备的数据开发平台——数据工场。该平台不仅提供统一的开发与编排调度工具、元数据服务及权限管理体系等底层平台能力,还集成了数据集成、数据开发、作业运维、作业查询等基础服务模块,并同步构建了质量监控、AI 助手等智能化能力组件,形成了覆盖数据全生命周期管理的一站式解决方案。
在存算引擎方面,围绕 Iceberg 和 Paimon 建设了湖仓一体底座,其上有数据集成引擎,Flink 引擎、Spark 引擎以及 OLAP 引擎。
关于湖仓建设,小米在 2021 年正式引入 Iceberg,并围绕 Iceberg 打造了湖仓一体的技术底座。随着业务的发展,尤其广告类业务,对数据的实时性要求不断提高,Iceberg 开始显得捉襟见肘,普遍情况下它更适合离线场景。针对数据实时要求,小米在 2024 年引入 Paimon 来进行湖仓中部分数据表的开发工作。
2. 数据开发平台关注的核心问题
平台关注的核心问题包括:
多源异构:小米内部数据源丰富(覆盖十余种数据系统),与合作开源项目侧重规范化能力建设不同,更聚焦提升用户使用体验、满足个性化需求。安全合规:企业内部数据跨团队、业务、区域使用,会涉及安全合规问题,如国内人员限用海外数据;米家设备用户数据在分析中需规避法律风险,均为需考量的安全合规要点。数据质量:需保证数据同步的时效性、准确性、完整性,确保下游获取的数据契合实际需求。3. 数据集成服务
围绕上述关注点,小米基于 Flink 自研了数据集成引擎,并在其上搭建元数据服务,以降低数据集成工作的门槛,方便业务做数据同步。另外,也借助 Flink 的埋点能力,实时上报数据同步的流量进展情况,通过数据比对、监控告警等,及时发现数据异常。
目前,集成服务已覆盖内部 17 种数据系统,线上托管了 5,000+数据源,其中关联的数据集成任务达 5 万+,实时入湖日均流量超过 100 亿。
02
多源湖仓架构
1. 统一元数据
面对众多数据源与业务部门,小米首先着力于统一元数据管理,通过全面梳理与整合各类元数据,构建清晰的数据资产脉络;同时,强化权限隔离机制,确保不同业务场景下的数据访问合规可控。
小米通过双重机制保障数据的合规使用与开发效率:
审批流管控:各类数据源使用前需通过业务方、数据使用方、数据提供方及 DBA 团队共同参与的审批流程,确保数据使用行为合法合规且经过全链路授权;权限隔离体系:数据开发平台支持表级、字段级、空间级等多层级权限隔离,用户无需接触数据库连接信息及账号密码,仅通过简单 SQL 即可完成数据查询与开发,在强化安全管控的同时大幅降低操作门槛。以下是统一元数据的一个示例。
在采用元数据服务前,基于 Flink CDC 的数据同步存在明显的安全隐患:需通过 create table 手动定义字段、填写数据库明文连接信息(含账号密码),且 Binlog 同步账号权限过高,导致无法追溯用户访问行为、敏感信息泄露风险突出。
有了元数据服务后,数据同步和数据查询安全性得到了更好的保障。以 MySQL 为例,注册成数据源之后有一个统一的 catalog,表通过 catalog.db.table 三元组的方式来定位,可直接通过 select * from catalog.db.table,元数据服务就会通过访问 catalog 拿到这个数据库的连接信息,并通过表拿到这个表的字段信息、审计信息。
诸如此类的问题,用户只需要写一条简单的 SQL,底层开发平台会自动对接元数据服务,这样数据开发、权限审计、数据集成工作得到大幅简化。例如,用户可直接 insert into select,像写 SQL 一样完成数据集成操作;也可在其上做库表封装,用户只需要填选源表、目标表,即可完成一键入湖。
2. 分库分表一键同步
分库分表一键同步也是通过元数据服务来实现的。
用户只需要写一个 select * from catalog.db.table 语句,数据开发平台会自动对接元数据服务,获取它背后的拓扑结构,自动进行拆分,之后对结果进行汇总,并且带上 db_name、tb_name 这两个标识字段后一键入湖,以此实现分库分表的一键同步。
3. Binlog 采集稳定性
原先通过 CDC 去采集 MySQL 的 Binlog 时,需要直连到 MySQL 的从库,一旦从库宕机、或者发生主从切换、DBA 换机器的情况,CDC 连接到的 IP 地址就会不可用,此时 CDC 会被动失败,无限重试,需要人工介入。
为解决此问题,我们与 DBA 平台共建:DBA 平台有元数据服务实时采集 MySQL 集群状态信息,以 API 形式返回对应的拓扑结构,这样每次建立连接时去获取集群的连接信息,并自动路由到一个可用的从库。一旦 MySQL 发生异常,这个任务会自动 failover 到其他实例上,这样可实现 Flink CDC 的高可用。
另一方面,DBA 平台也可通过其暴露的 API 来决定开发平台任务连接到哪台实例上,以做到大数据平台与业务平台访问数据的隔离,避免相互干扰。
4. 数据清洗、转换
在数据清洗、转换方面,内置了一些计算逻辑,比如给用户提供值替换、时间类型转换以及自定义计算字段的功能。用户可通过 Flink 内置的一些 SQL 函数,实现对原表做简单的计算,并生成到目标表的一个新字段。这方面小米支持计算字段、FlinkSQL 的内置函数,同时平台也会结合用户的日常需求开发一些轻量级的 UDF 函数,并注册到平台以供用户使用。对于过于复杂的工作内容,还是推荐用户在 DWD 层用数据开发的 SQL 来完成。
5. 隐私计算
在数据价值挖掘与隐私保护的平衡中,小米面临跨区域、跨集群团队协作分析、对外合作场景下的原始数据与业务细节暴露风险,常因合规问题导致数据价值难以释放。为此,小米通过构建可信隐私计算环境与标准化流程,探索隐私计算实践,力求在合规框架下激活数据价值潜能。
隐私计算流程为:对高隐私表 A 的数据以密文存储;在做数据开发以及调试的过程中,会根据原表的逻辑,通过一些数据合成的方法生成一个明文的影子表,B 表;然后用户就可以在 B 表上进行数据开发工作,生成加工后的 C 表;验证 C 表是否符合用户数据开发的预期;验证完之后,用户发起上线审批,负责把控隐私的团队来决定上线是否合理;若审批驳回,用户继续在 B 表到 C 表的开发过程中做整改;若审核通过,会授予对应 A 表的权限,用户作业上线通过后,自动会把 B 到 C 的加工逻辑转换成 A 到 D 的计算逻辑,这个 D 表就是真正的 A 表数据,再配合上 B 到 C 的数据开发,最终生成可用的数据。
这一方案可大幅降低用户接触原始隐私数据的风险,有效防范隐私泄露问题。
03
应用实践
1. 数据质量监控
(1)采集任务稳定性
采集任务稳定通过以下三方面来保障:
比如机器替换、重复宕机情形,CDC 任务跟从库心跳失联后,会自动 failover 到其他节点上,最大可能避免人工介入,尽快触发任务失败的重试恢复。
增加巡检机制,主动检查任务的元数据异常情况自动路由机制带来的一个弊端是通过连接代理从代理拿到从库,若从库被 DBA 做了拆库的操作,它已经不属于这个集群了,若任务没有主动失败,可能会出现断流的情况。因此增加定时的巡检机制,主动检查元数据不匹配的情况,后主动更新任务,触发重启。
多级监控告警对 CDC 任务失败、积压等问题进行监控,对流量变化做出预警,提前告知用户介入修复。
(2)同步队列
消息队列方面,MySQL 的 Binlog,通过 CDC 写入到消息队列,然后视情况进行二级转发,这个过程需要避免丢数据和乱序的情况。
小米的做法是先通过 CDC 把每个 DB 同步到对应 DB 级别的一个 Binlog topic 里去,后根据业务需求将这个 DB 级别的 topic 下发到各对应的表级别的 topic,做多级存储,用户可根据需求决定是消费 DB 级别的 topic 还是消费表级别的 topic。另一方面,为了避免出现消息乱序的情况,把消息队列的分区数设置为 1,强制保证其顺序性。
把消息队列设置成多分区做哈希处理的方式理论上也可以保证单条消息按主键区分后不乱序,但消息队列做扩缩容的情况下,还是有一定的极端风险,会导致一些数据异常情况。综合考虑后,还是暂时保留单分区设计。未来会考虑提升吞吐,解决多分区扩缩容带来的问题。
消息队列中一般会有失败重试的情况,为了不阻塞,消息投递失败后,会加到 buffer 队列里做重试。以 Kafka 为例,其有一个配置,默认参数值是 5,Kafka 在消息推送失败之后可能会出现消息乱序的情况。为保证消息不丢不乱序,小米将这一参数设置为 1,相当于消息一旦投递失败,就会卡在这里做无限的重试,以保证消息不会乱序。无限重试可能会造成积压,需要配合监控来处理。
消息队列使用同步生产者、消费者,避免异步产生带来额外问题。
(3)Schema 自动同步
目前,Schema 自动同步的解决方案是直接去 CDC 采集 MySQL 的 alter table 的 DDL,然后下发到 Binlog topic,下游收到 Binlog topic 后,会根据情况做 Schema 自动同步的操作。
采集 DDL 分两种情况,一种是直接 Schema change 的方式,在原表上执行 altertable 的操作,类似的 SQL 会直接下发到 Binlog topic;另一种是 online 的 Schema change 的方式,有些业务会用 ghost 对表进行在线编辑,ghost 编辑大致分为两个阶段,第一个阶段做 alter table,在影子表上执行一个 add column 的操作,然后做数据搬迁,搬迁完成后 alter table,rename 成正式的表,这时才真正完成了一次 alter table 操作。对于 online 的 Schema change,真正的 Schema change 发生在 rename 这条 SQL 上。
小米的做法是在 CDC 任务里加一个状态,若收到影子表的 alter table 的 DDL 语句,会把它存储在 Flink 的状态里,当收到 rename 这条 SQL 时,会进行状态的匹配,看哪些表执行了什么样的操作,这时再把 alter table 那条 SQL 下发下去。这是 CDC 在采集 DDL 方面的处理。
以数据集成实时入湖链路为例,做了自动同步的设计。当集成作业收到 alter table 语句后,重启前预先做 savepoint,把数据当前消费的位点,已经消费的数据做一次 flush,然后执行重启任务,重启的过程以离线的方式完成 Schema 对齐,具体会基于上游的 Schema 去生成下游湖仓表的 Schema,然后把对应的 Schema 发给湖仓去执行 alter table,做离线 Schema 的变更。这样就完成了一次自动的 Schema 同步。
(4)数据监控
数据监控工作,在 MySQL to 数据湖这条链路多个阶段做了埋点:
第一个埋点在 CDC 采集 MySQL 的 Binlog 上,记录 CDC 实际采集到了多少数据。第二个埋点在消息队列的接收端,看 CDC 下发了多少条数据,对比验证 CDC 有没有漏发数据,采集到的数据量以及下发到消息队列的数据量是否一致。第三个埋点是在消费 MQ 数据时,验证从 MQ 读到了多少条数据,用此时的 metric 跟前一个 metric 进行对比,看消息队列生产和消费阶段是否有丢失数据的情况。第三个埋点是写到数据湖的时候,在湖仓侧检测最终收到了多少条数据。这些 metric 会带上 event time,以屏蔽延迟或积压数据带来的影响,同时也可以借此发现积压存在的问题,比如上游发现这个时间点已经有很多条数据,但下游这个点没有数据,但是历史数据又对得上,就会发现是由积压导致的。以上就是实时链路监控。
离线数据对比,包含三级对比:
第一级对比原表和目标表的表结构是否是一致。第二级对比原表跟目标表的数据量是否一致。第三级对比数据内容是否一致,通过对原表的数据,目标表数据加哈希计算 MD5 的签名,看二者签名是否一致。上图展示了从 MySQL 到消息队列到 Iceberg 的链路监控,可以实时查看作业同步速率、各种 DDL 速率,以及 Flink 积压、告警等。
由于 Flink 实时同步,可能存在 checkpoint 带来的影响,湖仓的数据总是比 MySQL 的数据存在一个 checkpoint 的延迟,导致难以确认数据差异到底是来自数据同步还是由 checkpoint 延迟造成的。为解决这一问题,可以利用原表和目标表的时间字段进行对比,先排除 checkpoint 带来的影响。同时,利用时间字段对比也可以避免查全表对在线 MySQL 造成影响。
通过数据对比,一旦发现数据不一致,会提供数据采样,通过对原始数据和目标表的数据做 MD5 签名,发现不一致的数据,将其采样出来展示给用户,以方便用户快速定位问题。
(5)数据修正
针对数据不一致或一些异常情况,为用户提供了两种产品化的解决方案:
2. 非结构化数据入湖
针对非结构化数据的使用,小米早期将消息队列的数据直接通过 HDFS Sink 落到 HDFS 的目录上,用户通过套 Hive 的外表来做 TTL 的管理,以及一些简易的审计工作。用户可通过 Spark SQL 去读这个外表,也可直接读写 HDFS 目录,整体上使用方式不太好管理。为了解决上述问题,统一规范用户的非结构化数据使用,我们引入了 FileSet。
经过元数据服务,已经把表规范为 catalog.database. table,实现了对表的唯一定位。非结构化数据通过引入 FileSet 对文件做抽象。FileSet即一组非表格化文件集合,类似于表格数据的 table 的概念。再次结合元数据服务,当访问非表格数据时,通过/catalog/database/fileset/path 即可实现对目录的访问与定位。
消息队列数据经过数据集成入湖,可根据数据本身是否有格式来决定是落到 Iceberg 这样的表格数据里,还是落到 FileSet 非表格数据上,用户在其上进行数据开发工作。通过统一使用方式,可避免用户对各种存储的随意访问,做到统一管理、可追溯、可治理。
04
未来计划
目前正在推进中和计划中的主要工作包括:
结合 AI 大模型,优化异常诊断,智能提供诊断建议,协助用户完成异常修复、调优等工作。建设更加高可用的 CDC 实时采集,具备全量路异常快速定位、数据补偿的应急预案。构建更加集成化的应用产品,一键完成数据同步入湖无需关注配置调优、技术选型。调研、对比 SeaTunnel 引擎,考虑丰富引擎可选方案,提供更丰富的数据同步能力。05
问答环节
Q:在数据集成方面,Paimon 主要应用在哪些场景呢?
A:小米在 2024 年的时候正式引入 Paimon。当时面临两大主要问题,一是,Iceberg 很多设计整体还是比较偏 Hadoop 离线体系,例如:不支持产生 Changelog。这导致业务在搭建实时数仓时,会存在很多技术卡点。另一方面Iceberg V2 存在数据重复的问题,对业务而言理解和使用成本较高。
最后小米数据湖团队综合多方的调研,以及业务需要(广告类相关业务需求),共同调研发现 Paimon 能够很好满足需求,例如其支持主键表、可以生成 changelog、支持 partial update 等等,发现很多 feature 能更好的匹配相应的业务场景,最终才把它作为实时湖仓的技术选型方案。
具体的数据开发,偏实时的场景,倾向于用户用 Paimon 这样的主键表来做数据开发工作;偏离线的场景,如日志的备份记录,倾向于让用户写到 Iceberg 来处理。
以上就是本次分享的内容,谢谢大家。
来源:DataFunTalk