001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.Map.Entry;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQQueue;
030    import org.apache.activemq.command.ActiveMQTempQueue;
031    import org.apache.activemq.command.ActiveMQTempTopic;
032    import org.apache.activemq.command.ActiveMQTopic;
033    import org.apache.activemq.command.LocalTransactionId;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.command.TransactionId;
039    import org.apache.activemq.command.XATransactionId;
040    import org.apache.activemq.openwire.OpenWireFormat;
041    import org.apache.activemq.protobuf.Buffer;
042    import org.apache.activemq.store.AbstractMessageStore;
043    import org.apache.activemq.store.MessageRecoveryListener;
044    import org.apache.activemq.store.MessageStore;
045    import org.apache.activemq.store.PersistenceAdapter;
046    import org.apache.activemq.store.TopicMessageStore;
047    import org.apache.activemq.store.TransactionRecoveryListener;
048    import org.apache.activemq.store.TransactionStore;
049    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
050    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
051    import org.apache.activemq.store.kahadb.data.KahaDestination;
052    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
053    import org.apache.activemq.store.kahadb.data.KahaLocation;
054    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
055    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
056    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
057    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
058    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
059    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
060    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
061    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
062    import org.apache.activemq.usage.MemoryUsage;
063    import org.apache.activemq.usage.SystemUsage;
064    import org.apache.activemq.wireformat.WireFormat;
065    import org.apache.kahadb.journal.Location;
066    import org.apache.kahadb.page.Transaction;
067    
068    
069    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
070    
071        private final WireFormat wireFormat = new OpenWireFormat();
072    
073        public void setBrokerName(String brokerName) {
074        }
075        public void setUsageManager(SystemUsage usageManager) {
076        }
077    
078        public TransactionStore createTransactionStore() throws IOException {
079            return new TransactionStore(){
080                
081                public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
082                    store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
083                }
084                public void prepare(TransactionId txid) throws IOException {
085                    store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
086                }
087                public void rollback(TransactionId txid) throws IOException {
088                    store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
089                }
090                public void recover(TransactionRecoveryListener listener) throws IOException {
091                    for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
092                        XATransactionId xid = (XATransactionId)entry.getKey();
093                        ArrayList<Message> messageList = new ArrayList<Message>();
094                        ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
095                        
096                        for (Operation op : entry.getValue()) {
097                            if( op.getClass() == AddOpperation.class ) {
098                                AddOpperation addOp = (AddOpperation)op;
099                                Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
100                                messageList.add(msg);
101                            } else {
102                                RemoveOpperation rmOp = (RemoveOpperation)op;
103                                MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
104                                ackList.add(ack);
105                            }
106                        }
107                        
108                        Message[] addedMessages = new Message[messageList.size()];
109                        MessageAck[] acks = new MessageAck[ackList.size()];
110                        messageList.toArray(addedMessages);
111                        ackList.toArray(acks);
112                        listener.recover(xid, addedMessages, acks);
113                    }
114                }
115                public void start() throws Exception {
116                }
117                public void stop() throws Exception {
118                }
119            };
120        }
121    
122        public class KahaDBMessageStore extends AbstractMessageStore {
123            protected KahaDestination dest;
124    
125            public KahaDBMessageStore(ActiveMQDestination destination) {
126                super(destination);
127                this.dest = convert( destination );
128            }
129    
130            @Override
131            public ActiveMQDestination getDestination() {
132                return destination;
133            }
134    
135            public void addMessage(ConnectionContext context, Message message) throws IOException {
136                KahaAddMessageCommand command = new KahaAddMessageCommand();
137                command.setDestination(dest);
138                command.setMessageId(message.getMessageId().toString());
139                command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) );
140    
141                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
142                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
143    
144                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
145                
146            }
147            
148            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
149                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
150                command.setDestination(dest);
151                command.setMessageId(ack.getLastMessageId().toString());
152                command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
153                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
154            }
155    
156            public void removeAllMessages(ConnectionContext context) throws IOException {
157                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
158                command.setDestination(dest);
159                store(command, true);
160            }
161    
162            public Message getMessage(MessageId identity) throws IOException {
163                final String key = identity.toString();
164                
165                // Hopefully one day the page file supports concurrent read operations... but for now we must
166                // externally synchronize...
167                Location location;
168                synchronized(indexMutex) {
169                    location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
170                        public Location execute(Transaction tx) throws IOException {
171                            StoredDestination sd = getStoredDestination(dest, tx);
172                            Long sequence = sd.messageIdIndex.get(tx, key);
173                            if( sequence ==null ) {
174                                return null;
175                            }
176                            return sd.orderIndex.get(tx, sequence).location;
177                        }
178                    });
179                }
180                if( location == null ) {
181                    return null;
182                }
183                
184                return loadMessage(location);
185            }
186            
187            public int getMessageCount() throws IOException {
188                synchronized(indexMutex) {
189                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
190                        public Integer execute(Transaction tx) throws IOException {
191                            // Iterate through all index entries to get a count of messages in the destination.
192                            StoredDestination sd = getStoredDestination(dest, tx);
193                            int rc=0;
194                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
195                                iterator.next();
196                                rc++;
197                            }
198                            return rc;
199                        }
200                    });
201                }
202            }
203            
204            public boolean isEmpty() throws IOException {
205                synchronized(indexMutex) {
206                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
207                        public Boolean execute(Transaction tx) throws IOException {
208                            // Iterate through all index entries to get a count of messages in the destination.
209                            StoredDestination sd = getStoredDestination(dest, tx);
210                            return sd.locationIndex.isEmpty(tx);
211                        }
212                    });
213                }
214            }
215    
216    
217            public void recover(final MessageRecoveryListener listener) throws Exception {
218                synchronized(indexMutex) {
219                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
220                        public void execute(Transaction tx) throws Exception {
221                            StoredDestination sd = getStoredDestination(dest, tx);
222                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
223                                Entry<Long, MessageKeys> entry = iterator.next();
224                                listener.recoverMessage( loadMessage(entry.getValue().location) );
225                            }
226                        }
227                    });
228                }
229            }
230    
231            long cursorPos=0;
232            
233            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
234                synchronized(indexMutex) {
235                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
236                        public void execute(Transaction tx) throws Exception {
237                            StoredDestination sd = getStoredDestination(dest, tx);
238                            Entry<Long, MessageKeys> entry=null;
239                            int counter = 0;
240                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
241                                entry = iterator.next();
242                                listener.recoverMessage( loadMessage(entry.getValue().location ) );
243                                counter++;
244                                if( counter >= maxReturned ) {
245                                    break;
246                                }
247                            }
248                            if( entry!=null ) {
249                                cursorPos = entry.getKey()+1;
250                            }
251                        }
252                    });
253                }
254            }
255    
256            public void resetBatching() {
257                cursorPos=0;
258            }
259    
260            
261            @Override
262            public void setBatch(MessageId identity) throws IOException {
263                final String key = identity.toString();
264                
265                // Hopefully one day the page file supports concurrent read operations... but for now we must
266                // externally synchronize...
267                Long location;
268                synchronized(indexMutex) {
269                    location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
270                        public Long execute(Transaction tx) throws IOException {
271                            StoredDestination sd = getStoredDestination(dest, tx);
272                            return sd.messageIdIndex.get(tx, key);
273                        }
274                    });
275                }
276                if( location!=null ) {
277                    cursorPos=location+1;
278                }
279                
280            }
281    
282            @Override
283            public void setMemoryUsage(MemoryUsage memoeyUSage) {
284            }
285            @Override
286            public void start() throws Exception {
287            }
288            @Override
289            public void stop() throws Exception {
290            }
291            
292        }
293            
294        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
295            public KahaDBTopicMessageStore(ActiveMQTopic destination) {
296                super(destination);
297            }
298            
299            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
300                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
301                command.setDestination(dest);
302                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
303                command.setMessageId(messageId.toString());
304                // We are not passed a transaction info.. so we can't participate in a transaction.
305                // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
306                // to pass back to the XA recover method.
307                // command.setTransactionInfo();
308                store(command, false);
309            }
310    
311            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
312                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
313                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
314                command.setDestination(dest);
315                command.setSubscriptionKey(subscriptionKey);
316                command.setRetroactive(retroactive);
317                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
318                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
319                store(command, isEnableJournalDiskSyncs() && true);
320            }
321    
322            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
323                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
324                command.setDestination(dest);
325                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
326                store(command, isEnableJournalDiskSyncs() && true);
327            }
328    
329            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
330                
331                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
332                synchronized(indexMutex) {
333                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
334                        public void execute(Transaction tx) throws IOException {
335                            StoredDestination sd = getStoredDestination(dest, tx);
336                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
337                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
338                                SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
339                                subscriptions.add(info);
340    
341                            }
342                        }
343                    });
344                }
345                
346                SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
347                subscriptions.toArray(rc);
348                return rc;
349            }
350    
351            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
352                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
353                synchronized(indexMutex) {
354                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
355                        public SubscriptionInfo execute(Transaction tx) throws IOException {
356                            StoredDestination sd = getStoredDestination(dest, tx);
357                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
358                            if( command ==null ) {
359                                return null;
360                            }
361                            return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
362                        }
363                    });
364                }
365            }
366           
367            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
368                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
369                synchronized(indexMutex) {
370                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
371                        public Integer execute(Transaction tx) throws IOException {
372                            StoredDestination sd = getStoredDestination(dest, tx);
373                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
374                            if ( cursorPos==null ) {
375                                // The subscription might not exist.
376                                return 0;
377                            }
378                            cursorPos += 1;
379                            
380                            int counter = 0;
381                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
382                                iterator.next();
383                                counter++;
384                            }
385                            return counter;
386                        }
387                    });
388                }        
389            }
390    
391            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
392                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
393                synchronized(indexMutex) {
394                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
395                        public void execute(Transaction tx) throws Exception {
396                            StoredDestination sd = getStoredDestination(dest, tx);
397                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
398                            cursorPos += 1;
399                            
400                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
401                                Entry<Long, MessageKeys> entry = iterator.next();
402                                listener.recoverMessage( loadMessage(entry.getValue().location ) );
403                            }
404                        }
405                    });
406                }
407            }
408    
409            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
410                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
411                synchronized(indexMutex) {
412                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
413                        public void execute(Transaction tx) throws Exception {
414                            StoredDestination sd = getStoredDestination(dest, tx);
415                            Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
416                            if( cursorPos == null ) {
417                                cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
418                                cursorPos += 1;
419                            }
420                            
421                            Entry<Long, MessageKeys> entry=null;
422                            int counter = 0;
423                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
424                                entry = iterator.next();
425                                listener.recoverMessage( loadMessage(entry.getValue().location ) );
426                                counter++;
427                                if( counter >= maxReturned ) {
428                                    break;
429                                }
430                            }
431                            if( entry!=null ) {
432                                sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
433                            }
434                        }
435                    });
436                }
437            }
438    
439            public void resetBatching(String clientId, String subscriptionName) {
440                try {
441                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
442                    synchronized(indexMutex) {
443                        pageFile.tx().execute(new Transaction.Closure<IOException>(){
444                            public void execute(Transaction tx) throws IOException {
445                                StoredDestination sd = getStoredDestination(dest, tx);
446                                sd.subscriptionCursors.remove(subscriptionKey);
447                            }
448                        });
449                    }
450                } catch (IOException e) {
451                    throw new RuntimeException(e);
452                }
453            }
454        }
455    
456        String subscriptionKey(String clientId, String subscriptionName){
457            return clientId+":"+subscriptionName;
458        }
459        
460        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
461            return new KahaDBMessageStore(destination);
462        }
463    
464        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
465            return new KahaDBTopicMessageStore(destination);
466        }
467    
468        /**
469         * Cleanup method to remove any state associated with the given destination.
470         * This method does not stop the message store (it might not be cached).
471         *
472         * @param destination Destination to forget
473         */
474        public void removeQueueMessageStore(ActiveMQQueue destination) {
475        }
476    
477        /**
478         * Cleanup method to remove any state associated with the given destination
479         * This method does not stop the message store (it might not be cached).
480         *
481         * @param destination Destination to forget
482         */
483        public void removeTopicMessageStore(ActiveMQTopic destination) {
484        }
485    
486        public void deleteAllMessages() throws IOException {
487            deleteAllMessages=true;
488        }
489        
490        
491        public Set<ActiveMQDestination> getDestinations() {
492            try {
493                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
494                synchronized(indexMutex) {
495                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
496                        public void execute(Transaction tx) throws IOException {
497                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
498                                Entry<String, StoredDestination> entry = iterator.next();
499                                rc.add(convert(entry.getKey()));
500                            }
501                        }
502                    });
503                }
504                return rc;
505            } catch (IOException e) {
506                throw new RuntimeException(e);
507            }
508        }
509        
510        public long getLastMessageBrokerSequenceId() throws IOException {
511            return 0;
512        }
513        
514        public long size() {
515            if ( !started.get() ) {
516                return 0;
517            }
518            try {
519                return journal.getDiskSize() + pageFile.getDiskSize();
520            } catch (IOException e) {
521                throw new RuntimeException(e);
522            }
523        }
524    
525        public void beginTransaction(ConnectionContext context) throws IOException {
526            throw new IOException("Not yet implemented.");
527        }
528        public void commitTransaction(ConnectionContext context) throws IOException {
529            throw new IOException("Not yet implemented.");
530        }
531        public void rollbackTransaction(ConnectionContext context) throws IOException {
532            throw new IOException("Not yet implemented.");
533        }
534        
535        public void checkpoint(boolean sync) throws IOException {
536            super.checkpointCleanup(false);
537        }
538        
539        
540        ///////////////////////////////////////////////////////////////////
541        // Internal helper methods.
542        ///////////////////////////////////////////////////////////////////
543    
544        /**
545         * @param location
546         * @return
547         * @throws IOException
548         */
549        Message loadMessage(Location location) throws IOException {
550            KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
551            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
552            return msg;
553        }
554    
555        ///////////////////////////////////////////////////////////////////
556        // Internal conversion methods.
557        ///////////////////////////////////////////////////////////////////
558        
559        KahaTransactionInfo createTransactionInfo(TransactionId txid) {
560            if( txid ==null ) {
561                return null;
562            }
563            KahaTransactionInfo rc = new KahaTransactionInfo();
564            
565            // Link it up to the previous record that was part of the transaction.
566            ArrayList<Operation> tx = inflightTransactions.get(txid);
567            if( tx!=null ) {
568                rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
569            }
570            
571            if( txid.isLocalTransaction() ) {
572                LocalTransactionId t = (LocalTransactionId)txid;
573                KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
574                kahaTxId.setConnectionId(t.getConnectionId().getValue());
575                kahaTxId.setTransacitonId(t.getValue());
576                rc.setLocalTransacitonId(kahaTxId);
577            } else {
578                XATransactionId t = (XATransactionId)txid;
579                KahaXATransactionId kahaTxId = new KahaXATransactionId();
580                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
581                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
582                kahaTxId.setFormatId(t.getFormatId());
583                rc.setXaTransacitonId(kahaTxId);
584            }
585            return rc;
586        }
587        
588        KahaLocation convert(Location location) {
589            KahaLocation rc = new KahaLocation();
590            rc.setLogId(location.getDataFileId());
591            rc.setOffset(location.getOffset());
592            return rc;
593        }
594        
595        KahaDestination convert(ActiveMQDestination dest) {
596            KahaDestination rc = new KahaDestination();
597            rc.setName(dest.getPhysicalName());
598            switch( dest.getDestinationType() ) {
599            case ActiveMQDestination.QUEUE_TYPE:
600                rc.setType(DestinationType.QUEUE);
601                return rc;
602            case ActiveMQDestination.TOPIC_TYPE:
603                rc.setType(DestinationType.TOPIC);
604                return rc;
605            case ActiveMQDestination.TEMP_QUEUE_TYPE:
606                rc.setType(DestinationType.TEMP_QUEUE);
607                return rc;
608            case ActiveMQDestination.TEMP_TOPIC_TYPE:
609                rc.setType(DestinationType.TEMP_TOPIC);
610                return rc;
611            default:
612                return null;
613            }
614        }
615    
616        ActiveMQDestination convert(String dest) {
617            int p = dest.indexOf(":");
618            if( p<0 ) {
619                throw new IllegalArgumentException("Not in the valid destination format");
620            }
621            int type = Integer.parseInt(dest.substring(0, p));
622            String name = dest.substring(p+1);
623            
624            switch( KahaDestination.DestinationType.valueOf(type) ) {
625            case QUEUE:
626                return new ActiveMQQueue(name);
627            case TOPIC:
628                return new ActiveMQTopic(name);
629            case TEMP_QUEUE:
630                return new ActiveMQTempQueue(name);
631            case TEMP_TOPIC:
632                return new ActiveMQTempTopic(name);
633            default:    
634                throw new IllegalArgumentException("Not in the valid destination format");
635            }
636        }
637            
638    }