1. 线程池的优点
- 重用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销
- 能有效控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致的阻塞现象
- 能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能
2. ThreadPoolExecutor 概述
Android 中的线程池的概念来源于 Java 中的 Executor。Executor 是一个接口,真正的线程池的实现类为 ThreadPoolExecutor,ThreadPoolExecutor 提供了一系列参数来配置线程池,通过不同的参数可以创建不同的线程池
ThreadPoolExecutor 常用构造方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120/**
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* The queue used for holding tasks and handling off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty() (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0) ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package java.util.concurrent;
import java.util.Objects;
/**
* bla bla bla
*/
public enum TimeUnit {
// bla bla bla
/**
* Equivalent to
* {@link #convert(long, TimeUnit) NANOSECONDS.convert(duration, this)}.
* @param duration the duration
* @return the converted duration,
* or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or {@code Long. MAX_VALUE} if it would positively overflow.
*/
public long toNanos(long duration) {
throw new AbstractMethodError();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this queue
*/
public interface BlockingQueue<E> extends Queue<E> {
// bla bla bla
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package java.util.concurrent;
/**
* An object that creates new threads on demand. Using thread factories
* removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread},
* enabling applications to use special thread subclasses, priorities, etc.
*
* <p>
* The simplest implementation of this interface is just:
* <pre> {@code
* class SimpleThreadFactory implements ThreadFactory {
* public Thread newThread(Runnable r) {
* return new Thread(r);
* }
* }}</pre>
*
* The {@link Executor#defaultThreadFactory} method provides a more
* useful simple implementation, that sets the created thread context
* to known values before rturning it.
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package java.util.concurrent;
/**
* A handler for tasks cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
* d
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
* d
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}构造方法中各个参数的含义
参数 含义 int corePoolSize
线程池的核心线程数。默认情况下,核心线程会在线程池中一直存活,即使它们处于闲置状态。如果将 ThreadPoolExecutor 的 allowCoreThreadTimeOut
属性设置为 true,那么闲置的核心线程在等待新任务到来时会有超时策略,这个时间间隔由 keepAliveTime 所指定,当等待时间超过 keepAliveTime 所指定的时长后,核心线程就会被终止int maximum
线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞 long keepAliveTime
非核心线程闲置时的超时时长。超过这个时长,非核心线程就会被回收。当 ThreadPoolExecutor 的 allowCoreThreadTimeOut
属性设置为 true 时,keepAliveTime 同样会作用于核心线程TimeUnit unit
用于指定 keepAliveTime 参数的时间单位。这个一个枚举,常用的有 TimeUnit.MILLISECONDS(毫秒)
、TimeUnit.SECONDS(秒)
以及TimeUnit.MINUTES(分钟)
等BlockingQueue<Runnable> workQueue
线程池中的任务队列。通过线程池的 execute()
方法提交的 Runnable 对象会存储在这个参数中ThreadFactory threadFactory
线程工厂,为线程池提供创建新线程的功能。ThreadFactory 是一个接口,只有一个方法: newThread(Runnable r)
RejectedExecutionHandler handler
当线程池无法执行新任务时会触发任务拒绝策略,这可能是由于任务队列已满或者是无法成功执行任务,此时 ThreadPoolExecutor 会调用 handler 的 rejectedExecution()
方法来通知调用者,默认情况下,rejectedExecution()
方法会直接抛出一个 RejectedExecutionException。ThreadPoolExecutor 为 RejectedExecutionHandler 提供了几个可选值:CallRunsPolicy、AbortPolicy、DiscardPolicy 和 DiscardOldestPolicy,其中 AbortPolicy 是默认值,它会直接抛出 RejectedExecutionExceptionThreadPoolExecutor 执行任务时大致遵循如下规则:
- 如果线程池中的线程数量未达到核心线程的数量,那么会直接启动一个核心线程来执行任务
- 如果线程池中的线程数量已经达到或者超过核心线程的数量,那么任务会被插入到任务队列中排队等待执行
- 如果在步骤 2 中无法将任务插入到任务队列中,这往往是由于任务队列已满,此时如果线程数量未达到线程池规定的最大值,那么会立刻启动一个非核心线程来执行任务
- 如果步骤 3 中线程数量已经达到线程池规定的最大值,那么就拒绝执行此任务,ThreadPoolExecutor 会调用 RejectedExecutionHandler 的
rejectedExecution()
方法来通知调用者
Demo: ThreadPoolExecutor 的参数配置在 AsyncTask 中有明显的体现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72/**
* bla bla bla
*/
public abstract class AsyncTask<Params, Progress, Result> {
// bla bla bla
// We keep only a single pool thread around all the time.
// We let the pool grow to a fairly large number of threads if necessary,
// but let them time out quickly. In the unlikely case that we run out of threads,
// we fall back to a simple unbounded-queue executor.
// This combination ensure that:
// 1. We normally keep few threads (1) around.
// 2. We queue only after launching a significantly larger, but still bounded, set of threads.
// 3. We keep the total number of threads bounded, but still allow an unbounded set
// of tasks to be queued.
private static final int CORE_POOL_SIZE = 1;
private static final int MAXIMUM_POOL_SIZE = 20;
private static final int BACKUP_POOL_SIZE = 5;
private static final int KEEP_ALIVE_SECOND = 3;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
// Used only for rejected executions.
// Initialization protected by sRunOnSerialPolicy lock.
private static ThreadPoolExecutor sBackupExecutor;
private static LinkedBlockingQueue<Runnable> sBackupExecutorQueue;
private static final RejectedExecutionHandler sRunOnSerialPolicy =
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
// As a last ditch fallback, run it on an executor with an unbounded queue.
// Create this executor lazily, hopefully almost never.
synchronized (this) {
if (sBackupExecutor == null) {
sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
sBackupExecutor = new ThreadPoolExecutor(
BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
sBackupExecutor.allowCoreThreadTimeOut(true);
}
}
sBackupExecutor.execute(r);
}
};
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*
* @deprecated Using a single thread pool for a general purpose results in suboptimal behavior
* for different tasks. Small, CPU-bound tasks benefit from a bounded pool and queueing, and
* long-running blocking tasks, such as network operations, benefit from many threads. Use or
* create an {@link Executor} configured for your use case.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
3. 线程池的分类
3.1 FixedThreadPool
通过 Executors 的
newFixedThreadPool()
方法来创建它是一种线程数量固定的线程池,当线程处于空闲状态时,它们并不会被回收(核心线程没有超时机制),除非线程池被关闭了
当所有的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。另外,任务队列没有大小限制
由于 FixedThreadPool 只有核心线程并且这些核心线程不会被回收,这意味着它能更加快速地响应外界的请求
newFixedThreadPool()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91package java.util.concurrent;
import dalvik.annotation.optimization.ReachabilitySensitive;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import sun.security.util.SecurityConstants;
// BEGIN anroid-note
// removed security manager docs
// END android-note
/**
* Factory and utility methods for {@link Executor}, {@link
* ExecutorService}, {@link ScheduledExecutorService}, {@link
* ThreadFactory}, and {@link Callable} classes defined in this
* package. This class supports the following kinds of methods:
*
* <ul>
* <li>Methods that create and return an {@link ExecutorService}
* set up with commonly useful configuration settings.
* <li>Methods that create and return a {@link ScheduledExecutorService}
* set up with commonly useful configuration settings.
* <li>Methods that create and return a "wrapped" ExecutorService, that
* disables reconfiguration by making implementation-specific methods
* inaccessible.
* <li>Methods that create and return a {@link ThreadFactory}
* that sets newly created threads to a known state.
* <li>Methods that create and return a {@link Callable}
* out of other closure-like forms, so they can be used
* in execution methods requiring {@code Callable}.
* </ul>
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThread <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* avaliable. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutionService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
}
3.2 CachedThreadPool
通过 Executors 的
newCachedThreadPool()
方法来创建它是一种线程数量不定的线程池,它只有非核心线程,并且其最大线程数为 Integer.MAX_VALUE。由于
Integer.MAX_VALUE = 0x7fffffff(2 的 31 次方 - 1)
是一个很大的数,实际上就相当于最大线程数可以任意大当线程池中的线程都处于活动状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。线程池中的空闲线程都有超时机制,这个超时时长为 60 秒,超过 60 秒空闲线程就会被回收
CachedThreadPool 的任务队列其实相当于一个空集合,这将导致任何任务都会立即被执行,因为这种场景下 SynchronousQueue 是无法插入任务的。SynchronousQueue 是一个非常特殊的队列,在很多情况下可以把它简单理解为一个无法存储元素的队列,实际中使用较少
CachedThreadPool 线程池比较适合执行大量的耗时较少的异步任务。当整个线程池都处于闲置状态时,线程池中的线程都会超时而被停止,此时 CachedThreadPool 之中实际上是没有任何线程的,它几乎是不占用任何系统资源的
newCachedThreadPool()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
// bla bla bla
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* abailable. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Call to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integet.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
}
3.3 ScheduledThreadPool
通过 Executors 的
newScheduledThreadPool()
方法来创建它的核心线程数量是固定的,而非核心线程数是没有限制的(Integer.MAX_VALUE),并且当非核心线程闲置时会被立即回收
ScheduledThreadPool 这类线程池主要用于执行定时任务和具有固定周期的重复任务
newScheduledThreadPool()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
// bla bla bla
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSzie the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentExcepion if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
* d
* @since 1.5
* @author Doug Lea
*/
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
// bla bla bla
/**
* The default keep-alive time for pool threads.
*
* Normally, this value is unused because all pool threads will be
* core threads, but if a user creates a pool with a corePoolSize
* of zero (against our advice), we keep a thread alive as long as
* there are queued tasks. If the keep alive time is zero (the
* historic value), we end up hot-spinning in getTask, wasting a
* CPU. But on the other hand, if we set the value too high, and
* users create a one-shot pool which they don't cleanly shutdown,
* the pool's non-daemon threads will prevent JVM termination. A
* small but non-zero value (relative to a JVM's lifetime) seems
* best.
*/
private static final long DEFAULT_KEEPALIVE_MILLIS = 10l;
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
* d
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Interger.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
/**
* Specialized delay queue. To mesh with TPE declarations, this
* class must be declared as a BlockingQueue<Runnable> even though
* it can only hold RunnableScheduledFutures.
*/
static class DelayWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// bla bla bla
}
}
3.4 SingleThreadExecutor
通过 Executors 的
newSingleThreadExecutor()
方法来创建这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行
SingleThreadExecutor 的意义在于统一所有的外界任务到一个线程中,这使得在这些任务之间不需要处理线程同步的问题
newSingleThreadExecutor()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
// bla bla bla
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
}
3.5 WorkStealingPool
1.8 版本新增
newWorkStealingPool()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
// bla bla bla
/**
* Creates a work-stealing thread pool using the number of
* {@linkplain Runtime#availableProcessors available processors}
* as its target parallelism level.
*
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForJoinPool
(Runtime.getRuntime().availableProcessor(),
ForJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queue to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
}
3.6 SingleThreadScheduledExecutor
1.8 版本新增
newSingleThreadScheduledExecutor()
方法的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53package java.util.concurrent;
// import bla bla bla
/**
* bla bla bla
*
* @since 1.5
* @author Doug Lea
*/
public class Executors {
// bla bla bla
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPll(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
}
4. 线程池典型用法及参考
典型用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Runnable command = new Runnable() {
@Override
public void run() {
SystemClock.sleep(2000);
}
};
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
fixedThreadPool.execute(command);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(command);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
// 2000 ms 后执行 command
scheduledThreadPool.schedule(command, 2000, TimeUnit.MILLISECONDS);
// 延迟 10 ms 后,每隔 1000 ms 执行一次 command
scheduledThreadPool.scheduleAtFixedRate(command, 10, 1000, TimeUnit.MILLISECONDS);
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleTheadExecutor.execute(command);