这篇文章介绍java的数据结构之链表阻塞队列LinkedBlockingQueue
1、LinkedBlockingQueueLinkedBlockingQueue是一个链表形式的阻塞队列,遵循FIFO的原则,是线程安全的。
队列的添加数据和获取数据都比较简单,在LinkedBlockingQueue队列中,最重要的是线程安全性,这篇文章不讨论数据的插入和获取,主要讨论LinkedBlockingQueue是如何保证线程安全的。
读这篇文章的前提要对ReentrantLock锁机制有所了解,可以参考以下文章:
ReentrantLock及Condition加锁解锁机制
2、LinkedBlockingQueue的锁LinkedBlockingQueue中有两个锁及锁的状态,如下:
/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();-- takeLock:在获取数据时进行线程加锁
-- notEmpty:在获取数据时,如果队列为空时,使用notEmpty进行线程的阻塞,直到队列不为空时,被唤起,获取数据。
-- putLock:在添加数据时进行线程加锁
-- notFull:在添加数据时,如果队列数据已满,使用notFull进行线程阻塞,直到队列不满时被唤起,插入数据。
3、线程安全--数据插入有两个数据插入的方法:
put(E e): 如果队列已满,阻塞线程,直到队列不满时被唤起,插入数据。
offer(E e):如果队列已满,直接返回false,不插入数据。
put(E e) public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;//作者注:构造一个包含插入数据的Node节点Node node = new Node(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//作者注:获取putLock的锁putLock.lockInterruptibly();try {//作者注:如果队列已满,调用await进行线程挂起(阻塞)// 直到队列不满时,被唤起(参看take函数中的notFull.signal())while (count.get() == capacity) {notFull.await();}/*** 作者注:如果队列不满或者线程被唤起(线程被唤起时会重新获取putLock锁)* 将node节点插入到链表队列的末尾*/enqueue(node);c = count.getAndIncrement();//作者注:此时notFull的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。if (c + 1 < capacity)notFull.signal();} finally {//作者注:释放putLock锁putLock.unlock();}//作者注:如果此时c == 0,说明之前队列时空的,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列里有数据了。if (c == 0)signalNotEmpty();} offer(E e)offer方法和put方法唯一的区别是,如果队列已满,不进行阻塞等待,直接返回false,不再插入数据。
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;//如果队列已满,直接返回if (count.get() == capacity)return false;int c = -1;Node node = new Node(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;} 4、线程安全--数据获取获取数据的方法比较多,有以下几个:
-- take(): 如果队列为空,阻塞线程,直到队列不空时,获取数据,并删除数据。
-- peek():如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,但不从队列删除数据
-- poll():如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,同时从队列中将数据删除
take(): public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//作者注:获取takeLock的锁,获取成功继续执行,获取失败进行等待// 等待和阻塞的区别:// 等待:有权利获取锁// 阻塞:没有权利获取锁,只有被唤醒,才有权利获取锁takeLock.lockInterruptibly();try {//作者注:如果队列未空,调用await进行线程挂起(阻塞),并释放takeLock锁// 直到队列不为空时,被唤起(参看put函数中的notEmpty.signal())while (count.get() == 0) {notEmpty.await();}/*** 作者注:如果队列不空或者线程被唤起(线程被唤起时会重新获取takeLock锁)* 获取head头部数据,并删除*/x = dequeue();c = count.getAndDecrement();//作者注:此时notEmpty的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。if (c > 1)notEmpty.signal();} finally {//作者注: 释放takeLock锁takeLock.unlock();}//作者注:如果此时c == capacity,说明之前队列满时,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列已经不满了,可以插入数据了。// NotFull.signal()就在此方法中。if (c == capacity)signalNotFull();return x;} peek():与take的不同点:
(1)、take在队列为空时阻塞等待,peek在队列为空时直接返回,不阻塞。
(2)、take在获取数据后,将数据从队列删除;peek只获取数据,不删除数据。
public E peek() {//作者注:如果队列为空,直接返回,不进行阻塞等待。if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node first = head.next;if (first == null)return null;else //作者注:只返回数据,但不移动head指针,代表不删除数据。return first.item;} finally {takeLock.unlock();}} pool():与take的不同点:take在队列为空时阻塞等待,pool在队列为空时直接返回,不阻塞
与take的相同点:如果获取到数据,将数据从队列里删除。
public E poll() {final AtomicInteger count = this.count;//作者注:如果队列为空,直接返回,不进行阻塞等待。if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {//作者注:获取数据,并且将获取的数据从队列中删除。x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}