001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.io.OutputStream; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.util.HashMap; 025 import java.util.Iterator; 026 import java.util.Map; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.LinkedBlockingQueue; 031 import java.util.concurrent.ThreadFactory; 032 import java.util.concurrent.ThreadPoolExecutor; 033 import java.util.concurrent.TimeUnit; 034 import java.util.concurrent.atomic.AtomicBoolean; 035 import java.util.concurrent.atomic.AtomicInteger; 036 037 import javax.jms.Connection; 038 import javax.jms.ConnectionConsumer; 039 import javax.jms.ConnectionMetaData; 040 import javax.jms.DeliveryMode; 041 import javax.jms.Destination; 042 import javax.jms.ExceptionListener; 043 import javax.jms.IllegalStateException; 044 import javax.jms.JMSException; 045 import javax.jms.Queue; 046 import javax.jms.QueueConnection; 047 import javax.jms.QueueSession; 048 import javax.jms.ServerSessionPool; 049 import javax.jms.Session; 050 import javax.jms.Topic; 051 import javax.jms.TopicConnection; 052 import javax.jms.TopicSession; 053 import javax.jms.XAConnection; 054 import javax.jms.InvalidDestinationException; 055 056 import org.apache.activemq.blob.BlobTransferPolicy; 057 import org.apache.activemq.command.ActiveMQDestination; 058 import org.apache.activemq.command.ActiveMQMessage; 059 import org.apache.activemq.command.ActiveMQTempDestination; 060 import org.apache.activemq.command.ActiveMQTempQueue; 061 import org.apache.activemq.command.ActiveMQTempTopic; 062 import org.apache.activemq.command.BrokerInfo; 063 import org.apache.activemq.command.Command; 064 import org.apache.activemq.command.CommandTypes; 065 import org.apache.activemq.command.ConnectionControl; 066 import org.apache.activemq.command.ConnectionError; 067 import org.apache.activemq.command.ConnectionId; 068 import org.apache.activemq.command.ConnectionInfo; 069 import org.apache.activemq.command.ConsumerControl; 070 import org.apache.activemq.command.ConsumerId; 071 import org.apache.activemq.command.ConsumerInfo; 072 import org.apache.activemq.command.ControlCommand; 073 import org.apache.activemq.command.DestinationInfo; 074 import org.apache.activemq.command.ExceptionResponse; 075 import org.apache.activemq.command.Message; 076 import org.apache.activemq.command.MessageDispatch; 077 import org.apache.activemq.command.MessageId; 078 import org.apache.activemq.command.ProducerAck; 079 import org.apache.activemq.command.ProducerId; 080 import org.apache.activemq.command.RemoveInfo; 081 import org.apache.activemq.command.RemoveSubscriptionInfo; 082 import org.apache.activemq.command.Response; 083 import org.apache.activemq.command.SessionId; 084 import org.apache.activemq.command.ShutdownInfo; 085 import org.apache.activemq.command.WireFormatInfo; 086 import org.apache.activemq.management.JMSConnectionStatsImpl; 087 import org.apache.activemq.management.JMSStatsImpl; 088 import org.apache.activemq.management.StatsCapable; 089 import org.apache.activemq.management.StatsImpl; 090 import org.apache.activemq.state.CommandVisitorAdapter; 091 import org.apache.activemq.thread.TaskRunnerFactory; 092 import org.apache.activemq.transport.Transport; 093 import org.apache.activemq.transport.TransportListener; 094 import org.apache.activemq.util.IdGenerator; 095 import org.apache.activemq.util.IntrospectionSupport; 096 import org.apache.activemq.util.JMSExceptionSupport; 097 import org.apache.activemq.util.LongSequenceGenerator; 098 import org.apache.activemq.util.ServiceSupport; 099 import org.apache.activemq.advisory.DestinationSource; 100 import org.apache.commons.logging.Log; 101 import org.apache.commons.logging.LogFactory; 102 103 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { 104 105 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 106 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 107 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 108 109 private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class); 110 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 111 112 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 113 114 protected boolean dispatchAsync=true; 115 protected boolean alwaysSessionAsync = true; 116 117 private TaskRunnerFactory sessionTaskRunner; 118 private final ThreadPoolExecutor asyncConnectionThread; 119 120 // Connection state variables 121 private final ConnectionInfo info; 122 private ExceptionListener exceptionListener; 123 private ClientInternalExceptionListener clientInternalExceptionListener; 124 private boolean clientIDSet; 125 private boolean isConnectionInfoSentToBroker; 126 private boolean userSpecifiedClientID; 127 128 // Configuration options variables 129 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 130 private BlobTransferPolicy blobTransferPolicy; 131 private RedeliveryPolicy redeliveryPolicy; 132 private MessageTransformer transformer; 133 134 private boolean disableTimeStampsByDefault; 135 private boolean optimizedMessageDispatch = true; 136 private boolean copyMessageOnSend = true; 137 private boolean useCompression; 138 private boolean objectMessageSerializationDefered; 139 private boolean useAsyncSend; 140 private boolean optimizeAcknowledge; 141 private boolean nestedMapAndListEnabled = true; 142 private boolean useRetroactiveConsumer; 143 private boolean exclusiveConsumer; 144 private boolean alwaysSyncSend; 145 private int closeTimeout = 15000; 146 private boolean watchTopicAdvisories = true; 147 private long warnAboutUnstartedConnectionTimeout = 500L; 148 private int sendTimeout =0; 149 private boolean sendAcksAsync=true; 150 151 private final Transport transport; 152 private final IdGenerator clientIdGenerator; 153 private final JMSStatsImpl factoryStats; 154 private final JMSConnectionStatsImpl stats; 155 156 private final AtomicBoolean started = new AtomicBoolean(false); 157 private final AtomicBoolean closing = new AtomicBoolean(false); 158 private final AtomicBoolean closed = new AtomicBoolean(false); 159 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 160 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 161 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 162 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); 163 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); 164 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 165 166 // Maps ConsumerIds to ActiveMQConsumer objects 167 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 168 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 169 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 170 private final SessionId connectionSessionId; 171 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 172 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 173 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 174 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 175 176 private AdvisoryConsumer advisoryConsumer; 177 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 178 private BrokerInfo brokerInfo; 179 private IOException firstFailureError; 180 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 181 182 // Assume that protocol is the latest. Change to the actual protocol 183 // version when a WireFormatInfo is received. 184 private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 185 private long timeCreated; 186 private ConnectionAudit connectionAudit = new ConnectionAudit(); 187 private DestinationSource destinationSource; 188 private final Object ensureConnectionInfoSentMutex = new Object(); 189 private boolean useDedicatedTaskRunner; 190 protected CountDownLatch transportInterruptionProcessingComplete; 191 private long consumerFailoverRedeliveryWaitPeriod; 192 193 /** 194 * Construct an <code>ActiveMQConnection</code> 195 * 196 * @param transport 197 * @param factoryStats 198 * @throws Exception 199 */ 200 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { 201 202 this.transport = transport; 203 this.clientIdGenerator = clientIdGenerator; 204 this.factoryStats = factoryStats; 205 206 // Configure a single threaded executor who's core thread can timeout if 207 // idle 208 asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 209 public Thread newThread(Runnable r) { 210 Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport); 211 thread.setDaemon(true); 212 return thread; 213 } 214 }); 215 // asyncConnectionThread.allowCoreThreadTimeOut(true); 216 217 this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId())); 218 this.info.setManageable(true); 219 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 220 221 this.transport.setTransportListener(this); 222 223 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 224 this.factoryStats.addConnection(this); 225 this.timeCreated = System.currentTimeMillis(); 226 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 227 } 228 229 protected void setUserName(String userName) { 230 this.info.setUserName(userName); 231 } 232 233 protected void setPassword(String password) { 234 this.info.setPassword(password); 235 } 236 237 /** 238 * A static helper method to create a new connection 239 * 240 * @return an ActiveMQConnection 241 * @throws JMSException 242 */ 243 public static ActiveMQConnection makeConnection() throws JMSException { 244 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 245 return (ActiveMQConnection)factory.createConnection(); 246 } 247 248 /** 249 * A static helper method to create a new connection 250 * 251 * @param uri 252 * @return and ActiveMQConnection 253 * @throws JMSException 254 */ 255 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 256 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 257 return (ActiveMQConnection)factory.createConnection(); 258 } 259 260 /** 261 * A static helper method to create a new connection 262 * 263 * @param user 264 * @param password 265 * @param uri 266 * @return an ActiveMQConnection 267 * @throws JMSException 268 */ 269 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 270 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 271 return (ActiveMQConnection)factory.createConnection(); 272 } 273 274 /** 275 * @return a number unique for this connection 276 */ 277 public JMSConnectionStatsImpl getConnectionStats() { 278 return stats; 279 } 280 281 /** 282 * Creates a <CODE>Session</CODE> object. 283 * 284 * @param transacted indicates whether the session is transacted 285 * @param acknowledgeMode indicates whether the consumer or the client will 286 * acknowledge any messages it receives; ignored if the 287 * session is transacted. Legal values are 288 * <code>Session.AUTO_ACKNOWLEDGE</code>, 289 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 290 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 291 * @return a newly created session 292 * @throws JMSException if the <CODE>Connection</CODE> object fails to 293 * create a session due to some internal error or lack of 294 * support for the specific transaction and acknowledgement 295 * mode. 296 * @see Session#AUTO_ACKNOWLEDGE 297 * @see Session#CLIENT_ACKNOWLEDGE 298 * @see Session#DUPS_OK_ACKNOWLEDGE 299 * @since 1.1 300 */ 301 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 302 checkClosedOrFailed(); 303 ensureConnectionInfoSent(); 304 if(!transacted) { 305 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 306 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 307 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 308 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 309 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 310 } 311 } 312 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 313 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 314 } 315 316 /** 317 * @return sessionId 318 */ 319 protected SessionId getNextSessionId() { 320 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 321 } 322 323 /** 324 * Gets the client identifier for this connection. 325 * <P> 326 * This value is specific to the JMS provider. It is either preconfigured by 327 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 328 * dynamically by the application by calling the <code>setClientID</code> 329 * method. 330 * 331 * @return the unique client identifier 332 * @throws JMSException if the JMS provider fails to return the client ID 333 * for this connection due to some internal error. 334 */ 335 public String getClientID() throws JMSException { 336 checkClosedOrFailed(); 337 return this.info.getClientId(); 338 } 339 340 /** 341 * Sets the client identifier for this connection. 342 * <P> 343 * The preferred way to assign a JMS client's client identifier is for it to 344 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 345 * object and transparently assigned to the <CODE>Connection</CODE> object 346 * it creates. 347 * <P> 348 * Alternatively, a client can set a connection's client identifier using a 349 * provider-specific value. The facility to set a connection's client 350 * identifier explicitly is not a mechanism for overriding the identifier 351 * that has been administratively configured. It is provided for the case 352 * where no administratively specified identifier exists. If one does exist, 353 * an attempt to change it by setting it must throw an 354 * <CODE>IllegalStateException</CODE>. If a client sets the client 355 * identifier explicitly, it must do so immediately after it creates the 356 * connection and before any other action on the connection is taken. After 357 * this point, setting the client identifier is a programming error that 358 * should throw an <CODE>IllegalStateException</CODE>. 359 * <P> 360 * The purpose of the client identifier is to associate a connection and its 361 * objects with a state maintained on behalf of the client by a provider. 362 * The only such state identified by the JMS API is that required to support 363 * durable subscriptions. 364 * <P> 365 * If another connection with the same <code>clientID</code> is already 366 * running when this method is called, the JMS provider should detect the 367 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 368 * 369 * @param newClientID the unique client identifier 370 * @throws JMSException if the JMS provider fails to set the client ID for 371 * this connection due to some internal error. 372 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 373 * invalid or duplicate client ID. 374 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 375 * a connection's client ID at the wrong time or when it has 376 * been administratively configured. 377 */ 378 public void setClientID(String newClientID) throws JMSException { 379 checkClosedOrFailed(); 380 381 if (this.clientIDSet) { 382 throw new IllegalStateException("The clientID has already been set"); 383 } 384 385 if (this.isConnectionInfoSentToBroker) { 386 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 387 } 388 389 this.info.setClientId(newClientID); 390 this.userSpecifiedClientID = true; 391 ensureConnectionInfoSent(); 392 } 393 394 /** 395 * Sets the default client id that the connection will use if explicitly not 396 * set with the setClientId() call. 397 */ 398 public void setDefaultClientID(String clientID) throws JMSException { 399 this.info.setClientId(clientID); 400 this.userSpecifiedClientID = true; 401 } 402 403 /** 404 * Gets the metadata for this connection. 405 * 406 * @return the connection metadata 407 * @throws JMSException if the JMS provider fails to get the connection 408 * metadata for this connection. 409 * @see javax.jms.ConnectionMetaData 410 */ 411 public ConnectionMetaData getMetaData() throws JMSException { 412 checkClosedOrFailed(); 413 return ActiveMQConnectionMetaData.INSTANCE; 414 } 415 416 /** 417 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 418 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 419 * associated with it. 420 * 421 * @return the <CODE>ExceptionListener</CODE> for this connection, or 422 * null, if no <CODE>ExceptionListener</CODE> is associated with 423 * this connection. 424 * @throws JMSException if the JMS provider fails to get the 425 * <CODE>ExceptionListener</CODE> for this connection. 426 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 427 */ 428 public ExceptionListener getExceptionListener() throws JMSException { 429 checkClosedOrFailed(); 430 return this.exceptionListener; 431 } 432 433 /** 434 * Sets an exception listener for this connection. 435 * <P> 436 * If a JMS provider detects a serious problem with a connection, it informs 437 * the connection's <CODE> ExceptionListener</CODE>, if one has been 438 * registered. It does this by calling the listener's <CODE>onException 439 * </CODE> 440 * method, passing it a <CODE>JMSException</CODE> object describing the 441 * problem. 442 * <P> 443 * An exception listener allows a client to be notified of a problem 444 * asynchronously. Some connections only consume messages, so they would 445 * have no other way to learn their connection has failed. 446 * <P> 447 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 448 * <P> 449 * A JMS provider should attempt to resolve connection problems itself 450 * before it notifies the client of them. 451 * 452 * @param listener the exception listener 453 * @throws JMSException if the JMS provider fails to set the exception 454 * listener for this connection. 455 */ 456 public void setExceptionListener(ExceptionListener listener) throws JMSException { 457 checkClosedOrFailed(); 458 this.exceptionListener = listener; 459 } 460 461 /** 462 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 463 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 464 * associated with it. 465 * 466 * @return the listener or <code>null</code> if no listener is registered with the connection. 467 */ 468 public ClientInternalExceptionListener getClientInternalExceptionListener() 469 { 470 return clientInternalExceptionListener; 471 } 472 473 /** 474 * Sets a client internal exception listener for this connection. 475 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 476 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 477 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 478 * describing the problem. 479 * 480 * @param listener the exception listener 481 */ 482 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) 483 { 484 this.clientInternalExceptionListener = listener; 485 } 486 487 /** 488 * Starts (or restarts) a connection's delivery of incoming messages. A call 489 * to <CODE>start</CODE> on a connection that has already been started is 490 * ignored. 491 * 492 * @throws JMSException if the JMS provider fails to start message delivery 493 * due to some internal error. 494 * @see javax.jms.Connection#stop() 495 */ 496 public void start() throws JMSException { 497 checkClosedOrFailed(); 498 ensureConnectionInfoSent(); 499 if (started.compareAndSet(false, true)) { 500 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 501 ActiveMQSession session = i.next(); 502 session.start(); 503 } 504 } 505 } 506 507 /** 508 * Temporarily stops a connection's delivery of incoming messages. Delivery 509 * can be restarted using the connection's <CODE>start</CODE> method. When 510 * the connection is stopped, delivery to all the connection's message 511 * consumers is inhibited: synchronous receives block, and messages are not 512 * delivered to message listeners. 513 * <P> 514 * This call blocks until receives and/or message listeners in progress have 515 * completed. 516 * <P> 517 * Stopping a connection has no effect on its ability to send messages. A 518 * call to <CODE>stop</CODE> on a connection that has already been stopped 519 * is ignored. 520 * <P> 521 * A call to <CODE>stop</CODE> must not return until delivery of messages 522 * has paused. This means that a client can rely on the fact that none of 523 * its message listeners will be called and that all threads of control 524 * waiting for <CODE>receive</CODE> calls to return will not return with a 525 * message until the connection is restarted. The receive timers for a 526 * stopped connection continue to advance, so receives may time out while 527 * the connection is stopped. 528 * <P> 529 * If message listeners are running when <CODE>stop</CODE> is invoked, the 530 * <CODE>stop</CODE> call must wait until all of them have returned before 531 * it may return. While these message listeners are completing, they must 532 * have the full services of the connection available to them. 533 * 534 * @throws JMSException if the JMS provider fails to stop message delivery 535 * due to some internal error. 536 * @see javax.jms.Connection#start() 537 */ 538 public void stop() throws JMSException { 539 checkClosedOrFailed(); 540 if (started.compareAndSet(true, false)) { 541 synchronized(sessions) { 542 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 543 ActiveMQSession s = i.next(); 544 s.stop(); 545 } 546 } 547 } 548 } 549 550 /** 551 * Closes the connection. 552 * <P> 553 * Since a provider typically allocates significant resources outside the 554 * JVM on behalf of a connection, clients should close these resources when 555 * they are not needed. Relying on garbage collection to eventually reclaim 556 * these resources may not be timely enough. 557 * <P> 558 * There is no need to close the sessions, producers, and consumers of a 559 * closed connection. 560 * <P> 561 * Closing a connection causes all temporary destinations to be deleted. 562 * <P> 563 * When this method is invoked, it should not return until message 564 * processing has been shut down in an orderly fashion. This means that all 565 * message listeners that may have been running have returned, and that all 566 * pending receives have returned. A close terminates all pending message 567 * receives on the connection's sessions' consumers. The receives may return 568 * with a message or with null, depending on whether there was a message 569 * available at the time of the close. If one or more of the connection's 570 * sessions' message listeners is processing a message at the time when 571 * connection <CODE>close</CODE> is invoked, all the facilities of the 572 * connection and its sessions must remain available to those listeners 573 * until they return control to the JMS provider. 574 * <P> 575 * Closing a connection causes any of its sessions' transactions in progress 576 * to be rolled back. In the case where a session's work is coordinated by 577 * an external transaction manager, a session's <CODE>commit</CODE> and 578 * <CODE> rollback</CODE> methods are not used and the result of a closed 579 * session's work is determined later by the transaction manager. Closing a 580 * connection does NOT force an acknowledgment of client-acknowledged 581 * sessions. 582 * <P> 583 * Invoking the <CODE>acknowledge</CODE> method of a received message from 584 * a closed connection's session must throw an 585 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 586 * NOT throw an exception. 587 * 588 * @throws JMSException if the JMS provider fails to close the connection 589 * due to some internal error. For example, a failure to 590 * release resources or to close a socket connection can 591 * cause this exception to be thrown. 592 */ 593 public void close() throws JMSException { 594 try { 595 // If we were running, lets stop first. 596 if (!closed.get() && !transportFailed.get()) { 597 stop(); 598 } 599 600 synchronized (this) { 601 if (!closed.get()) { 602 closing.set(true); 603 604 if (destinationSource != null) { 605 destinationSource.stop(); 606 destinationSource = null; 607 } 608 if (advisoryConsumer != null) { 609 advisoryConsumer.dispose(); 610 advisoryConsumer = null; 611 } 612 613 long lastDeliveredSequenceId = 0; 614 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 615 ActiveMQSession s = i.next(); 616 s.dispose(); 617 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 618 } 619 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 620 ActiveMQConnectionConsumer c = i.next(); 621 c.dispose(); 622 } 623 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 624 ActiveMQInputStream c = i.next(); 625 c.dispose(); 626 } 627 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 628 ActiveMQOutputStream c = i.next(); 629 c.dispose(); 630 } 631 632 if (isConnectionInfoSentToBroker) { 633 // If we announced ourselfs to the broker.. Try to let 634 // the broker 635 // know that the connection is being shutdown. 636 RemoveInfo removeCommand = info.createRemoveCommand(); 637 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 638 doSyncSendPacket(info.createRemoveCommand(), closeTimeout); 639 doAsyncSendPacket(new ShutdownInfo()); 640 } 641 642 ServiceSupport.dispose(this.transport); 643 644 started.set(false); 645 646 // TODO if we move the TaskRunnerFactory to the connection 647 // factory 648 // then we may need to call 649 // factory.onConnectionClose(this); 650 if (sessionTaskRunner != null) { 651 sessionTaskRunner.shutdown(); 652 } 653 closed.set(true); 654 closing.set(false); 655 } 656 } 657 } finally { 658 try { 659 if (asyncConnectionThread != null){ 660 asyncConnectionThread.shutdown(); 661 } 662 }catch(Throwable e) { 663 LOG.error("Error shutting down thread pool " + e,e); 664 } 665 factoryStats.removeConnection(this); 666 } 667 } 668 669 /** 670 * Tells the broker to terminate its VM. This can be used to cleanly 671 * terminate a broker running in a standalone java process. Server must have 672 * property enable.vm.shutdown=true defined to allow this to work. 673 */ 674 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 675 // implemented. 676 /* 677 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 678 * command = new BrokerAdminCommand(); 679 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 680 * asyncSendPacket(command); } 681 */ 682 683 /** 684 * Create a durable connection consumer for this connection (optional 685 * operation). This is an expert facility not used by regular JMS clients. 686 * 687 * @param topic topic to access 688 * @param subscriptionName durable subscription name 689 * @param messageSelector only messages with properties matching the message 690 * selector expression are delivered. A value of null or an 691 * empty string indicates that there is no message selector 692 * for the message consumer. 693 * @param sessionPool the server session pool to associate with this durable 694 * connection consumer 695 * @param maxMessages the maximum number of messages that can be assigned to 696 * a server session at one time 697 * @return the durable connection consumer 698 * @throws JMSException if the <CODE>Connection</CODE> object fails to 699 * create a connection consumer due to some internal error 700 * or invalid arguments for <CODE>sessionPool</CODE> and 701 * <CODE>messageSelector</CODE>. 702 * @throws javax.jms.InvalidDestinationException if an invalid destination 703 * is specified. 704 * @throws javax.jms.InvalidSelectorException if the message selector is 705 * invalid. 706 * @see javax.jms.ConnectionConsumer 707 * @since 1.1 708 */ 709 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 710 throws JMSException { 711 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 712 } 713 714 /** 715 * Create a durable connection consumer for this connection (optional 716 * operation). This is an expert facility not used by regular JMS clients. 717 * 718 * @param topic topic to access 719 * @param subscriptionName durable subscription name 720 * @param messageSelector only messages with properties matching the message 721 * selector expression are delivered. A value of null or an 722 * empty string indicates that there is no message selector 723 * for the message consumer. 724 * @param sessionPool the server session pool to associate with this durable 725 * connection consumer 726 * @param maxMessages the maximum number of messages that can be assigned to 727 * a server session at one time 728 * @param noLocal set true if you want to filter out messages published 729 * locally 730 * @return the durable connection consumer 731 * @throws JMSException if the <CODE>Connection</CODE> object fails to 732 * create a connection consumer due to some internal error 733 * or invalid arguments for <CODE>sessionPool</CODE> and 734 * <CODE>messageSelector</CODE>. 735 * @throws javax.jms.InvalidDestinationException if an invalid destination 736 * is specified. 737 * @throws javax.jms.InvalidSelectorException if the message selector is 738 * invalid. 739 * @see javax.jms.ConnectionConsumer 740 * @since 1.1 741 */ 742 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 743 boolean noLocal) throws JMSException { 744 checkClosedOrFailed(); 745 ensureConnectionInfoSent(); 746 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 747 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 748 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 749 info.setSubscriptionName(subscriptionName); 750 info.setSelector(messageSelector); 751 info.setPrefetchSize(maxMessages); 752 info.setDispatchAsync(isDispatchAsync()); 753 754 // Allows the options on the destination to configure the consumerInfo 755 if (info.getDestination().getOptions() != null) { 756 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 757 IntrospectionSupport.setProperties(this.info, options, "consumer."); 758 } 759 760 return new ActiveMQConnectionConsumer(this, sessionPool, info); 761 } 762 763 // Properties 764 // ------------------------------------------------------------------------- 765 766 /** 767 * Returns true if this connection has been started 768 * 769 * @return true if this Connection is started 770 */ 771 public boolean isStarted() { 772 return started.get(); 773 } 774 775 /** 776 * Returns true if the connection is closed 777 */ 778 public boolean isClosed() { 779 return closed.get(); 780 } 781 782 /** 783 * Returns true if the connection is in the process of being closed 784 */ 785 public boolean isClosing() { 786 return closing.get(); 787 } 788 789 /** 790 * Returns true if the underlying transport has failed 791 */ 792 public boolean isTransportFailed() { 793 return transportFailed.get(); 794 } 795 796 /** 797 * @return Returns the prefetchPolicy. 798 */ 799 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 800 return prefetchPolicy; 801 } 802 803 /** 804 * Sets the <a 805 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 806 * policy</a> for consumers created by this connection. 807 */ 808 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 809 this.prefetchPolicy = prefetchPolicy; 810 } 811 812 /** 813 */ 814 public Transport getTransportChannel() { 815 return transport; 816 } 817 818 /** 819 * @return Returns the clientID of the connection, forcing one to be 820 * generated if one has not yet been configured. 821 */ 822 public String getInitializedClientID() throws JMSException { 823 ensureConnectionInfoSent(); 824 return info.getClientId(); 825 } 826 827 /** 828 * @return Returns the timeStampsDisableByDefault. 829 */ 830 public boolean isDisableTimeStampsByDefault() { 831 return disableTimeStampsByDefault; 832 } 833 834 /** 835 * Sets whether or not timestamps on messages should be disabled or not. If 836 * you disable them it adds a small performance boost. 837 */ 838 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 839 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 840 } 841 842 /** 843 * @return Returns the dispatchOptimizedMessage. 844 */ 845 public boolean isOptimizedMessageDispatch() { 846 return optimizedMessageDispatch; 847 } 848 849 /** 850 * If this flag is set then an larger prefetch limit is used - only 851 * applicable for durable topic subscribers. 852 */ 853 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 854 this.optimizedMessageDispatch = dispatchOptimizedMessage; 855 } 856 857 /** 858 * @return Returns the closeTimeout. 859 */ 860 public int getCloseTimeout() { 861 return closeTimeout; 862 } 863 864 /** 865 * Sets the timeout before a close is considered complete. Normally a 866 * close() on a connection waits for confirmation from the broker; this 867 * allows that operation to timeout to save the client hanging if there is 868 * no broker 869 */ 870 public void setCloseTimeout(int closeTimeout) { 871 this.closeTimeout = closeTimeout; 872 } 873 874 /** 875 * @return ConnectionInfo 876 */ 877 public ConnectionInfo getConnectionInfo() { 878 return this.info; 879 } 880 881 public boolean isUseRetroactiveConsumer() { 882 return useRetroactiveConsumer; 883 } 884 885 /** 886 * Sets whether or not retroactive consumers are enabled. Retroactive 887 * consumers allow non-durable topic subscribers to receive old messages 888 * that were published before the non-durable subscriber started. 889 */ 890 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 891 this.useRetroactiveConsumer = useRetroactiveConsumer; 892 } 893 894 public boolean isNestedMapAndListEnabled() { 895 return nestedMapAndListEnabled; 896 } 897 898 /** 899 * Enables/disables whether or not Message properties and MapMessage entries 900 * support <a 901 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 902 * Structures</a> of Map and List objects 903 */ 904 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 905 this.nestedMapAndListEnabled = structuredMapsEnabled; 906 } 907 908 public boolean isExclusiveConsumer() { 909 return exclusiveConsumer; 910 } 911 912 /** 913 * Enables or disables whether or not queue consumers should be exclusive or 914 * not for example to preserve ordering when not using <a 915 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 916 * 917 * @param exclusiveConsumer 918 */ 919 public void setExclusiveConsumer(boolean exclusiveConsumer) { 920 this.exclusiveConsumer = exclusiveConsumer; 921 } 922 923 /** 924 * Adds a transport listener so that a client can be notified of events in 925 * the underlying transport 926 */ 927 public void addTransportListener(TransportListener transportListener) { 928 transportListeners.add(transportListener); 929 } 930 931 public void removeTransportListener(TransportListener transportListener) { 932 transportListeners.remove(transportListener); 933 } 934 935 public boolean isUseDedicatedTaskRunner() { 936 return useDedicatedTaskRunner; 937 } 938 939 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 940 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 941 } 942 943 public TaskRunnerFactory getSessionTaskRunner() { 944 synchronized (this) { 945 if (sessionTaskRunner == null) { 946 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); 947 } 948 } 949 return sessionTaskRunner; 950 } 951 952 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 953 this.sessionTaskRunner = sessionTaskRunner; 954 } 955 956 public MessageTransformer getTransformer() { 957 return transformer; 958 } 959 960 /** 961 * Sets the transformer used to transform messages before they are sent on 962 * to the JMS bus or when they are received from the bus but before they are 963 * delivered to the JMS client 964 */ 965 public void setTransformer(MessageTransformer transformer) { 966 this.transformer = transformer; 967 } 968 969 /** 970 * @return the statsEnabled 971 */ 972 public boolean isStatsEnabled() { 973 return this.stats.isEnabled(); 974 } 975 976 /** 977 * @param statsEnabled the statsEnabled to set 978 */ 979 public void setStatsEnabled(boolean statsEnabled) { 980 this.stats.setEnabled(statsEnabled); 981 } 982 983 /** 984 * Returns the {@link DestinationSource} object which can be used to listen to destinations 985 * being created or destroyed or to enquire about the current destinations available on the broker 986 * 987 * @return a lazily created destination source 988 * @throws JMSException 989 */ 990 public DestinationSource getDestinationSource() throws JMSException { 991 if (destinationSource == null) { 992 destinationSource = new DestinationSource(this); 993 destinationSource.start(); 994 } 995 return destinationSource; 996 } 997 998 // Implementation methods 999 // ------------------------------------------------------------------------- 1000 1001 /** 1002 * Used internally for adding Sessions to the Connection 1003 * 1004 * @param session 1005 * @throws JMSException 1006 * @throws JMSException 1007 */ 1008 protected void addSession(ActiveMQSession session) throws JMSException { 1009 this.sessions.add(session); 1010 if (sessions.size() > 1 || session.isTransacted()) { 1011 optimizedMessageDispatch = false; 1012 } 1013 } 1014 1015 /** 1016 * Used interanlly for removing Sessions from a Connection 1017 * 1018 * @param session 1019 */ 1020 protected void removeSession(ActiveMQSession session) { 1021 this.sessions.remove(session); 1022 this.removeDispatcher(session); 1023 } 1024 1025 /** 1026 * Add a ConnectionConsumer 1027 * 1028 * @param connectionConsumer 1029 * @throws JMSException 1030 */ 1031 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1032 this.connectionConsumers.add(connectionConsumer); 1033 } 1034 1035 /** 1036 * Remove a ConnectionConsumer 1037 * 1038 * @param connectionConsumer 1039 */ 1040 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1041 this.connectionConsumers.remove(connectionConsumer); 1042 this.removeDispatcher(connectionConsumer); 1043 } 1044 1045 /** 1046 * Creates a <CODE>TopicSession</CODE> object. 1047 * 1048 * @param transacted indicates whether the session is transacted 1049 * @param acknowledgeMode indicates whether the consumer or the client will 1050 * acknowledge any messages it receives; ignored if the 1051 * session is transacted. Legal values are 1052 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1053 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1054 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1055 * @return a newly created topic session 1056 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1057 * to create a session due to some internal error or lack of 1058 * support for the specific transaction and acknowledgement 1059 * mode. 1060 * @see Session#AUTO_ACKNOWLEDGE 1061 * @see Session#CLIENT_ACKNOWLEDGE 1062 * @see Session#DUPS_OK_ACKNOWLEDGE 1063 */ 1064 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1065 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1066 } 1067 1068 /** 1069 * Creates a connection consumer for this connection (optional operation). 1070 * This is an expert facility not used by regular JMS clients. 1071 * 1072 * @param topic the topic to access 1073 * @param messageSelector only messages with properties matching the message 1074 * selector expression are delivered. A value of null or an 1075 * empty string indicates that there is no message selector 1076 * for the message consumer. 1077 * @param sessionPool the server session pool to associate with this 1078 * connection consumer 1079 * @param maxMessages the maximum number of messages that can be assigned to 1080 * a server session at one time 1081 * @return the connection consumer 1082 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1083 * to create a connection consumer due to some internal 1084 * error or invalid arguments for <CODE>sessionPool</CODE> 1085 * and <CODE>messageSelector</CODE>. 1086 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1087 * specified. 1088 * @throws javax.jms.InvalidSelectorException if the message selector is 1089 * invalid. 1090 * @see javax.jms.ConnectionConsumer 1091 */ 1092 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1093 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1094 } 1095 1096 /** 1097 * Creates a connection consumer for this connection (optional operation). 1098 * This is an expert facility not used by regular JMS clients. 1099 * 1100 * @param queue the queue to access 1101 * @param messageSelector only messages with properties matching the message 1102 * selector expression are delivered. A value of null or an 1103 * empty string indicates that there is no message selector 1104 * for the message consumer. 1105 * @param sessionPool the server session pool to associate with this 1106 * connection consumer 1107 * @param maxMessages the maximum number of messages that can be assigned to 1108 * a server session at one time 1109 * @return the connection consumer 1110 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1111 * to create a connection consumer due to some internal 1112 * error or invalid arguments for <CODE>sessionPool</CODE> 1113 * and <CODE>messageSelector</CODE>. 1114 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1115 * specified. 1116 * @throws javax.jms.InvalidSelectorException if the message selector is 1117 * invalid. 1118 * @see javax.jms.ConnectionConsumer 1119 */ 1120 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1121 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1122 } 1123 1124 /** 1125 * Creates a connection consumer for this connection (optional operation). 1126 * This is an expert facility not used by regular JMS clients. 1127 * 1128 * @param destination the destination to access 1129 * @param messageSelector only messages with properties matching the message 1130 * selector expression are delivered. A value of null or an 1131 * empty string indicates that there is no message selector 1132 * for the message consumer. 1133 * @param sessionPool the server session pool to associate with this 1134 * connection consumer 1135 * @param maxMessages the maximum number of messages that can be assigned to 1136 * a server session at one time 1137 * @return the connection consumer 1138 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1139 * create a connection consumer due to some internal error 1140 * or invalid arguments for <CODE>sessionPool</CODE> and 1141 * <CODE>messageSelector</CODE>. 1142 * @throws javax.jms.InvalidDestinationException if an invalid destination 1143 * is specified. 1144 * @throws javax.jms.InvalidSelectorException if the message selector is 1145 * invalid. 1146 * @see javax.jms.ConnectionConsumer 1147 * @since 1.1 1148 */ 1149 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1150 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1151 } 1152 1153 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1154 throws JMSException { 1155 1156 checkClosedOrFailed(); 1157 ensureConnectionInfoSent(); 1158 1159 ConsumerId consumerId = createConsumerId(); 1160 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1161 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1162 consumerInfo.setSelector(messageSelector); 1163 consumerInfo.setPrefetchSize(maxMessages); 1164 consumerInfo.setNoLocal(noLocal); 1165 consumerInfo.setDispatchAsync(isDispatchAsync()); 1166 1167 // Allows the options on the destination to configure the consumerInfo 1168 if (consumerInfo.getDestination().getOptions() != null) { 1169 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1170 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1171 } 1172 1173 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1174 } 1175 1176 /** 1177 * @return 1178 */ 1179 private ConsumerId createConsumerId() { 1180 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1181 } 1182 1183 /** 1184 * @return 1185 */ 1186 private ProducerId createProducerId() { 1187 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1188 } 1189 1190 /** 1191 * Creates a <CODE>QueueSession</CODE> object. 1192 * 1193 * @param transacted indicates whether the session is transacted 1194 * @param acknowledgeMode indicates whether the consumer or the client will 1195 * acknowledge any messages it receives; ignored if the 1196 * session is transacted. Legal values are 1197 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1198 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1199 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1200 * @return a newly created queue session 1201 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1202 * to create a session due to some internal error or lack of 1203 * support for the specific transaction and acknowledgement 1204 * mode. 1205 * @see Session#AUTO_ACKNOWLEDGE 1206 * @see Session#CLIENT_ACKNOWLEDGE 1207 * @see Session#DUPS_OK_ACKNOWLEDGE 1208 */ 1209 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1210 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1211 } 1212 1213 /** 1214 * Ensures that the clientID was manually specified and not auto-generated. 1215 * If the clientID was not specified this method will throw an exception. 1216 * This method is used to ensure that the clientID + durableSubscriber name 1217 * are used correctly. 1218 * 1219 * @throws JMSException 1220 */ 1221 public void checkClientIDWasManuallySpecified() throws JMSException { 1222 if (!userSpecifiedClientID) { 1223 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1224 } 1225 } 1226 1227 /** 1228 * send a Packet through the Connection - for internal use only 1229 * 1230 * @param command 1231 * @throws JMSException 1232 */ 1233 public void asyncSendPacket(Command command) throws JMSException { 1234 if (isClosed()) { 1235 throw new ConnectionClosedException(); 1236 } else { 1237 doAsyncSendPacket(command); 1238 } 1239 } 1240 1241 private void doAsyncSendPacket(Command command) throws JMSException { 1242 try { 1243 this.transport.oneway(command); 1244 } catch (IOException e) { 1245 throw JMSExceptionSupport.create(e); 1246 } 1247 } 1248 1249 /** 1250 * Send a packet through a Connection - for internal use only 1251 * 1252 * @param command 1253 * @return 1254 * @throws JMSException 1255 */ 1256 public Response syncSendPacket(Command command) throws JMSException { 1257 if (isClosed()) { 1258 throw new ConnectionClosedException(); 1259 } else { 1260 1261 try { 1262 Response response = (Response)this.transport.request(command); 1263 if (response.isException()) { 1264 ExceptionResponse er = (ExceptionResponse)response; 1265 if (er.getException() instanceof JMSException) { 1266 throw (JMSException)er.getException(); 1267 } else { 1268 if (isClosed()||closing.get()) { 1269 LOG.debug("Received an exception but connection is closing"); 1270 } 1271 JMSException jmsEx = null; 1272 try { 1273 jmsEx = JMSExceptionSupport.create(er.getException()); 1274 }catch(Throwable e) { 1275 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1276 } 1277 if(jmsEx !=null) { 1278 throw jmsEx; 1279 } 1280 } 1281 } 1282 return response; 1283 } catch (IOException e) { 1284 throw JMSExceptionSupport.create(e); 1285 } 1286 } 1287 } 1288 1289 /** 1290 * Send a packet through a Connection - for internal use only 1291 * 1292 * @param command 1293 * @return 1294 * @throws JMSException 1295 */ 1296 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1297 if (isClosed() || closing.get()) { 1298 throw new ConnectionClosedException(); 1299 } else { 1300 return doSyncSendPacket(command, timeout); 1301 } 1302 } 1303 1304 private Response doSyncSendPacket(Command command, int timeout) 1305 throws JMSException { 1306 try { 1307 Response response = (Response)this.transport.request(command, timeout); 1308 if (response != null && response.isException()) { 1309 ExceptionResponse er = (ExceptionResponse)response; 1310 if (er.getException() instanceof JMSException) { 1311 throw (JMSException)er.getException(); 1312 } else { 1313 throw JMSExceptionSupport.create(er.getException()); 1314 } 1315 } 1316 return response; 1317 } catch (IOException e) { 1318 throw JMSExceptionSupport.create(e); 1319 } 1320 } 1321 1322 /** 1323 * @return statistics for this Connection 1324 */ 1325 public StatsImpl getStats() { 1326 return stats; 1327 } 1328 1329 /** 1330 * simply throws an exception if the Connection is already closed or the 1331 * Transport has failed 1332 * 1333 * @throws JMSException 1334 */ 1335 protected synchronized void checkClosedOrFailed() throws JMSException { 1336 checkClosed(); 1337 if (transportFailed.get()) { 1338 throw new ConnectionFailedException(firstFailureError); 1339 } 1340 } 1341 1342 /** 1343 * simply throws an exception if the Connection is already closed 1344 * 1345 * @throws JMSException 1346 */ 1347 protected synchronized void checkClosed() throws JMSException { 1348 if (closed.get()) { 1349 throw new ConnectionClosedException(); 1350 } 1351 } 1352 1353 /** 1354 * Send the ConnectionInfo to the Broker 1355 * 1356 * @throws JMSException 1357 */ 1358 protected void ensureConnectionInfoSent() throws JMSException { 1359 synchronized(this.ensureConnectionInfoSentMutex) { 1360 // Can we skip sending the ConnectionInfo packet?? 1361 if (isConnectionInfoSentToBroker || closed.get()) { 1362 return; 1363 } 1364 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1365 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1366 info.setClientId(clientIdGenerator.generateId()); 1367 } 1368 syncSendPacket(info.copy()); 1369 1370 this.isConnectionInfoSentToBroker = true; 1371 // Add a temp destination advisory consumer so that 1372 // We know what the valid temporary destinations are on the 1373 // broker without having to do an RPC to the broker. 1374 1375 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1376 if (watchTopicAdvisories) { 1377 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1378 } 1379 } 1380 } 1381 1382 public synchronized boolean isWatchTopicAdvisories() { 1383 return watchTopicAdvisories; 1384 } 1385 1386 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1387 this.watchTopicAdvisories = watchTopicAdvisories; 1388 } 1389 1390 /** 1391 * @return Returns the useAsyncSend. 1392 */ 1393 public boolean isUseAsyncSend() { 1394 return useAsyncSend; 1395 } 1396 1397 /** 1398 * Forces the use of <a 1399 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1400 * adds a massive performance boost; but means that the send() method will 1401 * return immediately whether the message has been sent or not which could 1402 * lead to message loss. 1403 */ 1404 public void setUseAsyncSend(boolean useAsyncSend) { 1405 this.useAsyncSend = useAsyncSend; 1406 } 1407 1408 /** 1409 * @return true if always sync send messages 1410 */ 1411 public boolean isAlwaysSyncSend() { 1412 return this.alwaysSyncSend; 1413 } 1414 1415 /** 1416 * Set true if always require messages to be sync sent 1417 * 1418 * @param alwaysSyncSend 1419 */ 1420 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1421 this.alwaysSyncSend = alwaysSyncSend; 1422 } 1423 1424 /** 1425 * Cleans up this connection so that it's state is as if the connection was 1426 * just created. This allows the Resource Adapter to clean up a connection 1427 * so that it can be reused without having to close and recreate the 1428 * connection. 1429 */ 1430 public void cleanup() throws JMSException { 1431 1432 if (advisoryConsumer != null && !isTransportFailed()) { 1433 advisoryConsumer.dispose(); 1434 advisoryConsumer = null; 1435 } 1436 1437 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1438 ActiveMQSession s = i.next(); 1439 s.dispose(); 1440 } 1441 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1442 ActiveMQConnectionConsumer c = i.next(); 1443 c.dispose(); 1444 } 1445 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 1446 ActiveMQInputStream c = i.next(); 1447 c.dispose(); 1448 } 1449 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 1450 ActiveMQOutputStream c = i.next(); 1451 c.dispose(); 1452 } 1453 1454 if (isConnectionInfoSentToBroker) { 1455 if (!transportFailed.get() && !closing.get()) { 1456 syncSendPacket(info.createRemoveCommand()); 1457 } 1458 isConnectionInfoSentToBroker = false; 1459 } 1460 if (userSpecifiedClientID) { 1461 info.setClientId(null); 1462 userSpecifiedClientID = false; 1463 } 1464 clientIDSet = false; 1465 1466 started.set(false); 1467 } 1468 1469 /** 1470 * Changes the associated username/password that is associated with this 1471 * connection. If the connection has been used, you must called cleanup() 1472 * before calling this method. 1473 * 1474 * @throws IllegalStateException if the connection is in used. 1475 */ 1476 public void changeUserInfo(String userName, String password) throws JMSException { 1477 if (isConnectionInfoSentToBroker) { 1478 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1479 } 1480 this.info.setUserName(userName); 1481 this.info.setPassword(password); 1482 } 1483 1484 /** 1485 * @return Returns the resourceManagerId. 1486 * @throws JMSException 1487 */ 1488 public String getResourceManagerId() throws JMSException { 1489 waitForBrokerInfo(); 1490 if (brokerInfo == null) { 1491 throw new JMSException("Connection failed before Broker info was received."); 1492 } 1493 return brokerInfo.getBrokerId().getValue(); 1494 } 1495 1496 /** 1497 * Returns the broker name if one is available or null if one is not 1498 * available yet. 1499 */ 1500 public String getBrokerName() { 1501 try { 1502 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1503 if (brokerInfo == null) { 1504 return null; 1505 } 1506 return brokerInfo.getBrokerName(); 1507 } catch (InterruptedException e) { 1508 Thread.currentThread().interrupt(); 1509 return null; 1510 } 1511 } 1512 1513 /** 1514 * Returns the broker information if it is available or null if it is not 1515 * available yet. 1516 */ 1517 public BrokerInfo getBrokerInfo() { 1518 return brokerInfo; 1519 } 1520 1521 /** 1522 * @return Returns the RedeliveryPolicy. 1523 * @throws JMSException 1524 */ 1525 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1526 return redeliveryPolicy; 1527 } 1528 1529 /** 1530 * Sets the redelivery policy to be used when messages are rolled back 1531 */ 1532 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1533 this.redeliveryPolicy = redeliveryPolicy; 1534 } 1535 1536 public BlobTransferPolicy getBlobTransferPolicy() { 1537 if (blobTransferPolicy == null) { 1538 blobTransferPolicy = createBlobTransferPolicy(); 1539 } 1540 return blobTransferPolicy; 1541 } 1542 1543 /** 1544 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1545 * OBjects) are transferred from producers to brokers to consumers 1546 */ 1547 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1548 this.blobTransferPolicy = blobTransferPolicy; 1549 } 1550 1551 /** 1552 * @return Returns the alwaysSessionAsync. 1553 */ 1554 public boolean isAlwaysSessionAsync() { 1555 return alwaysSessionAsync; 1556 } 1557 1558 /** 1559 * If this flag is set then a separate thread is not used for dispatching 1560 * messages for each Session in the Connection. However, a separate thread 1561 * is always used if there is more than one session, or the session isn't in 1562 * auto acknowledge or duplicates ok mode 1563 */ 1564 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1565 this.alwaysSessionAsync = alwaysSessionAsync; 1566 } 1567 1568 /** 1569 * @return Returns the optimizeAcknowledge. 1570 */ 1571 public boolean isOptimizeAcknowledge() { 1572 return optimizeAcknowledge; 1573 } 1574 1575 /** 1576 * Enables an optimised acknowledgement mode where messages are acknowledged 1577 * in batches rather than individually 1578 * 1579 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1580 */ 1581 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1582 this.optimizeAcknowledge = optimizeAcknowledge; 1583 } 1584 1585 public long getWarnAboutUnstartedConnectionTimeout() { 1586 return warnAboutUnstartedConnectionTimeout; 1587 } 1588 1589 /** 1590 * Enables the timeout from a connection creation to when a warning is 1591 * generated if the connection is not properly started via {@link #start()} 1592 * and a message is received by a consumer. It is a very common gotcha to 1593 * forget to <a 1594 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1595 * the connection</a> so this option makes the default case to create a 1596 * warning if the user forgets. To disable the warning just set the value to < 1597 * 0 (say -1). 1598 */ 1599 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1600 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1601 } 1602 1603 /** 1604 * @return the sendTimeout 1605 */ 1606 public int getSendTimeout() { 1607 return sendTimeout; 1608 } 1609 1610 /** 1611 * @param sendTimeout the sendTimeout to set 1612 */ 1613 public void setSendTimeout(int sendTimeout) { 1614 this.sendTimeout = sendTimeout; 1615 } 1616 1617 /** 1618 * @return the sendAcksAsync 1619 */ 1620 public boolean isSendAcksAsync() { 1621 return sendAcksAsync; 1622 } 1623 1624 /** 1625 * @param sendAcksAsync the sendAcksAsync to set 1626 */ 1627 public void setSendAcksAsync(boolean sendAcksAsync) { 1628 this.sendAcksAsync = sendAcksAsync; 1629 } 1630 1631 1632 /** 1633 * Returns the time this connection was created 1634 */ 1635 public long getTimeCreated() { 1636 return timeCreated; 1637 } 1638 1639 private void waitForBrokerInfo() throws JMSException { 1640 try { 1641 brokerInfoReceived.await(); 1642 } catch (InterruptedException e) { 1643 Thread.currentThread().interrupt(); 1644 throw JMSExceptionSupport.create(e); 1645 } 1646 } 1647 1648 // Package protected so that it can be used in unit tests 1649 public Transport getTransport() { 1650 return transport; 1651 } 1652 1653 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1654 producers.put(producerId, producer); 1655 } 1656 1657 public void removeProducer(ProducerId producerId) { 1658 producers.remove(producerId); 1659 } 1660 1661 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1662 dispatchers.put(consumerId, dispatcher); 1663 } 1664 1665 public void removeDispatcher(ConsumerId consumerId) { 1666 dispatchers.remove(consumerId); 1667 } 1668 1669 /** 1670 * @param o - the command to consume 1671 */ 1672 public void onCommand(final Object o) { 1673 final Command command = (Command)o; 1674 if (!closed.get() && command != null) { 1675 try { 1676 command.visit(new CommandVisitorAdapter() { 1677 @Override 1678 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1679 waitForTransportInterruptionProcessing(); 1680 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1681 if (dispatcher != null) { 1682 // Copy in case a embedded broker is dispatching via 1683 // vm:// 1684 // md.getMessage() == null to signal end of queue 1685 // browse. 1686 Message msg = md.getMessage(); 1687 if (msg != null) { 1688 msg = msg.copy(); 1689 msg.setReadOnlyBody(true); 1690 msg.setReadOnlyProperties(true); 1691 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1692 msg.setConnection(ActiveMQConnection.this); 1693 md.setMessage(msg); 1694 } 1695 dispatcher.dispatch(md); 1696 } 1697 return null; 1698 } 1699 1700 @Override 1701 public Response processProducerAck(ProducerAck pa) throws Exception { 1702 if (pa != null && pa.getProducerId() != null) { 1703 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1704 if (producer != null) { 1705 producer.onProducerAck(pa); 1706 } 1707 } 1708 return null; 1709 } 1710 1711 @Override 1712 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1713 brokerInfo = info; 1714 brokerInfoReceived.countDown(); 1715 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1716 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1717 return null; 1718 } 1719 1720 @Override 1721 public Response processConnectionError(final ConnectionError error) throws Exception { 1722 asyncConnectionThread.execute(new Runnable() { 1723 public void run() { 1724 onAsyncException(error.getException()); 1725 } 1726 }); 1727 return null; 1728 } 1729 1730 @Override 1731 public Response processControlCommand(ControlCommand command) throws Exception { 1732 onControlCommand(command); 1733 return null; 1734 } 1735 1736 @Override 1737 public Response processConnectionControl(ConnectionControl control) throws Exception { 1738 onConnectionControl((ConnectionControl)command); 1739 return null; 1740 } 1741 1742 @Override 1743 public Response processConsumerControl(ConsumerControl control) throws Exception { 1744 onConsumerControl((ConsumerControl)command); 1745 return null; 1746 } 1747 1748 @Override 1749 public Response processWireFormat(WireFormatInfo info) throws Exception { 1750 onWireFormatInfo((WireFormatInfo)command); 1751 return null; 1752 } 1753 }); 1754 } catch (Exception e) { 1755 onClientInternalException(e); 1756 } 1757 1758 } 1759 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1760 TransportListener listener = iter.next(); 1761 listener.onCommand(command); 1762 } 1763 } 1764 1765 protected void onWireFormatInfo(WireFormatInfo info) { 1766 protocolVersion.set(info.getVersion()); 1767 } 1768 1769 /** 1770 * Handles async client internal exceptions. 1771 * A client internal exception is usually one that has been thrown 1772 * by a container runtime component during asynchronous processing of a 1773 * message that does not affect the connection itself. 1774 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1775 * its <code>onException</code> method, if one has been registered with this connection. 1776 * 1777 * @param error the exception that the problem 1778 */ 1779 public void onClientInternalException(final Throwable error) { 1780 if ( !closed.get() && !closing.get() ) { 1781 if ( this.clientInternalExceptionListener != null ) { 1782 asyncConnectionThread.execute(new Runnable() { 1783 public void run() { 1784 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1785 } 1786 }); 1787 } else { 1788 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1789 + error, error); 1790 } 1791 } 1792 } 1793 /** 1794 * Used for handling async exceptions 1795 * 1796 * @param error 1797 */ 1798 public void onAsyncException(Throwable error) { 1799 if (!closed.get() && !closing.get()) { 1800 if (this.exceptionListener != null) { 1801 1802 if (!(error instanceof JMSException)) { 1803 error = JMSExceptionSupport.create(error); 1804 } 1805 final JMSException e = (JMSException)error; 1806 1807 asyncConnectionThread.execute(new Runnable() { 1808 public void run() { 1809 ActiveMQConnection.this.exceptionListener.onException(e); 1810 } 1811 }); 1812 1813 } else { 1814 LOG.debug("Async exception with no exception listener: " + error, error); 1815 } 1816 } 1817 } 1818 1819 public void onException(final IOException error) { 1820 onAsyncException(error); 1821 if (!closing.get() && !closed.get()) { 1822 asyncConnectionThread.execute(new Runnable() { 1823 public void run() { 1824 transportFailed(error); 1825 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1826 brokerInfoReceived.countDown(); 1827 try { 1828 cleanup(); 1829 } catch (JMSException e) { 1830 LOG.warn("Exception during connection cleanup, " + e, e); 1831 } 1832 for (Iterator<TransportListener> iter = transportListeners 1833 .iterator(); iter.hasNext();) { 1834 TransportListener listener = iter.next(); 1835 listener.onException(error); 1836 } 1837 } 1838 }); 1839 } 1840 } 1841 1842 public void transportInterupted() { 1843 transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); 1844 if (LOG.isDebugEnabled()) { 1845 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); 1846 } 1847 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1848 ActiveMQSession s = i.next(); 1849 s.clearMessagesInProgress(); 1850 } 1851 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1852 TransportListener listener = iter.next(); 1853 listener.transportInterupted(); 1854 } 1855 } 1856 1857 public void transportResumed() { 1858 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1859 TransportListener listener = iter.next(); 1860 listener.transportResumed(); 1861 } 1862 } 1863 1864 /** 1865 * Create the DestinationInfo object for the temporary destination. 1866 * 1867 * @param topic - if its true topic, else queue. 1868 * @return DestinationInfo 1869 * @throws JMSException 1870 */ 1871 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 1872 1873 // Check if Destination info is of temporary type. 1874 ActiveMQTempDestination dest; 1875 if (topic) { 1876 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1877 } else { 1878 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1879 } 1880 1881 DestinationInfo info = new DestinationInfo(); 1882 info.setConnectionId(this.info.getConnectionId()); 1883 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 1884 info.setDestination(dest); 1885 syncSendPacket(info); 1886 1887 dest.setConnection(this); 1888 activeTempDestinations.put(dest, dest); 1889 return dest; 1890 } 1891 1892 /** 1893 * @param destination 1894 * @throws JMSException 1895 */ 1896 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 1897 1898 checkClosedOrFailed(); 1899 1900 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1901 ActiveMQSession s = i.next(); 1902 if (s.isInUse(destination)) { 1903 throw new JMSException("A consumer is consuming from the temporary destination"); 1904 } 1905 } 1906 1907 activeTempDestinations.remove(destination); 1908 1909 DestinationInfo info = new DestinationInfo(); 1910 info.setConnectionId(this.info.getConnectionId()); 1911 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1912 info.setDestination(destination); 1913 info.setTimeout(0); 1914 syncSendPacket(info); 1915 } 1916 1917 public boolean isDeleted(ActiveMQDestination dest) { 1918 1919 // If we are not watching the advisories.. then 1920 // we will assume that the temp destination does exist. 1921 if (advisoryConsumer == null) { 1922 return false; 1923 } 1924 1925 return !activeTempDestinations.contains(dest); 1926 } 1927 1928 public boolean isCopyMessageOnSend() { 1929 return copyMessageOnSend; 1930 } 1931 1932 public LongSequenceGenerator getLocalTransactionIdGenerator() { 1933 return localTransactionIdGenerator; 1934 } 1935 1936 public boolean isUseCompression() { 1937 return useCompression; 1938 } 1939 1940 /** 1941 * Enables the use of compression of the message bodies 1942 */ 1943 public void setUseCompression(boolean useCompression) { 1944 this.useCompression = useCompression; 1945 } 1946 1947 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 1948 1949 checkClosedOrFailed(); 1950 ensureConnectionInfoSent(); 1951 1952 DestinationInfo info = new DestinationInfo(); 1953 info.setConnectionId(this.info.getConnectionId()); 1954 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1955 info.setDestination(destination); 1956 info.setTimeout(0); 1957 syncSendPacket(info); 1958 1959 } 1960 1961 public boolean isDispatchAsync() { 1962 return dispatchAsync; 1963 } 1964 1965 /** 1966 * Enables or disables the default setting of whether or not consumers have 1967 * their messages <a 1968 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 1969 * synchronously or asynchronously by the broker</a>. For non-durable 1970 * topics for example we typically dispatch synchronously by default to 1971 * minimize context switches which boost performance. However sometimes its 1972 * better to go slower to ensure that a single blocked consumer socket does 1973 * not block delivery to other consumers. 1974 * 1975 * @param asyncDispatch If true then consumers created on this connection 1976 * will default to having their messages dispatched 1977 * asynchronously. The default value is false. 1978 */ 1979 public void setDispatchAsync(boolean asyncDispatch) { 1980 this.dispatchAsync = asyncDispatch; 1981 } 1982 1983 public boolean isObjectMessageSerializationDefered() { 1984 return objectMessageSerializationDefered; 1985 } 1986 1987 /** 1988 * When an object is set on an ObjectMessage, the JMS spec requires the 1989 * object to be serialized by that set method. Enabling this flag causes the 1990 * object to not get serialized. The object may subsequently get serialized 1991 * if the message needs to be sent over a socket or stored to disk. 1992 */ 1993 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 1994 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 1995 } 1996 1997 public InputStream createInputStream(Destination dest) throws JMSException { 1998 return createInputStream(dest, null); 1999 } 2000 2001 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 2002 return createInputStream(dest, messageSelector, false); 2003 } 2004 2005 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 2006 return doCreateInputStream(dest, messageSelector, noLocal, null); 2007 } 2008 2009 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 2010 return createInputStream(dest, null, false); 2011 } 2012 2013 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 2014 return createDurableInputStream(dest, name, messageSelector, false); 2015 } 2016 2017 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 2018 return doCreateInputStream(dest, messageSelector, noLocal, name); 2019 } 2020 2021 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException { 2022 checkClosedOrFailed(); 2023 ensureConnectionInfoSent(); 2024 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch()); 2025 } 2026 2027 /** 2028 * Creates a persistent output stream; individual messages will be written 2029 * to disk/database by the broker 2030 */ 2031 public OutputStream createOutputStream(Destination dest) throws JMSException { 2032 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2033 } 2034 2035 /** 2036 * Creates a non persistent output stream; messages will not be written to 2037 * disk 2038 */ 2039 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 2040 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2041 } 2042 2043 /** 2044 * Creates an output stream allowing full control over the delivery mode, 2045 * the priority and time to live of the messages and the properties added to 2046 * messages on the stream. 2047 * 2048 * @param streamProperties defines a map of key-value pairs where the keys 2049 * are strings and the values are primitive values (numbers 2050 * and strings) which are appended to the messages similarly 2051 * to using the 2052 * {@link javax.jms.Message#setObjectProperty(String, Object)} 2053 * method 2054 */ 2055 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 2056 checkClosedOrFailed(); 2057 ensureConnectionInfoSent(); 2058 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 2059 } 2060 2061 /** 2062 * Unsubscribes a durable subscription that has been created by a client. 2063 * <P> 2064 * This method deletes the state being maintained on behalf of the 2065 * subscriber by its provider. 2066 * <P> 2067 * It is erroneous for a client to delete a durable subscription while there 2068 * is an active <CODE>MessageConsumer </CODE> or 2069 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2070 * message is part of a pending transaction or has not been acknowledged in 2071 * the session. 2072 * 2073 * @param name the name used to identify this subscription 2074 * @throws JMSException if the session fails to unsubscribe to the durable 2075 * subscription due to some internal error. 2076 * @throws InvalidDestinationException if an invalid subscription name is 2077 * specified. 2078 * @since 1.1 2079 */ 2080 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2081 checkClosedOrFailed(); 2082 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2083 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2084 rsi.setSubscriptionName(name); 2085 rsi.setClientId(getConnectionInfo().getClientId()); 2086 syncSendPacket(rsi); 2087 } 2088 2089 /** 2090 * Internal send method optimized: - It does not copy the message - It can 2091 * only handle ActiveMQ messages. - You can specify if the send is async or 2092 * sync - Does not allow you to send /w a transaction. 2093 */ 2094 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2095 checkClosedOrFailed(); 2096 2097 if (destination.isTemporary() && isDeleted(destination)) { 2098 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2099 } 2100 2101 msg.setJMSDestination(destination); 2102 msg.setJMSDeliveryMode(deliveryMode); 2103 long expiration = 0L; 2104 2105 if (!isDisableTimeStampsByDefault()) { 2106 long timeStamp = System.currentTimeMillis(); 2107 msg.setJMSTimestamp(timeStamp); 2108 if (timeToLive > 0) { 2109 expiration = timeToLive + timeStamp; 2110 } 2111 } 2112 2113 msg.setJMSExpiration(expiration); 2114 msg.setJMSPriority(priority); 2115 2116 msg.setJMSRedelivered(false); 2117 msg.setMessageId(messageId); 2118 2119 msg.onSend(); 2120 2121 msg.setProducerId(msg.getMessageId().getProducerId()); 2122 2123 if (LOG.isDebugEnabled()) { 2124 LOG.debug("Sending message: " + msg); 2125 } 2126 2127 if (async) { 2128 asyncSendPacket(msg); 2129 } else { 2130 syncSendPacket(msg); 2131 } 2132 2133 } 2134 2135 public void addOutputStream(ActiveMQOutputStream stream) { 2136 outputStreams.add(stream); 2137 } 2138 2139 public void removeOutputStream(ActiveMQOutputStream stream) { 2140 outputStreams.remove(stream); 2141 } 2142 2143 public void addInputStream(ActiveMQInputStream stream) { 2144 inputStreams.add(stream); 2145 } 2146 2147 public void removeInputStream(ActiveMQInputStream stream) { 2148 inputStreams.remove(stream); 2149 } 2150 2151 protected void onControlCommand(ControlCommand command) { 2152 String text = command.getCommand(); 2153 if (text != null) { 2154 if (text.equals("shutdown")) { 2155 LOG.info("JVM told to shutdown"); 2156 System.exit(0); 2157 } 2158 } 2159 } 2160 2161 protected void onConnectionControl(ConnectionControl command) { 2162 if (command.isFaultTolerant()) { 2163 this.optimizeAcknowledge = false; 2164 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2165 ActiveMQSession s = i.next(); 2166 s.setOptimizeAcknowledge(false); 2167 } 2168 } 2169 } 2170 2171 protected void onConsumerControl(ConsumerControl command) { 2172 if (command.isClose()) { 2173 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2174 ActiveMQSession s = i.next(); 2175 s.close(command.getConsumerId()); 2176 } 2177 } else { 2178 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2179 ActiveMQSession s = i.next(); 2180 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2181 } 2182 } 2183 } 2184 2185 protected void transportFailed(IOException error) { 2186 transportFailed.set(true); 2187 if (firstFailureError == null) { 2188 firstFailureError = error; 2189 } 2190 } 2191 2192 /** 2193 * Should a JMS message be copied to a new JMS Message object as part of the 2194 * send() method in JMS. This is enabled by default to be compliant with the 2195 * JMS specification. You can disable it if you do not mutate JMS messages 2196 * after they are sent for a performance boost 2197 */ 2198 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2199 this.copyMessageOnSend = copyMessageOnSend; 2200 } 2201 2202 public String toString() { 2203 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2204 } 2205 2206 protected BlobTransferPolicy createBlobTransferPolicy() { 2207 return new BlobTransferPolicy(); 2208 } 2209 2210 public int getProtocolVersion() { 2211 return protocolVersion.get(); 2212 } 2213 2214 public int getProducerWindowSize() { 2215 return producerWindowSize; 2216 } 2217 2218 public void setProducerWindowSize(int producerWindowSize) { 2219 this.producerWindowSize = producerWindowSize; 2220 } 2221 2222 public void setAuditDepth(int auditDepth) { 2223 connectionAudit.setAuditDepth(auditDepth); 2224 } 2225 2226 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2227 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2228 } 2229 2230 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2231 connectionAudit.removeDispatcher(dispatcher); 2232 } 2233 2234 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2235 return connectionAudit.isDuplicate(dispatcher, message); 2236 } 2237 2238 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2239 connectionAudit.rollbackDuplicate(dispatcher, message); 2240 } 2241 2242 public IOException getFirstFailureError() { 2243 return firstFailureError; 2244 } 2245 2246 protected void waitForTransportInterruptionProcessing() throws InterruptedException { 2247 if (transportInterruptionProcessingComplete != null) { 2248 while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) { 2249 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete.."); 2250 } 2251 synchronized (this) { 2252 transportInterruptionProcessingComplete = null; 2253 } 2254 } 2255 } 2256 2257 protected synchronized void transportInterruptionProcessingComplete() { 2258 if (transportInterruptionProcessingComplete != null) { 2259 transportInterruptionProcessingComplete.countDown(); 2260 } 2261 } 2262 2263 /* 2264 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2265 * will wait to receive re dispatched messages. 2266 * default value is 0 so there is no wait by default. 2267 */ 2268 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2269 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2270 } 2271 2272 public long getConsumerFailoverRedeliveryWaitPeriod() { 2273 return consumerFailoverRedeliveryWaitPeriod; 2274 } 2275 }