0%

Java 并发基础知识(三):线程的基本协作机制

1. 多线程之间需要协作的场景有哪些

  • 生产者/消费者协作模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从队列上取数据或任务。如果队列长度有限,在队列满的时候,生产者需要等待;在队列空的时候,消费者需要等待
  • 同时开始:类似运动员比赛,在一些模拟仿真程序中,要求多个线程能同时开始
  • 等待结束:可以理解为主从模式,主线程将任务分解为若干子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕
  • 异步结果:在主从协作模式中,主线程手工创建子线程的写法往往比较麻烦。一种常见的模式是将子线程的管理封装为异步调用。异步调用马上返回,但返回的不是最终的结果,而是一个一般称为 Future 的对象,通过它可以在随后获得最终的结果
  • 集合点:在一些并行迭代计算程序中,每个线程负责一部分计算,然后在集合点等待其他线程完成。所有线程到齐后,交换数据和计算结果,再进行下一次迭代

2. Java 中多线程协作的基本机制是

  • wait()/notify()
  • Java 在 Object 类而非 Thread 类中定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类:一类是 wait(),另一类是 notify()
  • wait() 方法在等待期间都可以被中断,如果被中断,会抛出 InterruptedException 异常

3. wait() 方法实际上做了什么,它在等待什么

  • 每个对象都有一把和一个等待队列,一个线程在进入 synchronized 代码块时,会尝试获取锁。如果获取不到就会把当前线程加入到等待队列中
  • 除了用于锁的等待队列,每个对象还有另一个用于线程间的协作条件队列
  • 调用 wait() 就会把当前线程放到条件队列上并阻塞,同时释放对象锁,表示当前线程执行不下去了,它需要等待一个条件。这个条件它自己改变不了,需要其他线程改变

4. notify() 实际上做了什么,notify()notifyAll() 的区别是

  • notify() 做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,但不会释放锁。只有在包含 notify()synchronized 代码块执行完后,等待的线程才会从 wait() 调用中返回
  • notifyAll() 会移除条件队列中所有的线程并全部唤醒
  • wait()/notify() 方法只能在 synchronized 代码块内被调用。如果调用 wait()/notify() 方法时,当前线程没有持有对象锁,会抛出 java.lang.IllegalMonitorStateException 异常

5. wait() 的具体过程是

  1. 把当前线程放入条件队列,释放对象锁,阻塞并等待,线程状态变为 WAITINGTIMED-WAITING
  2. 等待时间到或被其他线程调用 notify()/notifyAll() 从条件队列中移除,这时,要重新竞争对象锁
    • 如果能够获得锁,线程状态变为 RUNNABLE,并从 wait() 调用中返回
    • 否则,该线程加入对象锁等待队列,线程状态变为 BLOCKED,只有在获得锁后才会从 wait() 调用中返回

6. join() 方法的原理

  • join() 方法实际上就是调用了 wait() 方法,主要代码是:while(isAlive()) {wait(0)}
  • 只要线程是活着的,isAlive() 返回 truejoin() 就一直等待
  • 当线程运行结束的时候,Java 系统调用 notify() 来通知

7. Java 提供的专门的阻塞队列实现有哪些

  • 接口 BlockingQueueBlockingDeque
  • 基于数组的实现类 ArrayBlockingQueue
  • 基于链表的实现类 LinkedBlockingQueueLinkedBlockingDeque
  • 基于的实现类 PriorityBlockingQueue

8 Java 中的框架 Executors,相关的部分接口和类有

  • 表示异步结果的接口 Future实现类 FutureTask
  • 用于执行异步任务的接口 Executor,以及有更多功能的子接口 ExecutorService
  • 用于创建 ExecutorExecutorService工厂方法类 Executors

