AutoMQ Table Topic——流式数据入湖最佳实践

B站影视 韩国电影 2025-05-24 09:00 2

摘要:AutoMQ 是 Apache Kafka 的一个云原生版本,AutoMQ 基于云的共享存储(如对象存储、云盘)对 Apache Kafka 进行了云原生的重塑。AutoMQ 不仅完全兼容 Apache Kafka API,并且提供了最高可达十倍的降本优势。

导读AutoMQ 基于云的共享存储对 Apache Kafka 进行了云原生的重塑,AutoMQ 无需额外 ETL 任务,支持高效、低成本地将流数据无缝入湖。

本次对 AutoMQ 的介绍将围绕下面六点展开:

1. AutoMQ 介绍

2. 流与#数据湖

3. Table Topic 架构介绍

4. Table Topic 架构优势

5. 产品功能演示

6. 问答环节

分享嘉宾|韩旭 AutoMQ 内核技术负责人

编辑整理|旭锋

内容校对|李瑶

出品社区|DataFun

01

AutoMQ 介绍

AutoMQ 是 Apache Kafka 的一个云原生版本,AutoMQ 基于云的共享存储(如对象存储、云盘)对 Apache Kafka 进行了云原生的重塑。AutoMQ 不仅完全兼容 Apache Kafka API,并且提供了最高可达十倍的降本优势。

Apache Kafka 默认使用三副本存储在磁盘上,而 AutoMQ 依赖云的共享存储,其底层通过纠删码或多副本来存储数据,相当于 AutoMQ 层面只有一副本。此外,AutoMQ 依赖的对象存储比磁盘便宜,如在 AWS 上 S3 的价格只有 EBS 的 30%。基于三副本优化机制与对象存储天然的低成本优势,AutoMQ 在存储成本控制上实现了显著突破,相较于传统方案,存储成本降幅最高可达 90%。由于 AutoMQ 的数据全部卸载到共享存储(如 S3)上,其节点是无状态的。相比 Apache Kafka 扩容需要数据复制和数据均衡,AutoMQ 的扩容无需数据搬迁,将扩容的效率从小时级提升至秒级。

综上所述,通过云的共享存储,AutoMQ 既节省了存储成本,又因其弹性优势变相节省了计算成本。

Table Topic 是 AutoMQ 近期发布的一项重要能力,它将流和表存储进一步融合,提供了将流数据无缝入湖的能力。Table Topic 的推出,使得 AutoMQ 在存储成本优势的基础上,进一步挖掘存储数据的价值,为用户提供了更加丰富的#数据处理 和分析手段。

02

流与数据湖

1. 数据库数据的流转过程

#数据库 数据通常来源于 Log 或 BinLog,通过 Kafka 流转,再由 Spark 或 Flink 写到数据库。对数据库表进行增删改查(DML)操作,这些变更会被数据库的 BinLog 以流的方式追加记录。

2. 流表双写业务形态

使用数据库的 BinLog 作为 source of truth,通过事件驱动架构实现业务解耦。如订单系统触发积分系统,通过 Spark 或 Flink 搭建 ETL 任务,从 Kafka 中订阅流数据,转化成表格式写入数据库,用于报表分析。数据从数据库的表开始,通过 BinLog 事件记录,进入 Kafka 流系统,为实时计算提供服务,最后转换成数据湖表为报表分析提供服务。

3. 流与数据湖衔接中的成本问题

在流与数据湖的衔接中,通常会维护一套 ETL 任务,这带来三方面的成本:

开发成本:需要为每个数据库 BinLog 订阅编写一套 ETL 任务,对数据变化进行 Schema 管理。运维成本:包括两部分,一是要对搭建的 Spark 或者 Flink 的 ETL 集群进行监控运维,如 Worker 的 CPU、内存情况;二是要观测同步工作的延迟,调整资源水位、Worker 的规格和数量,来确保整个任务的实时性和稳定性。资源成本:Kafka 消费消息过程中存在宽带消耗。同时 Kafka Record 转 Parquet 时会占用内存空间,为此 Worker 通常要预留几百兆的内存空间。另外,AutoMQ 中 Stream Storage 以流格式存储数据,而 Iceberg 中以 Table Storage 存储数据,相同数据以不同格式存储。

这些成本问题无疑增加了流与数据湖衔接的复杂性与成本负担,为此,AutoMQ 推出了 Table Topic,为解决上述问题提供了新的思路和有效方案。

03

Table Topic 架构介绍

1. Table Topic 架构

AutoMQ 已将流数据全部存在共享存储 S3 上,数据湖的数据也存在 S3 上,Table Topic 实现了二者的整合: 统一流的存储和湖的存储,让用户以 Kafka 的协议来写入数据,AutoMQ 会将 Topic 的数据无感入湖。

