0%

Android 的线程和线程池(三):Android 中的线程池

1. 线程池的优点

  • 重用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销
  • 能有效控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致的阻塞现象
  • 能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能

2. ThreadPoolExecutor 概述

  • Android 中的线程池的概念来源于 Java 中的 Executor。Executor 是一个接口,真正的线程池的实现类为 ThreadPoolExecutor,ThreadPoolExecutor 提供了一系列参数来配置线程池,通过不同的参数可以创建不同的线程池

  • ThreadPoolExecutor 常用构造方法源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    /**
    * All user control parameters are declared as volatiles so that
    * ongoing actions are based on freshest values, but without need
    * for locking, since no internal invariants depend on them
    * changing synchronously with respect to other actions.
    */

    /**
    * Core pool size is the minimum number of workers to keep alive
    * (and not allow to time out etc) unless allowCoreThreadTimeOut
    * is set, in which case the minimum is zero.
    */
    private volatile int corePoolSize;

    /**
    * Maximum pool size. Note that the actual maximum is internally
    * bounded by CAPACITY.
    */
    private volatile int maximumPoolSize;

    /**
    * The queue used for holding tasks and handling off to worker
    * threads. We do not require that workQueue.poll() returning
    * null necessarily means that workQueue.isEmpty(), so rely
    * solely on isEmpty to see if the queue is empty() (which we must
    * do for example when deciding whether to transition from
    * SHUTDOWN to TIDYING). This accommodates special-purpose
    * queues such as DelayQueues for which poll() is allowed to
    * return null even if it may later return non-null when delays
    * expire.
    */
    private final BlockingQueue<Runnable> workQueue;

    /**
    * Timeout in nanoseconds for idle threads waiting for work.
    * Threads use this timeout when there are more than corePoolSize
    * present or if allowCoreThreadTimeOut. Otherwise they wait
    * forever for new work.
    */
    private volatile long keepAliveTime;

    /**
    * If false (default), core threads stay alive even when idle.
    * If true, core threads use keepAliveTime to time out waiting
    * for work.
    */
    private volatile boolean allowCoreThreadTimeOut;

    /**
    * Factory for new threads. All threads are created using this
    * factory (via method addWorker). All callers must be prepared
    * for addWorker to fail, which may reflect a system or user's
    * policy limiting the number of threads. Even though it is not
    * treated as an error, failure to create threads may result in
    * new tasks being rejected or existing ones remaining stuck in
    * the queue.
    *
    * We go further and preserve pool invariants even in the face of
    * errors such as OutOfMemoryError, that might be thrown while
    * trying to create threads. Such errors are rather common due to
    * the need to allocate a native stack in Thread.start, and users
    * will want to perform clean pool shutdown to clean up. There
    * will likely be enough memory available for the cleanup code to
    * complete without encountering yet another OutOfMemoryError.
    */
    private volatile ThreadFactory threadFactory;

    /**
    * Handler called when saturated or shutdown in execute.
    */
    private volatile RejectedExecutionHandler handler;

    /**
    * Creates a new {@code ThreadPoolExecutor} with the given initial
    * parameters.
    *
    * @param corePoolSize the number of threads to keep in the pool, even
    * if they are idle, unless {@code allowCoreThreadTimeOut} is set
    * @param maximumPoolSize the maximum number of threads to allow in the
    * pool
    * @param keepAliveTime when the number of threads is greater than
    * the core, this is the maximum time that excess idle threads
    * will wait for new tasks before terminating.
    * @param unit the time unit for the {@code keepAliveTime} argument
    * @param workQueue the queue to use for holding tasks before they are
    * executed. This queue will hold only the {@code Runnable}
    * tasks submitted by the {@code execute} method.
    * @param threadFactory the factory to use when the executor
    * creates a new thread
    * @param handler the handler to use when execution is blocked
    * because the thread bounds and queue capacities are reached
    * @throws IllegalArgumentException if one of the following holds:<br>
    * {@code corePoolSize < 0}<br>
    * {@code keepAliveTime < 0}<br>
    * {@code maximumPoolSize <= 0}<br>
    * {@code maximumPoolSize < corePoolSize}
    * @throws NullPointerException if {@code workQueue}
    * or {@code threadFactory} or {@code handler} is null
    */
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0) ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package java.util.concurrent;

    import java.util.Objects;

    /**
    * bla bla bla
    */
    public enum TimeUnit {
    // bla bla bla

    /**
    * Equivalent to
    * {@link #convert(long, TimeUnit) NANOSECONDS.convert(duration, this)}.
    * @param duration the duration
    * @return the converted duration,
    * or {@code Long.MIN_VALUE} if conversion would negatively
    * overflow, or {@code Long. MAX_VALUE} if it would positively overflow.
    */
    public long toNanos(long duration) {
    throw new AbstractMethodError();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    package java.util.concurrent;

    import java.util.Collection;
    import java.util.Queue;

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    * @param <E> the type of elements held in this queue
    */
    public interface BlockingQueue<E> extends Queue<E> {
    // bla bla bla
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package java.util.concurrent;

    /**
    * An object that creates new threads on demand. Using thread factories
    * removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread},
    * enabling applications to use special thread subclasses, priorities, etc.
    *
    * <p>
    * The simplest implementation of this interface is just
    * <pre> {@code
    * class SimpleThreadFactory implements ThreadFactory {
    * public Thread newThread(Runnable r) {
    * return new Thread(r);
    * }
    * }}</pre>
    *
    * The {@link Executor#defaultThreadFactory} method provides a more
    * useful simple implementation, that sets the created thread context
    * to known values before rturning it.
    * @since 1.5
    * @author Doug Lea
    */
    public interface ThreadFactory {

    /**
    * Constructs a new {@code Thread}. Implementations may also initialize
    * priority, name, daemon status, {@code ThreadGroup}, etc.
    *
    * @param r a runnable to be executed by new thread instance
    * @return constructed thread, or {@code null} if the request to
    * create a thread is rejected
    */
    Thread newThread(Runnable r);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package java.util.concurrent;

    /**
    * A handler for tasks cannot be executed by a {@link ThreadPoolExecutor}.
    *
    * @since 1.5
    * @author Doug Lea
    */
    public interface RejectedExecutionHandler {

    /**
    * Method that may be invoked by a {@link ThreadPoolExecutor} when
    * {@link ThreadPoolExecutor#execute execute} cannot accept a
    * task. This may occur when no more threads or queue slots are
    * available because their bounds would be exceeded, or upon
    * shutdown of the Executor.
    * d
    * <p>In the absence of other alternatives, the method may throw
    * an unchecked {@link RejectedExecutionException}, which will be
    * propagated to the caller of {@code execute}.
    * d
    * @param r the runnable task requested to be executed
    * @param executor the executor attempting to execute this task
    * @throws RejectedExecutionException if there is no remedy
    */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
  • 构造方法中各个参数的含义

    参数 含义
    int corePoolSize 线程池的核心线程数。默认情况下,核心线程会在线程池中一直存活,即使它们处于闲置状态。如果将 ThreadPoolExecutor 的 allowCoreThreadTimeOut 属性设置为 true,那么闲置的核心线程在等待新任务到来时会有超时策略,这个时间间隔由 keepAliveTime 所指定,当等待时间超过 keepAliveTime 所指定的时长后,核心线程就会被终止
    int maximum 线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞
    long keepAliveTime 非核心线程闲置时的超时时长。超过这个时长,非核心线程就会被回收。当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 属性设置为 true 时,keepAliveTime 同样会作用于核心线程
    TimeUnit unit 用于指定 keepAliveTime 参数的时间单位。这个一个枚举,常用的有 TimeUnit.MILLISECONDS(毫秒)TimeUnit.SECONDS(秒)以及 TimeUnit.MINUTES(分钟)
    BlockingQueue<Runnable> workQueue 线程池中的任务队列。通过线程池的 execute() 方法提交的 Runnable 对象会存储在这个参数中
    ThreadFactory threadFactory 线程工厂,为线程池提供创建新线程的功能。ThreadFactory 是一个接口,只有一个方法:newThread(Runnable r)
    RejectedExecutionHandler handler 当线程池无法执行新任务时会触发任务拒绝策略,这可能是由于任务队列已满或者是无法成功执行任务,此时 ThreadPoolExecutor 会调用 handler 的 rejectedExecution() 方法来通知调用者,默认情况下,rejectedExecution() 方法会直接抛出一个 RejectedExecutionException。ThreadPoolExecutor 为 RejectedExecutionHandler 提供了几个可选值:CallRunsPolicy、AbortPolicy、DiscardPolicy 和 DiscardOldestPolicy,其中 AbortPolicy 是默认值,它会直接抛出 RejectedExecutionException
  • ThreadPoolExecutor 执行任务时大致遵循如下规则

    1. 如果线程池中的线程数量未达到核心线程的数量,那么会直接启动一个核心线程来执行任务
    2. 如果线程池中的线程数量已经达到或者超过核心线程的数量,那么任务会被插入到任务队列中排队等待执行
    3. 如果在步骤 2 中无法将任务插入到任务队列中,这往往是由于任务队列已满,此时如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务
    4. 如果步骤 3 中线程数量已经达到线程池规定的最大值,那么就拒绝执行此任务,ThreadPoolExecutor 会调用 RejectedExecutionHandler 的 rejectedExecution() 方法来通知调用者
  • Demo: ThreadPoolExecutor 的参数配置在 AsyncTask 中有明显的体现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    /**
    * bla bla bla
    */
    @Deprecated
    public abstract class AsyncTask<Params, Progress, Result> {
    // bla bla bla

    // We keep only a single pool thread around all the time.
    // We let the pool grow to a fairly large number of threads if necessary,
    // but let them time out quickly. In the unlikely case that we run out of threads,
    // we fall back to a simple unbounded-queue executor.
    // This combination ensure that:
    // 1. We normally keep few threads (1) around.
    // 2. We queue only after launching a significantly larger, but still bounded, set of threads.
    // 3. We keep the total number of threads bounded, but still allow an unbounded set
    // of tasks to be queued.
    private static final int CORE_POOL_SIZE = 1;
    private static final int MAXIMUM_POOL_SIZE = 20;
    private static final int BACKUP_POOL_SIZE = 5;
    private static final int KEEP_ALIVE_SECOND = 3;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
    return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
    };

    // Used only for rejected executions.
    // Initialization protected by sRunOnSerialPolicy lock.
    private static ThreadPoolExecutor sBackupExecutor;
    private static LinkedBlockingQueue<Runnable> sBackupExecutorQueue;

    private static final RejectedExecutionHandler sRunOnSerialPolicy =
    new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
    // As a last ditch fallback, run it on an executor with an unbounded queue.
    // Create this executor lazily, hopefully almost never.
    synchronized (this) {
    if (sBackupExecutor == null) {
    sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
    sBackupExecutor = new ThreadPoolExecutor(
    BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
    TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
    sBackupExecutor.allowCoreThreadTimeOut(true);
    }
    }
    sBackupExecutor.execute(r);
    }
    };

    /**
    * An {@link Executor} that can be used to execute tasks in parallel.
    *
    * @deprecated Using a single thread pool for a general purpose results in suboptimal behavior
    * for different tasks. Small, CPU-bound tasks benefit from a bounded pool and queueing, and
    * long-running blocking tasks, such as network operations, benefit from many threads. Use or
    * create an {@link Executor} configured for your use case.
    */
    @Deprecated
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (
    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(), sThreadFactory);
    threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    }

3. 线程池的分类

3.1 FixedThreadPool
  • 通过 ExecutorsnewFixedThreadPool() 方法来创建

  • 它是一种线程数量固定的线程池,当线程处于空闲状态时,它们并不会被回收(核心线程没有超时机制),除非线程池被关闭了

  • 当所有的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。另外,任务队列没有大小限制

  • 由于 FixedThreadPool 只有核心线程并且这些核心线程不会被回收,这意味着它能更加快速地响应外界的请求

  • newFixedThreadPool() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    package java.util.concurrent;

    import dalvik.annotation.optimization.ReachabilitySensitive;
    import java.security.AccessControlContext;
    import java.security.AccessControlException;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.security.PrivilegedActionException;
    import java.security.PrivilegedExceptionAction;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    import sun.security.util.SecurityConstants;

    // BEGIN anroid-note
    // removed security manager docs
    // END android-note

    /**
    * Factory and utility methods for {@link Executor}, {@link
    * ExecutorService}, {@link ScheduledExecutorService}, {@link
    * ThreadFactory}, and {@link Callable} classes defined in this
    * package. This class supports the following kinds of methods:
    *
    * <ul>
    * <li>Methods that create and return an {@link ExecutorService}
    * set up with commonly useful configuration settings.
    * <li>Methods that create and return a {@link ScheduledExecutorService}
    * set up with commonly useful configuration settings.
    * <li>Methods that create and return a "wrapped" ExecutorService, that
    * disables reconfiguration by making implementation-specific methods
    * inaccessible.
    * <li>Methods that create and return a {@link ThreadFactory}
    * that sets newly created threads to a known state.
    * <li>Methods that create and return a {@link Callable}
    * out of other closure-like forms, so they can be used
    * in execution methods requiring {@code Callable}.
    * </ul>
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {

    /**
    * Creates a thread pool that reuses a fixed number of threads
    * operating off a shared unbounded queue. At any point, at most
    * {@code nThreads} threads will be active processing tasks.
    * If additional tasks are submitted when all threads are active,
    * they will wait in the queue until a thread is available.
    * If any thread terminates due to a failure during execution
    * prior to shutdown, a new one will take its place if needed to
    * execute subsequent tasks. The threads in the pool will exist
    * until it is explicitly {@link ExecutorService#shutdown shutdown}.
    *
    * @param nThreads the number of threads in the pool
    * @return the newly created thread pool
    * @throws IllegalArgumentException if {@code nThread <= 0}
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    /**
    * Creates a thread pool that reuses a fixed number of threads
    * operating off a shared unbounded queue, using the provided
    * ThreadFactory to create new threads when needed. At any point,
    * at most {@code nThreads} threads will be active processing
    * tasks. If additional tasks are submitted when all threads are
    * active, they will wait in the queue until a thread is
    * avaliable. If any thread terminates due to a failure during
    * execution prior to shutdown, a new one will take its place if
    * needed to execute subsequent tasks. The threads in the pool will
    * exist until it is explicitly {@link ExecutorService#shutdown
    * shutdown}.
    *
    * @param nThreads the number of threads in the pool
    * @param threadFactory the factory to use when creating new threads
    * @return the newly created thread pool
    * @throws NullPointerException if threadFactory is null
    * @throws IllegalArgumentException if {@code nThreads <= 0}
    */
    public static ExecutionService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
    }
3.2 CachedThreadPool
  • 通过 ExecutorsnewCachedThreadPool() 方法来创建

  • 它是一种线程数量不定的线程池,它只有非核心线程,并且其最大线程数为 Integer.MAX_VALUE。由于 Integer.MAX_VALUE = 0x7fffffff(2 的 31 次方 - 1) 是一个很大的数,实际上就相当于最大线程数可以任意大

  • 当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。线程池中的空闲线程都有超时机制,这个超时时长为 60 秒,超过 60 秒空闲线程就会被回收

  • CachedThreadPool 的任务队列其实相当于一个空集合,这将导致任何任务都会立即被执行,因为这种场景下 SynchronousQueue 是无法插入任务的。SynchronousQueue 是一个非常特殊的队列,在很多情况下可以把它简单理解为一个无法存储元素的队列,实际中使用较少

  • CachedThreadPool 线程池比较适合执行大量的耗时较少的异步任务。当整个线程池都处于闲置状态时,线程池中的线程都会超时而被停止,此时 CachedThreadPool 之中实际上是没有任何线程的,它几乎是不占用任何系统资源

  • newCachedThreadPool() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {
    // bla bla bla

    /**
    * Creates a thread pool that creates new threads as needed, but
    * will reuse previously constructed threads when they are
    * abailable. These pools will typically improve the performance
    * of programs that execute many short-lived asynchronous tasks.
    * Call to {@code execute} will reuse previously constructed
    * threads if available. If no existing thread is available, a new
    * thread will be created and added to the pool. Threads that have
    * not been used for sixty seconds are terminated and removed from
    * the cache. Thus, a pool that remains idle for long enough will
    * not consume any resources. Note that pools with similar
    * properties but different details (for example, timeout parameters)
    * may be created using {@link ThreadPoolExecutor} constructors.
    *
    * @return the newly created thread pool
    */
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integet.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    /**
    * Creates a thread pool that creates new threads as needed, but
    * will reuse previously constructed threads when they are
    * available, and uses the provided
    * ThreadFactory to create new threads when needed.
    * @param threadFactory the factory to use when creating new threads
    * @return the newly created thread pool
    * @throws NullPointerException if threadFactory is null
    */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    threadFactory);
    }
    }
