org.apache.qpid.util.concurrent
Class BatchSynchQueueBase<E>

java.lang.Object
  extended by java.util.AbstractCollection<E>
      extended by java.util.AbstractQueue<E>
          extended by org.apache.qpid.util.concurrent.BatchSynchQueueBase<E>
All Implemented Interfaces:
Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>, BatchSynchQueue<E>
Direct Known Subclasses:
SynchBuffer, SynchQueue

public abstract class BatchSynchQueueBase<E>
extends AbstractQueue<E>
implements BatchSynchQueue<E>

Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.

Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete implementation of this. This queue is only accessed through the methods insert(E, boolean), extract(boolean, boolean), getBufferCapacity(), peekAtBufferHead(). An implementation can override these methods to implement the buffer other than by a queue, for example, by using an array.

Normal queue methods to work asynchronously.

Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately when their data is taken.

The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the option to keep producers blocked until the consumer decides to release them.

Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io) accross many producers. The only advantage of the single blocking take method is that it did take advantage of the queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.

CRC Card
Responsibilities Collaborations


Nested Class Summary
 class BatchSynchQueueBase.SynchRecordImpl<E>
          A SynchRecordImpl is used by a BatchSynchQueue to pair together a producer with its data.
 class BatchSynchQueueBase.SynchRefImpl
           
 
Field Summary
(package private)  Queue<BatchSynchQueueBase.SynchRecordImpl<E>> buffer
          Holds a reference to the queue implementation that holds the buffer.
 
Constructor Summary
BatchSynchQueueBase()
          Creates a batch synch queue without fair thread scheduling.
BatchSynchQueueBase(boolean fair)
          Ensures that the underlying buffer implementation is created.
 
Method Summary
protected abstract
<T> Queue<T>
createQueue()
          This abstract method should be overriden to return an empty queue.
 int drainTo(Collection<? super E> objects)
          Removes all available elements from this queue and adds them into the given collection.
 int drainTo(Collection<? super E> objects, int maxElements)
          Removes at most the given number of available elements from this queue and adds them into the given collection.
 SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
          Takes all available data items from the queue or blocks until some become available.
 SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
          Takes up to maxElements available data items from the queue or blocks until some become available.
protected  BatchSynchQueueBase.SynchRecordImpl<E> extract(boolean unblock, boolean signal)
          Extract element at current take position, advance, and signal.
protected  int getBufferCapacity()
          Get the capacity of the buffer.
protected  boolean insert(E x, boolean unlockAndBlock)
          Insert element into the queue, then possibly signal that the queue is not empty and block the producer on the element until permission to procede is given.
 Iterator<E> iterator()
          Returns an iterator over the elements contained in this collection.
 boolean offer(E e)
          Inserts the specified element into this queue, if possible.
 boolean offer(E e, long timeout, TimeUnit unit)
          Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to become available.
 E peek()
          Retrieves, but does not remove, the head of this queue, returning null if this queue is empty.
protected  E peekAtBufferHead()
          Return the head element from the buffer.
 E poll()
          Retrieves and removes the head of this queue, or null if this queue is empty.
 E poll(long timeout, TimeUnit unit)
          Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue.
 void put(E e)
          Adds the specified element to this queue, waiting if necessary for space to become available.
 int remainingCapacity()
          Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.
 int size()
          Returns the number of elements in this collection.
 E take()
          Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
 void tryPut(E e)
          Tries a synchronous put into the queue.
 
Methods inherited from class java.util.AbstractQueue
add, addAll, clear, element, remove
 
Methods inherited from class java.util.AbstractCollection
contains, containsAll, isEmpty, remove, removeAll, retainAll, toArray, toArray, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface java.util.concurrent.BlockingQueue
add, contains, remove
 
Methods inherited from interface java.util.Queue
element, remove
 
Methods inherited from interface java.util.Collection
addAll, clear, containsAll, equals, hashCode, isEmpty, removeAll, retainAll, toArray, toArray
 

Field Detail

buffer