Table Topic 的架构中,最上方是数据源,包括 Log(BinLog 或日志数据)、汽车或智能家居 IoT 数据、ClickStreams、订单数据以及财务数据等,通过 Kafka 协议发送到 AutoMQ。为支持写入低延迟和 S3 的低成本,AutoMQ 首先将写入的热数据存储到 Stream Storage 中(低延迟、高吞吐的存储),之后 AutoMQ 后台定期将 Stream Storage 数据转化成 Table Storage 数据,即 Parquet 格式。

在线任务和实时计算仍由 Kafka Consumers API 消费数据,AutoMQ 收到 Fetch API 的请求以后,会结合 Stream Storage 的数据和 Table Storage 数据一起返回给用户。

数据湖分析引擎(如 Spark、Flink 或 Presto)无需额外的 ETL 任务就可以直接访问 Iceberg 表格式的数据。

AutoMQ 借助多元存储,获得了存算分离的极致弹性能力,同时 Iceberg 这种标准表格式又让 AutoMQ 有直接共享底层存储数据的机会。通过 Table Topic,AutoMQ 从基于 S3 的 Shared Storage 架构变成了 Shared Data 架构。

2. Table Topic 工作原理

Table Topic 由三部分组成:

Builtin Schema Registry:是 AutoMQ 内置能力,确保写入到 Kafka 的数据已经是一个结构化并且合法的数据,明确字段类型,并确保其演进策略。TableCoordinator:相当于 Table Topic 的大脑,负责定期协调和触发 TableWorker 数据上传,并进行中心化 Iceberg Snapshot 提交。防止所有 Worker 提交产生冲突,并减少 Snapshot 产生量。每个 Table Topic 对应一个 Coordinator。TableWorker:可以理解为数据面,负责将 Kafka Record 转换成 Parquet,上传到 S3。并解析 Kafka Record 中携带的 Schema,与 table Schema 对比,进行 table Schema 演进。每个节点 Topic 维度一个 Worker,随着 Kafka 分区的打开关闭动态变更。

一次 Commit 的简要流程如下,首先是 TableCoordinator 到了用户设置的提交时间和 COMMIT 间隔(比如放开 1 分钟到 15 分钟),通过内部系统被广播请求到各个节点,Worker 收到请求后会根据 commit 请求里面携带的起始同步位置,进行从 Kafka Record 到 Parquet 的转换和上传。上传完成后 Worker 再将 DataFile 文件句柄返回给 Coordinator。最后 Coordinator 再去提交 Iceberg Snapshot,完成整个 Commit,此时数据即在数据湖中可见。

3. 架构优势

(1)Schema 管理

前面提到传统的数据湖入湖方式中,Schema 散落在各个系统中的,需要数据的生产方和 ETL 入湖的任务管理方之间进行协调,否则易出现数据丢失。以 PB 序列化为例,假设上游新增一个字段,但是 ETL 任务里面定义的 Schema 未及时更新,ETL 任务就会将新增的一列数据丢失。Table Topic 如何解决这个问题呢?

首先 Table Topic 将 Schema 的定义收敛到数据的生产方,即 Kafka Client。Kafka Client 发送消息时,序列化器解析 Payload 中 Schema 信息,并将 Schema 注册到 ShemaRegistry,并校验是否符合向后、向前兼容的演进策略。序列化器会将 Schema 的唯一标识 SchemaID 和序列号作为整体的 Record 发送到 AutoMQ。AutoMQ 在同步的过程中,TableWorker 从 partition 里拉取 Record,解析 Record 中的 SchemaID,并通过 SchemaID,从 SchemaRegistry 加载所需的 Schema,与 Iceberg 中 table 的 Schema 进行对比,如果 table 不存在,则自动创建一个 table;如果 SchemaID 上升,比 Iceberg table 多了一列,将更新 Schema。整个过程无需用户配置,Schema 演进实现了完全自动化。

(2)Exactly Once

Table Topic 的另一特性是 Exactly Once。

Table Topic 通过 Coordinator 进行同步管理,并最终 commit 到 Iceberg。假设 Coordinator 在发起 Iceberg commit 后异常宕机,但实际 commit 已经成功,此时若 Coordinator 重新拉起,会尝试重新提交,此时会有重复提交风险。

为解决这一问题,引入了两阶段提交机制。Table Topic 的 Coordinator 在发起 Spark commit 前,会先进入 PRE_COMMIT 状态,记录 commit ID 和回滚位点。如果 commit 成功,则推进到 COMMIT 状态;如果失败,则根据回滚位点进行回滚。

通过两阶段提交机制,Table Topic 确保了在同步过程中数据的 Exactly Once,即数据不会重复也不会丢失。

(3)同步机制优化

Coordinator 作为控制面负责定期触发 commit,commit 间隔由用户指定,范围是 1 分钟到 15 分钟。假设在 15 分钟这个场景:

