用数组实现
参考 JDK 中的实现java.util.concurrent.ArrayBlockingQueue
。
- 考虑使用环形数组。用两个指针
putIndex
(下一个入队位置)、takeIndex
(下一个出队位置),入队/出队到数组末尾时都从零开始,count
统计元素数量,count=0说明队列为空,count=数组容量说明队列已满。
- 考虑用 lock + Condition.await 实现阻塞和保证线程安全。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ArrayBlockingQueue<E> { final Object[] items; int putIndex; int takeIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public ArrayBlockingQueue1(int capacity) { if (capacity <= 0) throw new IllegalArgumentException();
items = new Object[capacity]; lock = new ReentrantLock(); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public void put(E e) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); notEmpty.signal(); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); E e = dequeue(); notFull.signal(); return e; } finally { lock.unlock(); } }
private void enqueue(E e) { Object[] items = this.items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0; count++; }
private E dequeue() { Object[] items = this.items; E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; return e; } }
|
用栈实现
栈的特点先进后出(FILO),阻塞队列的特点先进先出(FIFO),比较别扭不太合适。
硬要做也能做,思路是使用两个栈,一个入栈栈,一个出栈栈。
- 入队操作:将元素入栈栈
- 出队操作:如果出栈栈不空,直接弹出;如果出栈栈为空,将入栈栈中所有元素依次弹出并进出栈栈,然后出栈栈弹出。
- 同样用 lock + Condition.await 实现阻塞和保证线程安全。
import java.util.Stack; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class StackBlockingQueue<E> { private final Stack<E> inStack; private final Stack<E> outStack;
int capacity; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public StackBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException();
this.inStack = new Stack<>(); this.outStack = new Stack<>(); this.capacity = capacity; this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (inStack.size() + outStack.size() >= capacity) notFull.await(); enqueue(e); notEmpty.signal(); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (outStack.isEmpty() && inStack.isEmpty()) notEmpty.await(); E e = dequeue(); notFull.signal(); return e; } finally { lock.unlock(); } }
private void enqueue(E e) { inStack.push(e); }
private E dequeue() { if (outStack.isEmpty()) { while (!inStack.isEmpty()) { outStack.push(inStack.pop()); } } return outStack.pop(); } }
|