9000字长文解析实时流计算应用案例之实时流数据特征提取引擎!

B站影视 电影资讯 2025-04-20 13:42 1

摘要:其中,第一个是以CompletableFuture框架实现的实时流数据特征提取引擎;第二个是基于当前主流的流计算框架Flink,实现一个具备特征提取和规则模型功能的风控引擎。

在本篇中,我们将其中部分知识点整合起来,以展示两个完整的实时流计算应用案例。

其中,第一个是以CompletableFuture框架实现的实时流数据特征提取引擎;第二个是基于当前主流的流计算框架Flink,实现一个具备特征提取和规则模型功能的风控引擎。

特征提取是大多数在线决策系统必须经过的步骤。不管是在风控、监控、预警等各种场景下,也不管我们使用的是最简单的规则系统,还是复杂的统计模型或机器学习模型,它们的输入都一定是已经量化的各种数据,我们把这种数据称为特征。由此可见,大多数场景需要提取各种特征,因此我们准备设计一个通用的实时流数据特征提取引擎。为了方便特征引擎的使用,我们还需要在这个引擎上面提供一层DSL语言,这一方面能够让特征引擎更加通用、灵活,另一方面也会简化特征引擎的使用。

我们在前面分别使用两种方式实现了实时流计算应用,其中一种是完全从零开发,而另一种则是基于Jav.8异步编程框架CompletableFuture实现的。由于CompletableFuture框架的灵活性和便利性,所以在本节中,我们将使用它来实现实时流数据特征提取引擎。我们还可以通过Kafka对流数据的分区功能,以及Apach.Ignite或Redis集群对状态的分布式管理功能,将这个实时流数据特征提取引擎扩展为集群。

我们在这里采取自顶向下的设计方式,首先定义特征提取引擎使用的DSL。设计的DSL包含7个主要概念:输入流(source)、输出流(sink)、字段(filed)、算子(OPERATOR)、函数(function)、宏函数(macro)和操作模式(mode)。

1.输入流

输入流定义了事件的输入流。我们的实时流计算特征提取引擎以Kafka等消息中间件作为事件的输入流。例如,下面定义了一个从Kafka的event-input主题读取数据的输入流。

set-source --source '{"zookeeper": "127.0.0.1:2181", "topic": "event-input",

"group": "test-group", "offset": "largest"}'

2.输出流

输出流定义了事件的输出流。与输入源对应,当特征提取引擎对事件提取完特征后,将特征附加(append)到事件上,再将附加了特征的事件输出到Kafka等消息中间中。例如,下面定义了一个将特征提取结果输出到Kafka的event-output主题的输出流。

set-sink --sink '{"broker": "127.0.0.1:9092", "topic": "event-output"}'

3.字段

我们以JSON的方式表示一个事件。由于并非事件中的每一个字段都会参与特征提取,并且原始事件中字段的名字并不一定与我们所预想的一致,所以需要通过字段映射功能来设定特征引擎感兴趣的字段与原始事件字段之间的对应关系。

下面代码定义了两个字段的映射关系,其中,c_timestamp对应原始消息中的$.event.timestamp字段,user_id对应原始消息中的$.event.user_id字段,另外两个与此类似。这里使用JsonPath的方式来表达原始事件中字段的位置。

add-field --event-type transaction --field-name c_timestamp --field-path

$.event.timestamp

add-field --event-type transaction --field-name user_id --field-path

$.event.user_id

add-field --event-type transaction --field-name device_id --field-path

$.event.device_id

add-field --event-type transaction --field-name amount --field-path $.event.amount

4.算子

我们的特征引擎所处理的是流数据,流数据是一种时间序列,因此我们针对时间序列定义了算子概念。

OPERATOR(window, event_type, target[=value], on1[=value], on2[=value], ...)

说明:

·OPERATOR表示要统计的类型,如计数(COUNT/COUNT_DISTINCT)、求和(SUM)、最大(MAX)、最小(MIN)、均值(AVG)、方差(VARIANCE)、集合(SET)、列表(LIST)等。·window表示统计的窗口。例如,“1d”表示过去一天,“5m”表示过去5分钟,“2h”表示过去2小时等。

