摘要:弹指一挥间,创业已经四年了,我上一次在知乎发表文章也是两年前的事了。其实这些年我的写作并没有停止,只是换了一个战场。如果你去看https://www.timEPLus.com/blog,那里有好多我写的博客,不过都是代表公司的英文博客。说实话写这样的博客真的很
弹指一挥间,创业已经四年了,我上一次在知乎发表文章也是两年前的事了。其实这些年我的写作并没有停止,只是换了一个战场。如果你去看https://www.timEPLus.com/blog,那里有好多我写的博客,不过都是代表公司的英文博客。说实话写这样的博客真的很挑战,要用非母语来表达,要代表公司的立场,考虑市场的反馈,衡量读者的接受程度,根据内部和外部的评论修改,瞻前顾后,实在不是一件易事。在这里就不一样了,这里是我的空间,我可以自由发挥,想说啥就说啥(当然还要遵纪守法)。总之还是想要写点什么,这四年里一直都在和团队打造实时数据流处理的工具,就写写这方面的东西,和技术爱好者和数据工程师们一起学习,进步。
这个系列的博客,我想跟大家一起分享一下流数据处理的开源软件,以发表大致时间为顺序,和大家一起来学习和回顾一下。
我们今天要讲的软件叫Esper https://github.com/espertechinc/esper ,最早发布于2006年。谈起流处理领域里的开源软件,大家大多数会想起Flink,Spark这些如雷贯耳的名字来。那么这个Esper,它是何方神圣?
在深入介绍Esper之前,我们先看看Esper背后的这家公司,这家公司非常有趣。EsperTech Inc. 是一家小型的私营专业软件公司,总部位于新泽西州韦恩市,自2006年以来一直在复杂事件处理(CEP)领域成功运营。该公司年收入约400万美元, 员工不到10人,是一个非常小的精简运营团队。公司完全依靠自筹资金,从未在市场上融过资,也没有销售团队,完全依赖开源的应用和口碑传播。该公司声称自己"可能是全球同类型平台中使用最广泛的",他的知名客户包括博世、花旗集团、PayPal、埃森哲等,重点市场涉及银行业、物联网、电信和安全。
Esper项目代表了当今最成熟的开源复杂事件处理(CEP)引擎之一。自8.0版本以来,Esper已从传统查询引擎转变为现代编译器-运行时架构,生成优化的JVM字节码,将其定位为一个高性能解决方案,号称能够以亚微秒级延迟处理超过每秒700万个事件(这个数据很厉害,我并没有去验证)。
先上代码
在分析该产品的设计和架构之前,我们先看看它的样例代码。看看如何使用先。
首先,Esper使用一种名叫EPL(Event Processing Language)的类SQL语言。在流处理的世界里,主要有两种代码形式,一种就是编程语言,像是Flink的Java/Scala,或是Bytewax的python,另一种呢,就是SQL或者类似SQL,例如Flink SQL,Materialize,RisingWave还有timeplus的流式SQL等。但是这个Esper是在Java代码里运行它的EPL,可谓两者兼而有之。
我们先看一个EPL的例子:
SELECT avg(temperature) as avgTemp FROM SensorReading#length(5) GROUP BY sensorId这个和SQL很像,但是有一些创新的地方,它的#length(5)是类似一个窗口函数,取5个事件为一个窗口,计算传感器的滚动平均温度。
EPL支持几种不同的窗口函数
#length(N) - 最近的N个事件#time(30 sec) - 最近的三十秒#batch(100) - 批处理 100 个模式匹配语句
FROM PATTERN [every a=TradeEvent -> b=TradeEvent(price > a.price * 1.05)]这里的模式匹配是复杂事件处理的核心功能,也是Esper的主要卖点。我们来详细说说这个例子
这个PATTERN语句用于检测交易价格突破模式
PATTERN [...]
模式匹配 - 不是简单的过滤,而是检测事件序列在事件流中寻找特定的事件发生顺序every a=TradeEvent
every = 每当有符合条件的事件就开始一个新的模式检测a=TradeEvent = 将第一个交易事件标记为 a(起始点)->
序列操作符 - 表示"然后发生"意思是先有事件a,接着有事件bb=TradeEvent(price > a.price * 1.05)
b=TradeEvent = 将后续交易事件标记为 bprice > a.price * 1.05 = b的价格必须比a的价格高5%以上实际运行示例:
假设有这样的交易流:
时间1: AAPL, $100 ← 设为 a 时间2: AAPL, $102 (只涨2%,不匹配) 时间3: AAPL, $98 (下跌,不匹配) 时间4: AAPL, $106 ← 设为 b (比$100涨6%,匹配!)模式匹配成功! 因为:
a.price = $100b.price = $106 > $100 × 1.05 ($105)这是一个典型的金融技术分析模式:
突破模式 - 价格向上突破5%趋势反转信号 - 可能表示上涨趋势开始交易信号 - 算法交易系统可能基于此买入类似的中文表达:"每当有交易事件时,寻找后续价格上涨超过5%的交易事件"。 这种模式匹配在股票交易、风险监控、异常检测等场景中非常有用!
Join
在流处理中,Join两个或者多个流可能是最复杂的操作了,要看一个流处理工具做的好不好,能不能提供高效,复杂的Join能力是一个非常重要的评判标准。Esper也支持Join操作
-- Inner JoinFROM OrderEvent#length(10) as o, CustomerEvent#length(50) as c WHERE o.customerId = c.customerId-- Left Outer Join FROM OrderEvent#length(10) as o LEFT OUTER JOIN CustomerEvent#length(50) as c ON o.customerId = c.customerId-- Multi-Stream Joins FROM OrderEvent#length(10) as o, CustomerEvent#length(50) as c, ProductEvent#length(100) as p WHERE o.customerId = c.customerId AND o.productId = p.productId-- Temporal Joins with Time WindowsFROM OrderEvent#time(30 sec) as o, PaymentEvent#time(30 sec) as p WHERE o.orderId = p.orderId-- Self-JoinsFROM OrderEvent#length(20) as o1, OrderEvent#length(20) as o2 WHERE o1.customerId = o2.customerId ANDo1.orderId!=o2.orderIdEsper主要使用EPL来描述数据处理的逻辑,但是运行EPL还是需要一个载体,那么这个载体是什么呢?
这里有两个例子供大家参考,Java的代码比较长,我就不贴在这里了
Esper Complex Event Processing - Financial Trading Example
Esper IoT Sensor Monitoring - Simple Example
简单的说,就是用Java代码来调用EPL的处理。
Esper的基本工作流程是这样的
创建Java对象(Pojo)调用运行时功能发送实时事件到这个对象创建EPL,编译执行变异结果,使用监听器回掉,处理EPL执行的实时结果// 1SensorReading reading = new SensorReading("TEMP_001", "Server Room", 78.5, 45.2);// 2runtime.getEventService.sendEventBean(reading, "SensorReading");// 3String temperatureAlert = "SELECT sensorId, location, temperature " +"FROM SensorReading " +"WHERE temperature > 75.0";EPCompiled compiled1 = EPCompilerProvider.getCompiler.compile(temperatureAlert, new CompilerArguments(config));// 4runtime.getDeploymentService.deploy(compiled1).getstatements[0].addListener(new UpdateListener {@Overridepublic void update(EventBean newEvents, EventBean oldEvents, EPStatement statement, EPRuntime runtime) {for (EventBean event : newEvents) {System.out.printf("️ HIGH TEMP ALERT: Sensor %s in %s reading %.1f°F%n",event.get("sensorId"), event.get("location"), event.get("temperature"));}}});支持Kafka是流数据处理必不可少的功能,Esper当然也不例外
// Kafka consumer loopconsumer.subscribe("sensor-readings");while (true) {ConsumerRecordsrecords = consumer.poll(100); for (ConsumerRecordrecord : records) { // Parse JSON message to Java object SensorReading reading = parseJson(record.value); // Send to Esper runtime.getEventService.sendEventBean(reading, "SensorReading"); } }同时Esper还支持HTTP,文件,数据库,MQTT等多种数据源的接入。
好了,下一步我们看看Esper的架构和设计。
Esper在8.0版本中的架构有了一个根本性的范式转变,该系统现在作为两阶段编译器-运行时架构运行,类似于现代JVM语言如Scala和Kotlin,其中事件处理语言(EPL)语句在执行前被编译成优化的JVM字节码。
它的主要模块有:
esper-common模块提供共享API和实用程序esper-compiler处理EPL到字节码的转换esper-runtime执行编译的字节码esperio为Kafka、AMQP和Spring JMS等系统提供I/O适配器这种分离使编译器能够作为无状态服务运行,允许并行编译和部署场景,其中编译与运行时执行分别进行。
编译流水线通过复杂的多阶段处理运行:
第1阶段使用ANTLR 4.13.1进行EPL解析和AST构建第2阶段执行语义分析和类型解析第3阶段处理查询优化和字节码生成规划这种方法消除了虚方法调用,移除了类型转换开销,并启用了JIT编译器优化,可以将字节码转换为本机机器代码。
事件处理遵循共享过滤器索引架构,其中传入的事件通过优化的过滤器树流向相关语句,上下文分区实现了细粒度的并发控制。运行时通过将编译的字节码存储在JVM类空间(堆外)中来维持最小的堆使用,同时通过复杂的数据窗口实现管理事件保留策略,仅在时间窗口、长度窗口或模式匹配要求明确需要时才保留事件。
这里我们看一下Esper的一些技术实现细节:
Esper的技术实现使用字节码生成框架创建自定义聚合行类,将所有聚合状态集中在单个对象中,大幅减少内存分配并改善缓存局部性。早期版本需要为每个聚合值使用单独的对象,而编译器现在生成带有原始字段的专用类,消除了包装对象开销。
模式匹配采用动态状态树而非传统的非确定性有限状态机(FSM),基于斯坦福大学的Rapide模式语言(发表于1996年,属于上古技术)。这种方法随着模式子表达式激活动态创建和销毁状态分支,带标记的事件仅在模式触发或子表达式终止之前保留在内存中。该实现支持复杂的嵌套模式,同时对未标记模式保持零内存使用,并对活动模式子表达式设置可配置限制。
动态状态树是一种实时构建和销毁的树形数据结构,用于跟踪复杂事件模式的匹配状态。与传统NFA的区别:
传统NFA(非确定性有限自动机):固定状态机 → 预定义所有可能状态 → 占用固定内存Esper的动态状态树:按需创建状态分支 → 动态构建 → 用完即销毁事件处理语言(EPL)扩展了SQL-92,增加了复杂的时间构造,包括用于时间关系的Allen区间代数、全面的窗口功能和高级模式匹配语法。语言编译过程将声明性EPL转换为优化的字节码,以最小的运行时解释执行,实现了通常与编译语言相关的性能特征,同时保持SQL的声明式简洁性。
内存管理遵循增量网络原理,只在处理组件之间传递数据变更,消除不必要的对象分配和垃圾收集压力。运行时采用线程本地缓冲区、惰性状态加载,以最大化并发处理能力。增量网络原理的核心思想是:只传递和处理数据的变化(Delta),而不是完整的数据集,这种设计理念在流数据处理里是一种常见的模式,别许多其他的流数据处理工具所采用。
Esper在8.2.0版本中引入的自定义JSON实现。编译器生成特定模式的JSON解析器类,而不是依赖外部库,提供直接的JSON到对象转换,无需中间表示。性能基准测试显示,这种方法每秒可处理625,105次操作,显著优于Gson。
Esper的并发架构提供应用程序控制的线程模型,引擎在运行时不强制应用程序采用特定的线程模型。该实现支持用于事件队列的入站线程、用于结果传递的出站线程、用于基于时间处理的定时器执行线程,以及用于语句级并行化的路由执行线程。
细粒度锁定在上下文分区级别运行,使无状态语句能够无锁执行,同时为不同数据分区提供隔离的处理上下文。这种方法能够有效扩展到多核系统,同时避免传统数据库系统中典型的过度同步开销。
Esper开源版本不支持原生集群,这个功能只在企业版提供。
作为一个最早做开源流处理工具的项目,这个工具能活到现在就已经是个奇迹。这也从侧面说明,这个项目不简单。我觉得他最大的成功就是在于专注在一个非常具体的领域CEP,当然这也可能是他最大的失败,因为他并没有成长为Flink,Spark这样的行业巨鳄。但每个企业都有自己的生存之道,CEP是Esper的舒适区,取得了相当的成功,避免了和其他竞争对手在主战场上的激烈竞争。对一个创业公司而言,有时候,活着就是硬道理。
有没有了解这个项目的同学,谈谈你对Esper的意见?
来源:闻数起舞