0%

Java 并发总结(五):生产者/消费者模式

Demo1: 写一个生产者/消费者模式

  • 阻塞队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;

    public MyBlockingQueue(int limit) {
    this.limit = limit;
    queue = new ArrayDeque<> (limit); //ArrayDeque 的内部是一个动态扩展的逻辑上的循环数组
    }

    public synchronized void put(E e) throws InterruptedException { //
    给生产者使用,往队列上放数据,满了就 wait
    while(queue.size() == limit) {
    wait();
    }
    queue.add(e);
    notifyAll(); //通知所有的消费者
    }

    public synchronized E take() throws InterruptedException { //给消费者使用,从队列中取数据,如果为空就 wait
    while(queue.isEmpty()) {
    wait();
    }
    E e = queue.poll();
    notifyAll(); //通知所有的生产者
    return e;
    }
    }
    • put()take() 都调用了 wait(),但它们等待的条件是不一样的put() 等待的是队列不为满,而 take() 等待的是队列不为空,但它们都会加入相同的条件队列
    • 由于条件不同但又使用相同的条件队列,所以要调用 notifyAll() 而不能调用 nofity(),因为 notify() 只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用了
    • 只能有一个条件队列,这是 Java wait()/notify() 机制的局限性。显示的锁和条件,可以解决该问题
  • 一个简单的生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static class Producer extends Thread {
    MyBlockingQueue<String> queue;

    public Producer(MyBlockingQueue<String> queue) {
    this.queue = queue;
    }

    @Override
    public void run() {
    int num = 0;
    try {
    whiel(true) {
    String task = String.valueOf(num);
    queue.put(task);
    System.out.println("produce task " + task);
    num++;
    Thread.sleep((int) (Math.random() * 100));
    }
    } catch() {

    }
    }
    }
  • 一个简单的消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static class Consumer extends Thread {
    MyBlockingQueue<String> queue;

    public Consumer(MyBlockingQueue<Strign> queue) {
    this.queue = queue;
    }

    @Override
    public void run() {
    try {
    while(true) {
    String task = queue.take();
    System.out.println("handle task " + task);
    Thread.sleep((int) (Math.random() * 100));
    }
    } catch() {

    }
    }
    }
  • 主程序

    1
    2
    3
    4
    5
    public static void main(String[] args) {
    MyBlockingQueue<String> queue = new MyBlockingQueue<> (10);
    new Producer(queue).start();
    new Consumer(queue).start();
    }
  • 参考:Java 并发基础知识(三):线程的基本协作机制

Demo2: 写一个使用显示锁/条件实现的阻塞队列(生产者/消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//wait/notify 的局限是它只能有一个条件等待队列,分析等待条件也很复杂
//在生产者/消费者模式中,其实有两个条件,一个与队列满有关,一个与队列空有关
//使用显示锁,可以创建多个条件等待队列
static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<> (limit);
}
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while(queue.size() == limit) {
notFull.await();
}
queue.add(e);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
while(queue.isEmpty()) {
notEmpty.await();
}
E e = queue.poll();
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}

Demo3: 使用线程池实现生产者/消费者

-------------------- 本文结束感谢您的阅读 --------------------