CompletableFuture

月伴飞鱼 2025-01-11 22:46:54
实战相关
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

img

创建异步任务

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "我是有结果的异步任务";
});

CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("我只是默默地执行,不给你返回值");
});

自定义线程池

ThreadPoolExecutor rightPool = new ThreadPoolExecutor(
    5,                      // 核心线程数
    10,                     // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100),  // 工作队列
    new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(),  // 线程工厂
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "我是通过专属线程池执行的任务";
}, rightPool);

异步任务的取消和超时处理

取消任务时,cancel(true)表示允许中断正在执行的任务,cancel(false)表示仅取消还未执行的任务。

completeOnTimeout比直接使用get更优雅,因为它不会阻塞。

orTimeout适合那些超时必须处理的场景,比如支付操作。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时操作
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        // 被中断时的处理
        return "我被中断了!";
    }
    return "正常完成";
});

// 设置超时
try {
    String result = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true);  // 超时就取消任务
    System.out.println("等太久了,不等了!");
}

// 更优雅的超时处理
future.completeOnTimeout("默认值", 3, TimeUnit.SECONDS)
      .thenAccept(result -> System.out.println("最终结果:" + result));

// 或者配合orTimeout使用
future.orTimeout(3, TimeUnit.SECONDS)  // 超时就抛异常
      .exceptionally(ex -> "超时默认值")
      .thenAccept(result -> System.out.println("最终结果:" + result));

链式调用

thenApply:当你需要转换结果并继续传递时使用。

thenAccept:当你只需要处理结果,不需要返回值时使用。

thenRun:当你只需要执行一个操作,不需要使用结果时使用。

CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> {
        // 我是加工工人,负责把材料加工后返回新成品
        return s + " World";
    })
    .thenAccept(result -> {
        // 我是检验工人,只负责验收,不返回东西
        System.out.println("收到结果: " + result);
    })
    .thenRun(() -> {
        // 我是打扫工人,不关心之前的结果,只负责收尾工作
        System.out.println("生产线工作完成,开始打扫");
    });

异步转换

CompletableFuture.supplyAsync(() -> {
    // 模拟获取用户信息
    return "用户基础信息";
}).thenApplyAsync(info -> {
    // 耗时的处理操作,在新的线程中执行
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return info + " + 附加信息";
}, customExecutor);  // 可以指定自己的线程池

组合多个异步操作

thenCompose - 串行操作(一个接一个)。

thenCombine - 并行操作(两个一起做)。

CompletableFuture<String> getUserEmail(String userId) {
    return CompletableFuture.supplyAsync(() -> "user@example.com");
}

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "userId")
    .thenCompose(userId -> getUserEmail(userId));  // 基于第一个结果去获取邮箱
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "价格信息");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "库存信息");

CompletableFuture<String> result = future1.thenCombine(future2, (price, stock) -> {
    // 同时处理价格和库存信息
    return String.format("价格: %s, 库存: %s", price, stock);
});

商品详情页的数据聚合:

public CompletableFuture<ProductDetails> getProductDetails(String productId) {
    CompletableFuture<Product> productFuture = getProduct(productId);
    
    return productFuture.thenCompose(product -> {
        // 基于商品信息获取促销信息
        CompletableFuture<Promotion> promotionFuture = getPromotion(product.getCategory());
        // 同时获取评论信息
        CompletableFuture<Reviews> reviewsFuture = getReviews(productId);
        
        // 组合促销和评论信息
        return promotionFuture.thenCombine(reviewsFuture, (promotion, reviews) -> {
            return new ProductDetails(product, promotion, reviews);
        });
    });
}
// 错误示范:用 thenApply 导致嵌套的 Future
CompletableFuture<CompletableFuture<String>> nested = 
    CompletableFuture.supplyAsync(() -> "Hello")
        .thenApply(s -> CompletableFuture.supplyAsync(() -> s + " World"));

// 正确做法:用 thenCompose 保持链式调用
CompletableFuture<String> better = 
    CompletableFuture.supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

如果你的转换函数返回的是另一个CompletableFuture,那就用 thenCompose,否则用 thenApply

异常处理

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("服务暂时不可用");
        }
        return "正常返回的数据";
    })
    .exceptionally(throwable -> {
        // 记录异常日志
        log.error("操作失败", throwable);
        // 返回默认值
        return "服务异常,返回默认数据";
    });
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> callExternalService())
    .exceptionally(throwable -> {
        if (throwable.getCause() instanceof TimeoutException) {
            return "服务超时,返回缓存数据";
        } else if (throwable.getCause() instanceof IllegalArgumentException) {
            return "参数异常,返回空结果";
        }
        return "其他异常,返回默认值";
    });

