摘要:时间轮出自Netty,是一个环形结构,可以用时钟来类比,钟面上有很多bucket,每一个bucket上可以存放多个任务,使用一个List保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应bucket决定应该放入哪个bucket。和Has
这是一个或许对你有用的开源项目
国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。
废话少说,直接进入正题。
相信大家对 XXL-JOB 都很了解,故本文对源码不进行过多介绍,侧重的是 看源码过程中想到的几个知识点 ,不一定都对,请大神们批评指正。基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
时间轮出自 Netty ,是一个环形结构,可以用时钟来类比,钟面上有很多 bucket ,每一个 bucket 上可以存放多个任务,使用一个 List 保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应 bucket 决定应该放入哪个 bucket 。和 HashMap 的原理类似, newTask 对应 put ,使用 List 来解决 Hash 冲突。 以上图为例,假设一个 bucket 是1秒,则指针转动一轮表示的时间段为8s,假设当前指针指向 0,此时需要调度一个3s后执行的任务,显然应该加入到(0+3=3)的方格中,指针再走3s次就可以执行了;如果任务要在10s后执行,应该等指针走完一轮零2格再执行,因此应放入2,同时将 round(1) 保存到任务中。检查到期任务时只执行 round 为0的, bucket 上其他任务的 round 减1。❝
即将触发的任务,放入时间轮。2、ringThread:对当前 bucket 和前一个 bucket 中的任务取出并执行。 // 环状结构private volatile static Map> ringData = new ConcurrentHashMap;
// 任务下次启动时间(单位为秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime/1000)%60);
// 任务放进时间轮
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList;
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
}
// 同时取两个时间刻度的任务
List ringItemData = new ArrayList;
int nowSecond = Calendar.getInstance.get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 运行
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
可见 ,一致性Hash算法的关键在于 hash算法 ,保证 虚拟节点 及 Hash结果 的均匀性,而均匀性可以理解为 减少Hash冲突 ,Hash冲突的知识点本文暂不扩展,历史文章中有。或者将来我再抽时间写。
// jobId转换为md5// 不直接用hashCode 是因为扩大hash取值范围,减少冲突
byte digest = md5.digest;
// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;f(key) = hash(key) & (table.length - 1)
// 使用>>> 16的原因,hashCode的高位和低位都对f(key)有了一定影响力,使得分布更加均匀,散列冲突的几率就小了。
hash(key) = (h = key.hashCode) ^ (h >>> 16)public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover),
// 说好的实现呢???竟然是null
Sharding_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);...
// 如果是分片路由,走的是这段逻辑
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy, null)
&& group.getRegistryList != null && !group.getRegistryList.isEmpty
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList.size; i++) {
// 最后两个参数,i是当前机器在执行器集群当中的index,group.getRegistryList.size为执行器总数
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList.size);
}
}
...// 分片广播的参数比set进了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.shardingVO(triggerParam.getBroadcastIndex, triggerParam.getBroadcastTotal));
...
// 将执行参数传递给jobHandler执行
handler.execute(triggerParamTmp.getExecutorParams)public class ShardingUtil {
// 线程上下文
private static InheritableThreadLocal contextHolder = new InheritableThreadLocal;
// 分片参数对象
public static class ShardingVO {
private int index; // sharding index
private int total; // sharding total
// 次数省略 get/set
}
// 参数对象注入上下文
public static void setShardingVo(ShardingVO shardingVo){
contextHolder.set(shardingVo);
}
// 从上下文中取出参数对象
public static ShardingVO getShardingVo{
return contextHolder.get;
}
}@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo;
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex, shardingVO.getTotal);
// 业务逻辑
for (int i = 0; i < shardingVO.getTotal; i++) {
if (i == shardingVO.getIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
"第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
❝
1、可能是因为只有分片任务才用到这两个参数
2、IJobHandler只有String类型参数
文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
来源:寂寞的咖啡