Java 并发基础知识(三):线程的基本协作机制

  1. 多线程之间需要协作的场景有哪些?
    答:

    • 生产者/消费者协作模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务。如果队列长度有限,在队列满的时候,生产者需要等待;在队列为空的时候,消费者需要等待。
    • 同时开始:类似运动员比赛,在一些模拟仿真程序中,要求多个线程能同时开始。
    • 等待结束:主从模式,主线程将任务分解为若干子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。
    • 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦。一种常见的模式是将子线程的管理封装为异步调用。异步调用马上返回,但返回的不是最终的结果,而是一个一般称为 Future 的对象,通过它可以在随后获得最终的结果。
    • 集合点:在一些并行迭代计算程序中,每个线程负责一部分计算,然后在集合点等待其他线程完成。所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
  2. Java 中多线程协作的基本机制是?
    答:

    • wait()/notify()
    • JavaObject 类而非 Thread 类中定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类:一类是 wait(),另一类是 notify()
    • wait() 方法在等待期间都可以被中断,如果被中断,会抛出 InterruptedException 异常。
  3. wait() 方法实际上做了什么?它在等待什么?
    答:

    • 每个对象都有一把锁和一个等待队列,一个线程在进入 synchronized 代码块时,会尝试获取锁。如果获取不到就会把当前线程加入到等待队列中。
    • 除了用于锁的等待队列,每个对象还有另一个用于线程间的协作条件队列
    • 调用 wait() 就会把当前线程放到条件队列上并阻塞,同时释放对象锁,表示当前线程执行不下去了,它需要等待一个条件。这个条件它自己改变不了,需要其他线程改变。
  4. notify() 实际上做了什么?notify()notifyAll() 的区别是?
    答:

    • notify() 做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,但不会释放锁。只有在包含 notify()synchronized 代码块执行完后,等待的线程才会从 wait() 调用中返回
      * notifyAll() 会移除条件队列中所有的线程并全部唤醒
    • wait()/notify() 方法只能在 synchronized 代码块内被调用。如果调用 wait()/notify() 方法时,当前线程没有持有对象锁,会抛出 java.lang.IllegalMonitorStateException 异常。
  5. wait() 的具体过程是?
    答:

    • 把当前线程放入条件等待队列,释放对象锁,阻塞并等待,线程状态变为 WAITINGTIMED-WAITING
    • 等待时间到或被其他线程调用 notify()/notifyAll() 从条件队列中移除,这时,要重新竞争对象锁

      • 如果能够获得锁,线程状态变为 RUNNABLE,并从 wait() 调用中返回。
      • 否则,该线程加入对象锁等待队列,线程状态变为 BLOCKED只有在获得锁后才会从 wait() 调用中返回
  6. 【笔试题】手写一个生产者/消费者模式 Demo
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 生产者/消费者队列
    static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;
    public MyBlockingQueue(int limit) {
    this.limit = limit;
    queue = new ArrayDeque<> (limit);
    }
    public synchronized void put(E e) throws InterruptedException { // 给生产者使用,往队列上放数据,满了就 wait
    while(queue.size() == limit) {
    wait();
    }
    queue.add(e);
    notifyAll(); // 通知可能的消费者
    }
    public synchronized E take() throws InterruptedException { // 给消费者使用,从队列中取数据,如果为空就 wait
    while(queue.isEmpty()) {
    wait();
    }
    E e = queue.poll();
    notifyAll(); // 通知可能的生产者
    return e;
    }
    }

    // put() 和 take() 都调用了 wait(),但它们等待的条件是不一样的。put() 等待的是队列不为满,而 take() 等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用 notifyAll() 而不能调用 nofity(),因为 notify() 只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用了。
    // 只能有一个条件等待队列,这是 Java wait()/notify() 机制的局限性。显示的锁和条件,可以解决该问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 一个简单的生产者
