.NET 工作流Elsa - 书签

B站影视 2025-01-17 08:50 3

摘要:另外如果你硬要在步骤中直接把某件事做了也行,这种情况一般是在自定义的Activity中去完成具体的业务功能,通常这种参杂特定业务逻辑的Activity复用性不高罢了。

流程引擎的核心关注点是安排流程,

如:第1步做什么 → 第2步做什么 → 第n步做什么...,

至于各步骤具体是怎么做的,是由你来决定的,这不是流程引擎关注的重点。

流程安排可能会涉及到分叉、并行多个线路执行,但这只是流程安排的一种形式。

另外如果你硬要在步骤中直接把某件事做了也行,这种情况一般是在自定义的Activity中去完成具体的业务功能,通常这种参杂特定业务逻辑的Activity复用性不高罢了。

有时候需要在执行到某个Activity时停住,等待另外某个事情完成后,流程再继续执行。 举个简单的请假审批流程的例子:

定义流程

员工发起一个请假单的审核流程,先由部门主管审核,然后部门经理审核,最后流程结束。 流程引擎主要关注流程,即这里的:主管审核 → 部门经理审核,主管审核和经理审核的具体动作可以回调业务系统的api或实现自定义的activity去实现, 比如更新数据库中关联请假单的状态。

填写表单 这跟流程引擎无关,一般是在业务系统中填写表单提交保存到数据库

提交流程 在业务系统中,首先将请假单的状态改为:已进入审核流程,此时跟流程引擎无关。然后调用流程引擎,发起一个执行请假流程,(通常会将请假单的id作为流程引擎的相关性id参数传入)

流程引擎执行

[流程引擎] 开始节点啥也没做,只是表明流程开始执行了,这个好像不是必须的

⭐[流程引擎] 流程进入【主管审核】这个节点,关键就在这里,它做两件事情,1创建一个书签使流程卡在这里,不要继续流转了;2回调业务系统实现主管审核的具体逻辑

⭐[业务系统] 在业务系统去做主管审核的事,也就是将状态改为【主管已审核等待经理审核】然后携带书签调用流程引擎,让流程恢复继续执行辖区

[流程引擎] 流程继续执行,一样的道理会通过书签卡住在经理审核步骤,经理审核后流程继续,最终完成。

在执行某个Activity时,若需要先卡这里等着,就创建一个书签,然后等待外部事情做完后,用刚才这个书签告诉流程引擎继续执行后续步骤。

如果把一个流程看成一本书、把流程中的Activity看成书中每一页,那么书签就是跟真实世界的书签差不多,我看到某一页时,需要去做另一件事时,就在当前页插入一个书签,等我忙完回来后 我翻到书签所在也继续看后续的内容。

书签应用基本流程

Activity核心执行方法定义如下:

ValueTask ExecuteAsync(ActivityExecutionContext context);

当elsa预定义的Activity或我们自定义的Activity执行时,若需要卡住流程则创建书签,调用context.CreateBookmark即可。

在ExecuteAsync执行完成后,context中的书签会被复制到WorkflowExecutionContext中,由于当前Activity是卡住的,所以当前流程的本次执行就结束(虽然整个流程未结束,还在等下次从书签恢复)

流程引擎有个书签持久化中间件,会从当前流程的WorkflowExecutionContext中取出书签,并做持久化存储。

当业务系统完成工作后,携带书签id再次执行此流程时,流程引擎会从持久化中获取书签,并根据书签信息从上次卡住的Activity恢复流程实例继续执行。

携带书签恢复流程继续执行有多种方式,本文【恢复】节会说明。

上面基本流程提到,书签有两种数据格式,一种是存储在流程执行上下文WorkflowExecutionContext中的书签,它是用Bookmark类表示的;另一种是持久化存储的用StoredBookmark表示。 这两种格式差不多,比较持久化是通过上下文中的转换来的, 下面描述书签的几个关键属性:

属性描述id书签实例的唯一idWorkflowInstanceId书签所属的工作流实例的idActivityTypeName书签卡住的Activity类型的名称ActivityInstanceId书签卡住的Activity实例的idHash书签的hash值CorrelationId相关性id(通常关联到业务系统的业务实体id,如:请假单的id)Payload书签数据,不同的activity在创建和恢复书签之间传递数据Metadata自定义的一些额外数据CreatedAt创建时间

创建书签时,可以在Payload和Metadata存储复杂数据,以便在后续书签恢复时,从书签中获取这些数据,属于一种参数传递手段。

它俩的区别我也没看懂,不过猜测Payload是跟所在Activity要完成的任务的业务相关的,而Metadata应该是对书签本身属性的一种扩充手段。 意思是我们自定义Activity时,通常使用Payload,而流程引擎可能会使用Metadata来实现一些特殊功能。

