用数组实现

参考 JDK 中的实现java.util.concurrent.ArrayBlockingQueue

  1. 考虑使用环形数组。用两个指针 putIndex(下一个入队位置)、takeIndex(下一个出队位置),入队/出队到数组末尾时都从零开始,count统计元素数量,count=0说明队列为空,count=数组容量说明队列已满。
  2. 考虑用 lock + Condition.await 实现阻塞和保证线程安全。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class ArrayBlockingQueue<E> {
final Object[] items; // 存放元素的数组
int putIndex; // 下一个入队位置
int takeIndex; // 下一个出队位置
int count; // 元素数量
final ReentrantLock lock;
private final Condition notEmpty; // 不空,可以出
private final Condition notFull; // 不满,可以进

public ArrayBlockingQueue1(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();

items = new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

// 往队列放元素
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 加锁
try {
// 如果已满,释放锁并阻塞
while (count == items.length)
notFull.await();
// 如果未满,入队
enqueue(e);
// 唤醒关注未空的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 从队列取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果已空,阻塞释放锁
while (count == 0)
notEmpty.await();
// 如果未空,出队
E e = dequeue();
// 唤醒关注未满的线程
notFull.signal();
return e;
} finally {
lock.unlock();
}
}

// 入队
private void enqueue(E e) {
Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length)
putIndex = 0;
count++;
}

// 出队
private E dequeue() {
Object[] items = this.items;
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
return e;
}
}

用栈实现

栈的特点先进后出(FILO),阻塞队列的特点先进先出(FIFO),比较别扭不太合适。

硬要做也能做,思路是使用两个栈,一个入栈栈,一个出栈栈。

  • 入队操作:将元素入栈栈
  • 出队操作:如果出栈栈不空,直接弹出;如果出栈栈为空,将入栈栈中所有元素依次弹出并进出栈栈,然后出栈栈弹出。
  • 同样用 lock + Condition.await 实现阻塞和保证线程安全。
import java.util.Stack;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class StackBlockingQueue<E> {
private final Stack<E> inStack;
private final Stack<E> outStack;

int capacity;
final ReentrantLock lock;
private final Condition notEmpty;// 不空,可以出
private final Condition notFull;// 不满,可以进

public StackBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();

this.inStack = new Stack<>();
this.outStack = new Stack<>();
this.capacity = capacity;
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}

// 往队列放元素
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();

final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 加锁
try {
// 如果已满,阻塞并释放锁
while (inStack.size() + outStack.size() >= capacity)
notFull.await();
// 如果未满,入队
enqueue(e);
// 唤醒挂在未空条件上的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 从队列取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果已空,阻塞并释放锁
while (outStack.isEmpty() && inStack.isEmpty())
notEmpty.await();
// 如果未空,出队
E e = dequeue();
// 唤醒挂在未空条件上的线程
notFull.signal();
return e;
} finally {
lock.unlock();
}
}

// 入队
private void enqueue(E e) {
inStack.push(e);
}

// 出队
private E dequeue() {
if (outStack.isEmpty()) {
while (!inStack.isEmpty()) {
outStack.push(inStack.pop());
}
}
return outStack.pop();
}
}