1. 多线程之间需要协作的场景有哪些
- 生产者/消费者协作模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务。如果队列长度有限,在队列满的时候,生产者需要等待;在队列空的时候,消费者需要等待
- 同时开始:类似运动员比赛,在一些模拟仿真程序中,要求多个线程能同时开始
- 等待结束:可以理解为主从模式,主线程将任务分解为若干子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕
- 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦。一种常见的模式是将子线程的管理封装为异步调用。异步调用马上返回,但返回的不是最终的结果,而是一个一般称为
Future
的对象,通过它可以在随后获得最终的结果 - 集合点:在一些并行迭代计算程序中,每个线程负责一部分计算,然后在集合点等待其他线程完成。所有线程到齐后,交换数据和计算结果,再进行下一次迭代
2. Java 中多线程协作的基本机制是
wait()/notify()
- Java 在
Object
类而非Thread
类中定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类:一类是wait()
,另一类是notify()
wait()
方法在等待期间都可以被中断,如果被中断,会抛出InterruptedException
异常
3. wait()
方法实际上做了什么,它在等待什么
- 每个对象都有一把锁和一个等待队列,一个线程在进入
synchronized
代码块时,会尝试获取锁。如果获取不到就会把当前线程加入到等待队列中 - 除了用于锁的等待队列,每个对象还有另一个用于线程间的协作条件队列
- 调用
wait()
就会把当前线程放到条件队列上并阻塞,同时释放对象锁,表示当前线程执行不下去了,它需要等待一个条件。这个条件它自己改变不了,需要其他线程改变
4. notify()
实际上做了什么,notify()
和 notifyAll()
的区别是
notify()
做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,但不会释放锁。只有在包含notify()
的synchronized
代码块执行完后,等待的线程才会从wait()
调用中返回notifyAll()
会移除条件队列中所有的线程并全部唤醒wait()/notify()
方法只能在synchronized
代码块内被调用。如果调用wait()/notify()
方法时,当前线程没有持有对象锁,会抛出java.lang.IllegalMonitorStateException
异常
5. wait()
的具体过程是
- 把当前线程放入条件队列,释放对象锁,阻塞并等待,线程状态变为
WAITING
或TIMED-WAITING
- 等待时间到或被其他线程调用
notify()/notifyAll()
从条件队列中移除,这时,要重新竞争对象锁- 如果能够获得锁,线程状态变为
RUNNABLE
,并从wait()
调用中返回 - 否则,该线程加入对象锁等待队列,线程状态变为
BLOCKED
,只有在获得锁后才会从wait()
调用中返回
- 如果能够获得锁,线程状态变为
6. join()
方法的原理
join()
方法实际上就是调用了wait()
方法,主要代码是:while(isAlive()) {wait(0)}
- 只要线程是活着的,
isAlive()
返回true
,join()
就一直等待 - 当线程运行结束的时候,Java 系统调用
notify()
来通知
7. Java 提供的专门的阻塞队列实现有哪些
- 接口
BlockingQueue
和BlockingDeque
- 基于数组的实现类
ArrayBlockingQueue
- 基于链表的实现类
LinkedBlockingQueue
和LinkedBlockingDeque
- 基于堆的实现类
PriorityBlockingQueue
8 Java 中的框架 Executors
,相关的部分接口和类有
- 表示异步结果的接口
Future
和实现类FutureTask
- 用于执行异步任务的接口
Executor
,以及有更多功能的子接口ExecutorService
- 用于创建
Executor
和ExecutorService
的工厂方法类Executors
9. 写一个生产者/消费者模式 Demo
阻塞队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<> (limit); //ArrayDeque 的内部是一个动态扩展的逻辑上的循环数组
}
public synchronized void put(E e) throws InterruptedException { //
给生产者使用,往队列上放数据,满了就 wait
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll(); //通知所有的消费者
}
public synchronized E take() throws InterruptedException { //给消费者使用,从队列中取数据,如果为空就 wait
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll(); //通知所有的生产者
return e;
}
}put()
和take()
都调用了wait()
,但它们等待的条件是不一样的。put()
等待的是队列不为满,而take()
等待的是队列不为空,但它们都会加入相同的条件队列- 由于条件不同但又使用相同的条件队列,所以要调用
notifyAll()
而不能调用nofity()
,因为notify()
只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用了 - 只能有一个条件队列,这是 Java
wait()/notify()
机制的局限性。显示的锁和条件,可以解决该问题
一个简单的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
whiel(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch() {
}
}
}一个简单的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<Strign> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int) (Math.random() * 100));
}
} catch() {
}
}
}主程序
1
2
3
4
5public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<> (10);
new Producer(queue).start();
new Consumer(queue).start();
}
10. 写一个同时开始 Demo
类似于运动员比赛,思路:有一个主线程和 N 个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号
协作对象 FireFlag
1
2
3
4
5
6
7
8
9
10
11
12
13
14static class FireFlag {
private volatile boolean fired = false; //volatile: 相比 synchronized,更轻量级的保证内存可见性的一种方式
public synchronized void waitForFire() throws InterruptedException { //子线程应该调用 waitForFire() 等待枪响
while(!fired) {
wait();
}
}
public synchronized void fire() { //主线程应该调用 fire() 发射比赛开始信号
this.fire = fire;
notifyAll();
}
}表示比赛运动员的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run " + Thread.currentThread().getName());
} catch() {
}
}
}主程序代码
1
2
3
4
5
6
7
8
9
10
11public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i<num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}
11. 写一个等待结束 Demo
同步协作的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//Java 中有一个专门的同步类 CountDownLatch,在实际开发中应该使用它
public class MyLatch {
private int count;
public MyLatch(int count) {
this.count = count;
}
public synchronized void await() throws InterruptedException { //主线程调用
while(count > 0) {
wait();
}
}
public synchronized void countDown() { //子线程调用
count--;
if(count <= 0) {
notifyAll();
}
}
}使用
MyLatch
的工作子线程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
public void run() {
try {
// simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch(InterruptedException) {
}
}
}主线程
1
2
3
4
5
6
7
8
9
10
11
12
13public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i=0; i<workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
12. 使用上题中 MyLatch 实现同时开始 Demo
1 | public class RacerWithLatchDemo { |
13. 怎样理解异步结果/异步调用
- 在主从模式中,手工创建线程往往比较麻烦
- 一种常见的模式是异步调用,异步调用返回一个一般称为
Future
的对象,通过它可以获得最终的结果
14. 写一个异步结果/异步调用 Demo
准备代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//在 Java 中,表示子任务的接口是 Callable,声明为:
public interface Callable<V> {
V call() throws Exception;
}
//为表示异步调用的结果,定义一个接口 MyFuture,如下:
public interface MyFuture<V> {
//get() 方法返回真正的结果,如果结果还没有计算完成,get() 方法会阻塞直到计算完成
V get() throws Exception;
}
//为方便主线程调用子任务,定义一个类 MyExecutor,声明如下
//表示执行子任务并返回异步结果。利用该方法,对于主线程,就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果
public <V> MyFuture<V> execute(final Callable<V> task) {
}异步调用 Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
//子任务
Callable<Integer> subTask = new Callable<Integer> () {
@Override
public Integer call() throws Exception {
// ...执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
//异步调用,返回一个 MyFuture 对象
MyFuture<Integer> future = executor.execute(subTask); //execute() 方法封装了创建子线程、同步获取结果的过程,它会创建一个执行子线程
//执行其他操作
try {
//获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}执行子线程
ExecuteThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
public void run() {
try {
result = task.call(); //执行实际的子任务
} catch(Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll(); //唤醒可能在等待的主线程
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}异步执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public <V> MyFuture<V> execute(final Callable<V> task) { //execute() 方法启动一个线程,并返回 MyFuture 对象,MyFuture 的 get() 方法会阻塞等待直到线程运行结束
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<> (task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V> () {
public V get() throws Exception {
synchronized(lock) {
while(!thread.isDone()) {
try {
lock.wait();
} catch(InterruptedException e) {
}
}
if(thread.getException != null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}
15. 写一个集合点 Demo
协作对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//Java 中有一个专门的同步工具类 CyclicBarrier
public class AssemblePoint {
private int n;
public AssemblePoint(int n) {
this.n = n;
}
public synchronized void await() throws InterruptedException {
if(n > 0) {
n--;
if(n == 0) {
notifyAll();
} else {
while(n != 0) {
wait();
}
}
}
}
}集合点协作 Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
//模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
//集合
ap.await();
System.out.println("arrived");
//集合后执行其他操作
} catch(InterruptedException e) {
}
}
}
public static void main(String[] args) {
int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i=0; i<num; i++) {
thread[i] = new Tourist(ap);
threads[i].start();
}
}
}