Demo1: 写一个生产者/消费者模式
阻塞队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private static class MyBlockingQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyBlockingQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<> (limit); //ArrayDeque 的内部是一个动态扩展的逻辑上的循环数组
}
public synchronized void put(E e) throws InterruptedException { //
给生产者使用,往队列上放数据,满了就 wait
while(queue.size() == limit) {
wait();
}
queue.add(e);
notifyAll(); //通知所有的消费者
}
public synchronized E take() throws InterruptedException { //给消费者使用,从队列中取数据,如果为空就 wait
while(queue.isEmpty()) {
wait();
}
E e = queue.poll();
notifyAll(); //通知所有的生产者
return e;
}
}put()
和take()
都调用了wait()
,但它们等待的条件是不一样的。put()
等待的是队列不为满,而take()
等待的是队列不为空,但它们都会加入相同的条件队列- 由于条件不同但又使用相同的条件队列,所以要调用
notifyAll()
而不能调用nofity()
,因为notify()
只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用了 - 只能有一个条件队列,这是 Java
wait()/notify()
机制的局限性。显示的锁和条件,可以解决该问题
一个简单的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private static class Producer extends Thread {
MyBlockingQueue<String> queue;
public Producer(MyBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
whiel(true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce task " + task);
num++;
Thread.sleep((int) (Math.random() * 100));
}
} catch() {
}
}
}一个简单的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private static class Consumer extends Thread {
MyBlockingQueue<String> queue;
public Consumer(MyBlockingQueue<Strign> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
String task = queue.take();
System.out.println("handle task " + task);
Thread.sleep((int) (Math.random() * 100));
}
} catch() {
}
}
}主程序
1
2
3
4
5public static void main(String[] args) {
MyBlockingQueue<String> queue = new MyBlockingQueue<> (10);
new Producer(queue).start();
new Consumer(queue).start();
}
Demo2: 写一个使用显示锁/条件实现的阻塞队列(生产者/消费者)
1 | //wait/notify 的局限是它只能有一个条件等待队列,分析等待条件也很复杂 |
- 清晰易读,同时避免了不必要的唤醒和检查,提高了效率
- Java 并发包中的类
ArrayBlockingQueue
就采用了类似的方式实现 - 参考:Java 并发包的基石(三):显示条件