Java - 并发队列使用详解1(阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue等)
在并发队列上 JDK 提供了两套实现: 一个是以 ConcurrentLinkedQueue 为代表的高性能队列,一个是以 BlockingQueue 接口为代表的阻塞队列。无论哪种都继承自 Queue,并且都是线程安全的,都可以进行并发编程。 本文先介绍阻塞队列并通过样例进行演示。
一、阻塞队列介绍
1,什么是阻塞队列?
(1)阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
(2)阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2,阻塞队列有哪些?
(1)ArrayBlockingQueue
- 一个由数组结构组成的有界阻塞队列。
- 在创建 ArrayBlockingQueue 对象时必须指定容量大小。
- 并且创建时可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
- 它是一个先进先出队列。
(2)LinkedBlockingQueue
- 一个由链表结构组成的有界阻塞队列。
- 在创建 LinkedBlockingQueue 对象时可以指定容量大小。如果不指定容量大小,则默认大小为 Integer.MAX_VALUE。
- 它是一个先进先出队列。
(3)PriorityBlockingQueue
- 一个支持优先级排序的无界阻塞队列,即容量没有上限。
- 它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。
- 所有插入 PriorityBlockingQueue 的对象必须实现 java.lang.Comparable 接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
- PriorityBlockingQueue 中允许插入 null 对象。
(4)DelayQueue
- 一个使用优先级队列实现的无界阻塞队列,它是基于 PriorityQueue。
- DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。
- DelayQueue 也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
(5)SynchronousQueue
- 一个不存储元素的阻塞队列。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。
(6)LinkedTransferQueue
- 一个由链表结构组成的无界阻塞队列。
(7)LinkedBlockingDeque
- 一个由链表结构组成的双向阻塞队列。
3,阻塞队列的几个常用方法
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 |
add(e)
|
offer(e)
|
put(e)
|
offer(e,time,unit)
|
移除方法 |
remove()
|
poll()
|
take()
|
poll(time,unit)
|
检查方法 | element() | peek() | 不可用 | 不可用 |
附:使用 BlockingQueue 模拟生产者与消费者
1,一直阻塞方式
(1)首先我们创建一个生产者,代码如下。我们使用 put() 方法插入元素,当队列未满时,该方法会直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
提示:put() 方法的实现原理是先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用 notFull.await() 进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。当被其他线程唤醒时,通过 enqueue(e) 方法插入元素,最后解锁。
public class ProducerThread implements Runnable { private String name; private BlockingQueue queue; private volatile boolean flag = true; private static AtomicInteger count = new AtomicInteger(); public ProducerThread(String name, BlockingQueue queue) { this.name = name; this.queue = queue; } @Override public void run() { try { System.out.println(now() + this.name + ":线程启动。"); while (flag) { String data = count.incrementAndGet()+""; System.out.println(now() + this.name + ":开始存入" + data + "到队列中......"); // 将数据存入队列中 queue.put(data); System.out.println(now() + this.name + ":成功存入" + data + "到队列中。"); } } catch (Exception e) { } finally { System.out.println(now() + this.name + ":退出线程。"); } } public void stopThread() { this.flag = false; } // 获取当前时间(分:秒) public String now() { Calendar now = Calendar.getInstance(); return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] "; } }
(2)接着创建一个消费者,代码如下。我们使用 take() 方法获取元素,当队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。同时每次获取到元素之后会等待个 1 秒钟,模拟实际业务处理耗时,也便于观察队列情况。
提示:
take() 方法在获取元素时加锁,生产者无法操作 queue,取到元素并移除元素,然后释放锁。
public class ConsumerThread implements Runnable { private String name; private BlockingQueue<String> queue; private volatile boolean flag = true; public ConsumerThread(String name, BlockingQueue<String> queue) { this.name = name; this.queue = queue; } @Override public void run() { System.out.println(now() + this.name + ":线程启动。"); try { while (flag) { System.out.println(now() + this.name + ":正在从队列中获取数据......"); String data = queue.take(); System.out.println(now() + this.name + ":拿到队列中的数据:" + data); //等待1秒钟(模拟实际业务处理) Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(now() + this.name + ":退出线程。"); } } // 获取当前时间(分:秒) public String now() { Calendar now = Calendar.getInstance(); return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] "; } }
(3)最后分别创建两个生产者以及一个消费者进行测试,并且 3 秒种之后通知生产者线程退出。
public class ProducerAndConsumer { public static void main(String[] args) throws InterruptedException { // 创建一个ArrayBlockingQueue类型的阻塞队列(容量大小为3,公平性为true) BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 创建两个生产者和一一个消费者 ProducerThread producerThread1 = new ProducerThread("11111生产者1", queue); ProducerThread producerThread2 = new ProducerThread("22222生产者2", queue); ConsumerThread consumerThread1 = new ConsumerThread("---->消费者1", queue); Thread t1 = new Thread(producerThread1); Thread t2 = new Thread(producerThread2); Thread c1 = new Thread(consumerThread1); t1.start(); t2.start(); c1.start(); // 执行3s后,生产者不再生产 Thread.sleep(3 * 1000); producerThread1.stopThread(); producerThread2.stopThread(); } }
(4)运行结果如下,可以看到整个过程两个生产者前后一共生产了 8 个元素,并由消费者消费掉:

2,超时退出方式
(1)我们对生产者代码稍作改变,这次我们使用 offer() 方法插入元素,并设定等待的时间,当队列未满时,该方法会直接插入然后返回 true;队列满时会阻塞等待设定的时间,如果在指定时间内还不能往队列中插入数据则返回 false(不会一直阻塞下去)
public class ProducerThread implements Runnable { private String name; private BlockingQueue queue; private volatile boolean flag = true; private static AtomicInteger count = new AtomicInteger(); public ProducerThread(String name, BlockingQueue queue) { this.name = name; this.queue = queue; } @Override public void run() { try { System.out.println(now() + this.name + ":线程启动。"); while (flag) { String data = count.incrementAndGet()+""; System.out.println(now() + this.name + ":开始存入" + data + "到队列中......"); // 将数据存入队列中 boolean offer = queue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println(now() + this.name + ":成功存入" + data + "到队列中。"); } else { System.out.println(now() + this.name + ":存入" + data + "失败!!!"); } } } catch (Exception e) { } finally { System.out.println(now() + this.name + ":退出线程。"); } } public void stopThread() { this.flag = false; } // 获取当前时间(分:秒) public String now() { Calendar now = Calendar.getInstance(); return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] "; } }
(2)对消费者代码同样稍作改变,这次我们使用 poll() 方法获取 元素,并设定等待的时间,当队列不为空时,会返回队首值;当队列为空时阻塞等待设定的时间,如果在指定时间内队列还未空则返回 null(不会一直阻塞下去)
public class ConsumerThread implements Runnable { private String name; private BlockingQueue<String> queue; private volatile boolean flag = true; public ConsumerThread(String name, BlockingQueue<String> queue) { this.name = name; this.queue = queue; } @Override public void run() { System.out.println(now() + this.name + ":线程启动。"); try { while (flag) { System.out.println(now() + this.name + ":正在从队列中获取数据......"); String data = queue.poll(2, TimeUnit.SECONDS); if (data != null) { System.out.println(now() + this.name + ":拿到队列中的数据:" + data); //等待1秒钟 Thread.sleep(1000); } else { System.out.println(now() + this.name + ":超过2秒未获取到数据!!!"); flag = false; } } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(now() + this.name + ":消退出线程。"); } } // 获取当前时间(分:秒) public String now() { Calendar now = Calendar.getInstance(); return "[" + now.get(Calendar.MINUTE) + ":" + now.get(Calendar.SECOND) + "] "; } }
(3)测试代码 ProducerAndConsumer.java 内容同之前一样没有变化,运行结果如下,可以看到整个过程两个生产者前后一共生产了 8 个元素(但其中有 2 个元素因为超时没有进入队列),消费者最终一个消费了 6 个元素:
