本文共 2914 字,大约阅读时间需要 9 分钟。
import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue{ /** * 对象池 */ private Object[] objs; /** * 容量 */ private int capacity = DEFAULT_CAPACITY; /** * 存元素的索引 */ private int putIndex = 0; /** * 取元素的索引 */ private int takeIndex = 0; /** * 元素个数 */ private int curLen = 0; /** * 并发锁 */ private Lock lock = new ReentrantLock(); /** * 队列有元素 */ private Condition notNullCondition = lock.newCondition(); /** * 队列有空间 */ private Condition notFullCondition = lock.newCondition(); /** * 默认容量大小 */ private static final int DEFAULT_CAPACITY = 100; public BlockingQueue(int capacity) { if (capacity > 0) { this.capacity = capacity; } objs = new Object[this.capacity]; } public void push(T obj) throws InterruptedException { try { lock.lock(); // 容量已满,阻塞等待 while (curLen == capacity) { System.out.println("Queue is full..."); notFullCondition.await(); } // 放置元素 objs[putIndex] = obj; curLen++; // 索引更新 putIndex++; if (putIndex >= capacity) { putIndex = 0; } // 通知消费者线程 notNullCondition.signalAll(); } finally { lock.unlock(); } } public T poll() throws InterruptedException { Object obj; try { lock.lock(); // 队列未空,阻塞等待 while (curLen == 0) { System.out.println("Queue is empty..."); notNullCondition.await(); } // 获取元素 obj = objs[takeIndex]; curLen--; // 索引更新 takeIndex++; if (takeIndex >= capacity) { takeIndex = 0; } // 通知生产者线程 notFullCondition.signalAll(); } finally { lock.unlock(); } return (T) obj; } public static void main(String args[]) throws InterruptedException { BlockingQueue queue = new BlockingQueue(5); // 正常的入队、出队 queue.push(1); System.out.println(queue.poll()); queue.push(2); System.out.println(queue.poll()); queue.push(3); System.out.println(queue.poll()); // 持续入队 queue.push(1); queue.push(2); queue.push(3); queue.push(4); queue.push(5); // 这里会阻塞 // queue.push(6); // 持续出队 System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); // 这里会阻塞 // System.out.println(queue.poll()); }}
转载地址:http://cnaii.baihongyu.com/