001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import java.io.DataInputStream; 020 import java.io.DataOutputStream; 021 import java.io.IOException; 022 import java.util.Collections; 023 import java.util.HashMap; 024 import java.util.Map; 025 import javax.jms.JMSException; 026 027 import org.apache.activemq.ActiveMQConnection; 028 import org.apache.activemq.advisory.AdvisorySupport; 029 import org.apache.activemq.broker.region.Destination; 030 import org.apache.activemq.broker.region.MessageReference; 031 import org.apache.activemq.usage.MemoryUsage; 032 import org.apache.activemq.util.ByteArrayInputStream; 033 import org.apache.activemq.util.ByteArrayOutputStream; 034 import org.apache.activemq.util.ByteSequence; 035 import org.apache.activemq.util.MarshallingSupport; 036 import org.apache.activemq.wireformat.WireFormat; 037 038 /** 039 * Represents an ActiveMQ message 040 * 041 * @openwire:marshaller 042 * @version $Revision$ 043 */ 044 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 045 046 /** 047 * The default minimum amount of memory a message is assumed to use 048 */ 049 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 050 051 protected MessageId messageId; 052 protected ActiveMQDestination originalDestination; 053 protected TransactionId originalTransactionId; 054 055 protected ProducerId producerId; 056 protected ActiveMQDestination destination; 057 protected TransactionId transactionId; 058 059 protected long expiration; 060 protected long timestamp; 061 protected long arrival; 062 protected long brokerInTime; 063 protected long brokerOutTime; 064 protected String correlationId; 065 protected ActiveMQDestination replyTo; 066 protected boolean persistent; 067 protected String type; 068 protected byte priority; 069 protected String groupID; 070 protected int groupSequence; 071 protected ConsumerId targetConsumerId; 072 protected boolean compressed; 073 protected String userID; 074 075 protected ByteSequence content; 076 protected ByteSequence marshalledProperties; 077 protected DataStructure dataStructure; 078 protected int redeliveryCounter; 079 080 protected int size; 081 protected Map<String, Object> properties; 082 protected boolean readOnlyProperties; 083 protected boolean readOnlyBody; 084 protected transient boolean recievedByDFBridge; 085 protected boolean droppable; 086 087 private transient short referenceCount; 088 private transient ActiveMQConnection connection; 089 private transient org.apache.activemq.broker.region.Destination regionDestination; 090 private transient MemoryUsage memoryUsage; 091 092 private BrokerId[] brokerPath; 093 private BrokerId[] cluster; 094 095 public abstract Message copy(); 096 public abstract void clearBody() throws JMSException; 097 098 protected void copy(Message copy) { 099 super.copy(copy); 100 copy.producerId = producerId; 101 copy.transactionId = transactionId; 102 copy.destination = destination; 103 copy.messageId = messageId != null ? messageId.copy() : null; 104 copy.originalDestination = originalDestination; 105 copy.originalTransactionId = originalTransactionId; 106 copy.expiration = expiration; 107 copy.timestamp = timestamp; 108 copy.correlationId = correlationId; 109 copy.replyTo = replyTo; 110 copy.persistent = persistent; 111 copy.redeliveryCounter = redeliveryCounter; 112 copy.type = type; 113 copy.priority = priority; 114 copy.size = size; 115 copy.groupID = groupID; 116 copy.userID = userID; 117 copy.groupSequence = groupSequence; 118 119 if (properties != null) { 120 copy.properties = new HashMap<String, Object>(properties); 121 } else { 122 copy.properties = properties; 123 } 124 125 copy.content = content; 126 copy.marshalledProperties = marshalledProperties; 127 copy.dataStructure = dataStructure; 128 copy.readOnlyProperties = readOnlyProperties; 129 copy.readOnlyBody = readOnlyBody; 130 copy.compressed = compressed; 131 copy.recievedByDFBridge = recievedByDFBridge; 132 133 copy.arrival = arrival; 134 copy.connection = connection; 135 copy.regionDestination = regionDestination; 136 copy.brokerInTime = brokerInTime; 137 copy.brokerOutTime = brokerOutTime; 138 copy.memoryUsage=this.memoryUsage; 139 copy.brokerPath = brokerPath; 140 141 // lets not copy the following fields 142 // copy.targetConsumerId = targetConsumerId; 143 // copy.referenceCount = referenceCount; 144 } 145 146 public Object getProperty(String name) throws IOException { 147 if (properties == null) { 148 if (marshalledProperties == null) { 149 return null; 150 } 151 properties = unmarsallProperties(marshalledProperties); 152 } 153 return properties.get(name); 154 } 155 156 @SuppressWarnings("unchecked") 157 public Map<String, Object> getProperties() throws IOException { 158 if (properties == null) { 159 if (marshalledProperties == null) { 160 return Collections.EMPTY_MAP; 161 } 162 properties = unmarsallProperties(marshalledProperties); 163 } 164 return Collections.unmodifiableMap(properties); 165 } 166 167 public void clearProperties() { 168 marshalledProperties = null; 169 properties = null; 170 } 171 172 public void setProperty(String name, Object value) throws IOException { 173 lazyCreateProperties(); 174 properties.put(name, value); 175 } 176 177 protected void lazyCreateProperties() throws IOException { 178 if (properties == null) { 179 if (marshalledProperties == null) { 180 properties = new HashMap<String, Object>(); 181 } else { 182 properties = unmarsallProperties(marshalledProperties); 183 marshalledProperties = null; 184 } 185 } 186 } 187 188 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 189 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 190 } 191 192 public void beforeMarshall(WireFormat wireFormat) throws IOException { 193 // Need to marshal the properties. 194 if (marshalledProperties == null && properties != null) { 195 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 196 DataOutputStream os = new DataOutputStream(baos); 197 MarshallingSupport.marshalPrimitiveMap(properties, os); 198 os.close(); 199 marshalledProperties = baos.toByteSequence(); 200 } 201 } 202 203 public void afterMarshall(WireFormat wireFormat) throws IOException { 204 } 205 206 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 207 } 208 209 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 210 } 211 212 // ///////////////////////////////////////////////////////////////// 213 // 214 // Simple Field accessors 215 // 216 // ///////////////////////////////////////////////////////////////// 217 218 /** 219 * @openwire:property version=1 cache=true 220 */ 221 public ProducerId getProducerId() { 222 return producerId; 223 } 224 225 public void setProducerId(ProducerId producerId) { 226 this.producerId = producerId; 227 } 228 229 /** 230 * @openwire:property version=1 cache=true 231 */ 232 public ActiveMQDestination getDestination() { 233 return destination; 234 } 235 236 public void setDestination(ActiveMQDestination destination) { 237 this.destination = destination; 238 } 239 240 /** 241 * @openwire:property version=1 cache=true 242 */ 243 public TransactionId getTransactionId() { 244 return transactionId; 245 } 246 247 public void setTransactionId(TransactionId transactionId) { 248 this.transactionId = transactionId; 249 } 250 251 public boolean isInTransaction() { 252 return transactionId != null; 253 } 254 255 /** 256 * @openwire:property version=1 cache=true 257 */ 258 public ActiveMQDestination getOriginalDestination() { 259 return originalDestination; 260 } 261 262 public void setOriginalDestination(ActiveMQDestination destination) { 263 this.originalDestination = destination; 264 } 265 266 /** 267 * @openwire:property version=1 268 */ 269 public MessageId getMessageId() { 270 return messageId; 271 } 272 273 public void setMessageId(MessageId messageId) { 274 this.messageId = messageId; 275 } 276 277 /** 278 * @openwire:property version=1 cache=true 279 */ 280 public TransactionId getOriginalTransactionId() { 281 return originalTransactionId; 282 } 283 284 public void setOriginalTransactionId(TransactionId transactionId) { 285 this.originalTransactionId = transactionId; 286 } 287 288 /** 289 * @openwire:property version=1 290 */ 291 public String getGroupID() { 292 return groupID; 293 } 294 295 public void setGroupID(String groupID) { 296 this.groupID = groupID; 297 } 298 299 /** 300 * @openwire:property version=1 301 */ 302 public int getGroupSequence() { 303 return groupSequence; 304 } 305 306 public void setGroupSequence(int groupSequence) { 307 this.groupSequence = groupSequence; 308 } 309 310 /** 311 * @openwire:property version=1 312 */ 313 public String getCorrelationId() { 314 return correlationId; 315 } 316 317 public void setCorrelationId(String correlationId) { 318 this.correlationId = correlationId; 319 } 320 321 /** 322 * @openwire:property version=1 323 */ 324 public boolean isPersistent() { 325 return persistent; 326 } 327 328 public void setPersistent(boolean deliveryMode) { 329 this.persistent = deliveryMode; 330 } 331 332 /** 333 * @openwire:property version=1 334 */ 335 public long getExpiration() { 336 return expiration; 337 } 338 339 public void setExpiration(long expiration) { 340 this.expiration = expiration; 341 } 342 343 /** 344 * @openwire:property version=1 345 */ 346 public byte getPriority() { 347 return priority; 348 } 349 350 public void setPriority(byte priority) { 351 this.priority = priority; 352 } 353 354 /** 355 * @openwire:property version=1 356 */ 357 public ActiveMQDestination getReplyTo() { 358 return replyTo; 359 } 360 361 public void setReplyTo(ActiveMQDestination replyTo) { 362 this.replyTo = replyTo; 363 } 364 365 /** 366 * @openwire:property version=1 367 */ 368 public long getTimestamp() { 369 return timestamp; 370 } 371 372 public void setTimestamp(long timestamp) { 373 this.timestamp = timestamp; 374 } 375 376 /** 377 * @openwire:property version=1 378 */ 379 public String getType() { 380 return type; 381 } 382 383 public void setType(String type) { 384 this.type = type; 385 } 386 387 /** 388 * @openwire:property version=1 389 */ 390 public ByteSequence getContent() { 391 return content; 392 } 393 394 public void setContent(ByteSequence content) { 395 this.content = content; 396 } 397 398 /** 399 * @openwire:property version=1 400 */ 401 public ByteSequence getMarshalledProperties() { 402 return marshalledProperties; 403 } 404 405 public void setMarshalledProperties(ByteSequence marshalledProperties) { 406 this.marshalledProperties = marshalledProperties; 407 } 408 409 /** 410 * @openwire:property version=1 411 */ 412 public DataStructure getDataStructure() { 413 return dataStructure; 414 } 415 416 public void setDataStructure(DataStructure data) { 417 this.dataStructure = data; 418 } 419 420 /** 421 * Can be used to route the message to a specific consumer. Should be null 422 * to allow the broker use normal JMS routing semantics. If the target 423 * consumer id is an active consumer on the broker, the message is dropped. 424 * Used by the AdvisoryBroker to replay advisory messages to a specific 425 * consumer. 426 * 427 * @openwire:property version=1 cache=true 428 */ 429 public ConsumerId getTargetConsumerId() { 430 return targetConsumerId; 431 } 432 433 public void setTargetConsumerId(ConsumerId targetConsumerId) { 434 this.targetConsumerId = targetConsumerId; 435 } 436 437 public boolean isExpired() { 438 long expireTime = getExpiration(); 439 return expireTime > 0 && System.currentTimeMillis() > expireTime; 440 } 441 442 public boolean isAdvisory() { 443 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 444 } 445 446 /** 447 * @openwire:property version=1 448 */ 449 public boolean isCompressed() { 450 return compressed; 451 } 452 453 public void setCompressed(boolean compressed) { 454 this.compressed = compressed; 455 } 456 457 public boolean isRedelivered() { 458 return redeliveryCounter > 0; 459 } 460 461 public void setRedelivered(boolean redelivered) { 462 if (redelivered) { 463 if (!isRedelivered()) { 464 setRedeliveryCounter(1); 465 } 466 } else { 467 if (isRedelivered()) { 468 setRedeliveryCounter(0); 469 } 470 } 471 } 472 473 public void incrementRedeliveryCounter() { 474 redeliveryCounter++; 475 } 476 477 /** 478 * @openwire:property version=1 479 */ 480 public int getRedeliveryCounter() { 481 return redeliveryCounter; 482 } 483 484 public void setRedeliveryCounter(int deliveryCounter) { 485 this.redeliveryCounter = deliveryCounter; 486 } 487 488 /** 489 * The route of brokers the command has moved through. 490 * 491 * @openwire:property version=1 cache=true 492 */ 493 public BrokerId[] getBrokerPath() { 494 return brokerPath; 495 } 496 497 public void setBrokerPath(BrokerId[] brokerPath) { 498 this.brokerPath = brokerPath; 499 } 500 501 public boolean isReadOnlyProperties() { 502 return readOnlyProperties; 503 } 504 505 public void setReadOnlyProperties(boolean readOnlyProperties) { 506 this.readOnlyProperties = readOnlyProperties; 507 } 508 509 public boolean isReadOnlyBody() { 510 return readOnlyBody; 511 } 512 513 public void setReadOnlyBody(boolean readOnlyBody) { 514 this.readOnlyBody = readOnlyBody; 515 } 516 517 public ActiveMQConnection getConnection() { 518 return this.connection; 519 } 520 521 public void setConnection(ActiveMQConnection connection) { 522 this.connection = connection; 523 } 524 525 /** 526 * Used to schedule the arrival time of a message to a broker. The broker 527 * will not dispatch a message to a consumer until it's arrival time has 528 * elapsed. 529 * 530 * @openwire:property version=1 531 */ 532 public long getArrival() { 533 return arrival; 534 } 535 536 public void setArrival(long arrival) { 537 this.arrival = arrival; 538 } 539 540 /** 541 * Only set by the broker and defines the userID of the producer connection 542 * who sent this message. This is an optional field, it needs to be enabled 543 * on the broker to have this field populated. 544 * 545 * @openwire:property version=1 546 */ 547 public String getUserID() { 548 return userID; 549 } 550 551 public void setUserID(String jmsxUserID) { 552 this.userID = jmsxUserID; 553 } 554 555 public int getReferenceCount() { 556 return referenceCount; 557 } 558 559 public Message getMessageHardRef() { 560 return this; 561 } 562 563 public Message getMessage() throws IOException { 564 return this; 565 } 566 567 public org.apache.activemq.broker.region.Destination getRegionDestination() { 568 return regionDestination; 569 } 570 571 public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) { 572 this.regionDestination = destination; 573 if(this.memoryUsage==null) { 574 this.memoryUsage=regionDestination.getMemoryUsage(); 575 } 576 } 577 578 public MemoryUsage getMemoryUsage() { 579 return this.memoryUsage; 580 } 581 582 public void setMemoryUsage(MemoryUsage usage) { 583 this.memoryUsage=usage; 584 } 585 586 public boolean isMarshallAware() { 587 return true; 588 } 589 590 public int incrementReferenceCount() { 591 int rc; 592 int size; 593 synchronized (this) { 594 rc = ++referenceCount; 595 size = getSize(); 596 } 597 598 if (rc == 1 && getMemoryUsage() != null) { 599 getMemoryUsage().increaseUsage(size); 600 } 601 602 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 603 return rc; 604 } 605 606 public int decrementReferenceCount() { 607 int rc; 608 int size; 609 synchronized (this) { 610 rc = --referenceCount; 611 size = getSize(); 612 } 613 614 if (rc == 0 && getMemoryUsage() != null) { 615 getMemoryUsage().decreaseUsage(size); 616 } 617 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 618 619 return rc; 620 } 621 622 public int getSize() { 623 int minimumMessageSize = getMinimumMessageSize(); 624 if (size < minimumMessageSize || size == 0) { 625 size = minimumMessageSize; 626 if (marshalledProperties != null) { 627 size += marshalledProperties.getLength(); 628 } 629 if (content != null) { 630 size += content.getLength(); 631 } 632 } 633 return size; 634 } 635 636 protected int getMinimumMessageSize() { 637 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 638 //let destination override 639 Destination dest = regionDestination; 640 if (dest != null) { 641 result=dest.getMinimumMessageSize(); 642 } 643 return result; 644 } 645 646 /** 647 * @openwire:property version=1 648 * @return Returns the recievedByDFBridge. 649 */ 650 public boolean isRecievedByDFBridge() { 651 return recievedByDFBridge; 652 } 653 654 /** 655 * @param recievedByDFBridge The recievedByDFBridge to set. 656 */ 657 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 658 this.recievedByDFBridge = recievedByDFBridge; 659 } 660 661 public void onMessageRolledBack() { 662 incrementRedeliveryCounter(); 663 } 664 665 /** 666 * @openwire:property version=2 cache=true 667 */ 668 public boolean isDroppable() { 669 return droppable; 670 } 671 672 public void setDroppable(boolean droppable) { 673 this.droppable = droppable; 674 } 675 676 /** 677 * If a message is stored in multiple nodes on a cluster, all the cluster 678 * members will be listed here. Otherwise, it will be null. 679 * 680 * @openwire:property version=3 cache=true 681 */ 682 public BrokerId[] getCluster() { 683 return cluster; 684 } 685 686 public void setCluster(BrokerId[] cluster) { 687 this.cluster = cluster; 688 } 689 690 public boolean isMessage() { 691 return true; 692 } 693 694 /** 695 * @openwire:property version=3 696 */ 697 public long getBrokerInTime() { 698 return this.brokerInTime; 699 } 700 701 public void setBrokerInTime(long brokerInTime) { 702 this.brokerInTime = brokerInTime; 703 } 704 705 /** 706 * @openwire:property version=3 707 */ 708 public long getBrokerOutTime() { 709 return this.brokerOutTime; 710 } 711 712 public void setBrokerOutTime(long brokerOutTime) { 713 this.brokerOutTime = brokerOutTime; 714 } 715 716 public boolean isDropped() { 717 return false; 718 } 719 720 public String toString() { 721 return toString(null); 722 } 723 724 public String toString(Map<String, Object>overrideFields) { 725 try { 726 getProperties(); 727 } catch (IOException e) { 728 } 729 return super.toString(overrideFields); 730 } 731 }