001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.store.AbstractMessageStore;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.util.ByteSequence;
032    import org.apache.activemq.util.ByteSequenceData;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    
038    /**
039     * @version $Revision: 1.10 $
040     */
041    public class JDBCMessageStore extends AbstractMessageStore {
042    
043        private static final Log LOG = LogFactory.getLog(JDBCMessageStore.class);
044        protected final WireFormat wireFormat;
045        protected final JDBCAdapter adapter;
046        protected final JDBCPersistenceAdapter persistenceAdapter;
047        protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
048    
049        protected ActiveMQMessageAudit audit;
050        
051        public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
052            super(destination);
053            this.persistenceAdapter = persistenceAdapter;
054            this.adapter = adapter;
055            this.wireFormat = wireFormat;
056            this.audit = audit;
057        }
058        
059        public void addMessage(ConnectionContext context, Message message) throws IOException {
060    
061            MessageId messageId = message.getMessageId();
062            if (audit != null && audit.isDuplicate(message)) {
063                if (LOG.isDebugEnabled()) {
064                    LOG.debug(destination.getPhysicalName()
065                        + " ignoring duplicated (add) message, already stored: "
066                        + messageId);
067                }
068                return;
069            }
070            
071            long sequenceId = persistenceAdapter.getNextSequenceId();
072            
073            // Serialize the Message..
074            byte data[];
075            try {
076                ByteSequence packet = wireFormat.marshal(message);
077                data = ByteSequenceData.toByteArray(packet);
078            } catch (IOException e) {
079                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
080            }
081    
082            // Get a connection and insert the message into the DB.
083            TransactionContext c = persistenceAdapter.getTransactionContext(context);
084            try {      
085                adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
086            } catch (SQLException e) {
087                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
088                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
089            } finally {
090                c.close();
091            }
092        }
093    
094        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
095            // Get a connection and insert the message into the DB.
096            TransactionContext c = persistenceAdapter.getTransactionContext(context);
097            try {
098                adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
099            } catch (SQLException e) {
100                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
101                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
102            } finally {
103                c.close();
104            }
105        }
106    
107        public Message getMessage(MessageId messageId) throws IOException {
108            // Get a connection and pull the message out of the DB
109            TransactionContext c = persistenceAdapter.getTransactionContext();
110            try {
111                byte data[] = adapter.doGetMessage(c, messageId);
112                if (data == null) {
113                    return null;
114                }
115    
116                Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
117                return answer;
118            } catch (IOException e) {
119                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
120            } catch (SQLException e) {
121                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
122                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
123            } finally {
124                c.close();
125            }
126        }
127    
128        public String getMessageReference(MessageId messageId) throws IOException {
129            long id = messageId.getBrokerSequenceId();
130    
131            // Get a connection and pull the message out of the DB
132            TransactionContext c = persistenceAdapter.getTransactionContext();
133            try {
134                return adapter.doGetMessageReference(c, id);
135            } catch (IOException e) {
136                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
137            } catch (SQLException e) {
138                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
139                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
140            } finally {
141                c.close();
142            }
143        }
144    
145        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
146            
147            long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
148    
149            // Get a connection and remove the message from the DB
150            TransactionContext c = persistenceAdapter.getTransactionContext(context);
151            try {
152                adapter.doRemoveMessage(c, seq);
153            } catch (SQLException e) {
154                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
155                throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
156            } finally {
157                c.close();
158            }
159        }
160    
161        public void recover(final MessageRecoveryListener listener) throws Exception {
162    
163            // Get all the Message ids out of the database.
164            TransactionContext c = persistenceAdapter.getTransactionContext();
165            try {
166                c = persistenceAdapter.getTransactionContext();
167                adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
168                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
169                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
170                        msg.getMessageId().setBrokerSequenceId(sequenceId);
171                        return listener.recoverMessage(msg);
172                    }
173    
174                    public boolean recoverMessageReference(String reference) throws Exception {
175                        return listener.recoverMessageReference(new MessageId(reference));
176                    }
177                });
178            } catch (SQLException e) {
179                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
180                throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
181            } finally {
182                c.close();
183            }
184        }
185    
186        /**
187         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
188         */
189        public void removeAllMessages(ConnectionContext context) throws IOException {
190            // Get a connection and remove the message from the DB
191            TransactionContext c = persistenceAdapter.getTransactionContext(context);
192            try {
193                adapter.doRemoveAllMessages(c, destination);
194            } catch (SQLException e) {
195                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
196                throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
197            } finally {
198                c.close();
199            }
200        }
201    
202        public int getMessageCount() throws IOException {
203            int result = 0;
204            TransactionContext c = persistenceAdapter.getTransactionContext();
205            try {
206    
207                result = adapter.doGetMessageCount(c, destination);
208    
209            } catch (SQLException e) {
210                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
211                throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
212            } finally {
213                c.close();
214            }
215            return result;
216        }
217    
218        /**
219         * @param maxReturned
220         * @param listener
221         * @throws Exception
222         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
223         *      org.apache.activemq.store.MessageRecoveryListener)
224         */
225        public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
226            TransactionContext c = persistenceAdapter.getTransactionContext();
227    
228            try {
229                adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
230    
231                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
232                        if (listener.hasSpace()) {
233                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
234                            msg.getMessageId().setBrokerSequenceId(sequenceId);
235                            listener.recoverMessage(msg);
236                            lastStoreSequenceId.set(sequenceId);
237                            return true;
238                        }
239                        return false;
240                    }
241    
242                    public boolean recoverMessageReference(String reference) throws Exception {
243                        if (listener.hasSpace()) {
244                            listener.recoverMessageReference(new MessageId(reference));
245                            return true;
246                        }
247                        return false;
248                    }
249    
250                });
251            } catch (SQLException e) {
252                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
253            } finally {
254                c.close();
255            }
256    
257        }
258    
259        /**
260         * @see org.apache.activemq.store.MessageStore#resetBatching()
261         */
262        public void resetBatching() {
263            if (LOG.isDebugEnabled()) {
264                LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get());
265            }
266            lastStoreSequenceId.set(-1);
267    
268        }
269    
270        @Override
271        public void setBatch(MessageId messageId) {
272            long storeSequenceId = -1;
273            try {
274                storeSequenceId = getStoreSequenceIdForMessageId(messageId);
275            } catch (IOException ignoredAsAlreadyLogged) {
276                // reset batch in effect with default -1 value
277            }
278            if (LOG.isDebugEnabled()) {
279                LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get());
280            }
281            lastStoreSequenceId.set(storeSequenceId);
282        }
283    
284        private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
285            long result = -1;
286            TransactionContext c = persistenceAdapter.getTransactionContext();
287            try {
288                result = adapter.getStoreSequenceId(c, messageId);
289            } catch (SQLException e) {
290                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
291                throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
292            } finally {
293                c.close();
294            }
295            return result;
296        }
297    }