3.3 ScheduledThreadPool
  • 通过 ExecutorsnewScheduledThreadPool() 方法来创建

  • 它的核心线程数量是固定的,而非核心线程数是没有限制的(Integer.MAX_VALUE),并且当非核心线程闲置时会被立即回收

  • ScheduledThreadPool 这类线程池主要用于执行定时任务和具有固定周期的重复任务

  • newScheduledThreadPool() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {
    // bla bla bla

    /**
    * Creates a thread pool that can schedule commands to run after a
    * given delay, or to execute periodically.
    * @param corePoolSzie the number of threads to keep in the pool,
    * even if they are idle
    * @return a newly created scheduled thread pool
    * @throws IllegalArgumentExcepion if {@code corePoolSize < 0}
    */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
    * Creates a thread pool that can schedule commands to run after a
    * given delay, or to execute periodically.
    * @param corePoolSize the number of threads to keep in the pool,
    * even if they are idle
    * @param threadFactory the factory to use when the executor
    * creates a new thread
    * @return a newly created scheduled thread pool
    * @throws IllegalArgumentException if {@code corePoolSize < 0}
    * @throws NullPointerException if threadFactory is null
    */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    * d
    * @since 1.5
    * @author Doug Lea
    */
    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    // bla bla bla

    /**
    * The default keep-alive time for pool threads.
    *
    * Normally, this value is unused because all pool threads will be
    * core threads, but if a user creates a pool with a corePoolSize
    * of zero (against our advice), we keep a thread alive as long as
    * there are queued tasks. If the keep alive time is zero (the
    * historic value), we end up hot-spinning in getTask, wasting a
    * CPU. But on the other hand, if we set the value too high, and
    * users create a one-shot pool which they don't cleanly shutdown,
    * the pool's non-daemon threads will prevent JVM termination. A
    * small but non-zero value (relative to a JVM's lifetime) seems
    * best.
    */
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10l;

    /**
    * Creates a new {@code ScheduledThreadPoolExecutor} with the
    * given initial parameters.
    * d
    * @param corePoolSize the number of threads to keep in the pool, even
    * if they are idle, unless {@code allowCoreThreadTimeOut} is set
    * @param threadFactory the factory to use when the executor
    * creates a new thread
    * @throws IllegalArgumentException if {@code corePoolSize < 0}
    * @throws NullPointerException if {@code threadFactory} is null
    */
    public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Interger.MAX_VALUE,
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
    new DelayedWorkQueue(), threadFactory);
    }

    /**
    * Specialized delay queue. To mesh with TPE declarations, this
    * class must be declared as a BlockingQueue<Runnable> even though
    * it can only hold RunnableScheduledFutures.
    */
    static class DelayWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
    // bla bla bla
    }
    }
