public static void withFlatMapUsingJDK {...var virtualThreadExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual.name("jdk21-vt-", 0).factory);try (virtualThreadExecutor) {// Submit tasks for parallel processingList> futures =users.stream.map(user -> CompletableFuture.runAsync( -> {try {log.info("Processing user: {}", user);processSomeBizLogic(user);successCount.incrementAndGet;} catch (Exception e) {log.error("Error occurred while processing user {}: {}", user, e.getMessage);failureCount.incrementAndGet;}}, virtualThreadExecutor)).toList; // Collect CompletableFuture for each user// Wait for all tasks to completeCompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));try {allOf.join;} catch (Exception e) {log.error("Error waiting for all tasks to complete: {}", e.getMessage);}}...}摘要:public static void withFlatMapUsingJDK {...var virtualThreadExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual.name("j
2、基于 Spring Core Reactor 的虚拟线程实现
public static void withFlatMapUsingJDK {...// Custom executor with virtual threadsvar virtualThreadExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual.name("rx-vt-", 0).factory);try (virtualThreadExecutor) {Flux.fromIterable(objectList).flatMap(obj ->Mono.fromCallable( -> {log.info("Entering processUser in virtual thread: {}", obj);processSomeBizLogic(obj);log.info("Leaving processUser in virtual thread: {}", obj);successCount.incrementAndGet;return obj;}).doOnError(error -> {log.error("Error occurred while processing user {}: {}", obj, error.getMessage);failureCount.incrementAndGet;}).onErrorResume(error -> {log.info("Skipping user due to error: {}", obj);return Mono.empty; // Skip errored objects}).subscribeOn(Schedulers.fromExecutor(virtualThreadExecutor)) // Use virtual threads).doOnComplete( -> {log.info("Processing completed");log.info("Success count: {}", successCount.get);log.info("Failure count: {}", failureCount.get);}).blockLast;}...}发处理列表中以下数量的对象
10 万对象25万 个对象50 万 个对象结果:
处理整个列表所花费的总时间:
内存占用:
对于 10 万个对象,与基于 Spring Reactor 的实现相比,基于 JDK 的实现需要在旧代Old Gen 中分配 33 倍的内存对于 50 万个对象,与基于 Spring Reactor 的实现相比,基于 JDK 的实现在 旧代Old Gen 中使用的峰值内存是后者的 81 倍GC 暂停:
,基于 JDK 的实现的 GC 暂停时间更长。尽管基于 JDK 的实现的 GC 暂停时间更长,但这对应用程序的延迟没有任何显著影响。尽管基于 JDK 的实现需要更长的 CPU 时间来进行 GC 活动,但它不会对应用程序性能产生任何负面影响。对于基于虚拟线程的实现,JDK 应该是显而易见的选择,因为它们比 Spring Core Reactor快得多。对于基于平台线程的实现,Spring Core Reactor比基于 JDK 的实现相对更快两种虚拟线程对比:JDK vs. Spring Core Reactor性能对比 - 极道
来源:解道Jdon
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!