001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.message; 020 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.List; 027 import java.util.Properties; 028 import java.util.StringTokenizer; 029 import javax.jms.Destination; 030 import javax.jms.JMSException; 031 import javax.jms.Queue; 032 import javax.jms.TemporaryQueue; 033 import javax.jms.TemporaryTopic; 034 import javax.jms.Topic; 035 import org.activemq.ActiveMQSession; 036 import org.activemq.filter.DestinationFilter; 037 import org.activemq.filter.DestinationPath; 038 import org.activemq.jndi.JNDIBaseStorable; 039 import org.activemq.management.JMSDestinationStats; 040 041 /** 042 * A <CODE>Destination</CODE> object encapsulates a provider-specific 043 * address. 044 * The JMS API does not define a standard address syntax. Although a standard 045 * address syntax was considered, it was decided that the differences in 046 * address semantics between existing message-oriented middleware (MOM) 047 * products were too wide to bridge with a single syntax. 048 * <p/> 049 * <P>Since <CODE>Destination</CODE> is an administered object, it may 050 * contain 051 * provider-specific configuration information in addition to its address. 052 * <p/> 053 * <P>The JMS API also supports a client's use of provider-specific address 054 * names. 055 * <p/> 056 * <P><CODE>Destination</CODE> objects support concurrent use. 057 * <p/> 058 * <P>A <CODE>Destination</CODE> object is a JMS administered object. 059 * <p/> 060 * <P>JMS administered objects are objects containing configuration 061 * information that are created by an administrator and later used by 062 * JMS clients. They make it practical to administer the JMS API in the 063 * enterprise. 064 * <p/> 065 * <P>Although the interfaces for administered objects do not explicitly 066 * depend on the Java Naming and Directory Interface (JNDI) API, the JMS API 067 * establishes the convention that JMS clients find administered objects by 068 * looking them up in a JNDI namespace. 069 * <p/> 070 * <P>An administrator can place an administered object anywhere in a 071 * namespace. The JMS API does not define a naming policy. 072 * <p/> 073 * <P>It is expected that JMS providers will provide the tools an 074 * administrator needs to create and configure administered objects in a 075 * JNDI namespace. JMS provider implementations of administered objects 076 * should implement the <CODE>javax.naming.Referenceable</CODE> and 077 * <CODE>java.io.Serializable</CODE> interfaces so that they can be stored in 078 * all JNDI naming contexts. In addition, it is recommended that these 079 * implementations follow the JavaBeans<SUP><FONT SIZE="-2">TM</FONT></SUP> 080 * design patterns. 081 * <p/> 082 * <P>This strategy provides several benefits: 083 * <p/> 084 * <UL> 085 * <LI>It hides provider-specific details from JMS clients. 086 * <LI>It abstracts JMS administrative information into objects in the Java 087 * programming language ("Java objects") 088 * that are easily organized and administered from a common 089 * management console. 090 * <LI>Since there will be JNDI providers for all popular naming 091 * services, JMS providers can deliver one implementation 092 * of administered objects that will run everywhere. 093 * </UL> 094 * <p/> 095 * <P>An administered object should not hold on to any remote resources. 096 * Its lookup should not use remote resources other than those used by the 097 * JNDI API itself. 098 * <p/> 099 * <P>Clients should think of administered objects as local Java objects. 100 * Looking them up should not have any hidden side effects or use surprising 101 * amounts of local resources. 102 */ 103 104 public abstract class ActiveMQDestination extends JNDIBaseStorable implements Destination, Comparable, Serializable { 105 106 static final long serialVersionUID = -3300456112096957638L; 107 108 /** 109 * Topic Destination object 110 */ 111 public static final int ACTIVEMQ_TOPIC = 1; 112 /** 113 * Temporary Topic Destination object 114 */ 115 public static final int ACTIVEMQ_TEMPORARY_TOPIC = 2; 116 117 /** 118 * Queue Destination object 119 */ 120 public static final int ACTIVEMQ_QUEUE = 3; 121 /** 122 * Temporary Queue Destination object 123 */ 124 public static final int ACTIVEMQ_TEMPORARY_QUEUE = 4; 125 126 /** 127 * prefix for Advisory message destinations 128 */ 129 public static final String ADVISORY_PREFIX = "ActiveMQ.Advisory."; 130 131 /** 132 * prefix for connection advisory destinations 133 */ 134 public static final String CONNECTION_ADVISORY_PREFIX = ADVISORY_PREFIX + "Connections."; 135 136 /** 137 * prefix for consumer advisory destinations 138 * @deprecated Use {@see #getDestinationForConsumerAdvisory()} instead. 139 */ 140 public static final String CONSUMER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Consumers."; 141 142 /** 143 * prefix for producer advisory destinations 144 * @deprecated Use {@see #getDestinationForProducerAdvisory()} instead. 145 */ 146 public static final String PRODUCER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Producers."; 147 148 /** 149 * prefix for connection advisory destinations 150 * @deprecated Use {@see #getDestinationForTempAdvisory()} instead. 151 */ 152 public static final String TEMP_DESTINATION_ADVISORY_PREFIX = ADVISORY_PREFIX + "TempDestinations."; 153 154 /** 155 * The default target for ordered destinations 156 */ 157 public static final String DEFAULT_ORDERED_TARGET = "coordinator"; 158 159 private static final int NULL_DESTINATION = 10; 160 161 private static final String TEMP_PREFIX = "{TD{"; 162 private static final String TEMP_POSTFIX = "}TD}"; 163 private static final String COMPOSITE_SEPARATOR = ","; 164 165 private static final String QUEUE_PREFIX = "queue://"; 166 private static final String TOPIC_PREFIX = "topic://"; 167 168 private String physicalName = ""; 169 170 // Cached transient data 171 private transient DestinationFilter filter; 172 private transient JMSDestinationStats stats; 173 private transient String[] paths; 174 // Used track consumers of temporary topics. 175 private transient int consumerCounter; 176 private transient boolean deleted; 177 //additional for defining 'exotic' behaviour 178 private boolean exclusive; 179 private boolean ordered; 180 private boolean advisory; 181 private boolean wildcard; 182 private boolean composite; 183 private String orderedTarget = DEFAULT_ORDERED_TARGET; 184 //used client-side only 185 private transient ActiveMQSession sessionCreatedBy; 186 187 /** 188 * @return Returns the orginatingSession. 189 */ 190 public ActiveMQSession getSessionCreatedBy() { 191 return sessionCreatedBy; 192 } 193 /** 194 * @param orginatingSession The orginatingSession to set. 195 */ 196 public void setSessionCreatedBy(ActiveMQSession orginatingSession) { 197 this.sessionCreatedBy = orginatingSession; 198 } 199 /** 200 * The Default Constructor 201 */ 202 protected ActiveMQDestination() { 203 } 204 205 /** 206 * Construct the ActiveMQDestination with a defined physical name; 207 * 208 * @param name 209 */ 210 211 protected ActiveMQDestination(String name) { 212 setPhysicalName(name); 213 } 214 215 /** 216 * @return Returns the advisory. 217 */ 218 public final boolean isAdvisory() { 219 return advisory; 220 } 221 /** 222 * @param advisory The advisory to set. 223 */ 224 public final void setAdvisory(boolean advisory) { 225 this.advisory = advisory; 226 } 227 228 /** 229 * @return true if this is a destination for Consumer advisories 230 */ 231 public boolean isConsumerAdvisory(){ 232 return isAdvisory() && physicalName.startsWith(ActiveMQDestination.CONSUMER_ADVISORY_PREFIX); 233 } 234 235 /** 236 * @return true if this is a destination for Producer advisories 237 */ 238 public boolean isProducerAdvisory(){ 239 return isAdvisory() && physicalName.startsWith(ActiveMQDestination.PRODUCER_ADVISORY_PREFIX); 240 } 241 242 /** 243 * @return true if this is a destination for Connection advisories 244 */ 245 public boolean isConnectionAdvisory(){ 246 return isAdvisory() && physicalName.startsWith(ActiveMQDestination.CONNECTION_ADVISORY_PREFIX); 247 } 248 249 /** 250 * @return true if this a destination for Tempoary Destination advisories 251 */ 252 public final boolean isTempDestinationAdvisory(){ 253 return advisory && physicalName.startsWith(ActiveMQDestination.TEMP_DESTINATION_ADVISORY_PREFIX); 254 } 255 256 /** 257 * @return Returns the exclusive. 258 */ 259 public final boolean isExclusive() { 260 return exclusive; 261 } 262 /** 263 * @param exclusive The exclusive to set. 264 */ 265 public final void setExclusive(boolean exclusive) { 266 this.exclusive = exclusive; 267 } 268 /** 269 * @return Returns the ordered. 270 */ 271 public final boolean isOrdered() { 272 return ordered; 273 } 274 /** 275 * @param ordered The ordered to set. 276 */ 277 public final void setOrdered(boolean ordered) { 278 this.ordered = ordered; 279 } 280 /** 281 * @return Returns the orderedTarget. 282 */ 283 public String getOrderedTarget() { 284 return orderedTarget; 285 } 286 /** 287 * @param orderedTarget The orderedTarget to set. 288 */ 289 public void setOrderedTarget(String orderedTarget) { 290 this.orderedTarget = orderedTarget; 291 } 292 /** 293 * A helper method to return a descriptive string for the topic or queue 294 * @param destination 295 * 296 * @return a descriptive string for this queue or topic 297 */ 298 public static String inspect(Destination destination) { 299 if (destination instanceof Topic) { 300 return "Topic(" + destination.toString() + ")"; 301 } 302 else { 303 return "Queue(" + destination.toString() + ")"; 304 } 305 } 306 307 /** 308 * @param destination 309 * @return @throws JMSException 310 * @throws javax.jms.JMSException 311 */ 312 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { 313 ActiveMQDestination result = null; 314 if (destination != null) { 315 if (destination instanceof ActiveMQDestination) { 316 result = (ActiveMQDestination) destination; 317 } 318 else { 319 if (destination instanceof TemporaryQueue) { 320 result = new ActiveMQTemporaryQueue(((Queue) destination).getQueueName()); 321 } 322 else if (destination instanceof TemporaryTopic) { 323 result = new ActiveMQTemporaryTopic(((Topic) destination).getTopicName()); 324 } 325 else if (destination instanceof Queue) { 326 result = new ActiveMQTemporaryQueue(((Queue) destination).getQueueName()); 327 } 328 else if (destination instanceof Topic) { 329 result = new ActiveMQTemporaryTopic(((Topic) destination).getTopicName()); 330 } 331 } 332 } 333 return result; 334 } 335 336 /** 337 * Write an ActiveMQDestination to a Stream 338 * 339 * @param destination 340 * @param dataOut 341 * @throws IOException 342 */ 343 344 public static void writeToStream(ActiveMQDestination destination, DataOutput dataOut) throws IOException { 345 if (destination != null) { 346 dataOut.writeByte(destination.getDestinationType()); 347 String physicalName = destination.getPhysicalName(); 348 boolean writeOrderedTarget = destination.orderedTarget != null && !destination.orderedTarget.equals(DEFAULT_ORDERED_TARGET); 349 byte byte1 = 0; 350 if (physicalName != null && physicalName.length() > 0) byte1 |= 1; 351 if (destination.ordered) byte1 |= 2; 352 if(destination.exclusive) byte1 |= 4; 353 if (writeOrderedTarget) byte1 |= 8; 354 if (destination.advisory) byte1 |= 16; 355 if (destination.deleted) byte1 |= 32; 356 if (destination.composite) byte1 |= 64; 357 if (destination.wildcard) byte1 |= 128; 358 dataOut.writeByte(byte1); 359 if (physicalName != null && physicalName.length() > 0){ 360 dataOut.writeUTF(physicalName); 361 } 362 if (writeOrderedTarget){ 363 dataOut.writeUTF(destination.orderedTarget); 364 } 365 } 366 else { 367 dataOut.write(NULL_DESTINATION); 368 } 369 } 370 371 /** 372 * Read an ActiveMQDestination from a Stream 373 * 374 * @param dataIn 375 * @return the ActiveMQDestination 376 * @throws IOException 377 */ 378 379 public static ActiveMQDestination readFromStream(DataInput dataIn) throws IOException { 380 381 int type = dataIn.readByte(); 382 if (type == NULL_DESTINATION) { 383 return null; 384 } 385 ActiveMQDestination result = null; 386 if (type == ACTIVEMQ_TOPIC) { 387 result = new ActiveMQTopic(); 388 } 389 else if (type == ACTIVEMQ_TEMPORARY_TOPIC) { 390 result = new ActiveMQTemporaryTopic(); 391 } 392 else if (type == ACTIVEMQ_QUEUE) { 393 result = new ActiveMQQueue(); 394 } 395 else { 396 result = new ActiveMQTemporaryQueue(); 397 } 398 byte byte1 = dataIn.readByte(); 399 if ((byte1 & 1) == 1){ 400 result.physicalName = dataIn.readUTF(); 401 } 402 result.setOrdered((byte1 & 2) == 2); 403 result.setExclusive((byte1 & 4) == 4); 404 if ((byte1 & 8) == 8){ 405 result.setOrderedTarget(dataIn.readUTF()); 406 } 407 result.advisory = (byte1 & 16)==16; 408 result.deleted = (byte1 & 32)==32; 409 result.composite = (byte1 & 64)==64; 410 result.wildcard = (byte1 & 128)==128; 411 return result; 412 } 413 414 /** 415 * Create a Destination 416 * @param type 417 * @param pyhsicalName 418 * @return 419 */ 420 public static ActiveMQDestination createDestination(int type,String pyhsicalName){ 421 ActiveMQDestination result = null; 422 if (type == ACTIVEMQ_TOPIC) { 423 result = new ActiveMQTopic(pyhsicalName); 424 } 425 else if (type == ACTIVEMQ_TEMPORARY_TOPIC) { 426 result = new ActiveMQTemporaryTopic(pyhsicalName); 427 } 428 else if (type == ACTIVEMQ_QUEUE) { 429 result = new ActiveMQQueue(pyhsicalName); 430 } 431 else { 432 result = new ActiveMQTemporaryQueue(pyhsicalName); 433 } 434 return result; 435 } 436 437 /** 438 * Create a temporary name from the clientId 439 * 440 * @param clientId 441 * @return 442 */ 443 public static String createTemporaryName(String clientId) { 444 return TEMP_PREFIX + clientId + TEMP_POSTFIX; 445 } 446 447 /** 448 * From a temporary destination find the clientId of the Connection that created it 449 * 450 * @param destination 451 * @return the clientId or null if not a temporary destination 452 */ 453 public static String getClientId(ActiveMQDestination destination) { 454 String answer = null; 455 if (destination != null && destination.isTemporary()) { 456 String name = destination.getPhysicalName(); 457 int start = name.indexOf(TEMP_PREFIX); 458 if (start >= 0) { 459 start += TEMP_PREFIX.length(); 460 int stop = name.lastIndexOf(TEMP_POSTFIX); 461 if (stop > start && stop < name.length()) { 462 answer = name.substring(start, stop); 463 } 464 } 465 } 466 return answer; 467 } 468 469 470 /** 471 * @param o object to compare 472 * @return 1 if this > o else 0 if they are equal or -1 if this < o 473 */ 474 public int compareTo(Object o) { 475 if (o instanceof ActiveMQDestination) { 476 return compareTo((ActiveMQDestination) o); 477 } 478 return -1; 479 } 480 481 /** 482 * Lets sort by name first then lets sort topics greater than queues 483 * 484 * @param that another destination to compare against 485 * @return 1 if this > that else 0 if they are equal or -1 if this < that 486 */ 487 public int compareTo(ActiveMQDestination that) { 488 int answer = 0; 489 if (physicalName != that.physicalName) { 490 if (physicalName == null) { 491 return -1; 492 } 493 else if (that.physicalName == null) { 494 return 1; 495 } 496 answer = physicalName.compareTo(that.physicalName); 497 } 498 if (answer == 0) { 499 if (isTopic()) { 500 if (that.isQueue()) { 501 return 1; 502 } 503 } 504 else { 505 if (that.isTopic()) { 506 return -1; 507 } 508 } 509 } 510 return answer; 511 } 512 513 514 /** 515 * @return Returns the Destination type 516 */ 517 518 public abstract int getDestinationType(); 519 520 521 /** 522 * @return Returns the physicalName. 523 */ 524 public String getPhysicalName() { 525 return this.physicalName; 526 } 527 528 /** 529 * @param name The physicalName to set. 530 */ 531 public void setPhysicalName(String name) { 532 this.physicalName = name; 533 this.advisory = name != null && name.startsWith(ADVISORY_PREFIX); 534 this.composite = name != null && name.indexOf(COMPOSITE_SEPARATOR) > 0; 535 this.wildcard = name != null 536 && (name.indexOf(DestinationFilter.ANY_CHILD) >= 0 || name.indexOf(DestinationFilter.ANY_DESCENDENT) >= 0); 537 } 538 539 /** 540 * Returns true if a temporary Destination 541 * 542 * @return true/false 543 */ 544 545 public boolean isTemporary() { 546 return false; 547 } 548 549 /** 550 * Returns true if a Topic Destination 551 * 552 * @return true/false 553 */ 554 555 public boolean isTopic() { 556 return true; 557 } 558 559 /** 560 * Returns true if a Queue Destination 561 * 562 * @return true/false 563 */ 564 public boolean isQueue() { 565 return false; 566 } 567 568 /** 569 * Returns true if this destination represents a collection of 570 * destinations; allowing a set of destinations to be published to or subscribed 571 * from in one JMS operation. 572 * <p/> 573 * If this destination is a composite then you can call {@link #getChildDestinations()} 574 * to return the list of child destinations. 575 * 576 * @return true if this destination represents a collection of child destinations. 577 */ 578 public final boolean isComposite() { 579 return composite; 580 } 581 582 /** 583 * Returns a list of child destinations if this destination represents a composite 584 * destination. 585 * 586 * @return 587 */ 588 public List getChildDestinations() { 589 List answer = new ArrayList(); 590 StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR); 591 while (iter.hasMoreTokens()) { 592 String name = iter.nextToken(); 593 Destination child = null; 594 if (name.startsWith(QUEUE_PREFIX)) { 595 child = new ActiveMQQueue(name.substring(QUEUE_PREFIX.length())); 596 } 597 else if (name.startsWith(TOPIC_PREFIX)) { 598 child = new ActiveMQTopic(name.substring(TOPIC_PREFIX.length())); 599 } 600 else { 601 child = createDestination(name); 602 } 603 answer.add(child); 604 } 605 if (answer.size() == 1) { 606 // lets put ourselves inside the collection 607 // as we are not really a composite destination 608 answer.set(0, this); 609 } 610 return answer; 611 } 612 613 public void setChildDestinations(ActiveMQDestination[] children) { 614 if( children==null ) 615 throw new IllegalArgumentException("children array cannot be null."); 616 if( children.length == 0 ) 617 throw new IllegalArgumentException("children array size must be 1 or greater."); 618 619 StringBuffer rc = new StringBuffer(); 620 for (int i = 0; i < children.length; i++) { 621 if(i!=0) 622 rc.append(COMPOSITE_SEPARATOR); 623 if( children[i].isTopic() ) { 624 rc.append(TOPIC_PREFIX); 625 } else if( children[i].isQueue() ) { 626 rc.append(QUEUE_PREFIX); 627 } 628 rc.append(children[i].getPhysicalName()); 629 } 630 631 setPhysicalName(rc.toString()); 632 } 633 634 /** 635 * @return string representation of this instance 636 */ 637 638 public String toString() { 639 return this.physicalName; 640 } 641 642 /** 643 * @return hashCode for this instance 644 */ 645 646 public int hashCode() { 647 int answer = 0xcafebabe; 648 649 if (this.physicalName != null) { 650 answer = physicalName.hashCode(); 651 } 652 if (isTopic()) { 653 answer ^= 0xfabfab; 654 } 655 return answer; 656 } 657 658 /** 659 * if the object passed in is equivalent, return true 660 * 661 * @param obj the object to compare 662 * @return true if this instance and obj are equivalent 663 */ 664 665 public boolean equals(Object obj) { 666 boolean result = this == obj; 667 if (!result && obj != null && obj instanceof ActiveMQDestination) { 668 ActiveMQDestination other = (ActiveMQDestination) obj; 669 result = this.getDestinationType() == other.getDestinationType() && 670 this.physicalName.equals(other.physicalName); 671 } 672 return result; 673 } 674 675 676 /** 677 * @return true if the destination matches multiple possible destinations 678 */ 679 public boolean isWildcard() { 680 return wildcard; 681 } 682 683 /** 684 * @param destination 685 * @return true if the given destination matches this destination; including wildcards 686 */ 687 public boolean matches(ActiveMQDestination destination) { 688 if (isWildcard()) { 689 return getDestinationFilter().matches(destination); 690 } 691 else { 692 return equals(destination); 693 } 694 } 695 696 697 /** 698 * @return the DestinationFilter 699 */ 700 public DestinationFilter getDestinationFilter() { 701 if (filter == null) { 702 filter = DestinationFilter.parseFilter(this); 703 } 704 return filter; 705 } 706 707 /** 708 * @return the associated paths associated with this Destination 709 */ 710 public String[] getDestinationPaths() { 711 if (paths == null) { 712 paths = DestinationPath.getDestinationPaths(physicalName); 713 } 714 return paths; 715 } 716 717 /** 718 * @return stats for this destination 719 */ 720 public JMSDestinationStats getStats() { 721 if (stats == null) { 722 stats = createDestinationStats(); 723 } 724 return stats; 725 } 726 727 728 /** 729 * @param stats 730 */ 731 public void setStats(JMSDestinationStats stats) { 732 this.stats = stats; 733 } 734 735 /** 736 * increment counter for number of interested consumers 737 */ 738 synchronized public void incrementConsumerCounter() { 739 consumerCounter++; 740 } 741 742 /** 743 * descrement counter for number interested consumers 744 */ 745 synchronized public void decrementConsumerCounter() { 746 consumerCounter--; 747 } 748 749 /** 750 * @return true if this destination is deleted 751 */ 752 synchronized public boolean isDeleted() { 753 return deleted; 754 } 755 756 /** 757 * det the deleted flag to the new value 758 * @param value 759 */ 760 synchronized public void setDeleted(boolean value){ 761 deleted = value; 762 } 763 764 /** 765 * Used to Deletes a temporary destination. If there are existing consumers 766 * still using it, a <CODE>JMSException</CODE> will be thrown. 767 * 768 * @throws JMSException if the JMS provider fails to delete the 769 * temporary queue due to some internal error. 770 */ 771 synchronized public void delete() throws JMSException { 772 if (consumerCounter != 0) { 773 throw new JMSException("A consumer is still using this temporary queue."); 774 } 775 if (sessionCreatedBy != null) { 776 sessionCreatedBy.removeTemporaryDestination(this); 777 } 778 deleted = true; 779 } 780 781 782 // Implementation methods 783 //------------------------------------------------------------------------- 784 785 786 /** 787 * Factory method to create a child destination if this destination is a composite 788 * @param name 789 * @return the created Destination 790 */ 791 protected abstract Destination createDestination(String name); 792 793 /** 794 * Factory method to create a statistics counter object 795 * 796 * @return 797 */ 798 protected abstract JMSDestinationStats createDestinationStats(); 799 800 /** 801 * Set the properties that will represent the instance in JNDI 802 * 803 * @param props 804 */ 805 protected void buildFromProperties(Properties props) { 806 this.physicalName = props.getProperty("physicalName", this.physicalName); 807 808 } 809 810 /** 811 * Initialize the instance from properties stored in JNDI 812 * 813 * @param props 814 */ 815 816 protected void populateProperties(Properties props) { 817 props.put("physicalName", this.physicalName); 818 } 819 820 /** 821 * @return the topic that is used for this destination's temporary destination advisories. 822 */ 823 public ActiveMQTopic getTopicForTempAdvisory() { 824 String destName = ActiveMQDestination.TEMP_DESTINATION_ADVISORY_PREFIX+getAdvisoryDestinationTypePrefix()+getPhysicalName(); 825 return new ActiveMQTopic(destName); 826 } 827 828 /** 829 * @return the topic that is used for this destination's consumer advisories. 830 */ 831 public ActiveMQTopic getTopicForConsumerAdvisory() { 832 String destName = ActiveMQDestination.CONSUMER_ADVISORY_PREFIX+getAdvisoryDestinationTypePrefix()+getPhysicalName(); 833 return new ActiveMQTopic(destName); 834 } 835 836 /** 837 * @return the topic that is used for this destination's producer advisories. 838 */ 839 public ActiveMQTopic getTopicForProducerAdvisory() { 840 String destName = ActiveMQDestination.PRODUCER_ADVISORY_PREFIX+getAdvisoryDestinationTypePrefix()+getPhysicalName(); 841 return new ActiveMQTopic(destName); 842 } 843 844 /** 845 * @return null if this destination is not an advisory topic, else it returns the 846 * destination that the advisories are related. 847 */ 848 public ActiveMQDestination getDestinationBeingAdvised() { 849 if( isConnectionAdvisory() ) { 850 return null; 851 } else if( isConsumerAdvisory() ) { 852 String matchName = physicalName.substring(ActiveMQDestination.CONSUMER_ADVISORY_PREFIX.length()); 853 return createDestinationFromAdvisoryName(matchName); 854 } else if ( isProducerAdvisory() ) { 855 String matchName = physicalName.substring(ActiveMQDestination.PRODUCER_ADVISORY_PREFIX.length()); 856 return createDestinationFromAdvisoryName(matchName); 857 } else if ( isTempDestinationAdvisory() ) { 858 String matchName = physicalName.substring(ActiveMQDestination.TEMP_DESTINATION_ADVISORY_PREFIX.length()); 859 return createDestinationFromAdvisoryName(matchName); 860 } 861 return null; 862 } 863 864 private String getAdvisoryDestinationTypePrefix() { 865 switch( getDestinationType() ) { 866 case ACTIVEMQ_TOPIC: 867 return "topic."; 868 case ACTIVEMQ_QUEUE: 869 return "queue."; 870 case ACTIVEMQ_TEMPORARY_TOPIC: 871 return "temp-topic."; 872 case ACTIVEMQ_TEMPORARY_QUEUE: 873 return "temp-queue."; 874 } 875 throw new RuntimeException("Invlaid destiantion type: "+getDestinationType()); 876 } 877 878 /** 879 * @param advisoryName 880 * @return 881 */ 882 private ActiveMQDestination createDestinationFromAdvisoryName(String advisoryName) { 883 if( advisoryName.startsWith("topic.") ) { 884 String name = advisoryName.substring("topic.".length()); 885 return createDestination(ACTIVEMQ_TOPIC, name); 886 } else if( advisoryName.startsWith("queue.") ) { 887 String name = advisoryName.substring("queue.".length()); 888 return createDestination(ACTIVEMQ_QUEUE, name); 889 } else if( advisoryName.startsWith("temp-topic.") ) { 890 String name = advisoryName.substring("temp-topic.".length()); 891 return createDestination(ACTIVEMQ_TEMPORARY_TOPIC, name); 892 } else if( advisoryName.startsWith("temp-queue.") ) { 893 String name = advisoryName.substring("temp-queue.".length()); 894 return createDestination(ACTIVEMQ_TEMPORARY_QUEUE, name); 895 } 896 return null; 897 } 898 }