摘要:最近准备面试的小伙伴,可以看一下这个宝藏网站(Java 突击队):www.susan.net.cn,里面:面试八股文、场景设计题、面试真题、7个项目实战、工作内推什么都有 。
CompletableFuture 在并发编程中非常实用,但如果用不好,也很容易踩坑。
今天这篇文章跟大家一起聊聊,CompletableFuture 在使用过程中最常见的那些坑,希望对你会有所帮助。
最近准备面试的小伙伴,可以看一下这个宝藏网站(Java 突击队):www.susan.net.cn,里面:面试八股文、场景设计题、面试真题、7个项目实战、工作内推什么都有 。
加苏三的工作内推群
有些小伙伴在工作中刚开始接触 CompletableFuture 时,可能会被它强大的功能所吸引。
确实,CompletableFuture 为我们提供了非常优雅 #后端 #金石焕新程的异步编程方式,但正如武侠小说中的神兵利器,如果使用不当,反而会伤到自己。
先来看一个简单的 CompletableFuture 使用示例:
public class BasicCompletableFutureDemo {public static void main(String args) throws Exception {// 简单的异步计算CompletableFuture future = CompletableFuture.supplyAsync( -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace;}return "Hello, CompletableFuture!";});// 获取结果(阻塞)String result = future.get;System.out.println(result);}}看起来很简单对吧?但正是这种表面上的简单,掩盖了很多潜在的复杂性。
让我们通过一个架构图来理解 CompletableFuture 的完整生态:
现在,让我们开始深入探讨各个坑点。
有些小伙伴在使用 CompletableFuture 时,往往忽略了线程池的配置,这可能是最容易被忽视但影响最大的坑。
public class ThreadPoolPitfall {// 危险的用法:大量使用默认线程池public void processBatchData(List dataList) {List> futures = new ArrayList;for (String data : dataList) {// 使用默认的ForkJoinPool.commonPoolCompletableFuture future = CompletableFuture.supplyAsync( -> {return processData(data);});futures.add(future);}// 等待所有任务完成CompletableFuture.allOf(fatures.toArray(new CompletableFuture[0])).join;}private String processData(String data) {// 模拟数据处理try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread.interrupt;}return data.toUpperCase;}}问题分析:
默认线程池大小是CPU核心数-1在IO密集型任务中,这会导致大量任务排队等待如果任务提交速度 > 任务处理速度,会造成内存溢出public class ProperThreadPoolUsage {private final ExecutorService ioBoundExecutor;private final ExecutorService cpuBoundExecutor;public ProperThreadPoolUsage {// IO密集型任务 - 使用较大的线程池this.ioBoundExecutor = new ThreadPoolExecutor(50, // 核心线程数100, // 最大线程数60L, TimeUnit.SECONDS, // 空闲线程存活时间new LinkedBlockingQueue(1000), // 工作队列new ThreadFactoryBuilder.setNameFormat("io-pool-%d").build,new ThreadPoolExecutor.CallerRunsPolicy // 拒绝策略);// CPU密集型任务 - 使用较小的线程池this.cpuBoundExecutor = new ThreadPoolExecutor(Runtime.getRuntime.availableProcessors, // CPU核心数Runtime.getRuntime.availableProcessors * 2,60L, TimeUnit.SECONDS,new LinkedBlockingQueue(100),new ThreadFactoryBuilder.setNameFormat("cpu-pool-%d").build,new ThreadPoolExecutor.AbortPolicy);}public CompletableFuture processWithProperPool(String data) {return CompletableFuture.supplyAsync( -> {// IO操作,使用IO线程池return fetchFromDatabase(data);}, ioBoundExecutor);}public CompletableFuture computeWithProperPool(String data) {return CompletableFuture.supplyAsync( -> {// CPU密集型计算,使用CPU线程池return heavyComputation(data);}, cpuBoundExecutor);}// 资源清理@PreDestroypublic void destroy {ioBoundExecutor.shutdown;cpuBoundExecutor.shutdown;try {if (!ioBoundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {ioBoundExecutor.shutdownNow;}if (!cpuBoundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {cpuBoundExecutor.shutdownNow;}} catch (InterruptedException e) {ioBoundExecutor.shutdownNow;cpuBoundExecutor.shutdownNow;Thread.currentThread.interrupt;}}}有些小伙伴在调试 CompletableFuture 时,经常会发现异常"神秘消失"了,这其实是 CompletableFuture 异常处理机制的一个特性。
public class ExceptionDisappearance {public void testExceptionLost {CompletableFuture future = CompletableFuture.supplyAsync( -> {// 这里会抛出异常return dangerousOperation;});// 添加转换链CompletableFuture resultFuture = future.thenApply(result -> {System.out.println("处理结果: " + result);return result + " processed";});try {// 这里不会抛出异常!String result = resultFuture.get;System.out.println("最终结果: " + result);} catch (Exception e) {// 异常被包装在ExecutionException中System.out.println("捕获到异常: " + e.getClass.getName);System.out.println("根本原因: " + e.getCause.getMessage);}}private String dangerousOperation {throw new RuntimeException("业务操作失败!");}// 更隐蔽的异常丢失public void testHiddenExceptionLoss {CompletableFuture.supplyAsync( -> {throw new BusinessException("重要异常");}).thenAccept(result -> {// 如果上游有异常,这里不会执行System.out.println("处理结果: " + result);});// 程序继续执行,异常被忽略!System.out.println("程序正常结束,但异常丢失了!");}static class BusinessException extends RuntimeException {public BusinessException(String message) {super(message);}}}public class ProperExceptionHandling {// 方法1:使用exceptionally进行恢复public CompletableFuture handleWithRecovery {return CompletableFuture.supplyAsync( -> {return riskyOperation;}).exceptionally(throwable -> {// 异常恢复System.err.println("操作失败,使用默认值: " + throwable.getMessage);return "default-value";});}// 方法2:使用handle统一处理public CompletableFuture handleWithUnified {return CompletableFuture.supplyAsync( -> {return riskyOperation;}).handle((result, throwable) -> {if (throwable != null) {// 处理异常System.err.println("操作异常: " + throwable.getMessage);return "error-value";}return result + "-processed";});}// 方法3:使用whenComplete进行副作用处理public CompletableFuture handleWithSideEffect {return CompletableFuture.supplyAsync( -> {return riskyOperation;}).whenComplete((result, throwable) -> {if (throwable != null) {// 记录日志、发送告警等logError(throwable);sendAlert(throwable);} else {// 正常业务处理processResult(result);}});}// 方法4:组合操作中的异常处理public CompletableFuture handleInComposition {CompletableFuture future1 = CompletableFuture.supplyAsync( -> {return operation1;});CompletableFuture future2 = future1.thenCompose(result1 -> {return CompletableFuture.supplyAsync( -> {return operation2(result1);});});// 在整个链的末尾处理异常return future2.exceptionally(throwable -> {Throwable rootCause = getRootCause(throwable);if (rootCause instanceof BusinessException) {return "business-fallback";} else if (rootCause instanceof TimeoutException) {return "timeout-fallback";} else {return "unknown-error";}});}private void logError(Throwable throwable) {// 记录错误日志System.err.println("错误记录: " + throwable.getMessage);}private void sendAlert(Throwable throwable) {// 发送告警System.out.println("发送告警: " + throwable.getMessage);}private Throwable getRootCause(Throwable throwable) {Throwable cause = throwable;while (cause.getCause != null) {cause = cause.getCause;}return cause;}}有些小伙伴在复杂业务场景中使用 CompletableFuture 时,很容易陷入回调地狱,代码变得难以理解和维护。
public class CallbackHell {public CompletableFuture processUserOrder(String userId) {return getUserInfo(userId).thenCompose(userInfo -> {return getOrderHistory(userInfo.getId).thenCompose(orderHistory -> {return calculateDiscount(userInfo, orderHistory).thenCompose(discount -> {return createOrder(userInfo, discount).thenCompose(order -> {return sendConfirmation(userInfo, order);});});});});}// 上述代码的"平铺"版本,同样难以阅读public CompletableFuture processUserOrderFlat(String userId) {return getUserInfo(userId).thenCompose(userInfo -> getOrderHistory(userInfo.getId)).thenCompose(orderHistory -> getUserInfo(userId)).thenCompose(userInfo -> calculateDiscount(userInfo, orderHistory)).thenCompose(discount -> getUserInfo(userId)).thenCompose(userInfo -> createOrder(userInfo, discount)).thenCompose(order -> getUserInfo(userId)).thenCompose(userInfo -> sendConfirmation(userInfo, order));}}public class StructuredAsyncProgramming {// 定义业务数据类@Data@AllArgsConstructorpublic static class OrderContext {private String userId;private UserInfo userInfo;private List orderHistory;private Discount discount;private Order order;private String result;}public CompletableFuture processUserOrderStructured(String userId) {OrderContext context = new OrderContext(userId, null, null, null, null, null);return getUserInfo(context.getUserId).thenCompose(userInfo -> {context.setUserInfo(userInfo);return getOrderHistory(userInfo.getId);}).thenCompose(orderHistory -> {context.setOrderHistory(orderHistory);return calculateDiscount(context.getUserInfo, orderHistory);}).thenCompose(discount -> {context.setDiscount(discount);return createOrder(context.getUserInfo, discount);}).thenCompose(order -> {context.setOrder(order);return sendConfirmation(context.getUserInfo, order);}).thenApply(result -> {context.setResult(result);return result;}).exceptionally(throwable -> {// 统一异常处理return handleOrderError(context, throwable);});}// 使用thenCombine处理并行任务public CompletableFuture getUserProfile(String userId) {CompletableFuture userInfoFuture = getUserInfo(userId);CompletableFuture> orderHistoryFuture = getOrderHistory(userId);CompletableFuture> addressesFuture = getUserAddresses(userId);return userInfoFuture.thenCombine(orderHistoryFuture, (userInfo, orders) -> {return new UserProfile(userInfo, orders, null);}).thenCombine(addressesFuture, (profile, addresses) -> {profile.setAddresses(addresses);return profile;});}// 使用allOf处理多个独立任务public CompletableFuture> getDashboardData(String userId) {CompletableFuture userInfoFuture = getUserInfo(userId);CompletableFuture> ordersFuture = getOrderHistory(userId);CompletableFuture> notificationsFuture = getNotifications(userId);CompletableFuture preferencesFuture = getPreferences(userId);CompletableFuture allFutures = CompletableFuture.allOf(userInfoFuture, ordersFuture, notificationsFuture, preferencesFuture);return allFutures.thenApply(v -> {Map dashboard = new HashMap;try {dashboard.put("userInfo", userInfoFuture.get);dashboard.put("orders", ordersFuture.get);dashboard.put("notifications", notificationsFuture.get);dashboard.put("preferences", preferencesFuture.get);} catch (Exception e) {throw new CompletionException(e);}return dashboard;});}}有些小伙伴可能没有意识到,不当使用 CompletableFuture 会导致内存泄漏,特别是在长时间运行的应用中。
public class MemoryLeakDemo {private final Map> cache = new ConcurrentHashMap;// 场景1:无限增长的缓存public CompletableFuture getDataWithLeak(String key) {return cache.computeIfAbsent(key, k -> {return CompletableFuture.supplyAsync( -> fetchData(k));});}// 场景2:未完成的Future积累public void processWithUnfinishedFutures {for (int i = 0; i future = CompletableFuture.supplyAsync( -> {// 模拟长时间运行或阻塞的任务try {Thread.sleep(Long.MAX_VALUE); // 几乎永久阻塞} catch (InterruptedException e) {Thread.currentThread.interrupt;}return "result";});// future永远不会完成,但一直存在于内存中}}// 场景3:循环引用public class TaskManager {private CompletableFuture currentTask;private String status = "INIT";public void startTask {currentTask = CompletableFuture.supplyAsync( -> {// 任务持有Manager的引用while (!"COMPLETED".equals(status)) {// 处理任务processTask;}return "done";});}// Manager也持有任务的引用public CompletableFuture getCurrentTask {return currentTask;}}}public class MemoryLeakPrevention {private final Cache> cache;public MemoryLeakPrevention {// 使用Guava Cache自动清理this.cache = CacheBuilder.newBuilder.maximumSize(1000).expireAfterAccess(10, TimeUnit.MINUTES).removalListener((RemovalListener>) notification -> {if (notification.getCause == RemovalCause.SIZE ||notification.getCause == RemovalCause.EXPIRED) {// 取消未完成的任务CompletableFuture future = notification.getValue;if (!future.isDone) {future.cancel(true);}}}).build;}// 安全的缓存用法public CompletableFuture getDataSafely(String key) {try {return cache.get(key, -> {CompletableFuture future = CompletableFuture.supplyAsync( -> fetchData(key));// 添加超时控制return future.orTimeout(30, TimeUnit.SECONDS).exceptionally(throwable -> {// 发生异常时从缓存中移除cache.invalidate(key);return "fallback-data";});});} catch (ExecutionException e) {throw new RuntimeException(e);}}// 使用WeakReference避免循环引用public static class SafeTaskManager {private WeakReference> currentTaskRef;public void startTask {CompletableFuture task = CompletableFuture.supplyAsync( -> {return performTask;});currentTaskRef = new WeakReference(task);// 任务完成后自动清理task.whenComplete((result, error) -> {currentTaskRef = null;});}}// 监控和诊断工具public void monitorFutures {// 定期检查未完成的FutureTimer timer = new Timer(true);timer.scheduleAtFixedRate(new TimerTask {@Overridepublic void run {int unfinishedCount = 0;for (CompletableFuture future : cache.asMap.values) {if (!future.isDone) {unfinishedCount++;// 记录长时间运行的任务if (future.isDoneExceptionally) {// 处理异常任务handleExceptionalFuture(future);}}}if (unfinishedCount > 100) {// 发出警告System.err.println("警告: 有 " + unfinishedCount + " 个未完成的任务");}}}, 0, 60000); // 每分钟检查一次}private void handleExceptionalFuture(CompletableFuture future) {// 处理异常Future,避免它们一直存在future.exceptionally(throwable -> {// 记录异常日志System.err.println("任务异常: " + throwable.getMessage);return null;});}}内存泄漏检测流程最近为了帮助大家找工作,专门建了一些工作内推群,各大城市都有,欢迎各位 HR 和找工作的小伙伴进群交流,群里目前已经收集了不少的工作内推岗位。_su223,备注:掘金+所在城市,即可进群。
六、超时控制缺失超时问题的严重性public class TimeoutPitfalls {// 危险的代码:没有超时控制public String dangerousGet {CompletableFuture future = CompletableFuture.supplyAsync( -> {// 模拟网络问题导致的无限阻塞return blockingNetworkCall;});try {// 如果任务永远不完成,这里会永远阻塞return future.get;} catch (Exception e) {return "error";}}// 资源泄漏的示例public void resourceLeakExample {ExecutorService executor = Executors.newFixedThreadPool(10);for (int i = 0; i {try {// 长时间运行的任务Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e) {Thread.currentThread.interrupt;}}, executor);}// 线程池中的线程都被占用,无法执行新任务}private String blockingNetworkCall {// 模拟网络问题try {Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e) {Thread.currentThread.interrupt;}return "response";}}public class CompleteTimeoutSolution {private final ScheduledExecutorService timeoutExecutor;public CompleteTimeoutSolution {this.timeoutExecutor = Executors.newScheduledThreadPool(2);}// 方法1:使用orTimeout(Java 9+)public CompletableFuture withOrTimeout {return CompletableFuture.supplyAsync( -> {return externalServiceCall;}).orTimeout(5, TimeUnit.SECONDS) // 5秒超时.exceptionally(throwable -> {if (throwable instanceof TimeoutException) {return "timeout-fallback";}return "error-fallback";});}// 方法2:使用completeOnTimeout(Java 9+)public CompletableFuture withCompleteOnTimeout {return CompletableFuture.supplyAsync( -> {return externalServiceCall;}).completeOnTimeout("timeout-default", 3, TimeUnit.SECONDS);}// 方法3:手动超时控制(Java 8兼容)public CompletableFuture withManualTimeout {CompletableFuture taskFuture = CompletableFuture.supplyAsync( -> {return externalServiceCall;});CompletableFuture timeoutFuture = new CompletableFuture;// 设置超时timeoutExecutor.schedule( -> {timeoutFuture.completeExceptionally(new TimeoutException("操作超时"));}, 5, TimeUnit.SECONDS);// 哪个先完成就返回哪个return taskFuture.applyToEither(timeoutFuture, Function.identity).exceptionally(throwable -> {if (throwable instanceof TimeoutException) {return "manual-timeout-fallback";}return "other-error-fallback";});}// 方法4:分层超时控制public CompletableFuture withLayeredTimeout {return CompletableFuture.supplyAsync( -> {return phase1Operation;}).orTimeout(2, TimeUnit.SECONDS).thenCompose(phase1Result -> {return CompletableFuture.supplyAsync( -> {return phase2Operation(phase1Result);}).orTimeout(3, TimeUnit.SECONDS);}).thenCompose(phase2Result -> {return CompletableFuture.supplyAsync( -> {return phase3Operation(phase2Result);}).orTimeout(5, TimeUnit.SECONDS);}).exceptionally(throwable -> {Throwable rootCause = getRootCause(throwable);if (rootCause instanceof TimeoutException) {// 根据超时阶段提供不同的降级策略return "timeout-in-phase";}return "general-fallback";});}// 方法5:可配置的超时策略public CompletableFuture withConfigurableTimeout(String operationType) {TimeoutConfig config = getTimeoutConfig(operationType);return CompletableFuture.supplyAsync( -> {return performOperation(operationType);}).orTimeout(config.getTimeout, config.getTimeUnit).exceptionally(throwable -> {return config.getFallbackStrategy.apply(throwable);});}@PreDestroypublic void destroy {timeoutExecutor.shutdown;try {if (!timeoutExecutor.awaitTermination(5, TimeUnit.SECONDS)) {timeoutExecutor.shutdownNow;}} catch (InterruptedException e) {timeoutExecutor.shutdownNow;Thread.currentThread.interrupt;}}// 超时配置类@Datapublic static class TimeoutConfig {private final long timeout;private final TimeUnit timeUnit;private final Function fallbackStrategy;}private TimeoutConfig getTimeoutConfig(String operationType) {switch (operationType) {case "fast":return new TimeoutConfig(1, TimeUnit.SECONDS,t -> "fast-timeout");case "normal":return new TimeoutConfig(5, TimeUnit.SECONDS,t -> "normal-timeout");case "slow":return new TimeoutConfig(30, TimeUnit.SECONDS,t -> "slow-timeout");default:return new TimeoutConfig(10, TimeUnit.SECONDS,t -> "default-timeout");}}}超时控制策略总结通过上面的详细分析,我们可以看到 CompletableFuture 虽然强大,但也确实存在不少陷阱。
记住,工具是为了提高生产力,而不是制造问题。
掌握了这些避坑技巧,CompletableFuture 将成为你手中强大的并发编程利器!
如果这篇文章对您有所帮助,或者有所启发的话,,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
来源:墨码行者一点号