创建书签

书签是在流程实例执行到某个Activity时创建的。

一些elsa预定义的Activity本身就会创建书签,如:RunTask,当流程流转到 这种类型的节点时,它会创建一个书签,使流程卡在这里,然后会触发一个事件,我们的代码可以订阅此事件执行业务逻辑,之后通过书签使流程继续。

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
// Create bookmark.
var taskName = TaskName.Get(context); //从设计器中配置的任务名称获取任务名称字符串
var identityGenerator = context.GetRequiredService;
var taskId = identityGenerator.GenerateId;//创建一个任务的唯一id
var stimulus = new RunTaskStimulus(taskId, taskName);//暂时理解此对象表示一个要执行的任务,作为bookmark的payload存储起来了。
context.CreateBookmark(stimulus, ResumeAsync, includeActivityInstanceId: false);//重点是这里指定了 当有人携带一个书签请求此流程时,将恢复执行的方法

// Dispatch task request. 后续会触发事件,不是描述书签的重点,请查看官方文档
var taskParams = Payload.GetOrDefault(context);
var runTaskRequest = new RunTaskRequest(context, taskId, taskName, taskParams);
var dispatcher = context.GetRequiredService;

await dispatcher.DispatchAsync(runTaskRequest, context.CancellationToken);
}
//通过书签恢复此流程时,流程会从这里继续执行下去
private async ValueTask ResumeAsync(ActivityExecutionContext context)
{
var input = context.GetWorkflowinput(InputKey);
context.Set(Result, input);
await context.CompleteActivityAsync;
}

另外就是我们自定义的Activity了,如果需要让流程卡这里,后面再通过书签恢复流程继续,可以在其ExecuteAsync方法中调用 ActivityExecutionContext的CreateBookmark方法创建书签,此方法有多个重载,不过最核心的重载定义如下,注意查看注释:

public Bookmark CreateBookmark(CreateBookmarkArgs? options = default)
{
var payload = options?.Stimulus;//通过存储参数到这里,以便在通过书签恢复流程时从中获取此数据,属于一种参数传递方式
var callback = options?.Callback;//当后续通过此书签恢复流程继续时,回调这里指定的方法,不指定的话,默认执行流程书签所在Activity的下一个Activity
var bookmarkName = options?.BookmarkName ?? Activity.Type;//书签名称
var bookmarkHasher = GetRequiredService;
var identityGenerator = GetRequiredService;
var includeActivityInstanceId = options?.IncludeActivityInstanceId ?? true;
var hash = bookmarkHasher.Hash(bookmarkName, payload, includeActivityInstanceId ? Id : );//书签hash值
var bookmarkId = options?.BookmarkId ?? identityGenerator.GenerateId;//书签唯一id

var bookmark = new Bookmark(
bookmarkId,
bookmarkName,
hash,
payload,
Activity.Id,
ActivityNode.NodeId,
Id,
_systemClock.UtcNow,
options?.AutoBurn ?? true,
callback?.Method.Name,
options?.AutoComplete ?? true,
options?.Metadata);

AddBookmark(bookmark);//存储到当前上下文中
return bookmark;
}

这里注意,可以指定将来流程恢复时,从callback指定的委托处开始执行,否则会直接执行书签所在Activity的下一个Activity。

删除书签

Activity无论是自动完成还是外部调用完成,总会调用ActivityExecutionContext的CompleteActivityAsync方法,内部会清空当前Activity的书签, 之后书签持久化中间件PersistBookmarkMiddleware会删除书签。

书签存储(持久化)

通过上述步骤创建的书签对象开始存储在当前ActivityExecutionContext中,之后会被转存到WorkflowExecutionContext。 elsa的执行引擎类似asp.net core的中间件管道模型,在执行流程 【前/后】 会执行一系列中间件,其中PersistBookmarkMiddleware中间件就是在流程执行后 从WorkflowExecutionContext提取并存储书签到数据库或其它持久化,同时还会触发书签变动事件WorkflowBookmarksIndexed,后续的书签调度会订阅此事件。

PersistBookmarkMiddleware会同时存储新的书签,也会删除已经从WorkflowExecutionContext移除的书签。

我们可以携带书签信息主动调用流程引擎来让指定流程实例从书签处继续执行,某些时候可以安排一个后台任务,让书签在指定时间自动恢复执行。 这种在后台安排一个作业,以在指定时间通过书签恢复指定流程实例继续执行的事就是书签调度。

书签调度的基本流程如下:

在流程定义中放一个Activity节点,它必须是Delay、StartAt、Timer、Cron中的一种

这种Activity被执行时会创建书签,并在书签的payload携带跟时间相关参数

