摘要:如果你曾花时间在生产环境中构建机器学习(ML)系统,你肯定深有体会。你的模型在开发阶段中表现优异,通过了所有的离线测试,但在生产环境中却莫名其妙地表现糟糕。这听起来熟悉吗?
如果你曾花时间在生产环境中构建机器学习(ML)系统,你肯定深有体会。你的模型在开发阶段中表现优异,通过了所有的离线测试,但在生产环境中却莫名其妙地表现糟糕。这听起来熟悉吗?
十有八九,这是数据问题。而且不是一般的数据问题:这是一个让数据工程师夜不能寐的可重复性的噩梦。我们谈论的是几个月后重新创建训练数据集,追踪特征为何突然看起来不同了,或者弄清楚到底哪个版本的数据生成了那个神奇的模型,而大家又都要求你“赶紧快速重建”。
传统的数据湖?它们在存储海量数据方面表现出色,但在机器学习工作负载迫切需要的事务保证和版本控制方面却表现糟糕。这就好比试图用大锤做精细的外科手术。
这时,Apache Iceberg 就派上用场了。与 SparkSQL 结合使用,它为你的数据湖带来了真正的数据库般的可靠性。时间旅行、模式演变和 ACID 事务这些本应从一开始就显而易见的需求,但不知怎的在追求“大规模大数据”的过程中被忽略了。
机器学习数据的可重复性问题 非常嫌疑人让我们坦诚面对大多数机器学习团队中真正发生的情况。数据漂移悄然出现:随着时间的推移,你的特征分布会发生变化,但直到模型开始做出毫无意义的预测时,才会有人注意到。特征管道本应是确定性的,但实际上并非如此;运行相同的管道两次,由于时间戳逻辑或仅仅是普通的竞争条件,你会得到略有不同的输出。
然后是版本控制的情况。我们在代码版本控制方面做得相当不错(尽管我们不谈三个月前的那个“临时修复”至今仍在生产环境中)。但数据版本控制呢?它仍然主要是手动流程、电子表格和祈祷。
下面这个场景听起来是不是很熟悉:你的队友在周一运行特征工程,你在周二运行,突然你从本应相同的源数据中得到了不同的结果。为什么?因为在周一和周二之间,底层的表发生了变化,而你所谓的“时间点”逻辑并没有你想象的那么精准。
真正棘手的问题出现在模型训练期间。多人访问相同的数据库,模式变更会在毫无预警的情况下破坏数据管道,同时写入操作会产生竞争条件,破坏你精心设计的特征。这就好比在有人不断改变地基的情况下建造房屋。
为何传统数据湖表现不佳数据湖的设计初衷是为了应对这样一个世界:分析工作只需运行批处理报告,或许再做些 ETL 任务。重点在于存储的可扩展性,而非事务完整性。在你最大的担忧只是生成季度报告时,这倒也无妨。
但机器学习则不同。它是迭代的、实验性的,并且对数据一致性有着传统分析从未有过的严格要求。当你的模型训练任务读取了部分写入的数据(因为其他人正在更新同一张表),你得到的不只是错误的报告,而是从垃圾数据中学习并会做出垃圾预测的模型。
模式灵活性在理论上听起来不错,但在实践中往往会导致模式混乱。如果没有适当的演进控制,善意的数据科学家在向现有表添加“仅此一项新特征”时,会意外破坏下游系统。祝你好运去弄清楚是什么时候出了问题。
元数据的情况更糟。传统的数据湖追踪文件,而非逻辑数据集。所以当你需要了解特征的血缘关系或实施数据质量检查时,基本上就是在盲目摸索。
隐性成本糟糕的数据基础会带来一些预算中没有体现的成本。你的数据科学家们把大部分时间都花在处理数据上,而不是改进模型。我曾看到有研究指出,他们 60% 到 80% 的时间都花在了数据处理上。这……显然不是最佳状态。
当生产环节出现问题时——这是必然的——调试工作就会变成一场考古探险。模型是基于哪个版本的数据训练的?从那时到现在发生了什么变化?有没有没人记录的模式修改?这些问题可能要花数周时间才能回答,而且还不一定能回答得出来。
让我们来谈一谈合规性问题。试着向审计员解释为什么你无法重现用于训练做出贷款决策的模型的精确数据集。这可不是什么愉快的对话。
Iceberg 机器学习的基础知识 真正可行的时间旅行Iceberg 的时间旅行是一种基于快照的架构,它为每次写入操作都维护完整的表元数据。每个快照都代表了你的表在特定时间点的一致视图,包括模式、分区以及所有内容。
对于机器学习人员来说,这具有变革性意义。你可以查询历史表状态,从而实现对过去数据的分析和利用。
对于 ML 爱好者来说,这是改变游戏规则的。你可以使用简单的 SQL 查询历史表的状态:
-- 这就是你实际上解决可复现性问题的方法FOR SYSTEM_TIME AS OF '2024-01-15 10:30:00'SELECT * FROM ml_features FOR SYSTEM_VERSION AS OF 1234567890无需再猜测是哪个版本的数据产生了良好的结果。不再有“上周还行得通”的讨论。你可以跨时间段比较特征分布,通过检查历史数据状态来分析模型性能下降情况,并构建真正能给你带来一致结果的 A/B 测试框架。
元数据包含文件级统计信息,因此查询优化器能够就扫描效率做出明智决策。这不仅仅是关乎正确性的,也是关乎性能的。
轻松应对模式演变以下是一些应该是基础操作但却并非如此的情况:向你的特征表添加一个新列不应该需要团队会议和迁移计划。Iceberg 的模式演进功能让你能够根据不断变化的需求调整表,同时不会破坏现有的读取器或写入器。
你可以添加列、重命名列、重新排列列以及提升数据类型,同时保持向后和向前的兼容性。对于 ML 管道,数据科学家可以安全地添加新特征,而无需在多个团队之间协调复杂的迁移流程。
系统通过唯一的字段 ID 跟踪列的标识,因此重命名列不会破坏现有的查询。类型提升遵循 SQL 标准(整数到长整数,浮点数到双精度浮点数),因此你无需担心数据丢失。
-- 添加一个特征实际上就是这么简单ALTER TABLE customer_features ADD COLUMN lifetime_value DOUBLE-- 重命名不会破坏下游任何东西RENAME COLUMN purchase_frequency TO avg_purchase_frequencyACID 事务(终于!)ACID 支持使 ML 工作负载能够安全地在共享数据集上运行,而不会损坏数据或造成不一致的读取。Iceberg 使用乐观并发控制:多个写入者可以同时工作,但冲突会自动检测和解决。
隔离级别可防止读取者看到不完整的写入。因此,当你的训练任务启动时,它保证能看到数据的一致快照,即使有人正在实时更新特征。
事务边界与逻辑操作对齐,而非单个文件写入。你可以实现跨越多个表和分区的复杂特征工程工作流,同时保持一致性保证。再也不用担心“哎呀,我的更新只成功了一半”的情况。
构建可重现的特征管道 真正有意义的分区让我们来谈谈分区策略,因为这是很多团队自找麻烦的地方。秘诀其实并不复杂:按照与实际查询数据方式一致的维度进行分区。
大多数 ML 工作负载都遵循时间模式,即基于历史数据进行训练,并对近期数据进行预测。因此,使用日期或日期时间列进行时间分区通常是最佳选择。分区的粒度取决于数据量。对于高数据量的系统,使用每日分区;对于较小的数据集,使用每周或每月分区。
CREATE TABLE customer_features ( customer_id BIGINT, feature_timestamp TIMESTAMP, demographic_features MAP, behavioral_features MAP, target_label DOUBLE) USING ICEBERGPARTITIONED BY (days(feature_timestamp))如果存在自然的业务分组,多维分区也能发挥良好效果。客户细分、产品类别、地理区域等都能反映模型实际对数据的切分方式。
Iceberg 的隐式分区特别好用,因为它能自动维护分区结构,无需在查询中显式指定分区列。编写更简单的 SQL 语句,却能获得同样的性能优势。
但分区也不能过度。我见过一些团队创建了数千个极小的分区,以为这样能提升性能,结果却发现元数据开销严重影响了查询规划。保持分区大小适中(考虑几百兆字节到几吉字节),并监控分区统计信息。
实验的数据版本控制可重复的实验需要数据版本与模型工件之间的紧密耦合。这正是 Iceberg 快照大放异彩之处。它们为实现强大的实验跟踪奠定了基础,这种跟踪实际上将模型性能与特定的数据状态关联起来。
与 MLflow 或类似的跟踪系统的集成创建了模型运行与数据版本之间的可审计连接。每个训练任务都会记录输入数据集的快照 ID,从而精确重现实验条件。
import mlflowfrom pyspark.sql import SparkSessiondef train_model_with_versioning(spark, snapshot_id): # 从特定快照加载数据 df = spark.read \ .option("snapshot-id", snapshot_id) \ .table("ml_features.customer_features") # 在 MLflow 中记录数据版本 mlflow.log_param("data_snapshot_id", snapshot_id)"data_row_count", df.count) # 继续模型训练...分支和标记功能支持更复杂的工作流。为生产模型训练创建稳定的数据分支,同时在实验分支上继续开发。使用标签标记重要的里程碑,例如季度功能发布、监管检查点等。
特征存储集成现代机器学习平台需要数据基础设施与实验管理之间实现无缝集成。Iceberg 表与特征存储配合良好,利用时间旅行功能为训练提供历史特征,为推理提供特定时间点的特征。
这种组合提供了训练和服务之间一致的特征定义,同时保持了实时推理所需的性能特征。由于批处理和流处理的特征逻辑不再出现分歧,因此不会再有训练与服务之间的偏差。
from feast import FeatureStoreimport pandas as pddef get_training_features(entity_df, feature_refs, timestamp_col): try: fs = FeatureStore(repo_path=".") # Point-in-time join using Iceberg backing store training_df = fs.get_historical_features( entity_df=entity_df, features=feature_refs, full_feature_names=True ).to_df return training_df生产实施 真实案例:客户流失预测让我为你介绍一个在实际生产中切实可行的客户流失预测系统。该系统每天处理数百万次客户交互,同时保持完全可重复性。
其数据架构使用多个按新鲜度和访问模式组织的 Iceberg 表。原始事件流入暂存表,经过验证和清理,然后聚合到针对机器学习访问模式进行优化的特征表中。
-- 原始事件的暂存表CREATE TABLE customer_events_staging ( event_id STRING, customer_id BIGINT, event_type STRING, event_timestamp TIMESTAMP, event_properties MAP, ingestion_timestamp TIMESTAMP) USING ICEBERGPARTITIONED BY (days(event_timestamp))TBLPROPERTIES ( 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'snappy')-- 具有优化布局的特征表 customer_id BIGINT, feature_date DATE, recency_days INT, frequency_30d INT, monetary_value_30d DOUBLE, support_tickets_30d INT, churn_probability DOUBLE, feature_version STRING) USING ICEBERGPARTITIONED BY (feature_date)CLUSTERED BY (customer_id) INTO 16 BUCKETS特征工程管道利用 Iceberg 的合并功能实现增量处理。这种增量方法在保持不同处理计划间数据一致性的同时,最大限度地减少了重新计算。
def incremental_feature_engineering(spark, processing_date): # 读取上次处理以来的新事件 new_events = spark.read.table("customer_events_staging") \ .filter(f"event_timestamp >= '{processing_date}'") # 计算增量特征(实现取决于业务逻辑) new_features = compute_customer_features(new_events, processing_date) # 合并到特征表,使用 upsert 语义 new_features.writeTo("customer_features") \ .option("merge-schema", "true") \ .overwritePartitionsdef compute_customer_features(events_df, processing_date): """ 用户定义的函数,根据事件计算客户特征。实现将包括业务特定的特征工程逻辑。 """ # 示例特征工程逻辑将在这里 return events_df.groupBy("customer_id") \ .agg( count("event_id").alias("event_count"), max("event_timestamp").alias("last_activity") ) \ .withColumn("feature_date", lit(processing_date))性能优化Iceberg 中的查询性能得益于多种互补技术。文件大小很重要。根据你的访问模式,目标文件大小为 128MB 至 1GB,对于高度选择性查询使用较小文件,对于全表扫描使用较大文件。
Parquet 对 ML 工作负载具有天然优势,因为你通常选择列子集。压缩选择取决于你的优先事项。对于频繁访问的数据使用 Snappy(更快的解压缩),对于存档数据使用 Gzip(更好的压缩比)。
使用聚类或 Z 序排列进行数据布局优化能够显著提升多维访问模式下的性能。这些技术将相关数据放置在文件内,从而减少典型 ML 查询的扫描开销。
-- 为典型访问模式优化表SET TBLPROPERTIES ( 'write.target-file-size-bytes' = '134217728', -- 128MB 'write.parquet.bloom-filter-enabled.customer_id' = 'true', 'write.parquet.bloom-filter-enabled.feature_date' = 'true')元数据缓存极大地提高了查询规划性能,特别是对于具有众多分区的表。Iceberg 的元数据层支持分布式缓存(Redis)或在 Spark 执行器内的内存缓存。
监控与运维生产环境中的机器学习系统需要超越传统基础设施指标的监控。Iceberg 丰富的元数据支持复杂的监控方法,真正帮助您了解数据的情况。
数据质量监控利用元数据检测数据量、模式变更和统计分布方面的异常情况。与 Great Expectations 等框架集成,可提供自动化验证工作流,当质量阈值被违反时可停止处理。
import great_expectations as gefrom great_expectations.dataset import SparkDFDatasetdef validate_feature_quality(spark, table_name): df = spark.read.table(table_name) ge_df = SparkDFDataset(df) # 定义期望 ge_df.expect_table_row_count_to_be_between(min_value=1000) ge_df.expect_column_values_to_not_be_null("customer_id") ge_df.expect_column_values_to_be_between("recency_days", 0, 365) # 验证并返回结果 validation_result = ge_df.validate return validation_result.success性能监控跟踪查询执行指标、文件扫描效率和资源利用模式。Iceberg 的查询规划指标可提供分区剪枝效果和文件级扫描统计信息的洞察。
别忘了操作维护(例如文件压缩、过期快照清理和元数据优化)。这些操作可保持查询性能并控制长期存储成本。为这些操作设置自动化任务——相信我,你不会想手动做这些的。
最佳实践和经验教训 选择你的表格格式究竟在什么情况下应该使用 Iceberg 而不是其他选项呢?这并非总是显而易见的,而且营销材料也不会给你提供直接的答案。
当你需要强大的一致性保证、复杂的模式演变和时间旅行能力时,Iceberg 表现出色。由于其实验性质和可复制性要求,因此机器学习工作负载能特别从这些特性中受益。
Delta Lake 提供了类似的功能,并且与 Databricks 生态系统有更紧密的集成。如果你主要在 Databricks 环境中操作,或者需要诸如液态集群之类的高级功能,那么 Delta 可能是更好的选择。
Apache Hudi 针对具有复杂索引的流处理用例进行了优化。对于具有大量流处理需求或复杂更新插入模式的机器学习系统,可以考虑使用它。
还有呢?有时候普通的 Parquet 表就足够了。如果你有简单的追加操作工作负载且模式稳定,那么使用这些表格式所带来的操作开销可能并不值得。不要为实际上不存在的问题过度设计解决方案。
常见陷阱过度分区可能是我见到的最常见的错误。创建分区时,如果分区数据量少于 100MB 或每个分区包含超过一万份文件,都会损害查询规划性能。监控分区统计信息,并根据实际使用模式调整策略,而非依据理论上的理想状态。
即使有 Iceberg 的安全特性,模式演进错误仍可能破坏下游消费者。在 CI/CD 管道中实现模式验证,以便在部署前捕获不兼容的更改。使用列映射功能将逻辑列名与物理存储解耦。这会为你日后省去不少麻烦。
当团队未充分利用 Iceberg 的优化功能时,查询反模式往往就会出现。在 WHERE 子句中包含分区谓词以避免不必要的扫描。通过仅选择所需的列来使用列裁剪,而非使用 SELECT *(我知道这很方便,但你的查询性能会感谢您你)。
迁移策略从遗留数据湖迁移需要精心规划。你不能指望一蹴而就,所有事情都能顺利进行。在过渡期间实施并行系统,这样你就可以将基于 Iceberg 的管道与现有系统进行验证。
首先优先迁移关键的机器学习数据集,重点关注能从 Iceberg 功能中获益最大的表。使用导入功能迁移现有的 Parquet 数据集,而无需重写数据文件。
-- 将现有表迁移到 Iceberg 格式CALL system.migrate('legacy_db.customer_features_parquet')查询迁移意味着更新 SQL 以利用 Iceberg 的功能,同时保持向后兼容性。功能标志或配置驱动的表选择可实现逐步推出。
管道迁移应分阶段进行。先从批处理开始,然后再转向流处理工作流。Iceberg 与现有 Spark API 的兼容性可将迁移期间的代码更改降至最低。
总 结Apache Iceberg 和 SparkSQL 为构建在生产环境中真正可靠运行的机器学习系统奠定了坚实的基础。时间旅行、模式演进和 ACID 事务的结合解决了多年来一直困扰机器学习基础设施的基本数据管理难题。
通过提高开发速度、减少调试时间以及增强对系统可靠性的信心,投资得以回报。团队一致报告称实验的可重复性更好,新模型的投产速度更快。
但要取得成功,需要围绕分区、模式设计和操作流程做出深思熟虑的设计决策。这项技术提供了强大的功能,但你需要了解其底层架构以及机器学习工作负载的特定要求,才能实现其优势。
随着机器学习系统的复杂性和业务关键性不断提高,可靠的数据基础变得愈发重要。Iceberg 代表了一种成熟且可投入生产的解决方案,有助于组织构建与传统企业应用程序具有相同可靠性预期的机器学习系统。
老实说?是时候拥有那些真正符合我们需求的工具了。
来源:商财洞察君