使用CompletableFuture实现异步编程!
发表于更新于
中间件Java使用CompletableFuture实现异步编程!
月伴飞鱼创建异步任务
1 2 3 4 5 6 7
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { return "我是有结果的异步任务"; });
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("我只是默默地执行,不给你返回值"); });
|
自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ThreadPoolExecutor rightPool = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy() );
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "我是通过专属线程池执行的任务"; }, rightPool);
|
异步任务的取消和超时处理
取消任务时,cancel(true)
表示允许中断正在执行的任务,cancel(false)
表示仅取消还未执行的任务。
completeOnTimeout
比直接使用get
更优雅,因为它不会阻塞。
orTimeout
适合那些超时必须处理的场景,比如支付操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { return "我被中断了!"; } return "正常完成"; });
try { String result = future.get(3, TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true); System.out.println("等太久了,不等了!"); }
future.completeOnTimeout("默认值", 3, TimeUnit.SECONDS) .thenAccept(result -> System.out.println("最终结果:" + result));
future.orTimeout(3, TimeUnit.SECONDS) .exceptionally(ex -> "超时默认值") .thenAccept(result -> System.out.println("最终结果:" + result));
|
链式调用
thenApply
:当你需要转换结果并继续传递时使用。
thenAccept
:当你只需要处理结果,不需要返回值时使用。
thenRun
:当你只需要执行一个操作,不需要使用结果时使用。
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> { return s + " World"; }) .thenAccept(result -> { System.out.println("收到结果: " + result); }) .thenRun(() -> { System.out.println("生产线工作完成,开始打扫"); });
|
异步转换
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture.supplyAsync(() -> { return "用户基础信息"; }).thenApplyAsync(info -> { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return info + " + 附加信息"; }, customExecutor);
|
组合多个异步操作
thenCompose
- 串行操作(一个接一个)。
thenCombine
- 并行操作(两个一起做)。
1 2 3 4 5 6 7
| CompletableFuture<String> getUserEmail(String userId) { return CompletableFuture.supplyAsync(() -> "user@example.com"); }
CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "userId") .thenCompose(userId -> getUserEmail(userId));
|
1 2 3 4 5 6 7
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "价格信息"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "库存信息");
CompletableFuture<String> result = future1.thenCombine(future2, (price, stock) -> { return String.format("价格: %s, 库存: %s", price, stock); });
|
商品详情页的数据聚合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public CompletableFuture<ProductDetails> getProductDetails(String productId) { CompletableFuture<Product> productFuture = getProduct(productId); return productFuture.thenCompose(product -> { CompletableFuture<Promotion> promotionFuture = getPromotion(product.getCategory()); CompletableFuture<Reviews> reviewsFuture = getReviews(productId); return promotionFuture.thenCombine(reviewsFuture, (promotion, reviews) -> { return new ProductDetails(product, promotion, reviews); }); }); }
|
1 2 3 4 5 6 7 8 9
| CompletableFuture<CompletableFuture<String>> nested = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> CompletableFuture.supplyAsync(() -> s + " World"));
CompletableFuture<String> better = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
|
如果你的转换函数返回的是另一个CompletableFuture
,那就用 thenCompose
,否则用 thenApply
。
异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("服务暂时不可用"); } return "正常返回的数据"; }) .exceptionally(throwable -> { log.error("操作失败", throwable); return "服务异常,返回默认数据"; });
|
1 2 3 4 5 6 7 8 9 10
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> callExternalService()) .exceptionally(throwable -> { if (throwable.getCause() instanceof TimeoutException) { return "服务超时,返回缓存数据"; } else if (throwable.getCause() instanceof IllegalArgumentException) { return "参数异常,返回空结果"; } return "其他异常,返回默认值"; });
|
handle
方法比exceptionally
更强大,在于它能同时处理正常结果和异常情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟服务异常"); } return "原始数据"; }) .handle((result, throwable) -> { if (throwable != null) { log.error("处理异常", throwable); return "发生异常,返回备用数据"; } return result + " - 正常处理完成"; });
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> "原始数据") .whenComplete((result, throwable) -> { if (throwable != null) { log.error("发生异常", throwable); } else { log.info("处理完成: {}", result); } });
CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> "原始数据") .handle((result, throwable) -> { if (throwable != null) { return "异常情况下的替代数据"; } return result + " - 已处理"; });
|
使用whenComplete
:当只需要记录日志或执行一些清理工作,不需要改变结果时。
使用handle
:当需要在异常发生时返回备用值,或者需要转换成正常的结果时。
使用exceptionally
:当只关心异常情况,并且只需要提供一个替代值时。
1 2 3 4 5 6
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException("糟糕,出错了!"); return "成功"; }); future.thenAccept(System.out::println);
|
1 2 3 4 5 6
| future .thenAccept(System.out::println) .exceptionally(throwable -> { System.err.println("捕获到异常:" + throwable.getMessage()); return null; });
|
多任务编排
等待所有任务完成:
1 2 3
| CompletableFuture<Void> allOf = CompletableFuture.allOf( future1, future2, future3 );
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| CompletableFuture<String> cookRice = CompletableFuture .supplyAsync(() -> { sleep(1000); return "饭已就位"; });
CompletableFuture<String> stirFry = CompletableFuture .supplyAsync(() -> { sleep(2000); return "炒完了"; });
CompletableFuture<String> makeSoup = CompletableFuture .supplyAsync(() -> { sleep(3000); return "汤煮好了"; });
CompletableFuture.allOf(cookRice, stirFry, makeSoup) .thenRun(() -> { System.out.println("所有准备工作已完成,可以开始吃饭了!"); });
|
只要有一个任务完成:
1 2 3 4 5 6 7
| CompletableFuture<Object> taxi = CompletableFuture.anyOf( callDidi(), callGaode(), callMeituan() );
taxi.thenAccept(platform -> System.out.println(platform + "接单了,出发!"));
|
并行请求多个微服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public ProductDetailVO getProductDetail(Long productId) { CompletableFuture<ProductInfo> productFuture = CompletableFuture .supplyAsync(() -> productService.getProductInfo(productId)); CompletableFuture<Stock> stockFuture = CompletableFuture .supplyAsync(() -> stockService.getStock(productId)); CompletableFuture<Price> priceFuture = CompletableFuture .supplyAsync(() -> priceService.getPrice(productId)); CompletableFuture<List<Review>> reviewsFuture = CompletableFuture .supplyAsync(() -> reviewService.getReviews(productId));
return CompletableFuture.allOf( productFuture, stockFuture, priceFuture, reviewsFuture) .thenApply(v -> { return new ProductDetailVO( productFuture.join(), stockFuture.join(), priceFuture.join(), reviewsFuture.join() ); }).join(); }
|
可以使用orTimeout()
或completeOnTimeout()
来优雅地处理超时情况。
这样,即使某个服务出现问题,也不会影响整个页面的展示。
1 2 3 4 5 6 7 8
| CompletableFuture<ProductInfo> productFuture = CompletableFuture .supplyAsync(() -> productService.getProductInfo(productId)) .orTimeout(3, TimeUnit.SECONDS) .exceptionally(ex -> { log.error("获取商品信息失败", ex); return new ProductInfo(); });
|
批量任务
通过Semaphore
实现最大并发数限制,避免线程资源被用完。
通过计数器实时跟踪活动的任务数量和完成进度。
当系统负载较高时,通过信号量自动降低并发度。
提供简单的处理进度日志,便于问题定位和后续的诊断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| List<CompletableFuture<Result>> futures = tasks.stream() .map(task -> CompletableFuture.supplyAsync(() -> task.execute())) .collect(Collectors.toList());
public class BatchProcessor { private final ExecutorService executor; private final int maxConcurrency; public <T> List<T> executeBatch(List<Supplier<T>> tasks) { Semaphore semaphore = new Semaphore(maxConcurrency); AtomicInteger activeTaskCount = new AtomicInteger(0); AtomicInteger completedTaskCount = new AtomicInteger(0); List<CompletableFuture<T>> futures = tasks.stream() .map(task -> CompletableFuture.supplyAsync(() -> { try { semaphore.acquire(); activeTaskCount.incrementAndGet(); return task.get(); } catch (Exception e) { throw new CompletionException(e); } finally { semaphore.release(); activeTaskCount.decrementAndGet(); completedTaskCount.incrementAndGet(); } }, executor)) .collect(Collectors.toList()); CompletableFuture.runAsync(() -> { while (completedTaskCount.get() < tasks.size()) { log.info("批处理进度: {}/{}, 当前活动任务数: {}", completedTaskCount.get(), tasks.size(), activeTaskCount.get()); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } }); return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }
|
超时控制
支持配置最大重试次数。
采用指数退避算法,避免立即重试对系统造成冲击。
每次重试都有独立的超时控制,实现了基础的隔离。
支持自定义降级策略,可以对结果进行降级处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| CompletableFuture<Result> future = CompletableFuture .supplyAsync(() -> doSomething()) .orTimeout(3, TimeUnit.SECONDS);
public class TimeoutHandler { public <T> CompletableFuture<T> executeWithSmartTimeout( Supplier<T> task, long timeout, TimeUnit unit, int maxRetries) { AtomicInteger retryCount = new AtomicInteger(0); CompletableFuture<T> future = new CompletableFuture<>(); CompletableFuture.supplyAsync(() -> { while (retryCount.get() < maxRetries) { try { CompletableFuture<T> attempt = CompletableFuture .supplyAsync(task) .orTimeout(timeout, unit); return attempt.get(); } catch (TimeoutException e) { log.warn("第{}次尝试超时", retryCount.incrementAndGet()); Thread.sleep(Math.min(1000 * (1 << retryCount.get()), 10000)); } catch (Exception e) { throw new CompletionException(e); } } throw new TimeoutException("重试次数耗尽"); }).handle((result, ex) -> { if (ex != null) { future.completeExceptionally(ex); } else { future.complete(result); } return null; }); return future; } }
|
1 2 3 4 5 6 7
| TimeoutHandler handler = new TimeoutHandler(); CompletableFuture<UserProfile> future = handler.executeWithSmartTimeout( () -> remoteService.getUserProfile(userId), 500, TimeUnit.MILLISECONDS, 3 );
|
异步HTTP请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class AsyncHttpClient { private final ExecutorService executor; private final int timeout;
public AsyncHttpClient(int threadPoolSize, int timeoutSeconds) { this.executor = Executors.newFixedThreadPool(threadPoolSize); this.timeout = timeoutSeconds; }
public CompletableFuture<String> asyncGet(String url) { return CompletableFuture.supplyAsync(() -> { try { if (Math.random() < 0.1) { Thread.sleep(timeout * 1000 + 1000); } return "Response from " + url; } catch (InterruptedException e) { throw new CompletionException(e); } }, executor) .orTimeout(timeout, TimeUnit.SECONDS) .exceptionally(throwable -> { if (throwable instanceof TimeoutException) { return "请求超时了:" + url; } return "请求出错:" + throwable.getMessage(); }); }
public List<String> batchGet(List<String> urls) { List<CompletableFuture<String>> futures = urls.stream() .map(this::asyncGet) .collect(Collectors.toList());
return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }
public void shutdown() { executor.shutdown(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| AsyncHttpClient client = new AsyncHttpClient(10, 5); List<String> urls = Arrays.asList( "http://api1.example.com", "http://api2.example.com", "http://api3.example.com" );
List<String> results = client.batchGet(urls); results.forEach(System.out::println);
client.shutdown();
|