若 Coordinator 触发时同步,此时 Worker 需要在有限的时间内完成这 15 分钟,造成数据大概率不在缓存中,Worker 会大量地从 S3 读取历史数据,并产生大量的 CPU 尖刺和网络流量。若 Coordinator 触发之前同步,Worker 随消息写入实时写 Parquet 数据文件,假设 Parquet 文件大小为 32M,意味着假设服务器需要承担 100 要同步的 Table Topic,额外需要 3G 的内存处理,内存压力较大。

Table Topic 对两种同步机制进行了整合,既有实时同步、又有触发式同步。首先,攒批实时同步,会持续监测未同步的数据大小,当数据超过一定阈值(如 32M)时,即触发上传,上传时分时复用内存。另外,Coordinator 按用户设定的时间间隔触发同步,仅需在实时同步基础上,同步末尾一小部分。实时同步减少延迟,触发式同步降低系统开销,两者结合实现了同步效率和系统性能的提升。

图中绿色是 Topic T1 的 Worker,黄色是 Topic T2 的 Worker。在 T0 时间点,Topic T1 同步的数据到达 32M 时会直接生成一个 DataFile0,在 T1 时间点,Topic T2 也达到了 32M,也会生成一个 DataFile2。

T2 时间点时,Topic T1 的 Coordinator 触发了 commit。此时 Worker 发现还有两小块未同步,此时 Worker 会把这两小块数据生成一个新的 DataFile1,并将 DataFile0 和 DataFile1 的数据一起返回给 commit。这就是同步的分时复用优化。

实测下,在 8C16G 机器上可以同时支持 80MiB/s 的 Table Topic 写入,并且可提供 260w Field/s 的转换能力。

(4)Watermark 机制解决数据就绪问题

Kafka 流数据无法回答“昨天数据是否就已全部就绪”的业务问题,Table Topic 借鉴 Flink 的 Watermark 机制解决了这个问题。

Worker 完成 commit 数据上传后,会将 event time 作为 watermark 返回。 Coordinator 汇总所有 Worker 的 watermark 信息,并取最小值作为整个同步的 watermark。同样,watermark 信息也会写入 Snapshot summary 中,下游调度框架可以定期检查 Snapshot summary 的 watermark 是否超过分区时间,如果超过了,就可以开启下游作业,实现流转批。

04

Table Topic 架构优势

Zero ETL

通过使用 Table Topic,无需再搭建额外的 Spark 和 Flink 集群来处理数据入湖,简化了数据处理流程。在创建 Topic 时,可以选择开启 Table Topic 功能,并指定相关属性(如在 upsert 情况下指定主键等),即可实现流数据的自动入湖。

Auto Scaling & Auto Balancing

Table Topic 的同步吞吐能力随着 Kafka 集群规模线性伸缩,无需单独扩展开发集群或任务集群。AutoMQ 自带负载均衡机制,能够根据流量进行负载均衡,确保同步能力的均衡性。

Schema

通过 Kafka Schema 确保数据质量,使数据一进入即符合标准。根据 Record 的 Schema 变更,自动对 Iceberg Table 进行 DDL 操作,并实现了 Zero Downtime。

低成本

无 ETL 消费 Kafka 的网络开销,也无需搭建 ETL 集群。另外,内存分时复用,占用空间固定,同步的内存开销不随 Table Topic 数量的线性增长而增长,仅与写入流量相关。

05

产品功能演示

通过 Kafka API 发送消息。如上图所示,使用 Avro Console Producer 发送 Avro 格式的消息。指定 Schema Registry 的地址和 Schema 的描述,编写对应的 PayLoad 进行发送。

发送消息后,AutoMQ 会自动将数据上传到 Iceberg。本例中采用 AWS 的 S3 Table 作为 catalog 存储,借助 Athena(类似于 Spark 查询引擎)执行搜索语句,即可查询到已写入的数据。

整个流程中,用户仅需像普通发送消息一样写到 Kafka,AutoMQ 会负责无痛地将数据写入到数据湖。

目前,Table Topic 仍处于 AutoMQ 商业版的打磨和完善阶段。团队计划在优化成熟后,将该功能模块完整开源至 AutoMQ 社区版,助力开发者生态共享这一云原生流数据处理创新成果。

06

问答环节

A1:在电商场景的大促期间,流量峰值会突然增高,数据需要逐步被消化。此时开发集群需要扩容,随着集群扩容,同步的性能也会线性增长。用户只需评估写入流量,无需再考虑其他因素。

A2:AutoMQ 目前优先支持 Iceberg,因为 Iceberg 在国外社区是一个标准且比较流行。未来,将持续倾听用户诉求,可能会纳入对 Paimon、Hudi 等其他平台的支持。

A3:如果数据同时要被在线使用和入湖,基于 Table Topic 这种方案更合适。Fluss 更多模拟的是 Kafka 的写 API,希望用户可以直接通过 Kafka API 写入数据,但 Kafka 还要很复杂的一部分是消费。

以上就是本次分享的内容,谢谢大家。

来源:DataFunTalk

相关推荐