Queue<BatchSynchQueueBase.SynchRecordImpl<E>> buffer
Holds a reference to the queue implementation that holds the buffer.

Constructor Detail

BatchSynchQueueBase

public BatchSynchQueueBase()
Creates a batch synch queue without fair thread scheduling.


BatchSynchQueueBase

public BatchSynchQueueBase(boolean fair)
Ensures that the underlying buffer implementation is created.

Parameters:
fair - true if fairness is to be applied to threads waiting to access the buffer.
Method Detail

iterator

public Iterator<E> iterator()
Returns an iterator over the elements contained in this collection.

Specified by:
iterator in interface Iterable<E>
Specified by:
iterator in interface Collection<E>
Specified by:
iterator in class AbstractCollection<E>
Returns:
An iterator over the elements contained in this collection.

size

public int size()
Returns the number of elements in this collection. If the collection contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.

Specified by:
size in interface Collection<E>
Specified by:
size in class AbstractCollection<E>
Returns:
The number of elements in this collection.

offer

public boolean offer(E e)
Inserts the specified element into this queue, if possible. When using queues that may impose insertion restrictions (for example capacity bounds), method offer is generally preferable to method Collection.add(E), which can fail to insert an element only by throwing an exception.

Specified by:
offer in interface BlockingQueue<E>
Specified by:
offer in interface Queue<E>
Parameters:
e - The element to insert.
Returns:
true if it was possible to add the element to this queue, else false

offer

public boolean offer(E e,
                     long timeout,
                     TimeUnit unit)
              throws InterruptedException
Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to become available.

Specified by:
offer in interface BlockingQueue<E>
Parameters:
e - The element to add.
timeout - How long to wait before giving up, in units of unit
unit - A TimeUnit determining how to interpret the timeout parameter.
Returns:
true if successful, or false if the specified waiting time elapses before space is available.
Throws:
InterruptedException - If interrupted while waiting.
NullPointerException - If the specified element is null.

poll

public E poll()
Retrieves and removes the head of this queue, or null if this queue is empty.

Specified by:
poll in interface Queue<E>
Returns:
The head of this queue, or null if this queue is empty.

poll

public E poll(long timeout,
              TimeUnit unit)
       throws InterruptedException
Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue.

Specified by:
poll in interface BlockingQueue<E>
Parameters:
timeout - How long to wait before giving up, in units of unit.
unit - A TimeUnit determining how to interpret the timeout parameter.
Returns:
The head of this queue, or null if the specified waiting time elapses before an element is present.
Throws:
InterruptedException - If interrupted while waiting.

peek

public E peek()
Retrieves, but does not remove, the head of this queue, returning null if this queue is empty.

Specified by:
peek in interface Queue<E>
Returns:
The head of this queue, or null if this queue is empty.

remainingCapacity

public int remainingCapacity()
Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.

Note that you cannot always tell if an attempt to add an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to put or take an element.

Specified by:
remainingCapacity in interface BlockingQueue<E>
Returns:
The remaining capacity.

put

public void put(E e)
         throws InterruptedException
Adds the specified element to this queue, waiting if necessary for space to become available.

This method delegated to tryPut(E) which can raise SynchExceptions. If any are raised this method silently ignores them. Use the tryPut(E) method directly if you want to catch these exceptions.

Specified by:
put in interface BlockingQueue<E>
Parameters:
e - The element to add.
Throws:
InterruptedException - If interrupted while waiting.

tryPut

public void tryPut(E e)
            throws InterruptedException,
                   SynchException
Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the data that is put, then this is returned to the caller wrapped inside a SynchException.

Specified by:
tryPut in interface BatchSynchQueue<E>
Parameters:
e - The data element to put into the queue. Cannot be null.
Throws:
InterruptedException - If the thread is interrupted whilst waiting to write to the queue or whilst waiting on its entry in the queue being consumed.
SynchException - If a consumer encounters an error whilst processing the data element.

take

public E take()
       throws InterruptedException