·event_type表示事件类型。例如,“transaction”表示交易事件,“loan_application”表示贷款申请等。可以根据具体业务场景设置事件类型。

·target表示统计的目标变量,on1、on2等则是对target进行划分的维度。例如,“过去一天同一用户的总交易金额”中,target为“交易金额”,on为“用户”。而在“过去一周同一用户在同一I.C段的总交易金额”中,target为“交易金额”,on为“用户”和“I.C段”。

另外,target和on后面可以通过等号指定一个值,用于指定变量为特定值进行计算。target和on还可以递归地定义为算子或函数。

下面是几个算子的例子:

# 过去一周内在同一个设备上交易次数

COUNT(7d, transaction, device_id)

# 过去一周内在设备"d000001"上交易次数

COUNT(7d, transaction, device_id=d000001)

# 过去一天同一用户的总交易金额

SUM(1d, transaction, amount, userid)

# 过去一天用户"ud000001"的总交易金额

SUM(1d, transaction, amount, userid=ud000001)

# 过去一周内在同一个设备上注册的用户登录过的设备数

FLAT_COUNT_DISTINCT(7d, login, device_id, SET(7d, create_account, userid,

device_id))

# 过去一周内在设备"d000001"上注册的用户登录过的设备数

device_id=d000001))

5.函数

相比算子是对时间序列的操作,函数则用于对事件的字段进行转换操作。其定义如下:

F_FUNCTION(on1[=value], on2[=value], ...)

说明:

·F_FUNCTION表示函数的名字,必须以“F_”作为前缀,如加(F_ADD)、减(F_MINUS)、乘(F_MULTIPLY)、除(F_DIVIDE)、求和(F_SUM)、指数(F_EXP)、对数(F_LOG)等。

·on1、on2等用于指定作为函数输入的字段。on后面可以通过等号指定一个值,表示指定输入参数为指定值。on也可以递归地定义为算子或函数。

下面是几个函数的例子:

# 将事件中的amount1字段和amount2字段的值相加

F_ADD(amount1, amount)

# 将事件中的amount字段的值开方

F_POW(amount, n=0.5)

6.宏函数

与C语言中宏函数的作用类似,宏函数可以用一个更简单的式子替换一段具有更复杂功能的代码片段。其定义如下:

M_MACRO(arg1, arg2, ...)说明:

·M_MACRO表示函数的名字,必须以“M_”作为前缀。

·arg1、arg2等用于指定宏函数的参数,在使用宏展开时,这些参数会被实际变量替换掉。

下面是几个宏函数的例子。

# 定义宏M_CORRELATION用于计算on的a和b变量之间相关系数

add-macro --name "M_CORRELATION(time, type, a, b, on)" --replace

"F_DIVIDE(F_MINUS(AVG(time, type, F_MULTIPLY(a, b), on), F_MULTIPLY(AVG(time,

type, a, on), AVG(time, type, b, on))), F_SQRT(F_MULTIPLY(VARIANCE(time, type, a,

on),VARIANCE(time, type, b, on))))"

# 定义宏M_DEVICE_ON_USER_ONE_DAY用于计算一天内同一个user使用的不同device数add-macro --name "M_DEVICE_ON_USER_ONE_DAY(device, user)" --replace"COUNT_DISTINCT(1d, transaction, device, user)"定义好后,宏函数的用法与算子和函数相同。

另外,算子、函数、宏之间可以相互嵌套使用。

7.操作模式

我们特意将更新操作与查询操作分开,因此设定了3种计算模式:

update、get和upget。update模式和get模式分别对应更新模式和查询模式。其中,update模式会更新状态,get模式不会更新状态。upget模式同时包含了update模式和get模式,从而在一些更新并查询的场景下,减少调用特征引擎的次数。

