摘要:Matei (https://github.com/mateiz)是Databricks的CTO和联合创始人。大学在滑铁卢学习,然后在谷歌和脸书做过一段时间的实习生,之后就在伯克利读博士。正是在他读博士期间,他在AMPLab开始了这个创新性的数据处理工具Spa
在数据处理领域里,Apache Spark可谓如雷贯耳,它是一个用于大规模数据处理的统一分析引擎,自诞生以来就彻底革命了大数据处理领域。
在介绍Spark之前,我想先向大家介绍一些Spark的最初作者,Matei Zaharia。
source :https://www.dwih-newyork.org/files/2023/05/2023-06-HPI-KISZ_16-9-954x537.jpg
Matei (https://github.com/mateiz)是Databricks的CTO和联合创始人。大学在滑铁卢学习,然后在谷歌和脸书做过一段时间的实习生,之后就在伯克利读博士。正是在他读博士期间,他在AMPLab开始了这个创新性的数据处理工具Spark。
除了Spark,Matei还参与了以下的开源项目:
MLflow 22k star 开源的机器学习生命周期管理平台,用于实验跟踪、模型打包和部署Delta Lake 8.3k star构建在数据湖之上的开源存储框架,提供ACID事务和数据版本控制Dolly 20.8k star Databricks开源的指令微调大语言模型,基于Pythia模型训练而成DSPy 28.5k star 用于优化语言模型提示和权重的编程框架,将提示工程算法化MegaBlocks 1.5k star 高效的稀疏Transformer实现,专为大规模专家混合(MoE)模型设计ColBERT 3.6k star 高效的神经信息检索模型,结合BERT的表示能力和倒排索引的效率这些项目, 以Spark,MLFlow和Delta Lake为主,日后成为了Datebricks构建数据帝国的主要武器。
Spark
从github上我们能够看到,Matei在15年之前贡献了Spark的主要代码。
好了言归正传,Spark究竟带来了什么呢?
我们先讲讲数据处理的核心,无论什么样的数据处理技术,其核心都是围绕着两个维度来开展的,一个是存储,一个是计算。所谓的大数据无非就是要解决数据怎么存,怎么访问,怎么计算的问题。最先针对大数据领域的这两个核心问题开始攻击的是谷歌。谷歌分别在2003年和2004年发表了两篇论文:
《The Google File System》 - Sanjay Ghemawat, Howard Gobioff, Shun-Tak Leung《Mapreduce: Simplified Data Processing on Large Clusters》 - Jeffrey Dean, Sanjay Ghemawat前者启发了Hadoop分布式文件系统(HDFS)的设计,解决了大数据存储的问题。而后者开创了Hadoop MapReduce框架的实现,解决了大数据计算的问题。
MapReduce将复杂的数据处理任务分解为两个阶段:Map(映射)和Reduce(归约), 简单的说,就是分而治之,你数据不是很多吗,那我把你们都切成一块一块的,用多个机器,每一个处理一部分,然后把处理结果合并到一起,不就好了。
MapReduce的处理流程如下
Map阶段将输入数据分割成独立的块并行处理每个数据块输出中间键值对(key-value pairs)Shuffle阶段对Map输出进行排序和分组相同key的数据发送到同一个ReducerReduce阶段接收相同key的所有value执行聚合/汇总操作输出最终结果source https://datascientest.com/en/files/2023/09/illu_Schema_mapreduce-04.png
MapReduce的计算模型成就了Hadoop的辉煌,它简单易懂,很容易扩展,把计算移动到数据,可以减少网络传输,最重要的用户可以用非常便宜的计算硬件来处理海量数据。很快Hadoop就流行起来了。
然而MapReduce的计算模型并非没有缺点。
高延迟磁盘I/O密集,每个阶段都要读写磁盘作业启动开销大不适合实时处理编程复杂需要手动编写Map和Reduce函数复杂逻辑需要多个MapReduce作业调试困难资源利用率低CPU经常等待磁盘I/O内存利用不充分Reduce阶段必须等待所有Map完成不适合迭代算法机器学习算法需要多轮迭代每轮都要重新读取数据中间结果无法缓存只支持批处理无法处理流式数据, 因为Map需要对所有数据分片不支持交互式查询实时性要求高的场景无法满足Spark的出现恰恰能够解决一部分Hadoop的MapReduce的问题,让我们来看看
Spark实现了几个核心的抽象概念,完全改变了数据处理的体验。
弹性分布式数据集(RDDs)不可变的分布式对象集合通过血缘跟踪实现容错支持惰性求值以进行优化支持函数式编程范式DataFrames和Datasets构建在RDDs之上的高级API提供模式信息和类型安全支持类SQL操作和优化利用Catalyst优化器提升性能有向无环图(DAG)表示逻辑执行计划支持基于阶段的执行和优化促进故障恢复和高效资源利用首先基于RDD,用户操作的抽象的对象,而非mapper和reducer,这样的操作好比面向对象和用汇编。用户可以更聚焦在业务逻辑,而不是想方设法把业务逻辑映射成mapper和reducer。
Dataframe就更进一步了,类似Pandas的Dataframe,这个概念是广大数据工程耳熟能详的东西,亲切可爱。对于用户而言,就像是从石器时代跃迁到了蒸汽时代,写代码的效率得到了极大的改进。
而DAG是Spark相较于传统MapReduce的核心优势之一,它通过全局视角的优化、智能的容错机制和灵活的执行策略,显著提升了大数据处理的效率和可靠性。
这里看代码我们做一个简单的对比
// MapReduce写WordCount需要大量样板代码public class WordCountMapper extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString); while (tokenizer.hasMoreTokens) { word.set(tokenizer.nextToken); context.write(word, one); } } } public class WordCountReducer extends Reducer{ private IntWritable result = new IntWritable; public void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get; } result.set(sum); context.write(key, result); } } # Spark只需几行代码text_rdd = spark.textFile("hdfs://path/to/file")word_counts = (text_rdd.flatMap(lambda line: line.split).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b))除了显而易见的代码层面的革新,Spark在内存管理,磁盘IO,Shuffle的优化,代码生成和向量化,网络传输和序列化等方面都有了不同的创新和进步。
在功能上,Spark也支持Spark SQL, Spark ML机器学习(解决了Mapreduce不方便迭代计算的问题)和Spark Graph图处理等更高级的功能接口。
基于这些优势,Spark的快速流行可想而知。
驱动程序(主节点)协调整个Spark应用程序创建SparkContext/SparkSession作为入口点将用户代码转换为有向无环图(DAGs)包含DAG调度器、任务调度器、后端调度器和块管理器管理作业执行和资源分配集群管理器管理整个集群的资源支持多种部署模式:Standalone: 内置集群管理器YARN: Hadoop的资源管理器Mesos: 通用集群管理器Kubernetes: 容器编排平台(自Spark 3.1)执行器(工作节点)执行驱动程序分配的任务在内存中存储中间结果使用线程并发运行多个任务管理本地缓存和数据持久化source https://techvidvan.com/tutorials/wp-content/uploads/sites/2/2019/11/Internals-of-Job-Execution-In-Spark.jpg
可是以上我们提到的东西,还有一个缺点,那就是流处理,Spark仍然是一个批处理的计算模式,不能被有效的使用在流计算的场景下。
于是Spark就推出了Spark Stream
Spark在2013年和2016年分别推出了基于RDD的微批处理的第一代流处理和基于DataFrame/Dataset API的统一流批的第二代流处理实现。
本质上,Spark利用把批数据分成很小的批次的方式来实现流处理。从量子力学的角度,这样的设计无可厚非,因为当你把物质分的足够小,你就会遇到量子这个极限,这个时候批和流就统一在一起了。
实时数据流 → [batch1][batch2][batch3][batch4] → 批处理引擎时间轴: 0-1秒 1-2秒 2-3秒 3-4秒// DStream = 一系列按时间分布的RDDclass DStream[T] {// 每个时间间隔对应一个RDDdef compute(time: Time): Option[RDD[T]]// 转换操作def map[U](func: T => U): DStream[U]def filter(func: T => Boolean): DStream[T]def window(windowDuration: Duration): DStream[T]}之后的第二代,Spark统一了流批的概念,将流看成是不断增长的表,这样的设计看上去更自然。
# 将流看作不断增长的表静态表: [Row1, Row2, Row3]流式表: [Row1, Row2, Row3] → [Row1, Row2, Row3, Row4] → ...这样做的好处是统一了流批处理的接口
# 同样的代码,批流通用def process_data(df):return df.groupBy("category").count# 批处理batch_df = spark.read.json("batch_data.json")result = process_data(batch_df)# 流处理 stream_df = spark.readStream.json("stream_data")query = process_data(stream_df).writeStream.start我们看一个ETL管道的例子
# 实时数据清洗和转换def etl_pipeline:raw_stream = spark.readStream.Kafka.format("kafka").load# 数据清洗cleaned = raw_stream \.filter(col("value").isNotNull) \.withColumn("parsed", from_json(col("value"), schema))# 数据转换transformed = cleaned \.withColumn("processed_time", current_timestamp) \.withColumn("category", when(col("amount") > 1000, "high").otherwise("low"))# 写入多个目的地# 实时仪表板dashboard_query = transformed.writeStream \.format("kafka") \.option("topic", "dashboard") \.start# 数据仓库warehouse_query = transformed.writeStream \.format("delta") \.option("path", "/data/warehouse/transactions") \.startreturn [dashboard_query, warehouse_query]除了编程接口,Spark提供了流计算中必备的各项功能,包含
时间和窗口管理水位线技术(Watermarks) - 处理延迟数据和确定窗口关闭时间窗口函数滑动窗口 (Sliding Windows) - 重叠时间窗口计算滚动窗口 (Tumbling Windows) - 不重叠固定时间窗口会话窗口 (Session Windows) - 基于活动间隔的动态窗口事件时间处理 (Event-time Processing) - 基于数据中的时间戳进行计算处理时间处理 (Processing-time Processing) - 基于系统处理时间状态管理checkpoint机制 - 定期保存应用状态和元数据增量Checkpoint - 只保存状态变化部分,提高效率状态存储后端 - 支持内存、HDFS、S3等多种存储状态TTL - 自动清理过期状态数据有状态操作 - mapGroupsWithState, flatMapGroupsWithState聚合状态 - 内置聚合函数的状态管理容错和可靠性精确一次语义 (Exactly-once Semantics) - 端到端数据一致性保证Write-Ahead Log (WAL) - 预写日志确保数据不丢失自动故障恢复 - 基于checkpoint自动重启和状态恢复动态容错 - 实时检测和处理节点故障数据去重 - 自动处理重复数据数据源和输出多数据源支持 - Kafka, Kinesis, Socket, File, Rate等Schema演化 - 自动适应数据结构变化多输出模式 - Append, Complete, Update输出模式自定义数据源 - 支持用户自定义Source和Sink性能优化背压控制 (Backpressure) - 根据处理能力自动调节输入速率速率限制 - 限制每秒处理的记录数动态背压 - 自动根据处理能力调整消费速率批次大小控制 - 动态调整每批处理的数据量动态分区调整 - 自适应调整并行度缓存优化 - 智能缓存频繁访问的数据代码生成 - Tungsten引擎的代码生成优化向量化执行 - 批量数据处理优化连续处理模式 - 低延迟流处理模式(最近推出的实验性功能, Realtime Mode)内存管理 - 防止内存溢出的保护机制触发器和调度处理时间触发器 - 基于固定时间间隔触发连续触发器 - 尽可能快地连续处理一次性触发器 - 处理一个批次后停止自定义触发器 - 用户自定义触发逻辑高级分析功能流式JOIN - 流与流、流与静态数据的连接流式聚合 - 实时计算sum、count、avg等聚合指标复杂事件处理 - 模式匹配和事件序列检测流式机器学习 - 实时模型训练和预测时间序列分析 - 专门的时间序列处理函数这里我就不一一介绍,有兴趣的同学,可以去看相关的资料。
总结Apache Spark代表了大数据处理技术的重大进步,提供了一个集速度、灵活性和易用性于一体的统一平台。其复杂的架构采用了Catalyst优化器和Project Tungsten优化技术,为大规模数据处理工作负载提供卓越的性能。
该框架的优势在于能够在单一、连贯的生态系统中处理多样化的工作负载——从批处理到机器学习。其主从架构配合容错设计,使组织能够可靠高效地处理PB级数据集。
然而,Spark并非没有局限性。其内存密集型特性需要大量基础设施投资,系统的复杂性需要专业知识才能实现最佳性能。微批处理模型虽然适用于许多用例,但无法满足真正的实时处理要求。这也给了我们将要谈到的另一个出色的流数据处理工具Apache Flink足够的成长空间。
Spark的适用场景
批流混合处理需求复杂的数据转换和聚合机器学习集成已有Spark生态系统对延迟要求不严格 (秒级可接受)毫秒级延迟要求简单的事件路由轻量级部署需求Spark的江湖地位类似屠龙宝刀,杀伤力极大。但是在流处理的领域里,有另一个神一样的存在,Apache Flink,倚天不出,谁与争锋?
来源:闻数起舞