9. 写一个生产者/消费者模式 Demo

  • 阻塞队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;

    public MyBlockingQueue(int limit) {
    this.limit = limit;
    queue = new ArrayDeque<> (limit); //ArrayDeque 的内部是一个动态扩展的逻辑上的循环数组
    }

    public synchronized void put(E e) throws InterruptedException { //
    给生产者使用,往队列上放数据,满了就 wait
    while(queue.size() == limit) {
    wait();
    }
    queue.add(e);
    notifyAll(); //通知所有的消费者
    }

    public synchronized E take() throws InterruptedException { //给消费者使用,从队列中取数据,如果为空就 wait
    while(queue.isEmpty()) {
    wait();
    }
    E e = queue.poll();
    notifyAll(); //通知所有的生产者
    return e;
    }
    }
    • put()take() 都调用了 wait(),但它们等待的条件是不一样的put() 等待的是队列不为满,而 take() 等待的是队列不为空,但它们都会加入相同的条件队列
    • 由于条件不同但又使用相同的条件队列,所以要调用 notifyAll() 而不能调用 nofity(),因为 notify() 只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用了
    • 只能有一个条件队列,这是 Java wait()/notify() 机制的局限性。显示的锁和条件,可以解决该问题
  • 一个简单的生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static class Producer extends Thread {
    MyBlockingQueue<String> queue;

    public Producer(MyBlockingQueue<String> queue) {
    this.queue = queue;
    }

    @Override
    public void run() {
    int num = 0;
    try {
    whiel(true) {
    String task = String.valueOf(num);
    queue.put(task);
    System.out.println("produce task " + task);
    num++;
    Thread.sleep((int) (Math.random() * 100));
    }
    } catch() {

    }
    }
    }
  • 一个简单的消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static class Consumer extends Thread {
    MyBlockingQueue<String> queue;

    public Consumer(MyBlockingQueue<Strign> queue) {
    this.queue = queue;
    }

    @Override
    public void run() {
    try {
    while(true) {
    String task = queue.take();
    System.out.println("handle task " + task);
    Thread.sleep((int) (Math.random() * 100));
    }
    } catch() {

    }
    }
    }
  • 主程序

    1
    2
    3
    4
    5
    public static void main(String[] args) {
    MyBlockingQueue<String> queue = new MyBlockingQueue<> (10);
    new Producer(queue).start();
    new Consumer(queue).start();
    }

10. 写一个同时开始 Demo

  • 类似于运动员比赛,思路:有一个主线程和 N 个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号

  • 协作对象 FireFlag

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    static class FireFlag {
    private volatile boolean fired = false; //volatile: 相比 synchronized,更轻量级的保证内存可见性的一种方式

    public synchronized void waitForFire() throws InterruptedException { //子线程应该调用 waitForFire() 等待枪响
    while(!fired) {
    wait();
    }
    }

    public synchronized void fire() { //主线程应该调用 fire() 发射比赛开始信号
    this.fire = fire;
    notifyAll();
    }
    }
  • 表示比赛运动员的类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    static class Racer extends Thread {
    FireFlag fireFlag;

    public Racer(FireFlag fireFlag) {
    this.fireFlag = fireFlag;
    }

    @Override
    public void run() {
    try {
    this.fireFlag.waitForFire();
    System.out.println("start run " + Thread.currentThread().getName());
    } catch() {

    }
    }
    }
  • 主程序代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public static void main(String[] args) throws InterruptedException {
    int num = 10;
    FireFlag fireFlag = new FireFlag();
    Thread[] racers = new Thread[num];
    for(int i = 0; i<num; i++) {
    racers[i] = new Racer(fireFlag);
    racers[i].start();
    }
    Thread.sleep(1000);
    fireFlag.fire();
    }

11. 写一个等待结束 Demo

  • 同步协作的工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    //Java 中有一个专门的同步类 CountDownLatch,在实际开发中应该使用它
    public class MyLatch {
    private int count;

    public MyLatch(int count) {
    this.count = count;
    }

    public synchronized void await() throws InterruptedException { //主线程调用
    while(count > 0) {
    wait();
    }
    }

    public synchronized void countDown() { //子线程调用
    count--;
    if(count <= 0) {
    notifyAll();
    }
    }
    }
  • 使用 MyLatch 的工作子线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    static class Worker extends Thread {
    MyLatch latch;

    public Worker(MyLatch latch) {
    this.latch = latch;
    }

    @Override
    public void run() {
    try {
    // simulate working on task
    Thread.sleep((int) (Math.random() * 1000));
    this.latch.countDown();
    } catch(InterruptedException) {

    }
    }
    }
  • 主线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static void main(String[] args) throws InterruptedException {
    int workerNum = 100;
    MyLatch latch = new MyLatch(workerNum);
    Worker[] workers = new Worker[workerNum];

    for(int i=0; i<workerNum; i++) {
    workers[i] = new Worker(latch);
    workers[i].start();
    }

    latch.await();
    System.out.println("collect worker results");
    }

12. 使用上题中 MyLatch 实现同时开始 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class RacerWithLatchDemo {
static class Racer extends Thread {
MyLatch latch;

public Racer(MyLatch latch) {
this.latch = latch;
}

@Override
public void run() {
try {
this.latch.await();
System.out.println("start run " + Thread.currentThread().getName());
} catch(InterruptedException e) {

}
}

public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch latch = new MyLatch(1);
Thread[] racers = new Thread(num);
for(int i = 0; i<num; i++) {
racers[i] = new Racer(latch);
racer[i].start();
}
Thread.sleep(1000);
latch.countDown();
}
}
}

