0%

Java 函数式编程(四):组合式异步编程

1. 什么是组合式异步编程

  • 组合式异步编程主要是 Java 8 中的一个新类 CompletableFuture

    • 它是对并发编程的增强
    • 它可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发
  • CompletableFuture 是一个具体的类,实现了两个接口,一个是 Future,另一个是 CompletionStage

    • Future 表示异步任务的结果,而 CompletionStage 的字面意思是完成阶段
    • 多个 CompletionStage 可以以流水线的方式组合起来,对于其中一个 CompletionStage,它有一个计算任务,但可能需要等待其他一个或多个阶段完成才能开始,它完成后,可能会触发其他阶段开始运行
    • CompletionStage 提供了大量方法,使用它们,可以方便地响应任务事件,构建任务流水线,实现组合式异步编程

2. 怎样理解基本的 CompletableFuture

  • CompletableFuture 不支持使用 Callable 表示异步任务,而支持 RunnableSupplier

    • Supplier 代替 Callable 表示有返回结果的异步任务。与 Callable 的区别是,它不能抛出受检异常,如果会发生异常,可以抛出运行时异常
  • CompletableFuture 有一个静态方法 supplyAsync,定义为:public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    • 它接受两个参数 supplierexecutor,使用 executor 执行 supplier 表示的任务,返回一个 CompletableFuture,调用后,任务被异步执行,这个方法立即返回
    • supplyAsync 还有一个不带 executor 参数的方法:public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)。没有 executor,任务的执行与系统环境和配置有关,一般来说,如果可用的 CPU 核数大于 2,会使用 Java 7 引入的 Fork/Join 任务执行服务,即:ForkJoinPool.commonPool(),该任务执行服务背后的工作线程数一般为 CPU 核数减 1,即:Runtime.getRuntime().availableProcessors() - 1。否则,会使用 ThreadPerTaskExecutor,它会为每个任务创建一个线程
  • 对于 CPU 密集型的运算任务,使用 Fork/Join 任务执行服务是合适的,但对于一般的调用外部服务的异步任务,Fork/Join 可能是不合适的,因为它的并发度比较低,可能会让本可以并发的多任务串行运行。这时,应该提供 Executor 参数。很多以 Async 结尾命名的方法,一般都有两个版本,一个带 Executor 参数,另一个不带,其含义是相同的

  • 对于类型为 Runnable 的任务,构建 CompletableFuture 的方法为

    • public static CompletableFuture<Void> runAsync(Runnable runnable)
    • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

3. CompletableFutureFuture 的增强有哪些

  • Future 有的方法,CompletableFuture 都是支持的,不过,CompletableFuture 还有一些额外的相关方法
    • public T join():与 get() 方法类似,也会等待任务结束,但它不会抛出受检异常。如果任务异常结束了,join() 会将异常包装为运行时异常 CompletionException 抛出
    • public boolean isCompletedExceptionally()FutureisDone() 方法检查任务是否结束了,但不知道任务是正常结束还是异常结束,isCompletedExceptionally() 方法可以判断任务是否是异常结束
    • public T getNow(T valueIfAbsent):与 join() 类似,区别是,如果任务还没有结束,getNow() 不会等待,而是会返回传入的参数 valueIfAbsent

4. 进一步理解 Future/CompletableFuture

  • 任务执行服务与异步结果 Future 不是绑在一起的,可以自己创建线程返回异步结果
  • CompletableFuture 有两个方法:public boolean complete(T value)public boolean completeExceptionally(Throwable ex)

5. 怎样理解响应结果或异常

  • 使用 Future,我们只能通过 get() 获取结果,而 get() 可能会需要阻塞等待,而通过 CompletionStage,可以注册回调函数,当任务完成或异常结束时自动触发执行
  • 有两类注册方法:whenComplete()handle()

6. 构建依赖一个或多个阶段的任务流的方法有

  • 构建依赖单一阶段的任务流:thenRunthenAcceptthenApplythenCompose
  • 构建依赖两个阶段的任务流:runAfterBoththenCombinethenAcceptBoth
  • 构建依赖多个阶段的任务流:allOfanyOf

7. 总结一下 Java 8 中的组合式异步编程 CompletableFuture

  • 它是对 Future 的增强,但可以响应结果或异常事件,有很多方法构建异步任务流

  • 根据任务由谁执行,一般有三类对应方法

    • 名称不带 Async 的方法由当前线程或前一个阶段的线程执行
    • Async 但没有指定 Executor 的方法由默认 Executor(ForkJoinPool.commonPool() 或 ThreadPerTaskExecutor)执行
    • Async 且指定 Executor 参数的方法由指定的 Executor 执行
  • 根据任务类型,一般也有三类对应方法

    • 名称带 run 的对应 Runnable
    • accept 的对应 Consumer
    • apply 的对应 Function
  • 使用 CompletableFuture,可以简洁自然地表达多个异步任务之间的依赖关系和执行流程,大大简化代码,提高可读性

-------------------- 本文结束感谢您的阅读 --------------------