0%

Java 异步任务执行服务(一):基本概念和原理

1. 异步任务执行服务是什么意思

  • 线程 Thread 既表示要执行的任务(run() 方法),又表示执行的机制(start() 方法)
  • Java 并发包提供了一套框架,大大简化了执行异步任务所需的开发,这套框架引入了一个“执行服务”概念
  • 执行服务任务的提交任务的执行相分离。“执行服务”封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务获取结果取消任务;而不需要关注任务执行的细节,如线程的创建任务调度线程关闭
  • 任务执行服务体现了并发异步开发中关注点分离的思想,使用者只需要通过 ExecutorService 提交任务,通过 Future 操作任务和结果即可,不需要关注线程创建和协调的细节

2. 任务执行服务涉及的基本接口有哪些

  • RunnableCallable:表示要执行的异步任务
  • ExecutorExecutorService:表示执行服务
  • Future:表示异步任务的结果

3. RunnableCallable 的区别是什么

  • 二者都表示任务
    • Runnable 没有返回结果,而 Callable 有返回结果
    • Runnable 不会抛出异常,而 Callable 会抛出异常

4. 怎样理解 Executor

  • Executor 表示最简单的执行服务
  • 定义:public interface Executor { void execute(Runnable command); }就是可以执行一个 Runnable,没有返回结果
  • 没有限定任务如何执行可能是创建一个新线程,可能是复用线程池中的某个线程,也可能是在调用者线程中执行

5. 怎样理解 ExecutorService

  • ExecutorService 扩展了 Executor,定义了更多服务,基本方法有

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    	public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
  • 三个 submit() 都表示提交一个任务,返回值类型都是 Future。返回后,只是表示任务已提交,不代表已执行,通过 Future 可以查询异步任务的状态、获取最终结果、取消任务等

    • 对于 Callable,任务最终有个返回值,对于 Runnable 是没有返回值的
    • 第二个提交 Runnable 的方法可以同时提供一个结果,在异步任务结束时返回
    • 第三个方法异步任务的最终返回值为 null
  • 有两个关闭方法:shutdown()shutdownNow()

    • 区别是,shutdown() 表示不再接受新任务,但已提交的任务会继续执行,即使任务还未开始执行
    • shutdownNow() 不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的 interrupt() 方法尝试中断。不过,线程可能不响应中断,shutdownNow() 会返回已提交但尚未执行的任务列表
    • shutdown()shutdownNow() 不会阻塞等待,它们返回后不代表所有任务都已结束。不过 isShutdown() 方法会返回 true。调用者可以通过 awaitTermination() 等待所有任务结束,它可以限定等待的时间,如果超时前所有任务都结束了,即 isTerminated() 方法返回 true,则返回 true,否则返回 false
  • ExecutorService 有两组批量提交任务的方法:invokeAll()invokeAny(),它们都有两个版本,其中一个限定等待时间

    • invokeAll() 方法等待所有任务完成,返回的 Future 列表中,每个 FutureisDone() 方法都返回 true。不过 isDone()true 不代表任务就执行成功了,可能是被取消了。invokeAll() 可以指定等待时间,如果超时后有的任务没完成,就会被取消
    • 对于 invokeAny(),只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消;如果没有任务能在限时内成功返回,抛出 TimeoutException 异常;如果限时内所有任务都结束了,但都发生了异常,抛出 ExecutionException 异常
  • 使用 ExecutorService,编写并发异步任务的代码就像写顺序程序一样,不用关心线程的创建和协调,只需要提交任务、处理结果就可以了,大大简化了开发工作

6. 怎样理解 Future

  • Future 接口定义

    1
    2
    3
    4
    5
    6
    7
    public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
  • get() 方法用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待;带参数的 get() 方法可以限定阻塞等待的时间,如果超时任务还未结束,会抛出 TimeoutException 异常

  • cancel() 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消,cancel() 返回 false,否则返回 true。如果任务还未开始,则不再运行。但如果任务已经在运行,则不一定能取消,参数 mayInterruptIfRunning 表示,如果任务正在执行,是否调用 interrupt() 方法中断线程,如果为 false,就不会,如果为 true,就会尝试中断线程,虽然中断不一定能取消线程

  • isDone()isCancelled() 用于查询任务状态。isCancelled() 表示任务是否被取消,只要 cancel() 方法返回了 true,随后的 isCancelled() 方法都会返回 true,即使执行任务的线程还未真正结束。isDone() 表示任务是否结束,不管什么原因都算,可能是任务正常结束,可能是任务抛出了异常,也可能是任务被取消

  • 对于 get() 方法,如果调用get() 方法的线程被中断了,get() 方法会抛出 InterruptedException 异常,任务最终大概有三种结果

    • 正常完成get() 方法会返回其执行结果,如果任务是 Runnable 且没有提供结果,返回 null
    • 任务执行抛出了异常get() 方法会将异常包装为 ExecutionException 重新抛出,通过异常的 getCause() 方法可以获取原异常
    • 任务被取消了get() 方法会抛出异常 CancellationException
  • Future 是一个重要的概念,是实现“任务的提交”“任务的执行”相分离的关键,是其中的“纽带”,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作

7. 写一个任务执行服务 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BasicDemo {
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sleepSeconds = new Random().nextInt(1000);
Thread.sleep(sleepSeconds);
return sleepSeconds;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor(); //使用工厂类 Executors 创建了一个任务执行服务,表示使用一个线程执行所有服务
Future<Integer> future = executor.submit(new Task());
//模拟执行其他任务
Thread.sleep(100);
try {
System.out.println(future.get());
} catch(ExecutionException e) {
e.printStackTrace();
}
executor.shutdown(); //关闭任务执行服务
}
}

8. ExecutorService 的基本实现原理

  • ExecutorService 的主要实现类是 ThreadPoolExecutor,它是基于线程池实现的
  • ExecutorService 还有一个抽象实现类 AbstractExecutorService
  • AbstractExecutorService 提供了 submit()invokeAll()invokeAny() 的默认实现,子类需要实现其他方法
    • 除了 execute(),其他方法都与执行服务的生命周期管理有关
    • submit()invokeAll()invokeAny() 最终都会调用 execute()execute() 决定了到底如何执行任务

9. Future 的基本实现原理

  • Future 的主要实现类是 FutureTask
  • FutureTask 实现了 RunnableFuture 接口
  • RunnableFuture 接口既扩展了 Runnable,又扩展了 Future,没有定义新方法
    • 作为 Runnable,它表示要执行的任务,传递给 execute() 方法进行执行
    • 作为 Future,它又表示任务执行的异步结果

10. 写一个简单的 ExecutorService 实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SimpleExecutorService extends AbstractExecutorService {
@Override
public void shutdown() {
}

@Override
public List<Runnable> shutdownNow() {
return null;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long itmeout, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
-------------------- 本文结束感谢您的阅读 --------------------