001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    
019    package org.activemq.message;
020    
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.ArrayList;
024    import java.util.StringTokenizer;
025    
026    import org.activemq.util.BitArray;
027    import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
028    
029    /**
030     * Abstract class for a transportable Packet
031     *
032     * @version $Revision: 1.1.1.1 $
033     */
034    public abstract class AbstractPacket implements Packet {
035        
036        /**
037         * Message flag indexes (used for writing/reading to/from a Stream
038         */
039        public static final int RECEIPT_REQUIRED_INDEX = 0;
040        public static final int BROKERS_VISITED_INDEX =1;
041        private short id = 0;
042        protected BitArray bitArray;
043        protected transient int cachedHashCode = -1;
044        private  boolean receiptRequired;
045        private transient int memoryUsage = 2048;
046        private transient int memoryUsageReferenceCount;
047    
048        private CopyOnWriteArraySet brokersVisited;
049       
050        protected AbstractPacket(){
051            this.bitArray = new BitArray();
052        }
053    
054        /**
055         * @return the unique id for this Packet
056         */
057        public short getId() {
058            return this.id;
059        }
060    
061        /**
062         * Set the unique id for this Packet
063         *
064         * @param newId
065         */
066        public void setId(short newId) {
067            this.id = newId;
068        }
069    
070        /**
071         * @return true if a Recipt is required
072         */
073        public boolean isReceiptRequired() {
074            return this.receiptRequired;
075        }
076    
077        /**
078         * @return false since most packets are not receipt packets
079         */
080        public boolean isReceipt() {
081            return false;
082        }
083    
084        /**
085         * Set if a Recipt if required on receiving this Packet
086         *
087         * @param value
088         */
089        public void setReceiptRequired(boolean value) {
090            this.receiptRequired = value;
091        }
092    
093        /**
094         * Retrieve if a JMS Message type or not
095         *
096         * @return true if it is a JMS Message
097         */
098        public boolean isJMSMessage() {
099            return false;
100        }
101    
102        /**
103         * Tests equality with another instance
104         *
105         * @param obj - the other instance to test equality with
106         * @return Returns true if the objects are equilvant
107         */
108        public boolean equals(Object obj) {
109            boolean result = this == obj;
110            if (!result && obj != null && obj instanceof AbstractPacket) {
111                AbstractPacket other = (AbstractPacket) obj;
112                result = other.id == this.id;
113            }
114            return result;
115        }
116    
117        /**
118         * @return Returns hash code for this instance
119         */
120        public int hashCode() {
121            return this.id;
122        }
123    
124        /**
125         * Get a hint about how much memory this Packet is consuming
126         *
127         * @return an aproximation of the current memory used by this instance
128         */
129        public int getMemoryUsage() {
130            return memoryUsage;
131        }
132    
133        /**
134         * Set a hint about how mujch memory this packet is consuming
135         *
136         * @param newMemoryUsage
137         */
138        public void setMemoryUsage(int newMemoryUsage) {
139            this.memoryUsage = newMemoryUsage;
140        }
141    
142        /**
143         * Increment reference count for bounded memory collections
144         *
145         * @return the incremented reference value
146         * @see org.activemq.io.util.MemoryBoundedQueue
147         */
148        public synchronized int incrementMemoryReferenceCount() {
149            return ++memoryUsageReferenceCount;
150        }
151    
152        /**
153         * Decrement reference count for bounded memory collections
154         *
155         * @return the decremented reference value
156         * @see org.activemq.io.util.MemoryBoundedQueue
157         */
158        public synchronized int decrementMemoryReferenceCount() {
159            return --memoryUsageReferenceCount;
160        }
161    
162        /**
163         * @return the current reference count for bounded memory collections
164         * @see org.activemq.io.util.MemoryBoundedQueue
165         */
166        public synchronized int getMemoryUsageReferenceCount() {
167            return memoryUsageReferenceCount;
168        }
169    
170        /**
171         * As the packet passes through the broker add the broker to the visited list
172         *
173         * @param brokerName the name of the broker
174         */
175        public void addBrokerVisited(String brokerName) {
176            if (brokerName == null || brokerName.trim().equals("")) {
177                throw new IllegalArgumentException("Broker name cannot be empty or null");
178            }
179            initializeBrokersVisited();
180            brokersVisited.add(brokerName);
181        }
182        
183        /**
184         * clear list of brokers visited
185         */
186        public void clearBrokersVisited(){
187            brokersVisited = null;
188        }
189    
190        /**
191         * test to see if the named broker has already seen this packet
192         *
193         * @param brokerName the name of the broker
194         * @return true if the packet has visited the broker
195         */
196        public boolean hasVisited(String brokerName) {
197            if (brokersVisited == null){
198                return false;
199            }
200            return brokersVisited.contains(brokerName);
201        }
202    
203        /**
204         * @return Returns the brokersVisited.
205         */
206        public String getBrokersVisitedAsString() {
207            String result = "";
208            if (brokersVisited != null && !brokersVisited.isEmpty()){
209                for (Iterator i = brokersVisited.iterator(); i.hasNext();){
210                    result += i.next().toString() + ",";
211                }
212            }
213            return result;
214        }
215    
216        public void setBrokersVisitedAsString(String value) {
217            initializeBrokersVisited();
218            StringTokenizer enm = new StringTokenizer(value, ",");
219            while (enm.hasMoreElements()) {
220                brokersVisited.add(enm.nextToken());
221            }
222        }
223    
224        /**
225         * @return pretty print of this Packet
226         */
227        public String toString() {
228            return getPacketTypeAsString(getPacketType()) + ": id = " + getId();
229        }
230    
231    
232        public static String getPacketTypeAsString(int type) {
233            String packetTypeStr = "";
234            switch (type) {
235                case ACTIVEMQ_MESSAGE:
236                    packetTypeStr = "ACTIVEMQ_MESSAGE";
237                    break;
238                case ACTIVEMQ_TEXT_MESSAGE:
239                    packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
240                    break;
241                case ACTIVEMQ_OBJECT_MESSAGE:
242                    packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
243                    break;
244                case ACTIVEMQ_BYTES_MESSAGE:
245                    packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
246                    break;
247                case ACTIVEMQ_STREAM_MESSAGE:
248                    packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
249                    break;
250                case ACTIVEMQ_MAP_MESSAGE:
251                    packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
252                    break;
253                case ACTIVEMQ_MSG_ACK:
254                    packetTypeStr = "ACTIVEMQ_MSG_ACK";
255                    break;
256                case RECEIPT_INFO:
257                    packetTypeStr = "RECEIPT_INFO";
258                    break;
259                case CONSUMER_INFO:
260                    packetTypeStr = "CONSUMER_INFO";
261                    break;
262                case PRODUCER_INFO:
263                    packetTypeStr = "PRODUCER_INFO";
264                    break;
265                case TRANSACTION_INFO:
266                    packetTypeStr = "TRANSACTION_INFO";
267                    break;
268                case XA_TRANSACTION_INFO:
269                    packetTypeStr = "XA_TRANSACTION_INFO";
270                    break;
271                case ACTIVEMQ_BROKER_INFO:
272                    packetTypeStr = "ACTIVEMQ_BROKER_INFO";
273                    break;
274                case ACTIVEMQ_CONNECTION_INFO:
275                    packetTypeStr = "ACTIVEMQ_CONNECTION_INFO";
276                    break;
277                case SESSION_INFO:
278                    packetTypeStr = "SESSION_INFO";
279                    break;
280                case DURABLE_UNSUBSCRIBE:
281                    packetTypeStr = "DURABLE_UNSUBSCRIBE";
282                    break;
283                case RESPONSE_RECEIPT_INFO:
284                    packetTypeStr = "RESPONSE_RECEIPT_INFO";
285                    break;
286                case INT_RESPONSE_RECEIPT_INFO:
287                    packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
288                    break;
289                case CAPACITY_INFO:
290                    packetTypeStr = "CAPACITY_INFO";
291                    break;
292                case CAPACITY_INFO_REQUEST:
293                    packetTypeStr = "CAPACITY_INFO_REQUEST";
294                    break;
295                case WIRE_FORMAT_INFO:
296                    packetTypeStr = "WIRE_FORMAT_INFO";
297                    break;
298                case KEEP_ALIVE:
299                    packetTypeStr = "KEEP_ALIVE";
300                    break;
301                case CACHED_VALUE_COMMAND:
302                    packetTypeStr = "CachedValue";
303                    break;
304                default :
305                    packetTypeStr = "UNKNOWN PACKET TYPE: " + type;
306            }
307            return packetTypeStr;
308        }
309    
310        /**
311         * A helper method used when implementing equals() which returns true if the objects are identical or equal handling
312         * nulls properly
313         * @param left
314         * @param right
315         *
316         * @return true if the objects are the same or equal or both null
317         */
318        protected boolean equals(Object left, Object right) {
319            return left == right || (left != null && left.equals(right));
320        }
321        
322        /**
323         * Initializes another message with current values from this instance
324         *
325         * @param other the other ActiveMQMessage to initialize
326         */
327        protected void initializeOther(AbstractPacket other) {
328            initializeBrokersVisited();
329            other.id = this.id;
330            other.receiptRequired = this.receiptRequired;
331            other.memoryUsage = this.memoryUsage;
332            CopyOnWriteArraySet set = this.brokersVisited;
333            if (set != null && !set.isEmpty()){
334                other.brokersVisited = new CopyOnWriteArraySet(set);
335            }
336        }
337        
338        synchronized void initializeBrokersVisited(){
339            if (this.brokersVisited == null){
340                this.brokersVisited = new CopyOnWriteArraySet();
341            }
342        }
343    
344        /**
345         * @return Returns the brokersVisited.
346         */
347        public Object[] getBrokersVisited() {
348            if (brokersVisited == null || brokersVisited.isEmpty()){
349                return null;
350            }
351            return brokersVisited.toArray();
352        }
353    
354        /**
355         * @return Returns the bitArray.
356         */
357        public BitArray getBitArray() {
358            return bitArray;
359        }
360        /**
361         * @param bitArray The bitArray to set.
362         */
363        public void setBitArray(BitArray bitArray) {
364            this.bitArray = bitArray;
365        }
366    }