13. 怎样理解异步结果/异步调用

  • 在主从模式中,手工创建线程往往比较麻烦
  • 一种常见的模式是异步调用,异步调用返回一个一般称为 Future 的对象,通过它可以获得最终的结果

14. 写一个异步结果/异步调用 Demo

  • 准备代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //在 Java 中,表示子任务的接口是 Callable,声明为:
    public interface Callable<V> {
    V call() throws Exception;
    }

    //为表示异步调用的结果,定义一个接口 MyFuture,如下:
    public interface MyFuture<V> {
    //get() 方法返回真正的结果,如果结果还没有计算完成,get() 方法会阻塞直到计算完成
    V get() throws Exception;
    }

    //为方便主线程调用子任务,定义一个类 MyExecutor,声明如下
    //表示执行子任务并返回异步结果。利用该方法,对于主线程,就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果
    public <V> MyFuture<V> execute(final Callable<V> task) {

    }
  • 异步调用 Demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public static void main(String[] args) {
    MyExecutor executor = new MyExecutor();
    //子任务
    Callable<Integer> subTask = new Callable<Integer> () {
    @Override
    public Integer call() throws Exception {
    // ...执行异步任务
    int millis = (int) (Math.random() * 1000);
    Thread.sleep(millis);
    return millis;
    }
    };
    //异步调用,返回一个 MyFuture 对象
    MyFuture<Integer> future = executor.execute(subTask); //execute() 方法封装了创建子线程、同步获取结果的过程,它会创建一个执行子线程
    //执行其他操作
    try {
    //获取异步调用的结果
    Integer result = future.get();
    System.out.println(result);
    } catch(Exception e) {
    e.printStackTrace();
    }
    }
  • 执行子线程 ExecuteThread

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    static class ExecuteThread<V> extends Thread {
    private V result = null;
    private Exception exception = null;
    private boolean done = false;
    private Callable<V> task;
    private Object lock;

    public ExecuteThread(Callable<V> task, Object lock) {
    this.task = task;
    this.lock = lock;
    }

    @Override
    public void run() {
    try {
    result = task.call(); //执行实际的子任务
    } catch(Exception e) {
    exception = e;
    } finally {
    synchronized (lock) {
    done = true;
    lock.notifyAll(); //唤醒可能在等待的主线程
    }
    }
    }

    public V getResult() {
    return result;
    }

    public boolean isDone() {
    return done;
    }

    public Exception getException() {
    return exception;
    }
    }
  • 异步执行任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public <V> MyFuture<V> execute(final Callable<V> task) { //execute() 方法启动一个线程,并返回 MyFuture 对象,MyFuture 的 get() 方法会阻塞等待直到线程运行结束
    final Object lock = new Object();
    final ExecuteThread<V> thread = new ExecuteThread<> (task, lock);
    thread.start();

    MyFuture<V> future = new MyFuture<V> () {
    @Override
    public V get() throws Exception {
    synchronized(lock) {
    while(!thread.isDone()) {
    try {
    lock.wait();
    } catch(InterruptedException e) {

    }
    }
    if(thread.getException != null) {
    throw thread.getException();
    }
    return thread.getResult();
    }
    }
    };
    return future;
    }

15. 写一个集合点 Demo

  • 协作对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    //Java 中有一个专门的同步工具类 CyclicBarrier
    public class AssemblePoint {
    private int n;

    public AssemblePoint(int n) {
    this.n = n;
    }

    public synchronized void await() throws InterruptedException {
    if(n > 0) {
    n--;
    if(n == 0) {
    notifyAll();
    } else {
    while(n != 0) {
    wait();
    }
    }
    }
    }
    }
  • 集合点协作 Demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class AssemblePointDemo {
    static class Tourist extends Thread {
    AssemblePoint ap;

    public Tourist(AssemblePoint ap) {
    this.ap = ap;
    }

    @Override
    public void run() {
    try {
    //模拟先各自独立运行
    Thread.sleep((int) (Math.random() * 1000));
    //集合
    ap.await();
    System.out.println("arrived");
    //集合后执行其他操作
    } catch(InterruptedException e) {

    }
    }
    }

    public static void main(String[] args) {
    int num = 10;
    Tourist[] threads = new Tourist[num];
    AssemblePoint ap = new AssemblePoint(num);
    for(int i=0; i<num; i++) {
    thread[i] = new Tourist(ap);
    threads[i].start();
    }
    }
    }
-------------------- 本文结束感谢您的阅读 --------------------