Project JXTA

net.jxta.endpoint
Class AbstractMessenger

java.lang.Object
  extended bynet.jxta.util.AbstractSimpleSelectable
      extended bynet.jxta.endpoint.AbstractMessenger
All Implemented Interfaces:
Messenger, SimpleSelectable
Direct Known Subclasses:
ChannelMessenger, ThreadedMessenger

public abstract class AbstractMessenger
extends AbstractSimpleSelectable
implements Messenger

An AbstractMessenger is used to implement messengers (for example, by transport modules). It supplies the convenience, bw compatible, obvious, or otherwise rarely changed methods. Many method cannot be overloaded in order to ensure standard behaviour. The rest is left to implementations.

See Also:
EndpointService, EndpointAddress, Message

Nested Class Summary
 
Nested classes inherited from class net.jxta.util.SimpleSelectable
SimpleSelectable.IdentityReference
 
Field Summary
protected static long DEFAULT_MTU
          The default Maximum Transmission Unit.
protected  EndpointAddress dstAddress
          The destination address of messages sent on this messenger.
 
Fields inherited from class net.jxta.util.AbstractSimpleSelectable
identityReference
 
Fields inherited from interface net.jxta.endpoint.Messenger
ANYSTATE, BREAKING, BROKEN, CLOSED, CLOSING, CONNECTED, DISCONNECTED, DISCONNECTING, IDLE, RECONCLOSING, RECONNECTING, RECONSATURATED, RESOLCLOSING, RESOLPENDING, RESOLSATURATED, RESOLVED, RESOLVING, SATURATED, SENDING, SENDINGSATURATED, TERMINAL, UNRESOLVABLE, UNRESOLVED, UNRESOLVING, USABLE
 
Constructor Summary
AbstractMessenger(EndpointAddress dest)
          Create a new abstract messenger.
 
Method Summary
 void flush()
          Makes sure that all outstanding messages have been processed; successfully or not. This method waits unless and until the state of the messenger is an Messenger.IDLE state. If the reached state is neither Messenger.CLOSED or any Messenger.USABLE state, then it throws an IOException. Else it returns silently.

If another thread keeps sending messages, this method may never return.

This method is deliberately simple. If a timeout needs to be provided, or if more detailed conditions are required, the Messenger.waitState(int, long) and Messenger.getState() methods should be used. For example :

 int myFlush(long notTooLong) {
   messenger.waitState(IDLE, notTooLong);
   if ((messenger.getState() & IDLE) == 0) return TOOLONG;
   if ((messenger.getState() & USABLE) != 0) return FLUSHED;
   if (messenger.getState() == CLOSED) return CLOSEDANDFLUSHED;
   return BROKEN;
 }
 

Note: Messenger.close() being asynchronous, it is valid to invoke flush() after close() as a form of synchronous variant of close(). If this messenger is not shared with any other thread, then invoking flush() before close is a more familiar means of achieving the same effect.

 EndpointAddress getDestinationAddress()
          Returns the destination of this messenger. The returned EndpointAddress is a clone and can be freely used by the caller.
 EndpointAddress getDestinationAddressObject()
          Returns the internal EndpointAddress object of the destination of the user. Changing the content of the object may have unpredictable consequence on the behavior of the EndpointMessenger. This method is intended to be used for applications that requires to have weak or soft reference to an EndpointMessenger: the returned Endpoint Address object will be unreferenced when this messenger will finalize.
 long getMTU()
          Returns the maximum message size that this messenger can handle. That limit refers to the cummulative size of application level elements. The various sendMessage will refuse to send messages that exceed this limit.
 boolean isClosed()
          Returns true if this messenger is closed and no longer accepting messages to be sent. The messenger should be discarded.

This is a minimal implementation.

 boolean isIdle()
          Tells whether this messenger may be worth closing.

This is here for backward compatibility reasons.

 boolean isSynchronous()
          Returns true if the sendMessage methods of this messenger are fully synchronous.
 void itemChanged(SimpleSelectable changedObject)
          Implements a default for all AbstractMessengers: mirror the event to our selectors.
 boolean sendMessage(Message msg)
          Sends a message to the destination specified at construction as if by invoking sendMessage(msg, null, null)

This is a legacy method. Modern code should prefer the other methods and select messages. If a listener API is preferred, it is possible to use a ListenerAdaptor object explicitly to have a listener called.

 boolean sendMessage(Message msg, String rService, String rServiceParam)
          Sends a message to the destination specified at construction. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

Error Handling:

  • An IOException means that this message is invalid or that this messenger is now in one of the non Messenger.USABLE states and may no longer send new messages, and that the message was not sent. The exact state of the messenger may be obtained from the Messenger.getState() method.
  • A return result of false indicates that the message was not accepted to be sent. Usually this is due to local resource limits being reached. If needed, another attempt at sending the message, may be made after waiting for the congestion to clear (for example by using Messenger.waitState(int, long)).
  • A return result of true indicates that the message was accepted for sending. It does not imply that the message will be sent or that the destination will receive the message. There will be no immediate indication of any errors in sending the message. If this messenger subsequently reaches an Messenger.IDLE state that is either Messenger.CLOSED or a Messenger.USABLE state, then it may be inferred that all outsdanding messages have been processed without this messenger detecting an error.
  • The invoker may have confirmation of completion by observing the message's properties. When the message has been fully processed, Message.getMessageProperty(Messenger.class) will return an object of class OutgoingMessageEvent. Changes in a message's set of properties may be tracked by selecting the message.

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to sendMessage:

     messenger.sendMessage( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

Limitation: using this method along with SimpleSelector.select() on the same message may occasionaly cause some errors to not be thrown. Prefer Messenger.sendMessageN(net.jxta.endpoint.Message, java.lang.String, java.lang.String) when using SimpleSelector.select().

This is a legacy method. Modern code should prefer the other methods and select messages. If a listener API is preferred, it is possible to use a ListenerAdaptor object explicitly to have a listener called.

 void sendMessage(Message msg, String service, String serviceParam, OutgoingMessageEventListener listener)
          Deprecated. Implements deprecated method. Actual support only provided by channels.
protected  void setStateLock(Object stateLock)
          Specifies the object on which waitState must synchronize.
 int waitState(int wantedStates, long timeout)
          Blocks unless and until the current state is or has become one of the desired values. The observable states are guaranteed to be represented by a single bit. Multiple desired values may be specified by passing them ORed together.

This class defines the list of constants that may be used and how these may be combined.

Note that the state can change as soon as this method returns, so any observation is only an indication of the past. Synchronizing on the object itself has no other effect than interfering with other threads doing the same. Obviously, certain transitions cannot happen unless a new message is submitted. So unless another thread is using a messenger, it is safe to assume that a non-saturated messenger will not become saturated spontaneously. Note that messengers returned by different endpoint service interface objects (what PeerGroup.getEndpointService() returns) are different. However a given endpoint interface object will return an existing messenger to the same exact destination if there is a Messenger.USABLE one. With an unshared messenger, one can wait for any change with waitState(~getState(), 0);.

Note that it is advisable to always OR the desired states with Messenger.TERMINAL, unless being blocked passed the messenger termination is an acceptable behaviour.

Examples:

Ensure that the messenger can take more messages (or is UNUSABLE): waitState(~SATURATED)

Ensure that all submitted messages have been sent: waitState( TERMINAL | IDLE )

Ensure that the messenger is already resolved and can take more messages: waitState(TERMINAL | (RESOLVED & ~SATURATED) )

This method synchronizes on the lock object supplied at construction.

 
Methods inherited from class net.jxta.util.AbstractSimpleSelectable
getIdentityReference, haveListeners, notifyChange, register, registerListener, unregister, unregisterListener
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface net.jxta.endpoint.Messenger
close, getChannelMessenger, getLogicalDestinationAddress, getState, resolve, sendMessageB, sendMessageN
 
Methods inherited from interface net.jxta.util.SimpleSelectable
getIdentityReference, register, unregister
 

Field Detail

DEFAULT_MTU

protected static final long DEFAULT_MTU
The default Maximum Transmission Unit. Currently it is not enforced but at least that much can always be sent. All messengers getMTU method return this.

See Also:
Constant Field Values

dstAddress

protected final EndpointAddress dstAddress
The destination address of messages sent on this messenger. This member is protected because some transport messengers believe in reading it. FIXME - jice@jxta.org 20040413: fix transports.

Constructor Detail

AbstractMessenger

public AbstractMessenger(EndpointAddress dest)
Create a new abstract messenger. Warning: This class needs to know the object on which to waitState must synchronize. It is generally impossible to pass it at construction because it is not yet constructed. Instead implementations MUST call setStateLock(java.lang.Object) from their constructor.

Parameters:
dest - who messages should be addressed to
Method Detail

setStateLock

protected void setStateLock(Object stateLock)
Specifies the object on which waitState must synchronize.

Parameters:
stateLock - The object on which waitState must synchronize. This has to be the object that gets notified when the implementation changes its state. Changing state is defined as "any operation that causes the result of the getState method to change". Implementations that use the MessengerState state machine should typically use the MessengerState object as their state lock, but it is not assumed.

isIdle

public final boolean isIdle()
Tells whether this messenger may be worth closing.

This is here for backward compatibility reasons. The notion of long term unemployment still exists, but is no-longer part of the API. Self closing for unemployment is now a built-in feature of messengers.

Specified by:
isIdle in interface Messenger
Returns:
true if the messenger is idle otherwise false.

isSynchronous

public final boolean isSynchronous()
Returns true if the sendMessage methods of this messenger are fully synchronous.

Specified by:
isSynchronous in interface Messenger

getDestinationAddress

public EndpointAddress getDestinationAddress()
Returns the destination of this messenger. The returned EndpointAddress is a clone and can be freely used by the caller.

Specified by:
getDestinationAddress in interface Messenger
Returns:
EndpointAddress the destination address of this messenger
See Also:
Messenger.getLogicalDestinationAddress()

getDestinationAddressObject

public EndpointAddress getDestinationAddressObject()
Returns the internal EndpointAddress object of the destination of the user. Changing the content of the object may have unpredictable consequence on the behavior of the EndpointMessenger. This method is intended to be used for applications that requires to have weak or soft reference to an EndpointMessenger: the returned Endpoint Address object will be unreferenced when this messenger will finalize.

Specified by:
getDestinationAddressObject in interface Messenger
Returns:
EndpointAddress the destination address of this messenger
See Also:
Messenger.getDestinationAddress()

getMTU

public long getMTU()
Returns the maximum message size that this messenger can handle. That limit refers to the cummulative size of application level elements. The various sendMessage will refuse to send messages that exceed this limit.

Specified by:
getMTU in interface Messenger
Returns:
the limit.

isClosed

public boolean isClosed()
Returns true if this messenger is closed and no longer accepting messages to be sent. The messenger should be discarded.

This is a minimal implementation. It may not detect closure initiated by the other side unless the messenger was actually used since. A more accurate (but not mandatory implementation) would actually go and check the underlying connection, if relevant...unless breakage initiated by the other side is actually reported asynchronously when it happens. Breakage detection from the other side need not be reported atomically with its occurence. This not very important since we canonicalize transport messengers and so do not need to aggressively collect closed ones. When not used, messengers die by themselves.

Specified by:
isClosed in interface Messenger
Returns:
true if this messenger is closed, otherwise false.

flush

public final void flush()
                 throws IOException
Makes sure that all outstanding messages have been processed; successfully or not. This method waits unless and until the state of the messenger is an Messenger.IDLE state. If the reached state is neither Messenger.CLOSED or any Messenger.USABLE state, then it throws an IOException. Else it returns silently.

If another thread keeps sending messages, this method may never return.

This method is deliberately simple. If a timeout needs to be provided, or if more detailed conditions are required, the Messenger.waitState(int, long) and Messenger.getState() methods should be used. For example :

 int myFlush(long notTooLong) {
   messenger.waitState(IDLE, notTooLong);
   if ((messenger.getState() & IDLE) == 0) return TOOLONG;
   if ((messenger.getState() & USABLE) != 0) return FLUSHED;
   if (messenger.getState() == CLOSED) return CLOSEDANDFLUSHED;
   return BROKEN;
 }
 

Note: Messenger.close() being asynchronous, it is valid to invoke flush() after close() as a form of synchronous variant of close(). If this messenger is not shared with any other thread, then invoking flush() before close is a more familiar means of achieving the same effect.

Specified by:
flush in interface Messenger
Throws:
IOException - This messenger failed before processing all outstanding messages successfully.

sendMessage

public final boolean sendMessage(Message msg)
                          throws IOException
Sends a message to the destination specified at construction as if by invoking sendMessage(msg, null, null)

This is a legacy method. Modern code should prefer the other methods and select messages. If a listener API is preferred, it is possible to use a ListenerAdaptor object explicitly to have a listener called.

Specified by:
sendMessage in interface Messenger
Parameters:
msg - The message to send.
Returns:
boolean true if the message has been accepted for sending, otherwise false.
Throws:
IOException - Thrown if the message cannot be sent.

sendMessage

public void sendMessage(Message msg,
                        String service,
                        String serviceParam,
                        OutgoingMessageEventListener listener)
Deprecated. Implements deprecated method. Actual support only provided by channels.

Sends a message to the destination specified at construction. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

WARNING: The Message object should not be reused or modified until the message has been fully processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If a listener is provided it is invoked after the message is considered fully processed. However it is recommended not to reuse or otherwise modify a messages after sending it. If necessary, a clone of the message may be provided to sendMessage:

     messenger.sendMessage( (Message) myMessage.clone() );
 

Error Handling:

As with all sendMessage methods, success is not a guarantee that the message will actually be received.

This is a legacy method. Modern code should prefer the other methods and select messages. If a listener API is preferred, it is possible to use a ListenerAdaptor object explicitly to have a listener called.

Specified by:
sendMessage in interface Messenger
Parameters:
msg - The message to send.
service - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
serviceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
listener - listener for events about this message or null if no notification is desired.

sendMessage

public final boolean sendMessage(Message msg,
                                 String rService,
                                 String rServiceParam)
                          throws IOException
Sends a message to the destination specified at construction. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

Error Handling:

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to sendMessage:

     messenger.sendMessage( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

Limitation: using this method along with SimpleSelector.select() on the same message may occasionaly cause some errors to not be thrown. Prefer Messenger.sendMessageN(net.jxta.endpoint.Message, java.lang.String, java.lang.String) when using SimpleSelector.select().

This is a legacy method. Modern code should prefer the other methods and select messages. If a listener API is preferred, it is possible to use a ListenerAdaptor object explicitly to have a listener called.

Specified by:
sendMessage in interface Messenger
Parameters:
msg - The message to send.
rService - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
rServiceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
Returns:
boolean true if the message has been accepted for sending, otherwise false.
Throws:
IOException - Thrown if the message cannot be sent.

waitState

public final int waitState(int wantedStates,
                           long timeout)
                    throws InterruptedException
Blocks unless and until the current state is or has become one of the desired values. The observable states are guaranteed to be represented by a single bit. Multiple desired values may be specified by passing them ORed together.

This class defines the list of constants that may be used and how these may be combined.

Note that the state can change as soon as this method returns, so any observation is only an indication of the past. Synchronizing on the object itself has no other effect than interfering with other threads doing the same. Obviously, certain transitions cannot happen unless a new message is submitted. So unless another thread is using a messenger, it is safe to assume that a non-saturated messenger will not become saturated spontaneously. Note that messengers returned by different endpoint service interface objects (what PeerGroup.getEndpointService() returns) are different. However a given endpoint interface object will return an existing messenger to the same exact destination if there is a Messenger.USABLE one. With an unshared messenger, one can wait for any change with waitState(~getState(), 0);.

Note that it is advisable to always OR the desired states with Messenger.TERMINAL, unless being blocked passed the messenger termination is an acceptable behaviour.

Examples:

Ensure that the messenger can take more messages (or is UNUSABLE): waitState(~SATURATED)

Ensure that all submitted messages have been sent: waitState( TERMINAL | IDLE )

Ensure that the messenger is already resolved and can take more messages: waitState(TERMINAL | (RESOLVED & ~SATURATED) )

This method synchronizes on the lock object supplied at construction.

Specified by:
waitState in interface Messenger
Parameters:
wantedStates - The binary OR of the desired states.
timeout - How long to wait. A timeout of 0 means no time limit.
Returns:
The desired state that was reached or the current state when time ran out.
Throws:
InterruptedException - If the invoking thread was interrupted before the condition was realized.

itemChanged

public void itemChanged(SimpleSelectable changedObject)
Implements a default for all AbstractMessengers: mirror the event to our selectors. This is what is needed by all the known AbstractMessengers that register themselves somewhere. (That is ChannelMessengers). FIXME - jice@jxta.org 20040413: Not sure that this is the best default.

Specified by:
itemChanged in interface SimpleSelectable
Parameters:
changedObject - Ignored.
See Also:
AbstractSimpleSelectable

JXTA J2SE