001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport; 020 import java.net.URI; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.jms.ExceptionListener; 026 import javax.jms.JMSException; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.activemq.UnsupportedWireFormatException; 031 import org.activemq.broker.BrokerConnector; 032 import org.activemq.io.WireFormat; 033 import org.activemq.message.Packet; 034 import org.activemq.message.PacketListener; 035 import org.activemq.message.Receipt; 036 import org.activemq.message.ReceiptHolder; 037 import org.activemq.message.WireFormatInfo; 038 import org.activemq.util.ExecutorHelper; 039 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 040 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList; 041 import EDU.oswego.cs.dl.util.concurrent.Executor; 042 043 /** 044 * Some basic functionality, common across most transport implementations of channels 045 * 046 * @version $Revision: 1.1.1.1 $ 047 */ 048 public abstract class TransportChannelSupport implements TransportChannel { 049 private static final Log log = LogFactory.getLog(TransportChannelSupport.class); 050 private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); 051 private ConcurrentHashMap requestMap = new ConcurrentHashMap(); 052 private PacketListener packetListener; 053 private ExceptionListener exceptionListener; 054 private String clientID; 055 private TransportChannelListener transportChannelListener; 056 private long lastReceiptTimstamp = 0; 057 private boolean serverSide; 058 protected boolean pendingStop = false; 059 protected boolean transportConnected = true; 060 protected WireFormat currentWireFormat; 061 protected boolean cachingEnabled = false; 062 protected boolean noDelay = false; 063 protected boolean usedInternally = false; //denotes if transport is used by an internal Connection 064 065 066 067 protected TransportChannelSupport(){ 068 } 069 070 protected TransportChannelSupport(WireFormat wf){ 071 this.currentWireFormat = wf; 072 } 073 074 /** 075 * Give the TransportChannel a hint it's about to stop 076 * 077 * @param pendingStop 078 */ 079 public void setPendingStop(boolean pendingStop) { 080 this.pendingStop = pendingStop; 081 } 082 083 /** 084 * @return true if the channel is about to stop 085 */ 086 public boolean isPendingStop() { 087 return pendingStop; 088 } 089 090 /** 091 * set the wire format to be used by this channel 092 * @param wireformat 093 */ 094 public void setWireFormat(WireFormat wireformat){ 095 currentWireFormat = wireformat; 096 } 097 098 /** 099 * Get the current wireformat used by this channel 100 * @return the current wire format - or null if not set 101 */ 102 public WireFormat getWireFormat(){ 103 return currentWireFormat; 104 } 105 106 /** 107 * close the channel 108 */ 109 public void stop() { 110 transportConnected = false; 111 Map map = new HashMap(this.requestMap); 112 for (Iterator i = map.values().iterator();i.hasNext();) { 113 ReceiptHolder rh = (ReceiptHolder) i.next(); 114 rh.close(); 115 } 116 map.clear(); 117 requestMap.clear(); 118 if (transportChannelListener != null) { 119 transportChannelListener.removeClient(this); 120 } 121 exceptionListener = null; 122 packetListener = null; 123 } 124 125 /** 126 * synchronously send a Packet 127 * 128 * @param packet 129 * @return a Receipt 130 * @throws JMSException 131 */ 132 public Receipt send(Packet packet) throws JMSException { 133 return send(packet, 0); 134 } 135 136 /** 137 * Synchronously send a Packet 138 * 139 * @param packet packet to send 140 * @param timeout amount of time to wait for a receipt 141 * @return the Receipt 142 * @throws JMSException 143 */ 144 public Receipt send(Packet packet, int timeout) throws JMSException { 145 ReceiptHolder rh = asyncSendWithReceipt(packet); 146 Receipt result = rh.getReceipt(timeout); 147 return result; 148 } 149 150 /** 151 * Asynchronously send a Packet with receipt. 152 * 153 * @param packet the packet to send 154 * @return a ReceiptHolder for the packet 155 * @throws JMSException 156 */ 157 public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException { 158 ReceiptHolder rh = new ReceiptHolder(); 159 requestMap.put(new Short(packet.getId()), rh); 160 Packet response = doAsyncSend(packet); 161 if (response != null && response instanceof Receipt){ 162 rh.setReceipt((Receipt)response); 163 } 164 return rh; 165 } 166 167 // Properties 168 //------------------------------------------------------------------------- 169 /** 170 * @return the transportChannelListener 171 */ 172 public TransportChannelListener getTransportChannelListener() { 173 return transportChannelListener; 174 } 175 176 /** 177 * @param transportChannelListener 178 */ 179 public void setTransportChannelListener(TransportChannelListener transportChannelListener) { 180 this.transportChannelListener = transportChannelListener; 181 } 182 183 /** 184 * Add a listener for changes in a channels status 185 * 186 * @param listener 187 */ 188 public void addTransportStatusEventListener(TransportStatusEventListener listener) { 189 listeners.add(listener); 190 } 191 192 /** 193 * Remove a listener for changes in a channels status 194 * 195 * @param listener 196 */ 197 public void removeTransportStatusEventListener(TransportStatusEventListener listener) { 198 listeners.remove(listener); 199 } 200 201 /** 202 * @return the clientID 203 */ 204 public String getClientID() { 205 return clientID; 206 } 207 208 /** 209 * @param clientID set the clientID 210 */ 211 public void setClientID(String clientID) { 212 this.clientID = clientID; 213 } 214 215 /** 216 * @return the exception listener 217 */ 218 public ExceptionListener getExceptionListener() { 219 return exceptionListener; 220 } 221 222 /** 223 * @return the packet listener 224 */ 225 public PacketListener getPacketListener() { 226 return packetListener; 227 } 228 229 /** 230 * Set a listener for Packets 231 * 232 * @param l 233 */ 234 public void setPacketListener(PacketListener l) { 235 this.packetListener = l; 236 } 237 238 /** 239 * Set an exception listener to listen for asynchronously generated exceptions 240 * 241 * @param listener 242 */ 243 public void setExceptionListener(ExceptionListener listener) { 244 this.exceptionListener = listener; 245 } 246 247 /** 248 * @return true if server side 249 */ 250 public boolean isServerSide() { 251 return serverSide; 252 } 253 254 /** 255 * @param serverSide 256 */ 257 public void setServerSide(boolean serverSide) { 258 this.serverSide = serverSide; 259 } 260 261 /** 262 * @return true if the transport channel is active, 263 * this value will be false through reconnecting 264 */ 265 public boolean isTransportConnected(){ 266 return transportConnected; 267 } 268 269 protected void setTransportConnected(boolean value){ 270 transportConnected = value; 271 } 272 273 /** 274 * Some transports rely on an embedded broker (beer based protocols) 275 * @return true if an embedded broker required 276 */ 277 public boolean requiresEmbeddedBroker(){ 278 return false; 279 } 280 281 /** 282 * Some transports that rely on an embedded broker need to 283 * create the connector used by the broker 284 * @return the BrokerConnector or null if not applicable 285 * @throws JMSException 286 */ 287 public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{ 288 return null; 289 } 290 291 292 /** 293 * @return true if this transport is multicast based (i.e. broadcasts to multiple nodes) 294 */ 295 public boolean isMulticast(){ 296 return false; 297 } 298 299 /** 300 * Can this wireformat process packets of this version 301 * @param version the version number to test 302 * @return true if can accept the version 303 */ 304 public boolean canProcessWireFormatVersion(int version){ 305 return true; 306 } 307 308 public long getLastReceiptTimestamp() { 309 return lastReceiptTimstamp; 310 } 311 312 /** 313 * @return Returns the usedInternally. 314 */ 315 public boolean isUsedInternally() { 316 return usedInternally; 317 } 318 /** 319 * @param usedInternally The usedInternally to set. 320 */ 321 public void setUsedInternally(boolean usedInternally) { 322 this.usedInternally = usedInternally; 323 } 324 325 /** 326 * Does the transport support wire format version info 327 * @return 328 */ 329 public boolean doesSupportWireFormatVersioning(){ 330 return true; 331 } 332 333 /** 334 * @return the current version of this wire format 335 */ 336 public int getCurrentWireFormatVersion(){ 337 return -1; 338 } 339 340 341 342 /** 343 * some transports/wire formats will implement their own fragementation 344 * @return true unless a transport/wire format supports it's own fragmentation 345 */ 346 public boolean doesSupportMessageFragmentation(){ 347 return getWireFormat() != null && getWireFormat().doesSupportMessageFragmentation(); 348 } 349 350 351 /** 352 * Some transports/wireformats will not be able to understand compressed messages 353 * @return true unless a transport/wire format cannot understand compression 354 */ 355 public boolean doesSupportMessageCompression(){ 356 return getWireFormat() != null && getWireFormat().doesSupportMessageCompression(); 357 } 358 359 // Implementation methods 360 //------------------------------------------------------------------------- 361 /** 362 * consume a packet from the channel 363 * 364 * @param packet 365 * @throws UnsupportedWireFormatException 366 */ 367 protected void doConsumePacket(Packet packet) { 368 doConsumePacket(packet, packetListener); 369 } 370 371 protected void doConsumePacket(Packet packet, PacketListener listener) { 372 if (!doHandleReceipt(packet) && !doHandleWireFormat(packet)) { 373 if (listener != null) { 374 listener.consume(packet); 375 } 376 else { 377 log.warn("No packet listener set to receive packets"); 378 } 379 } 380 } 381 382 protected boolean doHandleReceipt(Packet packet) { 383 boolean result = false; 384 if (packet != null) { 385 if (packet.isReceipt()) { 386 lastReceiptTimstamp = System.currentTimeMillis(); 387 result = true; 388 Receipt receipt = (Receipt) packet; 389 ReceiptHolder rh = (ReceiptHolder) requestMap.remove(new Short(receipt.getCorrelationId())); 390 if (rh != null) { 391 rh.setReceipt(receipt); 392 } 393 else { 394 log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId()); 395 } 396 } 397 } 398 return result; 399 } 400 401 protected boolean doHandleWireFormat(Packet packet) { 402 boolean handled = false; 403 if (packet.getPacketType() == Packet.WIRE_FORMAT_INFO) { 404 handled = true; 405 WireFormatInfo info = (WireFormatInfo) packet; 406 if (!canProcessWireFormatVersion(info.getVersion())) { 407 setPendingStop(true); 408 String errorStr = "Cannot process wire format of version: " + info.getVersion(); 409 TransportStatusEvent event = new TransportStatusEvent(); 410 event.setChannelStatus(TransportStatusEvent.FAILED); 411 fireStatusEvent(event); 412 onAsyncException(new UnsupportedWireFormatException(errorStr)); 413 stop(); 414 } 415 else { 416 if (log.isDebugEnabled()) { 417 log.debug(this + " using wire format version: " + info.getVersion()); 418 } 419 } 420 } 421 return handled; 422 } 423 424 /** 425 * send a Packet to the raw underlying transport This method is here to allow specific implementations to override 426 * this method 427 * 428 * @param packet 429 * @return a response or null 430 * @throws JMSException 431 */ 432 protected Packet doAsyncSend(Packet packet) throws JMSException { 433 asyncSend(packet); 434 return null; 435 } 436 437 /** 438 * Handles an exception thrown while performing async dispatch of messages 439 * 440 * @param e 441 */ 442 protected void onAsyncException(JMSException e) { 443 if (exceptionListener != null) { 444 transportConnected = false; 445 exceptionListener.onException(e); 446 } 447 else { 448 log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e); 449 } 450 } 451 452 /** 453 * Fire status event to any status event listeners 454 * 455 * @param remoteURI 456 * @param status 457 */ 458 protected void fireStatusEvent(URI remoteURI, int status) { 459 TransportStatusEvent event = new TransportStatusEvent(); 460 event.setChannelStatus(status); 461 event.setRemoteURI(remoteURI); 462 fireStatusEvent(event); 463 } 464 465 /** 466 * Fire status event to any status event listeners 467 * 468 * @param event 469 */ 470 protected void fireStatusEvent(TransportStatusEvent event) { 471 if (event != null) { 472 for (Iterator i = listeners.iterator();i.hasNext();) { 473 TransportStatusEventListener l = (TransportStatusEventListener) i.next(); 474 l.statusChanged(event); 475 } 476 } 477 } 478 479 /** 480 * A helper method to stop the execution of an executor 481 * 482 * @param executor the executor or null if one is not created yet 483 * @throws InterruptedException 484 * @throws JMSException 485 */ 486 protected void stopExecutor(Executor executor) throws InterruptedException, JMSException { 487 ExecutorHelper.stopExecutor(executor); 488 } 489 /** 490 * @return Returns the cachingEnabled. 491 */ 492 public boolean isCachingEnabled() { 493 return cachingEnabled; 494 } 495 /** 496 * @param cachingEnabled The cachingEnabled to set. 497 */ 498 public void setCachingEnabled(boolean cachingEnabled) { 499 this.cachingEnabled = cachingEnabled; 500 } 501 502 /** 503 * Inform Transport to send messages as quickly 504 * as possible - for Tcp - this means disabling Nagles, 505 * which on OSX may provide better performance for sync 506 * sends 507 * @return Returns the noDelay. 508 */ 509 public boolean isNoDelay() { 510 return noDelay; 511 } 512 /** 513 * @param noDelay The noDelay to set. 514 */ 515 public void setNoDelay(boolean noDelay) { 516 this.noDelay = noDelay; 517 } 518 }