将前面的各个DSL概念汇集起来,可以得到一个完整的流数据特征提取引擎所需要的配置,或者说是脚本。下面就是一个完整的流数据特征提取引擎DSL脚本示例。// 指定应用的名称

config-application app001

// 定义流数据特征提取引擎的输入流

set-source --source '{"zookeeper": "127.0.0.1:2181", "topic": "app001-input",

"group": "test-group", "offset": "largest"}'

// 定义流数据特征提取引擎的输出流

set-sink --sink '{"broker": "127.0.0.1:9092", "topic": "app001-output"}'

// 定义流数据特征提取引擎需要使用的字段,以及这些字段对应在原始消息中的位置

add-field --event-type transaction --field-name c_timestamp --field-path

$.event.timestamp

add-field --event-type transaction --field-name user_id --field-path

$.event.user_id

add-field --event-type transaction --field-name device_id --field-path

$.event.device_id

add-field --event-type transaction --field-name amount --field-path $.event.amount

// 定义一个宏函数

add-macro --name "M_DEVICE_ON_USER_ONE_DAY(device, user)" --replace

"COUNT_DISTINCT(1d, transaction, device, user)"

// 定义流数据特征提取引擎需要计算的特征列表

add-feature --event-type transaction --feature "COUNT(1d, transaction, user_id)" -

-mode upget

add-feature --event-type transaction --feature

"M_DEVICE_ON_USER_ONE_DAY(device_id, user_id)" --mode upget

add-feature --event-type transaction --feature "SUM(1h, transaction, amount,

user_id)" --mode upget

// 激活前面的设置

activate

// 启动流数据特征提取引擎

start

图11-1展示了实时流数据特征提取引擎的工作原理,就像大多数的数据库系统由SQL解析层、执行计划执行层和存储引擎层构成一样,我们的特征引擎也包含3层:DSL解析层、执行计划执行层和状态存储层。

接下来我们具体讨论各层的实现原理。

1.DSL解析层

DSL解析层将DSL解析为执行计划。其中,每个特征被解析为一棵单独的树,因此多个特征被解析为多棵单独的树。每棵树的节点代表一个特征,树上节点之间的父子关系表示特征计算时的依赖关系。执行计划最初由多棵独立的树构成,每个特征定义语句都会生成一棵树。为了避免重复计算相同的特征,需要将所有这些树中的特征按照对应节点在各自树中的深度分组。相同深度的特征被划分到相同的组。

如果同一个特征在不同的树中有不同的深度,就将该特征的深度设定为最大的那个深度值。这样,我们最终得到一个按照深度分组,每组有若干特征的执行计划。之后,我们就可以按照深度由大到小的顺序执行这个执行计划了。

以图11-2展示的特征依赖关系为例,其中定义了4个特征:特征4、特征5、特征6和特征7。它们分别如下:

特征4(特征2)

特征5(特征2, 特征3)

特征6(特征3)

特征7(特征1,特征4(特征2))

其中,括号代表了依赖关系。例如,"特征4(特征2)"表示特征4的计算依赖于先计算出特征2。

这样会生成4个单独的特征依赖树:

特征4: depth=1, 特征2: depth=2

特征5: depth=1, 特征2: depth=2, 特征3: depth=2

特征6: depth=1, 特征3: depth=2

特征7: depth=1, 特征1: depth=2, 特征4: depth=2, 特征2: depth=3

接下来根据深度分组:

depth=1:特征5、特征6、特征7

depth=2:特征1、特征3、特征4

depth=3:特征2

其中,特征2由于被特征7依赖的深度为3,比被特征5依赖的深度(2)更大,故最终设定其深度为3。特征4由于被特征7依赖的深度为2,比其自身的深度(1)更大,故最终设定其深度为2。

至此,我们得到了最终的执行计划。接下来进入执行计划执行层按照这个执行计划计算各个特征的过程。

2.执行计划执行层

执行计划是一个按深度分组的特征集合。执行计划的执行过程是按照深度由大到小依次计算各个分组中的所有特征的。很明显,我们完全可以将这个过程与流计算对应起来。换句话说,这个执行计划不就是一个结构简单的DAG吗?

