【橙子老哥】C# 硬核ShardingCore执行流程源码解读

B站影视 2024-12-20 08:42 2

摘要:sharding-core已被NCC收编,足以证明项目的质量,薛家明先生一直在主力维护着,为EFCore在分表分库下提供一个成熟的解决方案薛家明先生撰写了非常多的有深度的文章博客,每一篇都可以学到很多东西,可以看出,是真心把知识分享了出去,但是本人却非常低调薛

hello,好久不见,大家好,欢迎来到橙子老哥的分享时刻,希望大家一起学习,一起进步。

欢迎加入.net意社区,第一时间了解我们的动态,文章第一时间分享至社区

社区官方地址:添加微信加入官方微信群:首先在这里,非常感谢薛家明前辈,本章分享的内容,就是他的开源巨作

sharding-core:https://github.com/dotnetcore/sharding-core

我很佩服的他的愿意有三个:

sharding-core已被NCC收编,足以证明项目的质量,薛家明先生一直在主力维护着,为EFCore在分表分库下提供一个成熟的解决方案

薛家明先生撰写了非常多的有深度的文章博客,每一篇都可以学到很多东西,可以看出,是真心把知识分享了出去,但是本人却非常低调

薛家明先生不仅仅局限于.net,在java领域,同样开源了一个重量级优雅ORM项目:easy-query,设计方式与我想法不谋而同

献给转java的c#和java程序员的数据库orm框架 easy-query:https://github.com/dromara/easy-query

771630778,非常推荐大家加入,可以学习很多,本篇文章,比较硬核,就一起来探究下ShardingCore的源码吧2、入口

众所皆知,efcore的代码出了名的难啃,不光量大,而且东西也特别抽象

对应的shardingcore的内容也非常非常多,而且很多地方借鉴了efcore的设计,所以阅读起来会比较有难度

本章只是对它的执行流程做一个大概的解读,不会精细到每一行

不过知道了大概的流程原理,等想看的时候或者出问题的时候,再去排查也是一个不错的选择

ShardingCore 一款efcore下高性能、轻量级针对分表分库读写分离的解决方案,具有零依赖、零学习成本、零业务代码入侵。

既然是基于EFcore的零业务代码入侵,那肯定要对Efcore进行扩展替换,我们直接从入口找找

在ShardingCoreExtension中,我们新增了shardingcode的服务,这里做了大量的依赖注入,其中UseDefaultSharding是对EFCORE的扩展
services.AddDbContext(UseDefaultSharding, contextLifetime,
optionsLifetime);
最终,我们执行到:

public static DbContextOptionsBuilder UseSharding(
this DbContextOptionsBuilder optionsBuilder, IShardingRuntimeContext shardingRuntimeContext)
{
return optionsBuilder
.UseShardingWrapMark
.UseShardingMigrator
.UseShardingOptions(shardingRuntimeContext)
.ReplaceService
.ReplaceService
.ReplaceService
.ReplaceService
.ReplaceService
.ReplaceService;
}
这里可以看到,替换了EfCore的非常多的服务,而今天,我们讲最核心的对象:IQueryCompiler这个对象,是Efcore查询进行编译执行的内容,所以我们想知道具体shardingcode在查询的时候执行了什么,就看看EfCore对外提供了,IQueryCompiler接口,ShardingQueryCompiler是他的实现

所以,请记住,无论做了什么,最终返回的结果,就是为了Execute方法返回的

public interfaceIQueryCompiler
///
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
///
TResultExecute(Expressionquery);

///
///This is an internal API that supports the Entity Framework Core infrastructure and not subject to
///the same compatibility standards as public APIs. It may be changed or removed without notice in
///any release. You should only use it directly in your code with extreme caution and knowing that
///doing so can result in application failures when updating to a new Entity Framework Core release.
///
TResult ExecuteAsync(Expression query, CancellationToken cancellationToken);

ShardingQueryCompiler包了一层,我们继续:

private readonly IShardingCompilerExecutor _shardingCompilerExecutor;
public override TResult Execute(Expression query)
{
return _shardingCompilerExecutor.Execute(_shardingDbContext, query);
}
看看IShardingCompilerExecutor public TResult Execute(IShardingDbContext shardingDbContext, Expression query)
{
//预解析表达式,通过ShardingQueryPrepareVisitor解析Expression query
var prepareParseResult = _prepareParser.Parse(shardingDbContext,query);
_logger.LogDebug($"compile parameter:{prepareParseResult}");
using (new CustomerQueryScope(prepareParseResult,_shardingRouteManager))
{
//根据上面预解析表达式,获取出上下文
//这里可以解析出QueryCompilerContext、MergeQueryCompilerContext两种类型
//后面会根据类型的不一样,选择对应的处理逻辑
var queryCompilerContext = _queryCompilerContextFactory.Create(prepareParseResult);
//再包一层,IShardingTrackQueryExecutor,把解析的上下文给了它
return _shardingTrackQueryExecutor.Execute(queryCompilerContext);
}
}

上面,我们可以看到,将解析的结果上下文传入IShardingTrackQueryExecutor,它的实现是:DefaultShardingTrackQueryExecutor,我们看看做了什么

public TResult Execute(IQueryCompilerContext queryCompilerContext)
{
var queryCompilerExecutor = queryCompilerContext.GetQueryCompilerExecutor;
if (queryCompilerExecutor == )
{
//上下文类型是需要合并的
if (queryCompilerContext is IMergeQueryCompilerContext mergeQueryCompilerContext)
{
//走IShardingQueryExecutor的处理逻辑
return _shardingQueryExecutor.Execute(mergeQueryCompilerContext);
}
thrownew ShardingCoreNotFoundException(queryCompilerContext.GetQueryExpression.ShardingPrint);
}

//如果有原生的queryCompilerExecutor,直接走原生的EFCORE,不需要走shardingcode了
//由于上面有return,上面的逻辑和下面的逻辑只会走一个
//native query
var result = queryCompilerExecutor.GetQueryCompiler.Execute(queryCompilerExecutor.GetReplaceQueryExpression);
//native query track
return ResultTrackExecute(result, queryCompilerContext, TrackEnumerable, Track);

}
到这里,我们需要处理的合并表的逻辑,就要看 public TResult Execute(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
//如果根表达式为tolist toarray getenumerator等表示需要迭代
if (mergeQueryCompilerContext.IsEnumerableQuery)
{
//需要迭代的,走迭代的处理
return EnumerableExecute(mergeQueryCompilerContext);
}
//其他走单独处理
return DoExecute(mergeQueryCompilerContext, false, default);
}

上面的逻辑中,对执行语句进行区分,像tolist、toarray,与first、max等处理是不一样的,我们先看一下简单的单个操作