handle方法比exceptionally更强大,在于它能同时处理正常结果和异常情况。

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟服务异常");
        }
        return "原始数据";
    })
    .handle((result, throwable) -> {
        if (throwable != null) {
            log.error("处理异常", throwable);
            return "发生异常,返回备用数据";
        }
        return result + " - 正常处理完成";
    });
// whenComplete:只是旁观者,不能修改结果
CompletableFuture<String> future1 = CompletableFuture
    .supplyAsync(() -> "原始数据")
    .whenComplete((result, throwable) -> {
        // 只能查看结果,无法修改
        if (throwable != null) {
            log.error("发生异常", throwable);
        } else {
            log.info("处理完成: {}", result);
        }
    });

// handle:既是参与者又是修改者
CompletableFuture<String> future2 = CompletableFuture
    .supplyAsync(() -> "原始数据")
    .handle((result, throwable) -> {
        // 可以根据结果或异常,返回新的值
        if (throwable != null) {
            return "异常情况下的替代数据";
        }
        return result + " - 已处理";
    });

使用whenComplete:当只需要记录日志或执行一些清理工作,不需要改变结果时。

使用handle:当需要在异常发生时返回备用值,或者需要转换成正常的结果时。

使用exceptionally:当只关心异常情况,并且只需要提供一个替代值时。

// 错误示范
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("糟糕,出错了!");
    return "成功";
});
future.thenAccept(System.out::println);  // 这里异常被吞掉了,什么都看不到
future
    .thenAccept(System.out::println)
    .exceptionally(throwable -> {
        System.err.println("捕获到异常:" + throwable.getMessage());
        return null;
    });

多任务编排

等待所有任务完成:

CompletableFuture<Void> allOf = CompletableFuture.allOf(
    future1, future2, future3
);
CompletableFuture<String> cookRice = CompletableFuture
    .supplyAsync(() -> {
        // 模拟煮饭
        sleep(1000);
        return "饭已就位";
    });

CompletableFuture<String> stirFry = CompletableFuture
    .supplyAsync(() -> {
        // 模拟炒菜
        sleep(2000);
        return "炒完了";
    });

CompletableFuture<String> makeSoup = CompletableFuture
    .supplyAsync(() -> {
        // 模拟煲汤
        sleep(3000);
        return "汤煮好了";
    });

// 等待所有任务完成
CompletableFuture.allOf(cookRice, stirFry, makeSoup)
    .thenRun(() -> {
        System.out.println("所有准备工作已完成,可以开始吃饭了!");
    });

只要有一个任务完成:

CompletableFuture<Object> taxi = CompletableFuture.anyOf(
    callDidi(),         // 叫滴滴
    callGaode(),        // 叫高德
    callMeituan()       // 叫美团
);

taxi.thenAccept(platform -> System.out.println(platform + "接单了,出发!"));

并行请求多个微服务:

public ProductDetailVO getProductDetail(Long productId) {
    // 1. 创建多个异步任务
    CompletableFuture<ProductInfo> productFuture = CompletableFuture
        .supplyAsync(() -> productService.getProductInfo(productId));
        
    CompletableFuture<Stock> stockFuture = CompletableFuture
        .supplyAsync(() -> stockService.getStock(productId));
        
    CompletableFuture<Price> priceFuture = CompletableFuture
        .supplyAsync(() -> priceService.getPrice(productId));
        
    CompletableFuture<List<Review>> reviewsFuture = CompletableFuture
        .supplyAsync(() -> reviewService.getReviews(productId));

    // 2. 等待所有数据都准备好
    return CompletableFuture.allOf(
            productFuture, stockFuture, priceFuture, reviewsFuture)
        .thenApply(v -> {
            // 3. 组装最终结果
            return new ProductDetailVO(
                productFuture.join(),  // 获取每个任务的结果
                stockFuture.join(),
                priceFuture.join(),
                reviewsFuture.join()
            );
        }).join();  // 4. 等待最终结果
}

可以使用orTimeout()completeOnTimeout()来优雅地处理超时情况。

这样,即使某个服务出现问题,也不会影响整个页面的展示。

// 为每个请求添加3秒超时
CompletableFuture<ProductInfo> productFuture = CompletableFuture
    .supplyAsync(() -> productService.getProductInfo(productId))
    .orTimeout(3, TimeUnit.SECONDS)  // 超时控制
    .exceptionally(ex -> {
        log.error("获取商品信息失败", ex);
        return new ProductInfo();  // 返回默认值
    });

批量任务

通过Semaphore实现最大并发数限制,避免线程资源被用完。

通过计数器实时跟踪活动的任务数量和完成进度。

当系统负载较高时,通过信号量自动降低并发度。

提供简单的处理进度日志,便于问题定位和后续的诊断。

// 不推荐:直接转换为CompletableFuture列表
List<CompletableFuture<Result>> futures = tasks.stream()
    .map(task -> CompletableFuture.supplyAsync(() -> task.execute()))
    .collect(Collectors.toList());

// 推荐:控制并发度
public class BatchProcessor {
    private final ExecutorService executor;
    private final int maxConcurrency;
    
    // 优化版本:带有限流和监控的批处理执行器
    public <T> List<T> executeBatch(List<Supplier<T>> tasks) {
        Semaphore semaphore = new Semaphore(maxConcurrency);
        AtomicInteger activeTaskCount = new AtomicInteger(0);
        AtomicInteger completedTaskCount = new AtomicInteger(0);
        
        List<CompletableFuture<T>> futures = tasks.stream()
            .map(task -> CompletableFuture.supplyAsync(() -> {
                try {
                    semaphore.acquire();
                    activeTaskCount.incrementAndGet();
                    return task.get();
                } catch (Exception e) {
                    throw new CompletionException(e);
                } finally {
                    semaphore.release();
                    activeTaskCount.decrementAndGet();
                    completedTaskCount.incrementAndGet();
                }
            }, executor))
            .collect(Collectors.toList());
        
        // 添加批处理进度监控
        CompletableFuture.runAsync(() -> {
            while (completedTaskCount.get() < tasks.size()) {
                log.info("批处理进度: {}/{}, 当前活动任务数: {}",
                    completedTaskCount.get(), tasks.size(), activeTaskCount.get());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        
        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }
}

超时控制

支持配置最大重试次数。

采用指数退避算法,避免立即重试对系统造成冲击。

每次重试都有独立的超时控制,实现了基础的隔离。

支持自定义降级策略,可以对结果进行降级处理。

// 原始的写法:基础的超时控制
CompletableFuture<Result> future = CompletableFuture
    .supplyAsync(() -> doSomething())
    .orTimeout(3, TimeUnit.SECONDS);

public class TimeoutHandler {
    // 带有动态超时和重试的处理器
    public <T> CompletableFuture<T> executeWithSmartTimeout(
            Supplier<T> task,
            long timeout,
            TimeUnit unit,
            int maxRetries) {
        
        AtomicInteger retryCount = new AtomicInteger(0);
        
        CompletableFuture<T> future = new CompletableFuture<>();
        
        CompletableFuture.supplyAsync(() -> {
            while (retryCount.get() < maxRetries) {
                try {
                    CompletableFuture<T> attempt = CompletableFuture
                        .supplyAsync(task)
                        .orTimeout(timeout, unit);
                    
                    return attempt.get();
                } catch (TimeoutException e) {
                    log.warn("第{}次尝试超时", retryCount.incrementAndGet());
                    // 指数退避
                    Thread.sleep(Math.min(1000 * (1 << retryCount.get()), 10000));
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }
            throw new TimeoutException("重试次数耗尽");
        }).handle((result, ex) -> {
            if (ex != null) {
                future.completeExceptionally(ex);
            } else {
                future.complete(result);
            }
            return null;
        });
        
        return future;
    }
}
TimeoutHandler handler = new TimeoutHandler();
CompletableFuture<UserProfile> future = handler.executeWithSmartTimeout(
    () -> remoteService.getUserProfile(userId),
    500, // 500ms超时
    TimeUnit.MILLISECONDS,
    3    // 最多重试3次
);

异步HTTP请求

public class AsyncHttpClient {
    private final ExecutorService executor;
    private final int timeout;

    public AsyncHttpClient(int threadPoolSize, int timeoutSeconds) {
        this.executor = Executors.newFixedThreadPool(threadPoolSize);
        this.timeout = timeoutSeconds;
    }

    public CompletableFuture<String> asyncGet(String url) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟HTTP请求
                if (Math.random() < 0.1) { // 10%概率模拟超时
                    Thread.sleep(timeout * 1000 + 1000);
                }
                return "Response from " + url;
            } catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor)
        .orTimeout(timeout, TimeUnit.SECONDS)
        .exceptionally(throwable -> {
            if (throwable instanceof TimeoutException) {
                return "请求超时了:" + url;
            }
            return "请求出错:" + throwable.getMessage();
        });
    }

    // 批量处理请求
    public List<String> batchGet(List<String> urls) {
        List<CompletableFuture<String>> futures = urls.stream()
            .map(this::asyncGet)
            .collect(Collectors.toList());

        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }

    public void shutdown() {
        executor.shutdown();
    }
}
AsyncHttpClient client = new AsyncHttpClient(10, 5);  // 10个线程,5秒超时
List<String> urls = Arrays.asList(
    "http://api1.example.com",
    "http://api2.example.com",
    "http://api3.example.com"
);

// 异步处理多个请求
List<String> results = client.batchGet(urls);
results.forEach(System.out::println);

client.shutdown();
img
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!