001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import org.apache.activemq.ActiveMQMessageAudit;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQTopic;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageId;
030    import org.apache.activemq.command.SubscriptionInfo;
031    import org.apache.activemq.store.MessageRecoveryListener;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.util.ByteSequence;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.activemq.wireformat.WireFormat;
036    
037    /**
038     * @version $Revision: 1.6 $
039     */
040    public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
041    
042        private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
043    
044        public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
045            super(persistenceAdapter, adapter, wireFormat, topic, audit);
046        }
047    
048        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
049            // Get a connection and insert the message into the DB.
050            TransactionContext c = persistenceAdapter.getTransactionContext(context);
051            try {
052                    long seq = adapter.getStoreSequenceId(c, messageId);
053                adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
054            } catch (SQLException e) {
055                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
056                throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
057            } finally {
058                c.close();
059            }
060        }
061    
062        /**
063         * @throws Exception
064         */
065        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
066    
067            TransactionContext c = persistenceAdapter.getTransactionContext();
068            try {
069                adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
070                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
071                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
072                        msg.getMessageId().setBrokerSequenceId(sequenceId);
073                        return listener.recoverMessage(msg);
074                    }
075    
076                    public boolean recoverMessageReference(String reference) throws Exception {
077                        return listener.recoverMessageReference(new MessageId(reference));
078                    }
079    
080                });
081            } catch (SQLException e) {
082                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
083                throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
084            } finally {
085                c.close();
086            }
087        }
088    
089        public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
090            throws Exception {
091            TransactionContext c = persistenceAdapter.getTransactionContext();
092            String subcriberId = getSubscriptionKey(clientId, subscriptionName);
093            AtomicLong last = subscriberLastMessageMap.get(subcriberId);
094            if (last == null) {
095                long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
096                last = new AtomicLong(lastAcked);
097                subscriberLastMessageMap.put(subcriberId, last);
098            }
099            final AtomicLong finalLast = last;
100            try {
101                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() {
102    
103                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
104                        if (listener.hasSpace()) {
105                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
106                            msg.getMessageId().setBrokerSequenceId(sequenceId);
107                            listener.recoverMessage(msg);
108                            finalLast.set(sequenceId);
109                            return true;
110                        }
111                        return false;
112                    }
113    
114                    public boolean recoverMessageReference(String reference) throws Exception {
115                        return listener.recoverMessageReference(new MessageId(reference));
116                    }
117    
118                });
119            } catch (SQLException e) {
120                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
121            } finally {
122                c.close();
123                last.set(finalLast.get());
124            }
125        }
126    
127        public void resetBatching(String clientId, String subscriptionName) {
128            String subcriberId = getSubscriptionKey(clientId, subscriptionName);
129            subscriberLastMessageMap.remove(subcriberId);
130        }
131    
132        /**
133         * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
134         *      boolean)
135         */
136        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
137            TransactionContext c = persistenceAdapter.getTransactionContext();
138            try {
139                c = persistenceAdapter.getTransactionContext();
140                adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
141            } catch (SQLException e) {
142                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
143                throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
144            } finally {
145                c.close();
146            }
147        }
148    
149        /**
150         * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
151         *      String)
152         */
153        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
154            TransactionContext c = persistenceAdapter.getTransactionContext();
155            try {
156                return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
157            } catch (SQLException e) {
158                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
159                throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
160            } finally {
161                c.close();
162            }
163        }
164    
165        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
166            TransactionContext c = persistenceAdapter.getTransactionContext();
167            try {
168                adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
169            } catch (SQLException e) {
170                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
171                throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
172            } finally {
173                c.close();
174                resetBatching(clientId, subscriptionName);
175            }
176        }
177    
178        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
179            TransactionContext c = persistenceAdapter.getTransactionContext();
180            try {
181                return adapter.doGetAllSubscriptions(c, destination);
182            } catch (SQLException e) {
183                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
184                throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
185            } finally {
186                c.close();
187            }
188        }
189    
190        public int getMessageCount(String clientId, String subscriberName) throws IOException {
191            int result = 0;
192            TransactionContext c = persistenceAdapter.getTransactionContext();
193            try {
194                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
195    
196            } catch (SQLException e) {
197                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
198                throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
199            } finally {
200                c.close();
201            }
202            return result;
203        }
204    
205        protected String getSubscriptionKey(String clientId, String subscriberName) {
206            String result = clientId + ":";
207            result += subscriberName != null ? subscriberName : "NOT_SET";
208            return result;
209        }
210    
211    }