0%

Java 异步任务执行服务(二):线程池

1. 线程池的概念

  • 顾名思义,就是一个线程的池子,里面有若干线程,它们的目的就是执行提交给线程池的任务,执行完一个任务后不会退出,而是继续等待或是执行新任务
  • 线程池主要由两个概念组成:一个是任务队列;一个是工作者线程
  • 工作者线程主体就是一个循环,循环从队列中接收任务并执行,任务队列保存待执行的任务。

2. 线程池的优点

  • 可以重用线程,避免线程创建的开销。
  • 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。

3. Java 并发包中线程池的实现类是

  • Java 并发包中线程池的实现类是 ThreadPoolExecutor,继承自 AbstractExecutorService,实现了 ExecutorService

    • ThreadPoolExecutor 有一些重要的参数,理解这些参数对于合理使用线程池非常重要。
    • ThreadPoolExecutor 实现了生产者/消费者模式,工作者线程就是消费者,任务提交者就是生产者,线程池自己维护任务队列
  • 当碰到类似生产者/消费者问题时,应优先考虑直接使用线程池,自己管理和维护消费者线程及任务队列。

4. ThreadPoolExecutor 线程池的主要构造方法有

  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

  • public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    • 该构造方法多了两个参数 threadFactoryhandler,这两个参数一般不需要,第一个构造方法会设置默认值。
  • 参数 corePoolSizemaximumPoolSizekeepAliveTimeunit 用于控制线程池中线程个数

    • workQueue 表示任务队列。
    • threadFactory 用于对创建的线程进行一些配置。
    • handler 用于任务拒绝策略。

5. 控制线程池 ThreadPoolExecutor 的大小主要与哪四个参数有关,分别是什么意思

  • 主要与 corePoolSizemaximumPoolSizekeepAliveTimeunit 四个参数有关,含义如下:

    • corePoolSize核心线程个数
    • maximumPoolSize最大线程个数
    • keepAliveTimeunit空闲线程存活时间
  • maximumPoolSize 表示线程池中的最大线程数,线程的个数会动态变化,但这是最大值,不管有多少任务,都不会创建比这个值还大的线程个数。

  • corePoolSize 表示线程池中的核心线程个数。不过,并不是一开始就创建这么多线程,刚创建一个线程池后,实际上并不会创建任何线程

    • 一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSize,就会创建一个新线程来执行该任务。需要说明的是,即使其他线程现在也是空闲的,也会创建新线程
    • 不过,如果线程个数大于等于 corePoolSize,那就不会立即创建新线程了,它会先尝试排队
    • 需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不排队,而是检查线程个数是否达到了 maximumPoolSize。如果没有,就会继续创建线程,直到线程个数达到 maximumPoolSize
  • keepAliveTime 的目的是为了释放多余的线程资源,它的含义是:当线程池中的线程个数大于 corePoolSize额外空闲线程的存活时间

    • 即,一个非核心线程,在空闲等待新任务时,会有一个最长等待时间,即 keepAliveTime
    • 如果到了时间还是没有新任务,就会被终止。如果该值为 0,则表示所有线程都不会超时终止。
  • 这几个参数除了可以在构造方法中进行指定外,还可以通过 getter()/setter() 方法进行查看和修改。

6. 除了构造方法里的几个静态参数,线程池 ThreadPoolExecutor 还可以查看关于线程和任务数的动态数字有哪些

  • public int getPoolSize():返回当前线程个数。
  • public int getLargestPoolSize():返回线程池曾经达到过的最大线程个数。
  • public long getCompletedTaskCount():返回线程池自创建以来所有已完成的任务数。
  • public long getTaskCount():返回所有任务数,包括所有已完成的加上所有排队待执行的。

7. 可以用作线程池 ThreadPoolExecutor 的队列有哪些

  • 从构造方法中可以看出,线程池 TheadPoolExecutor 要求的队列类型是阻塞队列 BlockingQueue,可以有:

    • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的
    • ArrayBlockingQueue:基于数组有界阻塞队列。
    • PriorityBlockingQueue:基于无界阻塞优先级队列。
    • SynchronousQueue没有实际存储空间的同步阻塞队列。
  • 如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize。到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。

  • 对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功。否则,总是会创建新线程来接受任务,直到达到 maximumPoolSize

8. 如果队列有界,且 maximumPoolSize 有限,则当队列排满任务,线程个数也达到了 maximumPoolSize,这时,新任务来了,如何处理呢

 此时,会触发线程池的任务拒绝策略

