org.apache.qpid.util.concurrent
Interface BatchSynchQueue<E>

All Superinterfaces:
BlockingQueue<E>, Collection<E>, Iterable<E>, Queue<E>
All Known Implementing Classes:
BatchSynchQueueBase, SynchBuffer, SynchQueue

public interface BatchSynchQueue<E>
extends BlockingQueue<E>

BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is consumed or until a consumer chooses to release the producer some time after consuming the data from the queue.

There are a number of possible advantages to using this technique when compared with having the producers processing their own data:

The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional take methods that can be used to take data from a queue without releasing producers, so that consumers have an opportunity to confirm correct processing of the data before producers are released. It also adds a put method with exceptions so that consumers can signal exception cases back to producers where there are errors in the data.

This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data from many threads but where synchronous handling of that data is neccessary because producers need to know that their data has been processed before they continue. For example, sending a bundle of messages together, or writing many records to disk at once, may result in improved performance but the originators of the messages or disk records need confirmation that their data has really been sent or saved to disk.

The consumer can put an element back onto the queue or send an error message to the elements producer using the SynchRecord interface.

The BlockingQueue.take(), BlockingQueue.drainTo(java.util.Collection) and BlockingQueue.drainTo(java.util.Collection, int) methods from BlockingQueue should behave as if they have been called with unblock set to false. That is they take elements from the queue but leave the producers blocked. These methods do not return collections of SynchRecords so they do not supply an interface through which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process all the records it takes.

The BlockingQueue.put(E) method should silently swallow any exceptions that consumers attempt to return to the caller. In order to handle exceptions the tryPut(E) method must be used.

CRC Card
Responsibilities Collaborations
Handle synchronous puts, with possible exceptions.
Allow consumers to take many records from a queue in a batch.
Allow consumers to decide when to unblock synchronous producers.


Method Summary
 SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
          Takes all available data items from the queue or blocks until some become available.
 SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock)
          Takes up to maxElements available data items from the queue or blocks until some become available.
 void tryPut(E e)
          Tries a synchronous put into the queue.
 
Methods inherited from interface java.util.concurrent.BlockingQueue
add, contains, drainTo, drainTo, offer, offer, poll, put, remainingCapacity, remove, take
 
Methods inherited from interface java.util.Queue
element, peek, poll, remove
 
Methods inherited from interface java.util.Collection
addAll, clear, containsAll, equals, hashCode, isEmpty, iterator, removeAll, retainAll, size, toArray, toArray
 

Method Detail

tryPut

void tryPut(E e)
            throws InterruptedException,
                   SynchException
Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the data that is put, then this is returned to the caller wrapped inside a SynchException.

Parameters:
e - The data element to put into the queue.
Throws:
InterruptedException - If the thread is interrupted whilst waiting to write to the queue or whilst waiting on its entry in the queue being consumed.
SynchException - If a consumer encounters an error whilst processing the data element.

drainTo

SynchRef drainTo(Collection<SynchRecord<E>> c,
                 boolean unblock)
Takes all available data items from the queue or blocks until some become available. The returned items are wrapped in a SynchRecord which provides an interface to requeue them or send errors to their producers, where the producers are still blocked.

Parameters:
c - The collection to drain the data items into.
unblock - If set to true the producers for the taken items will be immediately unblocked.
Returns:
A count of the number of elements that were drained from the queue.

drainTo

SynchRef drainTo(Collection<SynchRecord<E>> c,
                 int maxElements,
                 boolean unblock)
Takes up to maxElements available data items from the queue or blocks until some become available. The returned items are wrapped in a SynchRecord which provides an interface to requeue them or send errors to their producers, where the producers are still blocked.

Parameters:
c - The collection to drain the data items into.
maxElements - The maximum number of elements to drain.
unblock - If set to true the producers for the taken items will be immediately unblocked.
Returns:
A count of the number of elements that were drained from the queue.


Licensed to the Apache Software Foundation