private TResult DoExecute(IMergeQueryCompilerContext mergeQueryCompilerContext, boolasync,
CancellationToken cancellationToken = new CancellationToken)
{
//根据表达式解析的方法结果,直接找到对应的处理引擎,这里我们挑一个,max为例子,看看它是怎么处理的
var queryMethodName = mergeQueryCompilerContext.GetQueryMethodName;
switch (queryMethodName)
{
case nameof(Enumerable.First):
return EnsureResultTypeMergeExecute(typeof(FirstSkipAsyncInMemoryMergeEngine),
mergeQueryCompilerContext, async, cancellationToken);
case nameof(Enumerable.FirstOrDefault):
return EnsureResultTypeMergeExecute(typeof(FirstOrDefaultSkipAsyncInMemoryMergeEngine),

case nameof(Enumerable.Last):
return EnsureResultTypeMergeExecute(typeof(LastSkipAsyncInMemoryMergeEngine),

case nameof(Enumerable.LastOrDefault):
return EnsureResultTypeMergeExecute(typeof(LastOrDefaultSkipAsyncInMemoryMergeEngine),

case nameof(Enumerable.Single):
return EnsureResultTypeMergeExecute(typeof(SingleSkipAsyncInMemoryMergeEngine),

case nameof(Enumerable.SingleOrDefault):
return EnsureResultTypeMergeExecute(typeof(SingleOrDefaultSkipAsyncInMemoryMergeEngine),

case nameof(Enumerable.Count):
return EnsureResultTypeMergeExecute(typeof(CountAsyncInMemoryMergeEngine),

case nameof(Enumerable.LongCount):
return EnsureResultTypeMergeExecute(typeof(LongCountAsyncInMemoryMergeEngine),

case nameof(Enumerable.Any):
return EnsureResultTypeMergeExecute(typeof(AnyAsyncInMemoryMergeEngine),

case nameof(Enumerable.All):
return EnsureResultTypeMergeExecute(typeof(AllAsyncInMemoryMergeEngine),

case nameof(Enumerable.Max):
return EnsureResultTypeMergeExecute2(typeof(MaxAsyncInMemoryMergeEngine),

case nameof(Enumerable.Min):
return EnsureResultTypeMergeExecute2(typeof(MinAsyncInMemoryMergeEngine),

case nameof(Enumerable.Sum):
return EnsureResultTypeMergeExecute2(typeof(SumAsyncInMemoryMergeEngine),

case nameof(Enumerable.Average):
return EnsureResultTypeMergeExecute3(typeof(AverageAsyncInMemoryMergeEngine),

case nameof(Enumerable.Contains):
return EnsureResultTypeMergeExecute(typeof(ContainsAsyncInMemoryMergeEngine),

#if EFCORE7|| EFCORE8
case nameof(RelationalQueryableExtensions.ExecuteUpdate):
return EnsureResultTypeMergeExecute(typeof(ExecuteUpdateAsyncMemoryMergeEngine),

case nameof(RelationalQueryableExtensions.ExecuteDelete):
return EnsureResultTypeMergeExecute(typeof(ExecuteDeleteAsyncMemoryMergeEngine),

#endif
}

thrownew ShardingCoreException(
$"db context operator not support query expression:[{mergeQueryCompilerContext.GetQueryExpression.ShardingPrint}] result type:[{typeof(TResult).FullName}]");
}

上面,根据表达式解析的方法结果,直接找到对应的处理引擎,这里我们挑一个,max为例子,不过先不着急,因为到了这里,我们都没有到shardingcode的核心逻辑

是的,这就是比较难啃的原因之一,套娃非常多,你以为马上出来了,结果是还没进入

上面,我们先记着,获取到了max的处理引擎,至于真正shardingcode核心最重的逻辑,在EnumerableExecute

private TResult EnumerableExecute(IMergeQueryCompilerContext mergeQueryCompilerContext)
{
var combineQueryable = mergeQueryCompilerContext.GetQueryCombineResult.GetCombineQueryable;
var queryEntityType = combineQueryable.ElementType;
var streamMergeContext = GetStreamMergeContext(mergeQueryCompilerContext);

Type streamMergeEngineType = typeof(AsyncEnumeratorStreamMergeEngine);
streamMergeEngineType = streamMergeEngineType.MakeGenericType(queryEntityType);
return (TResult)Activator.CreateInstance(streamMergeEngineType, streamMergeContext);
}
同样是返回一个处理引擎,但是这里又比较特殊,又交给AsyncEnumeratorStreamMergeEngine上面,我们为了获取到可枚举的引擎,在中的GetAsyncEnumerator方法 public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken)
{
//做个校验
cancellationToken.ThrowIfCancellationRequested;
if (!_mergeContext.TryPrepareExecuteContinueQuery( => new EmptyQueryEnumerator,
outvar emptyQueryEnumerator))
{
return emptyQueryEnumerator;
}
//再包一层,通过EnumeratorStreamMergeEngineFactory去创建
var asyncEnumerator = EnumeratorStreamMergeEngineFactory.Create(_mergeContext).GetStreamEnumerable
.GetAsyncEnumerator(cancellationToken);

if (_mergeContext.IsUseShardingTrack(typeof(T)))
{
returnnew AsyncTrackerEnumerator(_mergeContext.GetShardingDbContext, asyncEnumerator);
}

return asyncEnumerator;
}

我们继续看,EnumeratorStreamMergeEngineFactory下如何创建的,也是在它的.GetStreamEnumerable方法中

以下,便是获取处理可枚举的执行引擎逻辑,下面的流程大致可以简化为:

是否存在空路由

本次查询是否要聚合

是否启用高性能分页

是否存在order字段

是否顺序分片配置

是否符合顺序分片

最后都走完,执行DefaultEnumerableShardingMerger

public IStreamEnumerable GetStreamEnumerable
{
if (_streamMergeContext.IsRouteNotMatch)
{
returnnew EmptyShardingEnumerable(_streamMergeContext);
}
if (_streamMergeContext.UseUnionAllMerge)
{
returnnew DefaultShardingEnumerable(_streamMergeContext);
}
var queryMethodName = _streamMergeContext.MergeQueryCompilerContext.GetQueryMethodName;
switch (queryMethodName)
{
case nameof(Enumerable.First):
case nameof(Enumerable.FirstOrDefault):
return new FirstOrDefaultShardingEnumerable(_streamMergeContext);
case nameof(Enumerable.Single):
case nameof(Enumerable.SingleOrDefault):
return new SingleOrDefaultShardingEnumerable(_streamMergeContext);
case nameof(Enumerable.Last):
case nameof(Enumerable.LastOrDefault):
return new LastOrDefaultShardingEnumerable(_streamMergeContext);
}

//未开启系统分表或者本次查询涉及多张分表
if (_streamMergeContext.IsPaginationQuery && _streamMergeContext.IsSingleShardingEntityQuery &&
_shardingPageManager.Current != )
{
//获取虚拟表判断是否启用了分页配置
var shardingEntityType = _streamMergeContext.GetSingleShardingEntityType;
if (shardingEntityType == )
thrownew ShardingCoreException($"query not found sharding data source or sharding table entity");

if (_streamMergeContext.Orders.IsEmpty)
{
//自动添加属性顺序排序
//除了判断属性名还要判断所属关系
var mergeEngine = DoNoOrderAppendEnumeratorStreamMergeEngine(shardingEntityType);
if (mergeEngine != )
return mergeEngine;
}
else
{
var mergeEngine = DoOrderSequencePaginationEnumeratorStreamMergeEngine(shardingEntityType);

if (mergeEngine != )
return mergeEngine;
}
}

}

埋个坑,如果你能看懂到这里,你已经战胜很多人了,确实是比较硬核

我们把前面的过程理一理,说简单一点,它通过预编译结果表达式解析,组装了预解析结果,然后通过一层一层一层一层的套娃,层层将各种情况进行解析,最后!只是为了返回一个对应处理的引擎

所以!真正执行的逻辑,我们还没有找到,我们走完的,只是一个创建引擎的大型工厂

接下来,我们前面预留的max引擎来看看,引擎是做了啥的

private async Task ExecuteAsync(CancellationToken cancellationToken = new CancellationToken)
{
cancellationToken.ThrowIfCancellationRequested;
if (!GetStreamMergeContext.TryPrepareExecuteContinueQuery( => default(TR), outvar tr))
{
return tr;
}
//获取sql执行的路由最小单元
var defaultSqlRouteUnits = GetDefaultSqlRouteUnits;
//创建执行者
var executor = CreateExecutor;
//执行
var result = await ShardingExecutor.ExecuteAsync>(GetStreamMergeContext, executor, true, defaultSqlRouteUnits, cancellationToken).ConfigureAwait(false);
//返回真正的,处理结果,就是我们要的max
return result.QueryResult;
}

在上面的引擎执行中,结果已经查询出来了,我们要叮住两个点

创建执行者CreateExecutor

ShardingExecutor执行ExecuteAsync

上面的第二步,把第一步创建的executor 传入了,我们要看看在ShardingExecutor.ExecuteAsync中,对executor 做了什么

紧盯executor,看看

public staticasync Task ExecuteAsync(StreamMergeContext streamMergeContext,
IExecutor executor, boolasync, IEnumerable sqlRouteUnits,
CancellationToken cancellationToken = new CancellationToken)
{
//创建分组结果,返回的是多个task,这里把执行者丢到这里了
var resultGroups =
Execute0(streamMergeContext, executor, async, sqlRouteUnits, cancellationToken).ToArray;
//看到了,TaskHelper.WhenAllFastFail,开启多线程,将上面的多个task结果进行执行
var results = (await TaskHelper.WhenAllFastFail(resultGroups).ConfigureAwait(false)).SelectMany(o => o)
.ToList;
if (results.IsEmpty)
thrownew ShardingCoreException("sharding execute result empty");
//再次通过executor,去执行合并,然后返回实际结果
var streamMerge = executor.GetShardingMerger.StreamMerge(results);
return streamMerge;
}

在上面代码中,executor先丢给了Execute0,获取到多个执行task,随后又执行.GetShardingMerger.StreamMerge,将多个结果进行合并

别晕,挺住,马上了,我们继续看Execute0,下面核心是做了

private static Task> Execute0(StreamMergeContext streamMergeContext,
IExecutor executor, boolasync, IEnumerable sqlRouteUnits,
CancellationToken cancellationToken = new CancellationToken)
{
//此处省略一大堆代码,下面这里,是执行了executor的ExecuteAsync
var waitTaskQueue = ReOrderTableTails(streamMergeContext, sqlRouteUnits)
.GroupBy(o => o.DataSourceName)
.Select(o => GetSqlExecutorGroups(streamMergeContext, o))
.Select(dataSourceSqlExecutorUnit =>
{
return Task.Run(async =>
{
returnawait executor.ExecuteAsync(async, dataSourceSqlExecutorUnit,cancellationToken)}).ToArray;
return waitTaskQueue;
}

到这里,基本大致不差,还有两步

executor开线程执行ExecuteAsync,最后通过

executor.GetShardingMerger.StreamMerge(results)

将多个结果进行合并

第一步:executor去执行ExecuteAsync,我们看看Max的 这里执行了一堆东西之后最终返回的是,MaxMethodExecutor,没错,执行引擎里面还要套多个方法执行器,它是AbstractMethodWrapExecutor的子类

它的执行流程,这里由于非常多的套娃操作,这里由于文章限制,我们带过一下

ExecuteAsync0

GroupExecuteAsync

ExecuteUnitAsync

EFCoreQueryAsync

最终,是执行的,这个方法在是:

没错,执行查询的原理,就是efcore IQueryable直接查询,下面不过是对类型进行封装下

protected override Task EFCoreQueryAsync(IQueryable queryable, CancellationToken cancellationToken = new CancellationToken)
{
var resultType = typeof(TEntity);
if (!resultType.IsableType)
{
if (typeof(decimal) == resultType)
{
return queryable.As>.Select(o => (decimal?)o).MaxAsync(cancellationToken).As>;
}
if (typeof(float) == resultType)
{
return queryable.As>.Select(o => (float?)o).MaxAsync(cancellationToken).As>;
}
if (typeof(int) == resultType)
{
return queryable.As>.Select(o => (int?)o).MaxAsync(cancellationToken).As>;
}
if (typeof(long) == resultType)
{
return queryable.As>.Select(o => (long?)o).MaxAsync(cancellationToken).As>;
}
if (typeof(double) == resultType)
{
return queryable.As>.Select(o => (double?)o).MaxAsync(cancellationToken).As>;
}

thrownew ShardingCoreException($"cant calc max value, type:[{resultType}]");
}
else
{
return queryable.As>.MaxAsync(cancellationToken).As>;
}
}

好了,接下来,我们看看下一步

第二步:一样的我们看看max的StreamMerge public RouteQueryResult StreamMerge(List> parallelResults)
{
var routeQueryResults = parallelResults.Where(o => o.HasQueryResult).ToList;
if (routeQueryResults.IsEmpty)
throw new InvalidOperationException("Sequence contains no elements.");
//将多个线程查询的所有结果,max下
var min = routeQueryResults.Max(o => o.QueryResult);
return new RouteQueryResult(, , min);
}

莫得了,看到这里,已经完成了max查询的一个闭环,就是这么的朴素无华

7、心得

非常感谢,你能花费大量的时间,阅读到这里,能坚持阅读到这里的人,并且将内容看完的人真不多,东西也确实比较硬核,不一定要全部弄懂每一处细节,能学习到一些设计核思想,就算进步了

也许当下,囫囵吞枣,望之后能想起,有这么一篇文章解读了SharingCode的源码,有这么一个人值得尊敬:薛家明前辈,那就够了

SharingCode的源码比较优秀,设计的也非常巧妙,值得一读

来源:opendotnet

相关推荐