9. 线程池 ThreadPoolExecutor 的任务拒绝策略是怎样的

  • 默认情况下,提交任务的方法,如 execute()/submit()/invokeAll() 等会抛出 RejectedExecutionException 异常。

  • 不过,拒绝策略是可以自定义的ThreadPoolExecutor 实现了 4 种方式:

    • ThreadPoolExecutor.AbortPolicy默认的方式,抛出异常。
    • ThreadPoolExecutor.DiscardPolicy静默处理,忽略新任务,不抛出异常,也不执行。
    • ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
    • ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。
  • 上面 4 种都是 ThreadPoolExecutorpublic 静态内部类,都实现了 RejectedExecutionHandler 接口。

    • 这个接口的定义为:public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
    • 当线程池不能接受任务时,调用其拒绝策略的 rejectedExecution() 方法。拒绝策略可以在构造方法中进行指定,也可以通过对应的 set() 方法进行指定。
  • 查看源码,默认的 RejectedExecutionHandler 是一个 AbortPolicy 实例,而 AbortPolicyrejectedExecution() 实现就是抛出异常。

  • 需要强调的是,拒绝策略只有在队列有界,且 maximumPoolSize 有限的情况下才会触发。

    • 如果队列无界,服务不了的任务总是会排队,但这不一定是期望的结果,因为请求处理队列可能会消耗非常大的内存,甚至引发内存不够的异常。
    • 如果队列有界但 maximumPoolSize 无限,可能会创建过多的线程,占满 CPU 和内存,使得任何任务都难以完成。
    • 所以,在任务量非常大的场景中,让拒绝策略有机会执行是保证系统稳定运行很重要的方面。

10. 怎样理解线程工厂 ThreadFactory

  • 线程工厂 ThreadFactory 是一个接口,定义为:public interface ThreadFactory { Thread newThread(Runnable r); }
  • 这个接口根据 Runnable 创建一个 ThreadThreadPoolExecutor默认实现Executors 类中的静态内部类 DefaultThreadFactory
    • 主要就是创建一个线程,给线程设置一个名称,设置 daemon 属性为 false,设置线程优先级为标准默认优先级 5。
    • 线程名称的格式为:pool-<线程池编号>-thread-<线程编号>
    • 如果需要自定义一些线程的属性,比如名称,可以实现自定义ThreadFactory

11. 核心线程是什么意思,核心线程有哪些配置

  • 当线程个数小于等于 corePoolSize 时,这些线程被称为核心线程

  • 默认情况下:

    • 核心线程不会预先创建,只有当有任务时才会创建
    • 核心线程不会因为空闲而被终止,keepAliveTime 参数不适用于它
  • 不过,ThreadPoolExecutor 有如下方法,可以改变这个默认行为

    • public int prestartAllCoreThreads()预先创建所有的核心线程。
    • public boolean prestartCoreThread()创建一个核心线程,如果所有核心线程都已创建,则返回 false
    • public void allowCoreThreadTimeout(boolean value):如果参数为 true,则参数 keepAliveTime 也适用于核心线程。

12. 类 Executors 的含义是

  • Executors 是一个工厂类,提供了一些静态工厂方法
  • 可以方便地创建一些预配置的线程池

13. 工厂类 Executors 的主要方法有哪些,含义分别是

  • public static ExecutorService newSingleThreadPool() { return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

    • 只使用一个线程,使用无界队列 LinkedBlockingQueue
    • 线程创建后不会超时终止,该线程顺序执行所有任务
    • 该线程池适用于需要确保所有任务被顺序执行的场合。
  • public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreadsn nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

    • 使用固定数目的 n 个线程,使用无界队列 LinkedBlockingQueue
    • 线程创建后不会超时终止
    • newSingleThreadExecutor 一样,由于是无界队列,如果排队任务过多,可能会消耗过多的内存。
  • public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

    • 新任务到来时,如果正好有空闲线程在等待任务,则其中一个空闲线程接受该任务。
    • 否则就总是创建一个新线程,创建的总线程个数不受限制
    • 对任一空闲线程,如果 60 秒内没有新任务,就终止。

14. 在实际开发中,应该使用 newFixedThreadPool() 还是 newCachedThreadPool() 呢

  • 线程的创建以及线程上下文的切换、线程的调度等是有成本的。

  • 在系统负载很高的情况下:

    • newFixedThreadPool() 可以通过队列对新任务排队,保证有足够的资源处理实际的任务;
    • newCachedThreadPool() 会为每个任务创建一个线程,导致创建过多的线程竞争 CPU 和内存资源,使得任何实际任务都难以完成。
    • 这时,newFixedThreadPool() 更为适用
  • 如果系统负载不太高

    • 单个任务的执行时间也比较短,newCachedThreadPool() 的效率可能更高,因为任务可以不经排队,直接交给某一个空闲线程。
  • 在系统负载可能极高的情况下,两者都不是好的选择

    • newFixedThreadPool() 的问题是*队列过长 *
    • newCachedThreadPool() 的问题是线程过多
    • 这时,应根据具体情况自定义 ThreadPoolExecutor,传递合适的参数。

15. 线程池有没有可能出现死锁,场景是

  • 当提交给线程池的任务,任务之间有依赖时,这种情况下可能出现死锁

    • 举个例子:比如任务 A,在它的执行过程中,它给同样的任务执行服务提交了一个任务 B,但需要等待任务 B 结束。
    • 如果任务 A 是提交给了一个单线程线程池,一定会出现死锁,A 在等待 B 的结果,而 B 在队列中等待被调度。
  • 如果是提交给了一个限定线程个数的线程池,也有可能因线程数限制出现死锁。

16. 怎样解决线程池可能出现的死锁

  • 可以使用 newCachedThreadPool 创建线程池,让线程数不受限制
  • 使用 SynchronousQueue:
    • newCachedThreadPool 本质上就是使用了 SynchronousQueue
    • 对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue入队成功就意味着已有线程接受处理。
    • 如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发任务拒绝策略机制,不管怎么样,都不会死锁。
-------------------- 本文结束感谢您的阅读 --------------------