摘要:com.starslinkdata-check1.0-SNAPSHOTCheckCo
项目地址:github.com/starslink/d…
高性能、多策略的数据对账框架,支持内存、流式、并行、Redis、数据库等多种处理方式
com.starslinkdata-check1.0-SNAPSHOTCheckConfig config = CheckConfig.builder .id("order-check") .name("订单对账") .srcLoader(date -> loadSourceData(date)) .targetLoader(date -> loadTargetData(date)) .allowableErrorRange(BigDecimal.valueOf(0.01)) .checkAfter(context -> { System.out.println("发现差异: " + context.getCheckResult.getDiffDetails.size); }) .build;CheckExecutor executor = CheckExecutor.buildExecutor(config); CheckContext result = executor.process("2023-12-01");public class Order {@CheckIdentity private String orderId;@CheckField(order = 1) private BigDecimal amount;@CheckField(order = 2) private String status; }public class OrderAdapter implements CheckAdapter {@Override public String getKey { return orderId; }@Override public Map getCheckData { return MapUtil.of("amount", amount, "status", status); } }| 处理器类型 | 适用数据量 | 主要特点 | 使用场景 | | ---
| Memory | 100万 | SQL 优化 | 超大数据量 | | Incremental | 任意 | 增量处理 | 定期对账,减少计算 |
CheckConfig config = CheckConfig.builder .id("stream-check") .batchSize(5000) .checkProcessor(new StreamCheckProcessor) .build;CheckConfig config = CheckConfig.builder .id("parallel-check") .isAsync(true) .checkProcessor(new ParallelCheckProcessor) .build;CheckConfig config = CheckConfig.builder .id("redis-check") .checkCacheEnum(CheckCacheEnum.REDIS) .redisConfig(redisConfig) .build;CheckConfig config = CheckConfig.builder .id("database-check") .checkProcessor(new DatabaseCheckProcessor(dataSource)) .build;IncrementalDataProvider provider = new MyIncrementalDataProvider;CheckConfig config = CheckConfig.builder .id("incremental-check") .checkProcessor(new IncrementalCheckProcessor(provider)) .build; 智能选择策略public CheckProcessor selectOptimalProcessor(int dataSize) { if (dataSize 数据量: 100万条记录测试结果Memory 处理器: ❌ OutOfMemoryError Stream 处理器: ✅ 45秒, 内存占用: 200MB Parallel 处理器: ✅ 28秒, 内存占用: 800MB Database 处理器: ✅ 35秒, 内存占用: 50MB@Testpublic void testOrderReconciliation { List sourceOrders = loadOrdersFromSystem1("2023-12-01"); List targetOrders = loadOrdersFromSystem2("2023-12-01");CheckConfig config = CheckConfig.builder .id("daily-order-check") .name("每日订单对账") .srcLoader(date -> CheckEntry.wrap(sourceOrders)) .targetLoader(date -> CheckEntry.wrap(targetOrders)) .allowableErrorRange(BigDecimal.valueOf(0.01)) .checkProcessor(selectOptimalProcessor(sourceOrders.size)) .checkPre(context -> validateData(context)) .checkAfter(context -> { CheckResult result = context.getCheckResult; generateReport(result); sendNotification(result); }) .build;CheckExecutor executor = CheckExecutor.buildExecutor(config); CheckContext result = executor.process("2023-12-01");System.out.println("对账完成:"); System.out.println("- 源数据: " + result.getSource.size); System.out.println("- 目标数据: " + result.getTarget.size); System.out.println("-}来源:墨码行者
