|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectjava.util.AbstractCollection<E>
java.util.AbstractQueue<E>
org.apache.qpid.util.concurrent.BatchSynchQueueBase<E>
public abstract class BatchSynchQueueBase<E>
Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete implementation of this. This queue is only accessed through the methodsinsert(E, boolean)
, extract(boolean, boolean)
,
getBufferCapacity()
, peekAtBufferHead()
. An implementation can override these methods to implement
the buffer other than by a queue, for example, by using an array.
Normal queue methods to work asynchronously.
Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
when their data is taken.
The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
option to keep producers blocked until the consumer decides to release them.
Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
Responsibilities | Collaborations |
---|
Nested Class Summary | |
---|---|
class |
BatchSynchQueueBase.SynchRecordImpl<E>
A SynchRecordImpl is used by a BatchSynchQueue to pair together a producer with its data. |
class |
BatchSynchQueueBase.SynchRefImpl
|
Field Summary | |
---|---|
(package private) Queue<BatchSynchQueueBase.SynchRecordImpl<E>> |
buffer
Holds a reference to the queue implementation that holds the buffer. |
Constructor Summary | |
---|---|
BatchSynchQueueBase()
Creates a batch synch queue without fair thread scheduling. |
|
BatchSynchQueueBase(boolean fair)
Ensures that the underlying buffer implementation is created. |
Method Summary | ||
---|---|---|
protected abstract
|
createQueue()
This abstract method should be overriden to return an empty queue. |
|
int |
drainTo(Collection<? super E> objects)
Removes all available elements from this queue and adds them into the given collection. |
|
int |
drainTo(Collection<? super E> objects,
int maxElements)
Removes at most the given number of available elements from this queue and adds them into the given collection. |
|
SynchRef |
drainTo(Collection<SynchRecord<E>> c,
boolean unblock)
Takes all available data items from the queue or blocks until some become available. |
|
SynchRef |
drainTo(Collection<SynchRecord<E>> coll,
int maxElements,
boolean unblock)
Takes up to maxElements available data items from the queue or blocks until some become available. |
|
protected BatchSynchQueueBase.SynchRecordImpl<E> |
extract(boolean unblock,
boolean signal)
Extract element at current take position, advance, and signal. |
|
protected int |
getBufferCapacity()
Get the capacity of the buffer. |
|
protected boolean |
insert(E x,
boolean unlockAndBlock)
Insert element into the queue, then possibly signal that the queue is not empty and block the producer on the element until permission to procede is given. |
|
Iterator<E> |
iterator()
Returns an iterator over the elements contained in this collection. |
|
boolean |
offer(E e)
Inserts the specified element into this queue, if possible. |
|
boolean |
offer(E e,
long timeout,
TimeUnit unit)
Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to become available. |
|
E |
peek()
Retrieves, but does not remove, the head of this queue, returning null if this queue is empty. |
|
protected E |
peekAtBufferHead()
Return the head element from the buffer. |
|
E |
poll()
Retrieves and removes the head of this queue, or null if this queue is empty. |
|
E |
poll(long timeout,
TimeUnit unit)
Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue. |
|
void |
put(E e)
Adds the specified element to this queue, waiting if necessary for space to become available. |
|
int |
remainingCapacity()
Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit. |
|
int |
size()
Returns the number of elements in this collection. |
|
E |
take()
Retrieves and removes the head of this queue, waiting if no elements are present on this queue. |
|
void |
tryPut(E e)
Tries a synchronous put into the queue. |
Methods inherited from class java.util.AbstractQueue |
---|
add, addAll, clear, element, remove |
Methods inherited from class java.util.AbstractCollection |
---|
contains, containsAll, isEmpty, remove, removeAll, retainAll, toArray, toArray, toString |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
Methods inherited from interface java.util.concurrent.BlockingQueue |
---|
add, contains, remove |
Methods inherited from interface java.util.Queue |
---|
element, remove |
Methods inherited from interface java.util.Collection |
---|
addAll, clear, containsAll, equals, hashCode, isEmpty, removeAll, retainAll, toArray, toArray |
Field Detail |
---|
Queue<BatchSynchQueueBase.SynchRecordImpl<E>> buffer
Constructor Detail |
---|
public BatchSynchQueueBase()
public BatchSynchQueueBase(boolean fair)
fair
- true if fairness is to be applied to threads waiting to access the buffer.Method Detail |
---|
public Iterator<E> iterator()
iterator
in interface Iterable<E>
iterator
in interface Collection<E>
iterator
in class AbstractCollection<E>
public int size()
size
in interface Collection<E>
size
in class AbstractCollection<E>
public boolean offer(E e)
Collection.add(E)
, which can fail to insert an element only by throwing an exception.
offer
in interface BlockingQueue<E>
offer
in interface Queue<E>
e
- The element to insert.
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
offer
in interface BlockingQueue<E>
e
- The element to add.timeout
- How long to wait before giving up, in units of unitunit
- A TimeUnit determining how to interpret the timeout parameter.
InterruptedException
- If interrupted while waiting.
NullPointerException
- If the specified element is null.public E poll()
poll
in interface Queue<E>
public E poll(long timeout, TimeUnit unit) throws InterruptedException
poll
in interface BlockingQueue<E>
timeout
- How long to wait before giving up, in units of unit.unit
- A TimeUnit determining how to interpret the timeout parameter.
InterruptedException
- If interrupted while waiting.public E peek()
peek
in interface Queue<E>
public int remainingCapacity()
Note that you cannot always tell if an attempt to add an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to put or take an element.
remainingCapacity
in interface BlockingQueue<E>
public void put(E e) throws InterruptedException
tryPut(E)
which can raise SynchException
s. If any are raised
this method silently ignores them. Use the tryPut(E)
method directly if you want to catch these
exceptions.
put
in interface BlockingQueue<E>
e
- The element to add.
InterruptedException
- If interrupted while waiting.public void tryPut(E e) throws InterruptedException, SynchException
SynchException
.
tryPut
in interface BatchSynchQueue<E>
e
- The data element to put into the queue. Cannot be null.
InterruptedException
- If the thread is interrupted whilst waiting to write to the queue or whilst waiting
on its entry in the queue being consumed.
SynchException
- If a consumer encounters an error whilst processing the data element.public E take() throws InterruptedException
#drainTo(java.util.Collection>, int, boolean)
method. There is no take method to do that because there is not usually any advantage in a synchronous hand
off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
latencies accross many producers where possible.
take
in interface BlockingQueue<E>
InterruptedException
- if interrupted while waiting.public int drainTo(Collection<? super E> objects)
drainTo
in interface BlockingQueue<E>
objects
- The collection to transfer elements into.
NullPointerException
- If objects is null.
IllegalArgumentException
- If objects is this queue.public int drainTo(Collection<? super E> objects, int maxElements)
drainTo
in interface BlockingQueue<E>
objects
- The collection to transfer elements into.maxElements
- The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
all elements.
NullPointerException
- If c is null.
IllegalArgumentException
- If c is this queue.public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
SynchRecord
which provides an interface to requeue them or send errors to their
producers, where the producers are still blocked.
drainTo
in interface BatchSynchQueue<E>
c
- The collection to drain the data items into.unblock
- If set to true the producers for the taken items will be immediately unblocked.
public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
SynchRecord
which provides an interface to requeue them or send errors to their
producers, where the producers are still blocked.
drainTo
in interface BatchSynchQueue<E>
coll
- The collection to drain the data items into.maxElements
- The maximum number of elements to drain.unblock
- If set to true the producers for the taken items will be immediately unblocked.
protected abstract <T> Queue<T> createQueue()
protected boolean insert(E x, boolean unlockAndBlock)
unlockAndBlock
- trueIf the global queue lock should be released and the producer should be blocked.
protected BatchSynchQueueBase.SynchRecordImpl<E> extract(boolean unblock, boolean signal)
protected int getBufferCapacity()
protected E peekAtBufferHead()
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |