001    /**
002     * 
003     * Copyright 2004 Hiram Chirino
004     * Copyright 2004 Protique Ltd
005     * 
006     * Licensed under the Apache License, Version 2.0 (the "License"); 
007     * you may not use this file except in compliance with the License. 
008     * You may obtain a copy of the License at 
009     * 
010     * http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS, 
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
015     * See the License for the specific language governing permissions and 
016     * limitations under the License. 
017     * 
018     **/
019    package org.activemq.store.jdbc;
020    
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    
024    import javax.jms.JMSException;
025    
026    import org.activemq.io.WireFormat;
027    import org.activemq.message.ConsumerInfo;
028    import org.activemq.service.MessageIdentity;
029    import org.activemq.service.SubscriberEntry;
030    import org.activemq.store.RecoveryListener;
031    import org.activemq.store.TopicMessageStore;
032    import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
033    import org.activemq.util.JMSExceptionHelper;
034    
035    /**
036     * @version $Revision: 1.1 $
037     */
038    public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
039    
040        public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
041            super(persistenceAdapter, adapter, wireFormat, destinationName);
042        }
043    
044        public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
045            long seq = getMessageSequenceId(messageIdentity);
046            // Get a connection and insert the message into the DB.
047            Connection c = null;
048            try {
049                c = persistenceAdapter.getConnection();
050                adapter.doSetLastAck(c, destinationName, subscription,  seq);
051            }
052            catch (SQLException e) {
053                throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
054            }
055            finally {
056                persistenceAdapter.returnConnection(c);
057            }
058        }
059    
060        /**
061         * @see org.activemq.store.TopicMessageStore#getLastestMessageIdentity()
062         */
063        public MessageIdentity getLastestMessageIdentity() throws JMSException {
064            return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
065        }
066    
067        /**
068         * 
069         */
070        public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, final RecoveryListener listener) throws JMSException {
071    
072            Connection c = null;
073            try {
074                c = persistenceAdapter.getConnection();
075                adapter.doRecoverSubscription(c, destinationName, subscriptionId, new MessageListResultHandler() {
076                    public void onMessage(long seq, String messageID) throws JMSException {
077                        MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
078                        listener.recoverMessage(messageIdentity);
079                    }
080                });
081            }
082            catch (SQLException e) {
083                throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscriptionId + ". Reason: " + e, e);
084            }
085            finally {
086                persistenceAdapter.returnConnection(c);
087            }
088        }
089    
090        /**
091         * @see org.activemq.store.TopicMessageStore#setSubscriberEntry(org.activemq.message.ConsumerInfo, org.activemq.service.SubscriberEntry)
092         */
093        public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
094            String key = info.getConsumerKey();
095            Connection c = null;
096            try {
097                c = persistenceAdapter.getConnection();
098                adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
099            }
100            catch (SQLException e) {
101                throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
102            }
103            finally {
104                persistenceAdapter.returnConnection(c);
105            }
106        }
107    
108        /**
109         * @see org.activemq.store.TopicMessageStore#getSubscriberEntry(org.activemq.message.ConsumerInfo)
110         */
111        public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
112            String key = info.getConsumerKey();
113            Connection c = null;
114            try {
115                c = persistenceAdapter.getConnection();
116                return adapter.doGetSubscriberEntry(c, destinationName, key);
117            }
118            catch (SQLException e) {
119                throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
120            }
121            finally {
122                persistenceAdapter.returnConnection(c);
123            }
124        }
125    
126        public void deleteSubscription(String subscription) throws JMSException {
127            Connection c = null;
128            try {
129                c = persistenceAdapter.getConnection();
130                adapter.doDeleteSubscription(c, destinationName, subscription);
131            }
132            catch (SQLException e) {
133                throw JMSExceptionHelper.newJMSException("Failed to remove subscription for: " + subscription + ". Reason: " + e, e);
134            }
135            finally {
136                persistenceAdapter.returnConnection(c);
137            }
138        }
139    
140        public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
141        }
142    
143        public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity) throws JMSException {
144        }
145    
146    }