根据本文前面说的“存储(持久化)”会保存书签到数据库,并触发WorkflowBookmarksIndexed事件

事件订阅器ScheduleWorkflows会调用书签调度器IWorkflowScheduler去后台作业中安排书签恢复任务

安排时从payload中获取安排时间,并调用elsa的后台作业框架安排后台作业。

当后台作业执行时,会根据书签信息恢复流程继续,这个在文章中的“恢复”节描述。

elsa内置的能自动恢复的4种Activity

Delay:当执行到此类型的Activity时,内部会创建一个书签,并在延迟指定时长后自动根据此书签恢复。

StartAt:当执行到此类型的Activity时,内部会创建一个书签,并在到达指定时间点时自动根据此书签恢复。

Timer:当执行到此类型的Activity时,内部会创建一个书签,并在到达指定时间点时自动根据此书签恢复。

Cron:与Timer类似,只不过是按Cron表达式的周期携带书签恢复流程执行。

这几个Activity挺特殊,其中Timer、StartAt、Cron是触发器,同时触发器本就是特殊的Activity,而Delay仅仅是普通的Activity

书签调度器

由IBookmarkScheduler表示,默认实现为DefaultBookmarkScheduler,它仅关注前面说的4种类型的Activity创建的书签,书签调度器做如下两件事:

安排后台作业,在指定时间点执行 根据书签恢复流程实例

删除指定书签的后台作业的安排

触发器调度仅关注Timer、StartAt、Cron,并且总是调度执行一个新的流程实例;而书签调度除了关注这3个外还多一个Delay,并且总是调度作业,通过书签恢复现有流程实例的执行触发器是在流程定义发布后调度的;而书签调度是Activity执行后,持久化书签时触发调度的。

Timer、Cron、StartAt

触发器是一个特殊的Activity,以Timer为例,它就是一个触发器,当定义的流程发布时,elsa会从流程中提取所有触发器节点,并保存在数据库中, 以便将来需要触发器时直接从数据库中获取,而不是又去找到流程并提取一次,这也可以称为触发器索引化。

Timer触发器也需要安排回台作业的,以便在固定时间到达时自动触发流程执行,关键在于这里的触发流程执行是每次都会创建新流程实例。

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
//当前流程被执行,是否是因为当前节点触发的
if(context.IsTriggerOfWorkflow)
{
//若是,则直接完成,以便流程后续节点立即执行
await context.CompleteActivityAsync;
return;
}

var clock = context.ExpressionExecutionContext.GetRequiredService;
var timeSpan = context.ExpressionExecutionContext.Get(Interval);
var resumeAt = clock.UtcNow.Add(timeSpan);

context.JournalData.Add("ResumeAt", resumeAt);
context.CreateBookmark(new TimerBookmarkPayload(resumeAt));
}

若当前流程就是当前触发器自己触发执行的,则直接让当前Activity完成,以便流程后续节点立即执行。if(context.IsTriggerOfWorkflow)就是做这事的。假设Timer是流程首个节点,每隔指定时间, 流程都会被执行,且每次都会创建新的流程实例。

若当前执行不是被Timer自己这个节点触发的,说明流程是流转到此节点的,此时会创建书签,按本文之前的说明,此书签会被安排后台任务,以便在指定时长后自动恢复,此时就跟延迟固定时间后恢复一样,所以Timer在书签中并不会一直执行,而是每次仅仅恢复自己这个流程实例。

书签匹配模式

有时候我们想触发书签恢复流程,但又不想或不方便直接调用api,可以定义一个书签的filter参数(BookmarkQueueItem)来描述我们希望恢复的书签,

书签参数BookmarkQueueItem

它其实定义的时书签filter参数,当系统中出现书签时,若与这里定义的过滤条件匹配时,则自动恢复书签。

入队IBookmarkQueue

默认实现StoreBookmarkQueue