图11-3执行计划对应的DAG

接下来就是实时流计算发挥作用的时刻了。只需要将每个深度特征集的计算过程设置为流计算过程的一个步骤,就可以用CompletableFuture框架构建基于实时流计算技术的执行计划执行层了。

整个特征提取引擎的实现过程是比较复杂的,在本书中展示全部代码不太可能,所以这里只对主要执行流程进行说明,并略去了许多支线代码。完整的代码参见本书配套源代码。

首先对特征提取引擎DSL的解析,其中最主要的部分是对特征定义语句的解析。特征定义语句是指由算子、函数和宏组成的用于描述一个特征的语句。例如:

COUNT(7d, transaction, device_id)

FLAT_COUNT_DISTINCT(7d, login, device_id, SET(7d, create_account, userid,

device_id))

我们从特征定义语句解析出分词,具体如下:

private static final String TOKEN_SPLIT_PATTERN = "(\\s+)|(\\s*,\\s*)";

public List parseTokens(String dsl, boolean dslNormalized) {

String normDSL = dsl; // 步骤a

// 步骤b。步骤a和步骤b主要是对特征定义字符串dsl进行规整化,如去掉多余的空格

String parserDSL = genDSL4Parser(normDSL);

// 步骤c。进行分词

Scanner s = new Scanner(parserDSL).useDelimiter(TOKEN_SPLIT_PATTERN);

List tokens = new LinkedList;

while (s.hasNext) {

tokens.add(s.next);

}

tokens =

tokens.stream.filter(StringUtils::isNotBlank).collect(Collectors.toList);

return tokens;

}

经过parseTokens后,一个特征定义语句被解析为一组分词。接下来将分词解析为执行树。

public String parseFromTokens(

JSONObject globalSetting,

List tokens,

Map> functionTokens,

Map featureDSLMap,

Map functions) {

List> functionList = new LinkedList;

Stack stack = new Stack; // 步骤a。使用栈进行语法解析

int depth = 0;

for (String token : tokens) {

// 步骤b。当遇到反括号时,说明某个特征(可以是嵌套特征)的定义结束

if (")".equals(token)) {

List newFunctionTokens = new LinkedList;

while (true) {

String lastToken = stack.pop; // 步骤c。取出该特征的所有分词

if ("(".equals(lastToken)) {

break;

}

newFunctionTokens.add(lastToken);

}

newFunctionTokens.add(stack.pop);

// 步骤d。得到某个特征的全部分词

newFunctionTokens = Lists.reverse(newFunctionTokens);

// 步骤e。通过分词解析出单个特征的语法结构对象StreamFQL

StreamFQL newFunction = genFunctionDSL(globalSetting,

newFunctionTokens);

// 步骤f。记录解析出的单个特征的语法结构对象,以及它被最外层特征依赖的深度

functionList.add(new Tuple2(newFunction, depth));

newFunctionTokens.add(1, "(");

newFunctionTokens.add(")");

functionTokens.put(newFunction.getName, newFunctionTokens);

// 步骤g。输出解析出的单个特征语法结构对象

featureDSLMap.put(newFunction.getName, newFunction);

stack.push(newFunction.getName);

depth -= 1; // 步骤h。当遇到正括号时depth加1,当遇到反括号时depth减1

} else {

stack.push(token);

if ("(".equals(token)) {

depth += 1; // 步骤i。当遇到正括号时depth加1,当遇到反括号时depth减1

}

}

}

String topFunctionName = stack.pop;

// ……

for (Tuple2 tuple : functionList) {

Integer fDepth = functions.get(tuple._1);

//步骤j。如果一个特征被最外层特征多次使用,就将该特征的深度设置为最大的深度值

if (fDepth == null || tuple._2 > fDepth) {

functions.put(tuple._1, tuple._2);

}

}

return topFunctionName;

}