3.4 SingleThreadExecutor
  • 通过 ExecutorsnewSingleThreadExecutor() 方法来创建

  • 这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行

  • SingleThreadExecutor 的意义在于统一所有的外界任务到一个线程中,这使得在这些任务之间不需要处理线程同步的问题

  • newSingleThreadExecutor() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {
    // bla bla bla

    /**
    * Creates an Executor that uses a single worker thread operating
    * off an unbounded queue. (Note however that if this single
    * thread terminates due to a failure during execution prior to
    * shutdown, a new one will take its place if needed to execute
    * subsequent tasks.) Tasks are guaranteed to execute
    * sequentially, and no more than one task will be active at any
    * given time. Unlike the otherwise equivalent
    * {@code newFixedThreadPool(1)} the returned executor is
    * guaranteed not to be reconfigurable to use additional threads.
    *
    * @return the newly created single-threaded Executor
    */
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    /**
    * Creates an Executor that uses a single worker thread operating
    * off an unbounded queue, and uses the provided ThreadFactory to
    * create a new thread when needed. Unlike the otherwise
    * equivalent {@code newFixedThreadPool(1, threadFactory)} the
    * returned executor is guaranteed not to be reconfigurable to use
    * additional threads.
    *
    * @param threadFactory the factory to use when creating new
    * threads
    *
    * @return the newly created single-threaded Executor
    * @throws NullPointerException if threadFactory is null
    */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }
    }