public async Task EnqueueAsync(NewBookmarkQueueItem item, CancellationToken cancellationToken = default)
{
//使用书签过滤条件描述对象item创建一个书签过滤器
var filter = new BookmarkFilter
{
BookmarkId = item.BookmarkId,
Hash = item.StimulusHash,
WorkflowInstanceId = item.WorkflowInstanceId,
ActivityTypeName = item.ActivityTypeName
};
//尝试立即恢复此书签
var result = await resumer.ResumeAsync(filter, item.Options, cancellationToken);
if (result.Matched)
{
//若确实执行成功说明,说明已经直接恢复书签了,这里直接返回即可
logger.LogDebug("Successfully resumed workflow instance {WorkflowInstance} using bookmark {BookmarkId} for activity type {ActivityType}", item.WorkflowInstanceId, item.BookmarkId, item.ActivityTypeName);
return;
}

// There was no matching bookmark yet. Store the queue item for the system to pick up whenever the bookmark becomes present.
logger.LogDebug("No bookmark with ID {BookmarkId} found for workflow {WorkflowInstance} for activity type {ActivityType}. Adding the request to the bookmark queue", item.BookmarkId, item.WorkflowInstanceId, item.ActivityTypeName);

var entity = new BookmarkQueueItem
{
Id = identityGenerator.GenerateId,

StimulusHash = item.StimulusHash,
ActivityInstanceId = item.ActivityInstanceId,
ActivityTypeName = item.ActivityTypeName,
Options = item.Options,
CreatedAt = systemClock.UtcNow,
};
//否则持久化此书签过滤条件,IBookmarkQueueProcessor会在后台线程中来消费这个队列
await store.AddAsync(entity, cancellationToken);

// Trigger the bookmark queue processor.
// 触发一个信号,通知消费端,有新的书签恢复请求
await bookmarkQueueSignaler.TriggerAsync(cancellationToken);
}
消费IBookmarkQueueProcessor

IBookmarkQueueWorker是个死循环后台任务,它里面等待书签变动信号,一旦有信号,它就开始调用BookmarkQueueProcessor

private async Task ProcessItemAsync(BookmarkQueueItem item, CancellationToken cancellationToken = default)
{
//根据参数创建过滤器
var filter = item.CreateBookmarkFilter;
var options = item.Options;

logger.LogDebug("Processing bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} for activity type {ActivityType}", item.Id, item.WorkflowInstanceId, item.ActivityTypeName);
//尝试恢复书签
var result = await bookmarkResumer.ResumeAsync(filter, options, cancellationToken);

if (result.Matched)
{
//若成功则从书签参数队列中删除
logger.LogDebug("Successfully resumed workflow instance {WorkflowInstance} using bookmark {BookmarkId} for activity type {ActivityType}", item.WorkflowInstanceId, item.BookmarkId, item.ActivityTypeName);
await store.DeleteAsync(item.Id, cancellationToken);
}
else
{
//否则,等待下次尝试
logger.LogDebug("No matching bookmark found for bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} for activity type {ActivityType}", item.Id, item.WorkflowInstanceId, item.ActivityTypeName);
}
}
书签队列工作者IBookmarkQueueWorker

它内部搞个死循环等信号,信号一来,立即调用IBookmarkQueueProcessor消费。

书签队列信号IBookmarkQueueSignaler

貌似是书签入队与消费之间的一个信号器,在入队后触发这个信号,消费看到信号就马上消费下。 书签本身变动时,也会触发它

队列清理IBookmarkQueuePurger

它也是个后台任务,默认实现是DefaultBookmarkQueuePurger,它不管啥情况,太老的数据直接清理,无论是否成功恢复书签

多种持久化方式IBookmarkQueueStore

内存、ef、dapper等方式

elsa内部的使用场景

直接看哪些地方引用IBookmarkQueue就晓得,下面只说一个 ResumeExecuteWorkflowActivity它订阅事件,当一个流程执行完成时,它尝试去恢复ExecuteWorkflow这个Activity卡住的活动,也就是子流程执行完成后,通知父活动继续。

书签恢复

如论是执行一个新的流程实例,还是使用书签恢复执行一个已有流程实例,最底层的接口都是IWorkflowRunner.RunAsync,只不过若是通过书签恢复流程实例继续执行时传递参数中要包含BookmarkId。

如下方式内部本质都是调用IWorkflowRunner

通过IWorkflowRuntime创建IWorkflowClient,默认实现LocalWorkflowClient(额外模块的实现:DistributedWorkflowClient、ProtoActorWorkflowClient )后调用其RunInstanceAsync

通过IWorkflowStarter的默认实现DefaultWorkflowStarter,内部是调用的IWorkflowRuntime,然后创建IWorkflowClient,最后调用执行流程

通过IBookmarkResumer的默认实现BookmarkResumer,内部也是调用的IWorkflowRuntime,然后创建IWorkflowClient,最后调用执行流程

调用elsa的http api 也可以使用书签恢复流程执行,内部也一样

前面提到的“书签匹配模式”

前面提到的“调度”会自动恢复书签

看命名也懂,若我们要编程的方式恢复,可以优先考虑使用IBookmarkResumer,或者用“书签匹配模式”中的IBookmarkQueue若我们是独立部署elsa,业务系统与elsa服务器分开的,则可以使用http api方式通过书签通知elsa恢复已有的流程实例。

另外在Activity中创建书签时,若未指定书签恢复时要执行的委托,则自动执行书签所在Activity的下一个Activity,否则就执行这个指定的委托。

来源:opendotnet

相关推荐