private StreamFQL genFunctionDSL(JSONObject globalSetting, List

newFunctionTokens) {

StreamFQL fql = new StreamFQL;

fql.setOp(op);

if (op.startsWith("M_")) { // 步骤a。解析宏函数

// ......

} else if (op.startsWith("F_")) { // 步骤b。解析函数

// ......

}

// 步骤c。 解析算子。newFunctionTokens是除去括号后的分词列表。第0个分词是函数名

else if (newFunctionTokens.size >= 4) {

// 步骤d。根据算子定义,第1个分词time是时间窗口

fql.setWindow(newFunctionTokens.get(1)); // 步骤e。根据算子定义,第2个分词是事件类型

fql.setEvent_type(newFunctionTokens.get(2));

// 步骤f。根据算子定义,第3个分词是target字段

fql.setTarget(parseField(newFunctionTokens.get(3)));

List onList = new ArrayList;

// 步骤g。根据算子定义,target之后接下来是一个或多个on字段

for (int i = 4; i

Field newField = parseField(newFunctionTokens.get(i));

onList.add(newField);

}

// 步骤h。允许设置一些全局默认的on字段

List onDefaults = getOnDefault(globalSetting);

for (Field onDefault : onDefaults) {

onList.add(onDefault);

}

fql.setOn(onList);

} else {

// ......

}

// 步骤i。将分词连接起来进行Base64编码,再加上前缀和后缀,即形成这个特征的名称,供内部使用

String name = String.format("___B___%s___F___",

BaseEncoder.encodeBase64(Joiner.on("&").join(newFunctionTokens)));

fql.setName(name);

List tokens =

newFunctionTokens.stream.map(this::decode).collect(Collectors.toList);

// 步骤j。将分词按字面连接起来,也就是这个特征的完整字面表达式,供阅读使用

String textName = String.format("%s(%s)", tokens.get(0),

Joiner.on(",").join(tokens.subList(1, tokens.size)));

fql.setText_name(textName);

return fql;

}

然后,将多个执行树合并起来,形成最后的执行计划。

public Map> parseExecutionTree(JSONObject globalSetting,

List dsls,

boolean dslNormalized) {

Map macros = new HashMap;

// …… 步骤a。解析宏函数。

// parse dsl

Map> allFunctions = new HashMap;

for (String elem : dsls) {

PreParseInfo preParseInfo = preParse(macros, elem, dslNormalized);

Map functions = new HashMap;

Map featureDSLMap = new HashMap;

String topFunction = parseFromTokens( // 步骤b。将特征定义语句解析为特征语法结构

对象

globalSetting, preParseInfo.tokens, null, featureDSLMap,

functions);

for (Map.Entry entry : functions.entrySet) {

Tuple2 oldFeatureDSL =

allFunctions.get(entry.getKey);

if (oldFeatureDSL == null) {

// 步骤c。将所有特征定义语句解析出来的特征语法结构对象合并起来

allFunctions.put(entry.getKey, new Tuple2(entry.getKey,

entry.getValue)); } else {

// 步骤d。如果某个特征被多次依赖,就将其depth设置为最大的那个depth值

if (entry.getValue > oldFeatureDSL._2) {

entry.getValue));

}

}

}

}

// 步骤e。将所有特征语法结构对象按照depth分组,至此形成最后的执行计划

Map> result = new HashMap;

for (Tuple2 entry : allFunctions.values) {

result.putIfAbsent(String.valueOf(entry._2), new HashSet);

result.get(String.valueOf(entry._2)).add(entry._1);

}

return result; // 步骤f。result就是最后的执行计划

}

至此,我们得到了由DSL解析出来的执行计划。为了方便理解,下面用具体的例子来说明DSL解析过程中各个阶段的输入和输出。

特征定义语句:COUNT(7d, transaction, device_id)

parseTokens输出分词数组:[COUNT, (, 7d, transaction, device_id, )]

parseFromTokens输出:{COUNT:1} // 键为特征,值为特征被依赖的深度

特征定义语句: FLAT_COUNT_DISTINCT(7d, login, device_id, SET(7d, create_account,

userid, device_id))

