创建异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "我是有结果的异步任务";
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("我只是默默地执行,不给你返回值");
});
自定义线程池
ThreadPoolExecutor rightPool = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "我是通过专属线程池执行的任务";
}, rightPool);
异步任务的取消和超时处理
取消任务时,
cancel(true)
表示允许中断正在执行的任务,cancel(false)
表示仅取消还未执行的任务。
completeOnTimeout
比直接使用get
更优雅,因为它不会阻塞。
orTimeout
适合那些超时必须处理的场景,比如支付操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// 被中断时的处理
return "我被中断了!";
}
return "正常完成";
});
// 设置超时
try {
String result = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 超时就取消任务
System.out.println("等太久了,不等了!");
}
// 更优雅的超时处理
future.completeOnTimeout("默认值", 3, TimeUnit.SECONDS)
.thenAccept(result -> System.out.println("最终结果:" + result));
// 或者配合orTimeout使用
future.orTimeout(3, TimeUnit.SECONDS) // 超时就抛异常
.exceptionally(ex -> "超时默认值")
.thenAccept(result -> System.out.println("最终结果:" + result));
链式调用
thenApply
:当你需要转换结果并继续传递时使用。
thenAccept
:当你只需要处理结果,不需要返回值时使用。
thenRun
:当你只需要执行一个操作,不需要使用结果时使用。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> {
// 我是加工工人,负责把材料加工后返回新成品
return s + " World";
})
.thenAccept(result -> {
// 我是检验工人,只负责验收,不返回东西
System.out.println("收到结果: " + result);
})
.thenRun(() -> {
// 我是打扫工人,不关心之前的结果,只负责收尾工作
System.out.println("生产线工作完成,开始打扫");
});
异步转换
CompletableFuture.supplyAsync(() -> {
// 模拟获取用户信息
return "用户基础信息";
}).thenApplyAsync(info -> {
// 耗时的处理操作,在新的线程中执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return info + " + 附加信息";
}, customExecutor); // 可以指定自己的线程池
组合多个异步操作
thenCompose
- 串行操作(一个接一个)。
thenCombine
- 并行操作(两个一起做)。
CompletableFuture<String> getUserEmail(String userId) {
return CompletableFuture.supplyAsync(() -> "user@example.com");
}
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "userId")
.thenCompose(userId -> getUserEmail(userId)); // 基于第一个结果去获取邮箱
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "价格信息");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "库存信息");
CompletableFuture<String> result = future1.thenCombine(future2, (price, stock) -> {
// 同时处理价格和库存信息
return String.format("价格: %s, 库存: %s", price, stock);
});
商品详情页的数据聚合:
public CompletableFuture<ProductDetails> getProductDetails(String productId) {
CompletableFuture<Product> productFuture = getProduct(productId);
return productFuture.thenCompose(product -> {
// 基于商品信息获取促销信息
CompletableFuture<Promotion> promotionFuture = getPromotion(product.getCategory());
// 同时获取评论信息
CompletableFuture<Reviews> reviewsFuture = getReviews(productId);
// 组合促销和评论信息
return promotionFuture.thenCombine(reviewsFuture, (promotion, reviews) -> {
return new ProductDetails(product, promotion, reviews);
});
});
}
// 错误示范:用 thenApply 导致嵌套的 Future
CompletableFuture<CompletableFuture<String>> nested =
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> CompletableFuture.supplyAsync(() -> s + " World"));
// 正确做法:用 thenCompose 保持链式调用
CompletableFuture<String> better =
CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
如果你的转换函数返回的是另一个
CompletableFuture
,那就用thenCompose
,否则用thenApply
。
异常处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("服务暂时不可用");
}
return "正常返回的数据";
})
.exceptionally(throwable -> {
// 记录异常日志
log.error("操作失败", throwable);
// 返回默认值
return "服务异常,返回默认数据";
});
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> callExternalService())
.exceptionally(throwable -> {
if (throwable.getCause() instanceof TimeoutException) {
return "服务超时,返回缓存数据";
} else if (throwable.getCause() instanceof IllegalArgumentException) {
return "参数异常,返回空结果";
}
return "其他异常,返回默认值";
});
handle
方法比exceptionally
更强大,在于它能同时处理正常结果和异常情况。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("模拟服务异常");
}
return "原始数据";
})
.handle((result, throwable) -> {
if (throwable != null) {
log.error("处理异常", throwable);
return "发生异常,返回备用数据";
}
return result + " - 正常处理完成";
});
// whenComplete:只是旁观者,不能修改结果
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "原始数据")
.whenComplete((result, throwable) -> {
// 只能查看结果,无法修改
if (throwable != null) {
log.error("发生异常", throwable);
} else {
log.info("处理完成: {}", result);
}
});
// handle:既是参与者又是修改者
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "原始数据")
.handle((result, throwable) -> {
// 可以根据结果或异常,返回新的值
if (throwable != null) {
return "异常情况下的替代数据";
}
return result + " - 已处理";
});
使用
whenComplete
:当只需要记录日志或执行一些清理工作,不需要改变结果时。使用
handle
:当需要在异常发生时返回备用值,或者需要转换成正常的结果时。使用
exceptionally
:当只关心异常情况,并且只需要提供一个替代值时。
// 错误示范
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("糟糕,出错了!");
return "成功";
});
future.thenAccept(System.out::println); // 这里异常被吞掉了,什么都看不到
future
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("捕获到异常:" + throwable.getMessage());
return null;
});
多任务编排
等待所有任务完成:
CompletableFuture<Void> allOf = CompletableFuture.allOf(
future1, future2, future3
);
CompletableFuture<String> cookRice = CompletableFuture
.supplyAsync(() -> {
// 模拟煮饭
sleep(1000);
return "饭已就位";
});
CompletableFuture<String> stirFry = CompletableFuture
.supplyAsync(() -> {
// 模拟炒菜
sleep(2000);
return "炒完了";
});
CompletableFuture<String> makeSoup = CompletableFuture
.supplyAsync(() -> {
// 模拟煲汤
sleep(3000);
return "汤煮好了";
});
// 等待所有任务完成
CompletableFuture.allOf(cookRice, stirFry, makeSoup)
.thenRun(() -> {
System.out.println("所有准备工作已完成,可以开始吃饭了!");
});
只要有一个任务完成:
CompletableFuture<Object> taxi = CompletableFuture.anyOf(
callDidi(), // 叫滴滴
callGaode(), // 叫高德
callMeituan() // 叫美团
);
taxi.thenAccept(platform -> System.out.println(platform + "接单了,出发!"));
并行请求多个微服务:
public ProductDetailVO getProductDetail(Long productId) {
// 1. 创建多个异步任务
CompletableFuture<ProductInfo> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProductInfo(productId));
CompletableFuture<Stock> stockFuture = CompletableFuture
.supplyAsync(() -> stockService.getStock(productId));
CompletableFuture<Price> priceFuture = CompletableFuture
.supplyAsync(() -> priceService.getPrice(productId));
CompletableFuture<List<Review>> reviewsFuture = CompletableFuture
.supplyAsync(() -> reviewService.getReviews(productId));
// 2. 等待所有数据都准备好
return CompletableFuture.allOf(
productFuture, stockFuture, priceFuture, reviewsFuture)
.thenApply(v -> {
// 3. 组装最终结果
return new ProductDetailVO(
productFuture.join(), // 获取每个任务的结果
stockFuture.join(),
priceFuture.join(),
reviewsFuture.join()
);
}).join(); // 4. 等待最终结果
}
可以使用
orTimeout()
或completeOnTimeout()
来优雅地处理超时情况。这样,即使某个服务出现问题,也不会影响整个页面的展示。
// 为每个请求添加3秒超时
CompletableFuture<ProductInfo> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProductInfo(productId))
.orTimeout(3, TimeUnit.SECONDS) // 超时控制
.exceptionally(ex -> {
log.error("获取商品信息失败", ex);
return new ProductInfo(); // 返回默认值
});
批量任务
通过
Semaphore
实现最大并发数限制,避免线程资源被用完。通过计数器实时跟踪活动的任务数量和完成进度。
当系统负载较高时,通过信号量自动降低并发度。
提供简单的处理进度日志,便于问题定位和后续的诊断。
// 不推荐:直接转换为CompletableFuture列表
List<CompletableFuture<Result>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> task.execute()))
.collect(Collectors.toList());
// 推荐:控制并发度
public class BatchProcessor {
private final ExecutorService executor;
private final int maxConcurrency;
// 优化版本:带有限流和监控的批处理执行器
public <T> List<T> executeBatch(List<Supplier<T>> tasks) {
Semaphore semaphore = new Semaphore(maxConcurrency);
AtomicInteger activeTaskCount = new AtomicInteger(0);
AtomicInteger completedTaskCount = new AtomicInteger(0);
List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
activeTaskCount.incrementAndGet();
return task.get();
} catch (Exception e) {
throw new CompletionException(e);
} finally {
semaphore.release();
activeTaskCount.decrementAndGet();
completedTaskCount.incrementAndGet();
}
}, executor))
.collect(Collectors.toList());
// 添加批处理进度监控
CompletableFuture.runAsync(() -> {
while (completedTaskCount.get() < tasks.size()) {
log.info("批处理进度: {}/{}, 当前活动任务数: {}",
completedTaskCount.get(), tasks.size(), activeTaskCount.get());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
超时控制
支持配置最大重试次数。
采用指数退避算法,避免立即重试对系统造成冲击。
每次重试都有独立的超时控制,实现了基础的隔离。
支持自定义降级策略,可以对结果进行降级处理。
// 原始的写法:基础的超时控制
CompletableFuture<Result> future = CompletableFuture
.supplyAsync(() -> doSomething())
.orTimeout(3, TimeUnit.SECONDS);
public class TimeoutHandler {
// 带有动态超时和重试的处理器
public <T> CompletableFuture<T> executeWithSmartTimeout(
Supplier<T> task,
long timeout,
TimeUnit unit,
int maxRetries) {
AtomicInteger retryCount = new AtomicInteger(0);
CompletableFuture<T> future = new CompletableFuture<>();
CompletableFuture.supplyAsync(() -> {
while (retryCount.get() < maxRetries) {
try {
CompletableFuture<T> attempt = CompletableFuture
.supplyAsync(task)
.orTimeout(timeout, unit);
return attempt.get();
} catch (TimeoutException e) {
log.warn("第{}次尝试超时", retryCount.incrementAndGet());
// 指数退避
Thread.sleep(Math.min(1000 * (1 << retryCount.get()), 10000));
} catch (Exception e) {
throw new CompletionException(e);
}
}
throw new TimeoutException("重试次数耗尽");
}).handle((result, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(result);
}
return null;
});
return future;
}
}
TimeoutHandler handler = new TimeoutHandler();
CompletableFuture<UserProfile> future = handler.executeWithSmartTimeout(
() -> remoteService.getUserProfile(userId),
500, // 500ms超时
TimeUnit.MILLISECONDS,
3 // 最多重试3次
);
异步HTTP请求
public class AsyncHttpClient {
private final ExecutorService executor;
private final int timeout;
public AsyncHttpClient(int threadPoolSize, int timeoutSeconds) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
this.timeout = timeoutSeconds;
}
public CompletableFuture<String> asyncGet(String url) {
return CompletableFuture.supplyAsync(() -> {
try {
// 模拟HTTP请求
if (Math.random() < 0.1) { // 10%概率模拟超时
Thread.sleep(timeout * 1000 + 1000);
}
return "Response from " + url;
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}, executor)
.orTimeout(timeout, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
return "请求超时了:" + url;
}
return "请求出错:" + throwable.getMessage();
});
}
// 批量处理请求
public List<String> batchGet(List<String> urls) {
List<CompletableFuture<String>> futures = urls.stream()
.map(this::asyncGet)
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public void shutdown() {
executor.shutdown();
}
}
AsyncHttpClient client = new AsyncHttpClient(10, 5); // 10个线程,5秒超时
List<String> urls = Arrays.asList(
"http://api1.example.com",
"http://api2.example.com",
"http://api3.example.com"
);
// 异步处理多个请求
List<String> results = client.batchGet(urls);
results.forEach(System.out::println);
client.shutdown();