多线程数据核对实战指南:文件 vs 数据库

B站影视 2025-02-05 18:15 3

摘要:曾经手里有一份超大的数据文件,我需要解析这个文件并提取每行特定的数据,文件是无法打开的。直接上实操,首先是设计出一个流程,推敲演练,最终总结出如下4大过程

曾经手里有一份 超大的数据文件,我需要解析这个文件并提取每行特定的数据,文件是无法打开的。直接上实操,首先是设计出一个流程,推敲演练,最终总结出如下4大过程

解析文件内容,提取关键数据。从数据库中读取对应数据。将两者进行核对。异常处理。将核对结果批量入库。

但问题是:文件太大,内存有限!别担心,我们可以使用用 多线程 + 文件分块 的魔法来解决!

方案设计图

给出关键代码片段

1.文件切分

目标:将大文件切分成小块,确保每块内容完整。方法:按行切分,避免切割到半行数据。每块大小根据内存限制动态调整。public List splitFile(String filePath, int blockSize) throws IOException { List blocks = new ArrayList; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { String line; StringBuilder buffer = new StringBuilder; int currentSize = 0; while ((line = reader.readLine) != null) { buffer.append(line).append("\n"); currentSize += line.length; if (currentSize >= blockSize) { blocks.add(new FileBlock(buffer.toString)); buffer = new StringBuilder; currentSize = 0; } } if (buffer.length > 0) { blocks.add(new FileBlock(buffer.toString)); } } return blocks;}

2.多线程解析文件

目标:用多线程解析文件块,提取关键数据。方法:每个线程处理一个文件块。将解析结果放入核对队列。public class FileParser implements runnable { private String blockContent; private BlockingQueue queue; public FileParser(String blockContent, BlockingQueue queue) { this.blockContent = blockContent; this.queue = queue; } @Override public void run { String lines = blockContent.split("\n"); for (String line : lines) { Data data = parseLine(line); // 解析每行数据 queue.put(data); // 放入核对队列 } } private Data parseLine(String line) { // 解析逻辑 return new Data; }}

3.多线程读取数据库

目标:用多线程从数据库读取数据。方法:每个线程读取一部分数据。将读取结果放入核对队列。public class DBReader implements Runnable { private BlockingQueue queue; private int startId; private int endId; public DBReader(BlockingQueue queue, int startId, int endId) { this.queue = queue; this.startId = startId; this.endId = endId; } @Override public void run { List dbData = fetchDataFromDB(startId, endId); // 从数据库读取数据 for (Data data : dbData) { queue.put(data); // 放入核对队列 } } private List fetchDataFromDB(int startId, int endId) { // 数据库查询逻辑 return new ArrayList; }}

4.数据核对

目标:核对文件数据和数据库数据。方法:从队列中取出数据,进行比对。将核对结果放入批量入库队列。public class DataChecker implements Runnable { private BlockingQueue fileQueue; private BlockingQueue dbQueue; private BlockingQueue resultQueue; public DataChecker(BlockingQueue fileQueue, BlockingQueue dbQueue, BlockingQueue resultQueue) { this.fileQueue = fileQueue; this.dbQueue = dbQueue; this.resultQueue = resultQueue; } @Override public void run { while (true) { Data fileData = fileQueue.take; Data dbData = dbQueue.take; if (fileData.equals(dbData)) { resultQueue.put(fileData); // 核对通过,放入结果队列 } } }}

5.批量入库

目标:将核对结果批量写入数据库。方法:从结果队列中取出数据,批量插入。public class BatchInserter implements Runnable { private BlockingQueue resultQueue; public BatchInserter(BlockingQueue resultQueue) { this.resultQueue = resultQueue; } @Override public void run { List batch = new ArrayList; while (true) { Data data = resultQueue.take; batch.add(data); if (batch.size >= 1000) { // 每1000条批量插入一次 insertBatch(batch); batch.clear; } } } private void insertBatch(List batch) { // 批量插入逻辑 }}

异常防御机制是防弹装甲,抵御各种意外攻击。

1.文件解析异常捕获

public class FileParser implements Runnable { @Override public void run { try { // 解析逻辑... } catch (Exception e) { ErrorTracker.log("文件解析异常", e); ErrorQueue.put(new ErrorData(blockId, e)); // 记录错误块 } finally { CompletionCounter.fileBlockDone; // 完成计数器 } }}

2.数据库查询重试机制

public List fetchDataWithRetry(int page, int size) { int retry = 0; while (retry

3.核对完整性保障

public class DataChecker { private AtomicInteger fileCount = new AtomicInteger(0); private AtomicInteger dbCount = new AtomicInteger(0); public void run { while (!isDone) { // 双重判断 Data fileData = fileQueue.poll(1, TimeUnit.SECONDS); Data dbData = dbQueue.poll(1, TimeUnit.SECONDS); if (fileData != null) fileCount.incrementAndGet; if (dbData != null) dbCount.incrementAndGet; // 核对逻辑... } // 最终校验 if (fileCount.get != dbCount.get) { ErrorTracker.log("数据总量不匹配: 文件数据=" + fileCount + " 数据库数据=" + dbCount); } }}

4.批量插入容错设计

public class BatchInserter { public void insertWithFallback(List batch) { try { jdbcTemplate.batchUpdate("INSERT...", batch); } catch (DataAccessException e) { ErrorTracker.log("批量插入失败", e); // 分片重试:将大分片拆成小分片 if (batch.size > 1) { insertWithFallback(batch.subList(0, batch.size/2)); insertWithFallback(batch.subList(batch.size/2, batch.size)); } else { ErrorQueue.put(batch.get(0)); // 单条进入错误队列 } } }}保障机制实现方式原子计数器使用AtomicInteger统计文件/数据库数据量双重完成检测1. 生产者完成标记
2. 队列空状态检测最终一致性校验核对结束后对比文件行数与数据库记录数错误数据重试错误队列数据定时重新投入核对流程水位线监控实时监控各队列数据积压情况,动态调整线程数

智能优化策略 是涡轮增压引擎,让处理速度持续飙升

动态分块策略// 根据系统实时状态自动调整分块大小int dynamicBlockSize = Runtime.getRuntime.freeMemory > 512MB ? 64MB : 16MB;双缓冲队列设计// 主队列 + 溢出磁盘队列(防止内存溢出)BlockingQueue overflowQueue = new DiskBackedBlockingQueue; 智能线程池管理// 根据任务类型动态调整线程数ExecutorService executor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new SmartThreadFactory // 监控线程负载);

运行效果

文件切分:确保每块内容完整。多线程解析:高效提取文件数据。多线程读取数据库:快速获取数据库数据。数据核对:通过队列实现高效比对。批量入库:将核对结果高效写入数据库。

通过 文件切分 + 多线程 + 队列 的方案,我们可以轻松解决了 大文件解析 和 数据核对 的难题!

这个方案像给数据处理流程装上了 防弹衣 + GPS追踪器,并且这套方案的思路对以下极端情况:

文件解析中途崩溃、数据库连接闪断、核对数据量级差异、网络波动导致插入失败,系统仍能保证:

零数据丢失完整核对覆盖自动恢复能力实时状态可观测

欢迎讨论并提出建议!

来源:微笑火龙果

相关推荐