在前面的三篇文章中,我们一起陆续地研究了AQS的底层事理,同时研究了AQS在不同场景下的三个运用工具类(ReentrantLock、CountDownLatch、Semaphore)的事情事理,之以是和大家一起剖析它们,是由于这三个类是我们平时事情中运用最多的类了,其实在J.U.C中,还有好多直接或者间接通过AQS实现的工具类,比如读写锁ReentrantReadWriteLock、循环栅栏CyclicBarrier等。
本篇我们将一起对AQS的另一类的运用处景的实现类——壅塞行列步队(实现生产者/消费者模型的经典方法) 做一次深入浅出的剖析。
从上面的类图构造和源码的注释剖析来看,我总结如下:

从上面总结的第一和第二点我们知道,BlockingQueue的接口方法包含四种类型的:会抛非常的、返回分外值的、会壅塞的和壅塞有超时的;但是按照操作结果大体可分为三种(官方区分的):插入型的、删除型的、读取型的。如下面的表格总结(实在也是源码的注释内容):
下面我们就此表格解释下(我又多分了一个种类->读取删除型,这样更清晰些):
常规操作 插入型: boolean add(e),向行列步队插入元素,成功返回true,如果行列步队容量不足了,则抛出IllegalStateException boolean offer(e),向行列步队插入元素,成功true,失落败false,行列步队容量不足,不会抛出非常 删除型: boolean remove(o),从行列步队删除指定的元素o,如果行列步队里存在多个e,且o.equals(e),则所有e都删除,返回值boolean 读取型: E element(),读取队头元素数据,但是不删除元素,如果行列步队为空,则会抛出非常 E peek(),和element一样,但是行列步队为空,它就返回null 读取删除型: E poll(),读取并删除队头数据,如果行列步队为空,则返回空,否则返转头节点元素,即此方法既是取又是删BlockingQueue扩展操作 (壅塞超时)(生产者/消费者实现的核心) 插入型: void put(e),壅塞插入,将元素插入行列步队,如果行列步队满了,则会壅塞线程等待,直到行列步队有空于的节点可供插入 boolean offer(e,timeout,unit),和上面put一样,但是等待有韶光限定,到了超时的韶光,就会退出 读取删除型: E take(),壅塞读取删除队头元素数据,如果行列步队为空,则会壅塞线程等待,直到行列步队有数据添加了,才会连续实行 E poll(timeout,unit),和上面take一样,但是壅塞等待也有韶光的限定,超出韶光就会退出BlockingQueue实现类简介直接上图:
从图中可以知道:
继续接口:BlockingDeque, TransferQueue实现类:ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeque, PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue,DelayQueue,DelayedWorkQueue(线程池的)。ArrayBlockingQueue && LinkedBlockingQueue 先容ArrayBlockingQueue是一个通过数组实现的有界壅塞行列步队。通过FIFO前辈先出的办法操作元素,即队头的元素在行列步队中勾留的韶光最长,队尾则最短。新元素以尾插入的形式入队,读取将从队头开始。
ArrayBlockingQueue可以作为一个“有界缓冲区”,是实现生产者/消费者场景的主要手段。生产者生产元素放入行列步队,消费者从行列步队读取进行消费。行列步队满的时候,生产者就不能连续往里面放入元素,则必须壅塞等待;同样的,当行列步队为空时,消费者就取不到元素而壅塞等待。ArrayBlockingQueue一旦在创建时初始化了容量大小,就不会变革了(如果随时扩容/减少怎么可以壅塞等待呢)。
同时,ArrayBlockingQueue可以支持生产消费过程的是否公正。默认情形是非公正的,当然你也可以自己设置。至于公正和非公正的优缺陷这里就岂论述了,看这里。
LinkedBlockingQueue从名字就可以看出,它是一个以链脸色势实现的行列步队,它同样是个有界壅塞行列步队,可以在初始化的时候指定大小,如果不指定,则默认为Integer.MAX_VALUE。比较较ArrayBlockingQueue而言,LinkedBlockingQueue的吞吐量表现更好点,除了这个其他特性险些和ArrayBlockingQueue差不多。
BlockingDeque及实在现LinkedBlockingDeque先容BlockingDeque继续于BlockingQueue,是一个壅塞有界双端行列步队,即行列步队的头和尾都可以进行插入,删除,读取操作。同样的,我贴出官方的表格:(细节就不再阐述,实在都差不多,从方法名字也可以看出来是干嘛的了)
和BlockingQueue的紧张差异就在于,BlockingDeque是两头都可以操作。
LinkedBlockingDeque便是对BlockingDeque的详细实现,基于链脸色势的实现。其详细实现的理念和LinkedBlockingQueue也差不多,也不再阐述了。
PriorityBlockingQueue先容PriorityBlockingQueue从名字来看,是一个跟优先级有关系的壅塞行列步队,准确的说,它是一个支持优先级的无界壅塞行列步队,其内部定义了Comparator属性,我们可以通过这个自定义元素的排列顺序而改变出队的顺序,从而实现优先级,不雅观其布局器就可知一二:
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity];}
由于是无界,以是利用时小心资源耗尽,发生OOM。
SynchronousQueue先容Synchronous从英文翻译来看很故意思,同步的行列步队。乍一看,不懂。。。。源码注释里写道:这是一个壅塞行列步队,它的每个插入操作必须等待另一个线程实行相应的删除操作,反之亦然。你无法查看同步队列,由于只有当你试图删除一个元素时,它才会涌现;你不能插入一个元素(利用任何方法),除非另一个线程试图删除它;你不能迭代,由于没有东西可以迭代。行列步队的头部是第一个插入行列步队的线程试图添加到行列步队中的元素;如果没有这样的行列步队线程,那么就没有可以删除的元素,poll()将返回null。
觉得有点绕,大略来说,该行列步队无法对元素数据进行存储,无法查看行列步队里的情形,也无法进行元素数据的迭代,缘故原由便是,一个线程插入完成,就会被另一个线程取走;一个线程取走了,又会再有一个线程连续插入。顾名思义,实在这个行列步队的数据,觉得像是同步的,插入了就会被取走,取走后就会被再次再次插入。(除非是非常非常巧合的情形下说不定会查看到行列步队的元素数据)
TransferQueue及实在现LinkedTransferQueue先容TransferQueue除了继续了BlockingQueue的方法之外,还补充了符合该类特性的方法->Transfer:转移传输的意思,详细哪些呢?如下图接口定义:
从源码贴图可知,紧张多了上面几个方法:
boolean tryTransfer(E e)/ boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException:如果可能直接传输一个元素给正在等待的消费者,如果没有消费者,则返回false退出。void transfer(E e):和上面一样,传输一个元素给一个正在等待的消费者。和上面的try不同的是,try是如果没有正在等待的消费者,就直接返回退出,而此方法则是等待,直到有消费者将之取走。boolean hasWaitingConsumer():判断是否有等待的消费者,这个方法在不同的时候调用,返回结果是不一样的,可能前1ms还是true,后1ms为false了。int getWaitingConsumerCount():用来统计当前正在等待的消费者的数量,和上面的方法一样,也只是一个瞬时的数据,由于高并发时候在变革。而且这个方法的返回速率实际上可能要比hasWaitingConsumer()慢得多(毕竟要遍历嘛)。此方法一样平常用来调试监控数据的。LinkedTransferQueue便是TransferQueue的详细实现,是基于链脸色势的无界传输壅塞行列步队,符合FIFO特性,除了以上先容的transfer分外属性外,其他特性和其他实现类差不多。
DelayQueue先容DelayQueue:一个用来存放延迟元素的无界壅塞行列步队,这个行列步队不知道大家有没有利用过,它是实现延迟行列步队的主要手段之一。 其里面掩护的元素数据是一个实现Delayed接口的自定义实现类的工具,这个类会实现一个方法【long getDelay(TimeUnit unit)】,如果该方法返回一个小于或即是0的值,就表示里面的元素已经到达设定的延迟韶光,这时就会把元素放入到行列步队当中。详细事理我会在后面专门写一篇文章来阐述。
BlockingQueue先容完毕壅塞行列步队BlockingQueue及其所有实现类的大略先容到此结束,下面我们将从上面的详细实现类选取ArrayBlockingQueue作为本篇的另一个主角,来看看详细的壅塞行列步队是如何实现的。
(PS:很主要的一点,把稳每个行列步队的名字,有界和无界,利用无界行列步队的时候一定要小心OOM)
ArrayBlockingQueue源码剖析大略解释事情事理从上面剖析壅塞行列步队以及ArrayBlockingQueue的大略先容来看,ArrayBlockingQueue最主要的功能便是,能够在操作元素(插入和读取删除)的时候壅塞。其事情事理便是,当ArrayBlockingQueue行列步队为空时,会壅塞消费者读取数据,直到行列步队有数据时唤醒消费者连续;当行列步队满的时候,会壅塞生产者进行插入数据,直到行列步队有剩余空间时,会唤醒生产者连续插入。
主要属性和布局器在之前的篇章中,我们就已经知道,wait/notify是实现生产者/消费者的主要手段之一,是通过线程间的通信来完成的,那在我们J.U.C包里,会不会是通过Condition的等待行列步队机制实现的呢?我们一起来看下,上源码:
/ The queued items / final Object[] items; / items index for next take, poll, peek or remove / int takeIndex; / items index for next put, offer, or add / int putIndex; / Number of elements in the queue / int count; / Concurrency control uses the classic two-condition algorithm found in any textbook. / / Main lock guarding all access / final ReentrantLock lock; / Condition for waiting takes / private final Condition notEmpty; / Condition for waiting puts / private final Condition notFull;
从这段代码实在我们就已经知道了上面问题的答案,确实是通过Condition的等待行列步队机制实现的。
首先,ArrayBlockingQueue之以是叫Array,其内部实在掩护了一个数组,items;其次,两个数组下标属性,存放下一个读取和插入的位置(把稳:他们的默认值为0):takeIndex和putIndex;还有一个用来表示行列步队中元素个数的count;末了,一个紧张用来并发掌握的锁ReentrantLock;以及两个条件等待行列步队:notEmpty(用来壅塞消费者)和notFull(用来壅塞生产者)。我们再来看看布局器,一共重载实现了三个布局器,如下:
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
从布局器可以看出,ArrayBlockingQueue公正/非公正都支持,默认利用非公正模式。初始化的时候,就已经将消费/生产两大条件等待行列步队初始化完成了。
生产消费紧张方法生产方法put public void put(E e) throws InterruptedException { checkNotNull(e); / 为了担保操作的原子性,上来先上锁 / final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { / 当前行列步队元素实际个数count如果即是数组定义的大小的话,则解释行列步队里已经满了 则会调用生产者等待行列步队的壅塞await方法,对当前消费者线程进行壅塞 把稳:这边是个while循环,跳出循环的唯一条件便是,count!=items.length,什么情形才会 不相等呢?当然是行列步队中的元素被读取删除的时候 / while (count == items.length) notFull.await(); / 如果不知足上面的条件或者生产者被唤醒从壅塞状态回归时,将调用enqueue(e)方法进行入队操作。 / enqueue(e); } finally { lock.unlock(); }}private void enqueue(E x) { / 这部分的代码就很大略了 首先,便是将当前须要入队的元素,放入putIndex下标对应的坑中 其次,将putIndex实行+1操作,如果+1后即是数组的长度,解释下标走到尽头了,得重新开始,于是实行=0操作 末了,以上操作都成功的话,将元素个数count实行+1;并且调用消费者等待行列步队,唤醒一个壅塞的消费者进行 消费 / // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();}
流程比较大略,看代码注释即可明白啦~~
消费方法takepublic E take() throws InterruptedException { / 同样的,进入方法先上锁,担保原子性操作,实在也不仅仅是为了担保原子性 我们在之前讲过Condition的事理的时候说过,调用其方法前必须先lock / final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { / 险些和上面一样,首先是个while循环,条件是当前元素的个数是否为0 如果为0,解释行列步队里没数据可以读取消费,则实行notEmpty.await()对消费者壅塞 如果不为0或者从壅塞等待行列步队中被唤醒,则实行出队操作dequeue() / while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }}private E dequeue() { / 首先,取出当前takeIndex下标对应的元素,并且置为空 其次,takeIndex实行+1操作,并判断是否即是数组大小,判断和上面一样哈~ 末了,count进行-1操作,由于出去了一个元素;并且调用生产者连续放入元素 / // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;}
同样的,代码注释里,已经剖析清楚了~~至于其他的一些操作方法小伙伴们可以自己去看哈~也比较大略。
ArrayBlockingQueue大略总结到此,我们想要理解的都理清楚了,大略总结下:
首先,ArrayBlockingQueue内部掩护了一个一维数组,两个下标(删/插)都是从0开始,加到数组大小之后又从零开始,这样担保了前辈先出的特性。(小伙伴们轻微动动笔画个图就知道了)其次,ArrayBlockingQueue准备了一把锁,用来担保各个操作的原子性,同时这把锁既可以公正又可以非公正末了,ArrayBlockingQueue准备了两个壅塞等待行列步队,一个给生产者,一个给消费者,当ArrayBlockingQueue放不下了,就会壅塞生产者,直到消费者消费后;当ArrayBlockingQueue没有数据了,就会阻挡消费者消费,直莅临盆者插入数据。有了之前AQS的根本,理解壅塞行列步队ArrayBlockingQueue的事理是不是轻松加愉快呢~当然我们后面也会选一些其他的实现类来剖析的,有的可能就比ArrayBlockingQueue繁芜多了。
大略用法案例下面我们写个大略的示例,来加深下对壅塞行列步队ArrayBlockingQueue的理解。
场景描述:一个行列步队大小为5的壅塞行列步队,多个生产者往行列步队放入随机号,一个消费者进行读取消费。
public class ArrayBlockingQueueDemo {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5, true); //先把消费者搞起来Thread consumer = new Thread(new Runnable() { @Override public void run() {try {for (;;) {String value = queue.take();System.out.println("消费者取出了值" + value);}} catch (InterruptedException e) {e.printStackTrace();}} }, "consumer");consumer.start(); //循环启动10个生产者for (int i = 0; i < 10; i++) { Thread producer = new Thread(new Runnable() {@Overridepublic void run() { try {String value = UUID.randomUUID().toString();System.out.println(Thread.currentThread().getName() + "放入值:" + value);queue.put(value); } catch (InterruptedException e) {e.printStackTrace(); }} }, "producer" + i); producer.start();}}}/运行效果producer1放入值:e06ee09f-46c0-4c33-92b3-53b4300fc1afproducer5放入值:03ed5819-6b51-4ef9-bb93-5f048ead0a1eproducer4放入值:9988b80b-7218-4b95-bbe0-e6d8ec00ba38消费者取出了值e06ee09f-46c0-4c33-92b3-53b4300fc1afproducer3放入值:01fc2d1c-6fc5-4968-b5ef-dde07639bde4producer8放入值:410903e5-4e4d-4310-a84d-b87f94f4a797producer0放入值:3323dda4-baf4-4b27-b574-95cda9ce0b14producer7放入值:13fc6650-69ff-433e-bdd3-4e726d41abdfproducer2放入值:e398813a-8b50-4221-8a13-845ae91a453aproducer6放入值:9696211c-f2bf-466a-bebc-b55de4608108producer9放入值:f106cffa-351e-4d4e-ad1a-6986f7136c3a消费者取出了值03ed5819-6b51-4ef9-bb93-5f048ead0a1e消费者取出了值9988b80b-7218-4b95-bbe0-e6d8ec00ba38消费者取出了值01fc2d1c-6fc5-4968-b5ef-dde07639bde4消费者取出了值410903e5-4e4d-4310-a84d-b87f94f4a797消费者取出了值3323dda4-baf4-4b27-b574-95cda9ce0b14消费者取出了值13fc6650-69ff-433e-bdd3-4e726d41abdf消费者取出了值e398813a-8b50-4221-8a13-845ae91a453a消费者取出了值9696211c-f2bf-466a-bebc-b55de4608108消费者取出了值f106cffa-351e-4d4e-ad1a-6986f7136c3a当然效果每次运行都会不一样的,你可能会有疑问,不是生产一个消费一个吗?你错了,我们仿照的是多个生产者,只有一个消费者,当消费者在消费打印的时候,说不定已经生产好几个了。请把稳,这是一个高并发场景。/
由此我们看出,只有生产者生产放入数据后,消费者才会进行读取消费。
到此,本篇到此结束啦~你学会了吗?~~
作者:会飞的鱼2022链接:https://juejin.cn/post/7117889476709318669