A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty,
or if you try to enqueue items to it and the queue is already full.
A thread trying to dequeue from an empty queue is blocked
until some other thread inserts an item into the queue.
A thread trying to enqueue an item in a full queue is blocked
until some other thread makes space in the queue, either by dequeuing one
or more items or clearing the queue completely.
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity
<= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Take element from BlockingQueue
public E
take() throws InterruptedException {
final ReentrantLock
lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
// Causes the current thread to wait
until it is signalled
or interrupted.
notEmpty.await();
} catch (InterruptedException
ie) {
notEmpty.signal(); // propagate
to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
/**
* Extracts element at
current take position, advances, and signals.
* Call only when
holding lock.
*/
private E extract() {
final E[]
items = this.items;
E x
= items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
Put element in BlockingQueue
/**
* Inserts the
specified element at the tail of this queue, waiting
* for space to become
available if the queue is full.
* @throws InterruptedException
* @throws NullPointerException
*/
public void put(E
e) throws InterruptedException
{
if (e == null) throw new NullPointerException();
final E[]
items = this.items;
final ReentrantLock
lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException
ie) {
notFull.signal(); // propagate
to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
/**Inserts
element at current put position, advances, and signals.
* Call
only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
java.util.concurrent.locks.Condition
Condition factors
out the Object monitor methods (wait, notify and notifyAll) into
distinct objects to give the effect of having multiple wait-sets per object, by
combining them with the use of arbitrary Lock implementations.
Where a Lock replaces
the use of synchronized methods and statements, a Condition replaces
the use of the Object monitor methods.
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
newCondition()
Returns: Condition instance for use with
this Lock instance.
The returned Condition instance supports the
same usages as do the Object monitor methods (wait, notify, and notifyAll) when used with the built-in
monitor lock.
If this lock is not held
when any of the Condition waiting or signaling methods
are called, an IllegalMonitorStateException is thrown.
When the condition waiting methods
are called the lock is released and, before they return, the lock is reacquired
and the lock hold count restored to what it was when the method was called.
If a thread is interrupted while
waiting then the wait will terminate, an InterruptedException
will be thrown, and the thread's interrupted status will be cleared.
Waiting threads are signaled
in FIFO order.
The
ordering of lock reacquisition for threads returning from waiting methods is
the same as for threads initially acquiring the lock, which is in the default
case not specified, but for fair locks favors those threads
that have been waiting the longest.
No comments:
Post a Comment