Retrieves and removes the head of this queue, waiting if no elements are present on this queue. Any producer that has its data element taken by this call will be immediately unblocked. To keep the producer blocked whilst taking just a single item, use the #drainTo(java.util.Collection>, int, boolean) method. There is no take method to do that because there is not usually any advantage in a synchronous hand off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption latencies accross many producers where possible.

Specified by:
take in interface BlockingQueue<E>
Returns:
The head of this queue.
Throws:
InterruptedException - if interrupted while waiting.

drainTo

public int drainTo(Collection<? super E> objects)
Removes all available elements from this queue and adds them into the given collection. This operation may be more efficient than repeatedly polling this queue. A failure encountered while attempting to add elements to collection c may result in elements being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if the specified collection is modified while the operation is in progress.

Specified by:
drainTo in interface BlockingQueue<E>
Parameters:
objects - The collection to transfer elements into.
Returns:
The number of elements transferred.
Throws:
NullPointerException - If objects is null.
IllegalArgumentException - If objects is this queue.

drainTo

public int drainTo(Collection<? super E> objects,
                   int maxElements)
Removes at most the given number of available elements from this queue and adds them into the given collection. A failure encountered while attempting to add elements to collection c may result in elements being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if the specified collection is modified while the operation is in progress.

Specified by:
drainTo in interface BlockingQueue<E>
Parameters:
objects - The collection to transfer elements into.
maxElements - The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning all elements.
Returns:
The number of elements transferred.
Throws:
NullPointerException - If c is null.
IllegalArgumentException - If c is this queue.

drainTo

public SynchRef drainTo(Collection<SynchRecord<E>> c,
                        boolean unblock)
Takes all available data items from the queue or blocks until some become available. The returned items are wrapped in a SynchRecord which provides an interface to requeue them or send errors to their producers, where the producers are still blocked.

Specified by:
drainTo in interface BatchSynchQueue<E>
Parameters:
c - The collection to drain the data items into.
unblock - If set to true the producers for the taken items will be immediately unblocked.
Returns:
A count of the number of elements that were drained from the queue.

drainTo

public SynchRef drainTo(Collection<SynchRecord<E>> coll,
                        int maxElements,
                        boolean unblock)
Takes up to maxElements available data items from the queue or blocks until some become available. The returned items are wrapped in a SynchRecord which provides an interface to requeue them or send errors to their producers, where the producers are still blocked.

Specified by:
drainTo in interface BatchSynchQueue<E>
Parameters:
coll - The collection to drain the data items into.
maxElements - The maximum number of elements to drain.
unblock - If set to true the producers for the taken items will be immediately unblocked.
Returns:
A count of the number of elements that were drained from the queue.

createQueue

protected abstract <T> Queue<T> createQueue()
This abstract method should be overriden to return an empty queue. Different implementations of producer consumer buffers can control the order in which data is accessed using different queue implementations. This method allows the type of queue to be abstracted out of this class and to be supplied by concrete implementations.

Returns:
An empty queue.

insert

protected boolean insert(E x,
                         boolean unlockAndBlock)
Insert element into the queue, then possibly signal that the queue is not empty and block the producer on the element until permission to procede is given.

If the producer is to be blocked then the lock must be released first, otherwise no other process will be able to get access to the queue. Hence, unlock and block are always set together.

Call only when holding the global lock.

Parameters:
unlockAndBlock - trueIf the global queue lock should be released and the producer should be blocked.
Returns:
true if the operation succeeded, false otherwise. If the result is true this method may not return straight away, but only after the producer is unblocked by having its data consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no matter what value the unlockAndBlock flag has, leaving the global lock on.

extract

protected BatchSynchQueueBase.SynchRecordImpl<E> extract(boolean unblock,
                                                         boolean signal)
Extract element at current take position, advance, and signal.

Call only when holding lock.


getBufferCapacity

protected int getBufferCapacity()
Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.

Call only when holding lock.

Returns:
The maximum capacity of the buffer.

peekAtBufferHead

protected E peekAtBufferHead()
Return the head element from the buffer.

Call only when holding lock.

Returns:
The head element from the buffer.


Licensed to the Apache Software Foundation