3.5 WorkStealingPool
  • 1.8 版本新增

  • newWorkStealingPool() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {
    // bla bla bla

    /**
    * Creates a work-stealing thread pool using the number of
    * {@linkplain Runtime#availableProcessors available processors}
    * as its target parallelism level.
    *
    * @return the newly created thread pool
    * @see #newWorkStealingPool(int)
    * @since 1.8
    */
    public static ExecutorService newWorkStealingPool() {
    return new ForJoinPool
    (Runtime.getRuntime().availableProcessor(),
    ForJoinPool.defaultForkJoinWorkerThreadFactory,
    null, true);
    }

    /**
    * Creates a thread pool that maintains enough threads to support
    * the given parallelism level, and may use multiple queue to
    * reduce contention. The parallelism level corresponds to the
    * maximum number of threads actively engaged in, or available to
    * engage in, task processing. The actual number of threads may
    * grow and shrink dynamically. A work-stealing pool makes no
    * guarantees about the order in which submitted tasks are
    * executed.
    *
    * @param parallelism the targeted parallelism level
    * @return the newly created thread pool
    * @throws IllegalArgumentException if {@code parallelism <= 0}
    * @since 1.8
    */
    public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
    (parallelism,
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null, true);
    }
    }
3.6 SingleThreadScheduledExecutor
  • 1.8 版本新增

  • newSingleThreadScheduledExecutor() 方法的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package java.util.concurrent;

    // import bla bla bla

    /**
    * bla bla bla
    *
    * @since 1.5
    * @author Doug Lea
    */
    public class Executors {
    // bla bla bla

    /**
    * Creates a single-threaded executor that can schedule commands
    * to run after a given delay, or to execute periodically.
    * (Note however that if this single
    * thread terminates due to a failure during execution prior to
    * shutdown, a new one will take its place if needed to execute
    * subsequent tasks.) Tasks are guaranteed to execute
    * sequentially, and no more than one task will be active at any
    * given time. Unlike the otherwise equivalent
    * {@code newScheduledThreadPll(1)} the returned executor is
    * guaranteed not to be reconfigurable to use additional threads.
    * @return the newly created scheduled executor
    */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1));
    }

    /**
    * Creates a single-threaded executor that can schedule commands
    * to run after a given delay, or to execute periodically. (Note
    * however that if this single thread terminates due to a failure
    * during execution prior to shutdown, a new one will take its
    * place if needed to execute subsequent tasks.) Tasks are
    * guaranteed to execute subsequent tasks.) Tasks are
    * guaranteed to execute sequentially, and no more than one task
    * will be active at any given time. Unlike the otherwise
    * equivalent {@code newScheduledThreadPool(1, threadFactory)}
    * the returned executor is guaranteed not to be reconfigurable to
    * use additional threads.
    * @param threadFactory the factory to use when creating new
    * threads
    * @return a newly created scheduled executor
    * @throws NullPointerException if threadFactory is null
    */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
    }

4. 线程池典型用法及参考

  • 典型用法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Runnable command = new Runnable() {
    @Override
    public void run() {
    SystemClock.sleep(2000);
    }
    };

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
    fixedThreadPool.execute(command);

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    cachedThreadPool.execute(command);

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
    // 2000 ms 后执行 command
    scheduledThreadPool.schedule(command, 2000, TimeUnit.MILLISECONDS);
    // 延迟 10 ms 后,每隔 1000 ms 执行一次 command
    scheduledThreadPool.scheduleAtFixedRate(command, 10, 1000, TimeUnit.MILLISECONDS);

    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    singleTheadExecutor.execute(command);
  • 参考:Java 线程池实现原理及其在美团业务中的实践

-------------------- 本文结束感谢您的阅读 --------------------