parseTokens输出分词数组: [FLAT_COUNT_DISTINCT, (, 7d, login, device_id, SET,

(, 7d, create_account, userid, device_id, ), )]

parseFromTokens输出:{FLAT_COUNT_DISTINCT:1, SET:2} // 键为特征,值为特征被依赖的深度

parseExecutionTree输出执行计划:{2:[SET], 1:[COUNT, FLAT_COUNT_DISTINCT]}

// 键为深度,值为属于该深度组的特征集合

接下来实现执行计划执行层。我们使用CompletableFuture框架构建一个实时流计算过程来执行上面得到的执行树。

private CompletableFuture executeAsync(Map>

dslTree,

JSONObject event,

Map helper,

String mode,

String depth,

Map

CompletableFuture>> functionFuturesContainer) {

Map>> currentDepthFunction

Futures = new HashMap;

// 步骤a。将属于同一depth的特征提取任务都提交给专门负责该depth特征计算的执行器执行

for (StreamFQL function : dslTree.get(depth)) {

CompletableFuture> future =CompletableFuture.supplyAsync( -> {

try {

// 步骤b。在此调用具体的特征计算方法

return execute(function, event, helper, mode);

} catch (Exception e) {

return new HashMap;

}

},

// 步骤c。根据depth创建或获取负责该depth特征计算的执行器

ServiceExecutorHolder.getExtracExecutorService(depth));

currentDepthFunctionFutures.put(function, future);

functionFuturesContainer.put(function, future);

}

// 步骤d。相当于fork/join模式中的join部分,将多个并行特征提取任务的结果合并起来

CompletableFuture allFutures = CompletableFuture.allOf(

currentDepthFunctionFutures.values.toArray(new

CompletableFuture[0]));

// 步骤e。FIXED_CONTENT字段用于存储特征提取的结果

CompletableFuture result = allFutures.thenApply(v -> {

event.putIfAbsent(FIXED_CONTENT, new JSONObject);

for (Map.Entry>> entry :

currentDepthFunctionFutures.entrySet) {

StreamFQL function = entry.getKey;

// 步骤f。获取特征提取的结果

Map functionResult = entry.getValue.join;

event.getJSONObject(FIXED_CONTENT).put(function.getName,

functionResult.get("value")); // 步骤g。将特征提取结果附加到消息上

}

return event;

});

if ("1".equals(depth)) {

return result; // 步骤h。如果depth为1,说明这就是DAG中最后一步了

} else {

// 步骤i。如果depth不为1,说明这还不是DAG的最后一步,继续递归下去

// 继续构建DAG的后续步骤,直至最终depth为1为止

return result.thenCompose(v -> {

String newDepth = String.valueOf(Integer.parseInt(depth) - 1);

return executeAsync(dslTree, event, helper, mode, newDepth,

functionFuturesContainer);

});

}

}

如果结合执行计划执行层代码对图11-3中的DAG做更加精细的描述,那么执行计划执行层实现的是图11-4所示的实时流计算过程。

在执行计划执行层流计算过程的每一个步骤中,我们都使用了带反向压力功能的执行器,从而避免了不同深度的特征在计算速度不一致时造成的OOM问题。

private static final Map EXECUTOR_SERVICE_MAP = new

ConcurrentHashMap;public static ExecutorService getExtracExecutorService(String depth) {

if (EXECUTOR_SERVICE_MAP.get(depth) != null) {

return EXECUTOR_SERVICE_MAP.get(depth);

}

synchronized (ServiceExecutorHolder.class) {

}

EXECUTOR_SERVICE_MAP.put(depth, ExecutorHolder.createMultiQueue

ThreadPool(

String.format("extract_service_depth_%s", depth),

getInt("extract_service.executor_number"),

getInt("extract_service.coreSize"),

getInt("extract_service.maxSize"),

getInt("extract_service.executor_queue_capacity"),

getLong("extract_service.reject_sleep_mills")));

}

}

图11-4 特征提取执行层

来源:大数据架构师

相关推荐