static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
whiel(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch() {

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 一个简单的消费者
static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<Strign> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int) (Math.random() * 100));
}
} catch() {

}
}
}
1
2
3
4
5
6
// 主程序
public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<> (10);
new Producer(queue).start();
new Consumer(queue).start();
}
  1. Java 提供的专门的阻塞队列实现有哪些?
    答:

    • 接口 BlockingQueueBlockingDeque
    • 基于数组的实现类 ArrayBlockingQueue
    • 基于链表的实现类 LinkedBlockingQueueLinkedBlockingDeque
    • 基于堆的实现类 PriorityBlockingQueue
  2. 【笔试题】手写一个同时开始 Demo?
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 类似于运动员比赛
    // 思路:有一个主线程和 N 个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号

    // 协作对象 FireFlag
    static class FireFlag {
    private volatile boolean fired = false; // volatile: 相比 synchronized,更轻量级的保证内存可见性的一种方式
    public synchronized void waitForFire() throws InterruptedException { // 子线程应该调用 waitForFire() 等待枪响
    while(!fired) {
    wait();
    }
    }
    public synchronized void fire() { // 主线程应该调用 fire() 发射比赛开始信号
    this.fire = fire;
    notifyAll();
    }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 表示比赛运动员的类
static class Racer extends Thread {
FireFlag fireFlag;
public Racer(FireFlag fireFlag) {
this.fireFlag = fireFlag;
}
@Override
public void run() {
try {
this.fireFlag.waitForFire();
System.out.println("start run " + Thread.currentThread().getName());
} catch() {

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// 主程序代码
public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag fireFlag = new FireFlag();
Thread[] racers = new Thread[num];
for(int i = 0; i<num; i++) {
racers[i] = new Racer(fireFlag);
racers[i].start();
}
Thread.sleep(1000);
fireFlag.fire();
}
  1. join() 方法的原理?
    答:

    • join() 方法实际上就是调用了 wait() 方法,主要代码是:while(isAlive()) {wait(0)}
    • 只要线程是活着的,isAlive() 返回 truejoin() 就一直等待。
    • 当线程运行结束的时候,Java 系统调用 notify() 来通知
  2. 【笔试题】手写一个等待结束 Demo
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 同步协作的工具类
    // Java 中有一个专门的同步类 CountDownLatch,在实际开发中应该使用它
    public class MyLatch {
    private int count;
    public MyLatch(int count) {
    this.count = count;
    }
    public synchronized void await() throws InterruptedException { // 主线程调用
    while(count > 0) {
    wait();
    }
    }
    public synchronized void countDown() { // 子线程调用
    count--;
    if(count <= 0) {
    notifyAll();
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 使用 MyLatch 的工作子线程
static class Worker extends Thread {
MyLatch latch;
public Worker(MyLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// simulate working on task
Thread.sleep((int) (Math.random() * 1000));
this.latch.countDown();
} catch(InterruptedException) {

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// 主线程
public static void main(String[] args) throws InterruptedException {
int workerNum = 100;
MyLatch latch = new MyLatch(workerNum);
Worker[] workers = new Worker[workerNum];
for(int i=0; i<workerNum; i++) {
workers[i] = new Worker(latch);
workers[i].start();
}
latch.await();
System.out.println("collect worker results");
}
  1. 【笔试题】使用上题中 MyLatch 实现同时开始?
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class RacerWithLatchDemo {
    static class Racer extends Thread {
    MyLatch latch;
    public Racer(MyLatch latch) {
    this.latch = latch;
    }
    @Override
    public void run() {
    try {
    this.latch.await();
    System.out.println("start run " + Thread.currentThread().getName());
    } catch(InterruptedException e) {

    }
    }
    public static void main(String[] args) throws InterruptedException {
    int num = 10;
    MyLatch latch = new MyLatch(1);
    Thread[] racers = new Thread(num);
    for(int i = 0; i<num; i++) {
    racers[i] = new Racer(latch);
    racer[i].start();
    }
    Thread.sleep(1000);
    latch.countDown();
    }
    }
    }
  1. 怎样理解异步结果/异步调用
    答:在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为 Future 的对象,通过它可以获得最终的结果

  2. 【笔试题】手写一个异步结果/异步调用 Demo
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 在 Java 中,表示子任务的接口是 Callable,声明为:
    public interface Callable<V> {
    V call() throws Exception;
    }

    // 为表示异步调用的结果,定义一个接口 MyFuture,如下:
    public interface MyFuture<V> {
    V get() throws Exception; // get() 方法返回真正的结果,如果结果还没有计算完成,get() 方法会阻塞直到计算完成
    }

    // 为方便主线程调用子任务,定义一个类 MyExecutor,声明为:
    public <V> MyFuture<V> execute(final Callable<V> task) // 表示执行子任务并返回异步结果。利用该方法,对于主线程,就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 异步调用 Demo
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
// 子任务
Callable<Integer> subTask = new Callable<Integer> () {
@Override
public Integer call() throws Exception {
// ...执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
// 异步调用,返回一个 MyFuture 对象
MyFuture<Integer> future = executor.execute(subTask); // execute() 方法封装了创建子线程、同步获取结果的过程,它会创建一个执行子线程
// ...执行其他操作
try {
// 获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch(Exception e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 执行子线程 ExecuteThread
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call(); // 执行实际的子任务
} catch(Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll(); // 唤醒可能在等待的主线程
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 异步执行任务
public <V> MyFuture<V> execute(final Callable<V> task) { // execute() 方法启动一个线程,并返回 MyFuture 对象,MyFuture 的 get() 方法会阻塞等待直到线程运行结束
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<> (task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V> () {
@Override
public V get() throws Exception {
synchronized(lock) {
while(!thread.isDone()) {
try {
lock.wait();
} catch(InterruptedException e) {

}
}
if(thread.getException != null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}
  1. Java 中的框架 Executors,相关的部分接口和类有?
    答:

    • 表示异步结果的接口 Future 和实现类 FutureTask
    • 用于执行异步任务的接口 Executor,以及有更多功能的子接口 ExecutorService
    • 用于创建 ExecutorExecutorService 的工厂方法类 Executors
  2. 【笔试题】手写一个集合点 Demo?
    答:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 协作对象。Java 中有一个专门的同步工具类 CyclicBarrier
    public class AssemblePoint {
    private int n;
    public AssemblePoint(int n) {
    this.n = n;
    }
    public synchronized void await() throws InterruptedException {
    if(n > 0) {
    n--;
    if(n == 0) {
    notifyAll();
    } else {
    while(n != 0) {
    wait();
    }
    }
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 集合点协作 Demo
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint ap;
public Tourist(AssemblePoint ap) {
this.ap = ap;
}
@Override
public void run() {
try {
// 模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));
// 集合
ap.await();
System.out.println("arrived");
// 集合后执行其他操作
} catch(InterruptedException e) {

}
}
}
public static void main(String[] args) {
int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint ap = new AssemblePoint(num);
for(int i=0; i<num; i++) {
thread[i] = new Tourist(ap);
threads[i].start();
}
}
}