001    /** 
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.jdbc;
019    
020    import java.io.IOException;
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    
024    import javax.jms.JMSException;
025    
026    import org.activemq.io.WireFormat;
027    import org.activemq.message.ActiveMQMessage;
028    import org.activemq.message.MessageAck;
029    import org.activemq.service.MessageIdentity;
030    import org.activemq.store.MessageStore;
031    import org.activemq.store.RecoveryListener;
032    import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
033    import org.activemq.util.JMSExceptionHelper;
034    import org.activemq.util.LongSequenceGenerator;
035    
036    /**
037     * @version $Revision: 1.1 $
038     */
039    public class JDBCMessageStore implements MessageStore {
040        
041        protected final WireFormat wireFormat;
042        protected final String destinationName;
043        protected final LongSequenceGenerator sequenceGenerator;
044        protected final JDBCAdapter adapter;
045            protected final JDBCPersistenceAdapter persistenceAdapter;
046    
047        public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
048            this.persistenceAdapter = persistenceAdapter;
049                    this.adapter = adapter;
050            this.sequenceGenerator = adapter.getSequenceGenerator();
051            this.wireFormat = wireFormat;
052            this.destinationName = destinationName;
053        }
054    
055        public void addMessage(ActiveMQMessage message) throws JMSException {
056            
057            // Serialize the Message..
058            String messageID = message.getJMSMessageID();
059            byte data[];
060            try {
061                data = wireFormat.toBytes(message);
062            } catch (IOException e) {
063                throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
064            }
065            
066            long seq=sequenceGenerator.getNextSequenceId();
067    
068            // Get a connection and insert the message into the DB.
069            Connection c = null;
070            try {
071                c = persistenceAdapter.getConnection();            
072                adapter.doAddMessage(c, seq, messageID, destinationName, data, message.getJMSExpiration());
073            } catch (SQLException e) {
074                throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
075            } finally {
076                    persistenceAdapter.returnConnection(c);
077            }
078    
079            MessageIdentity answer = message.getJMSMessageIdentity();
080            answer.setSequenceNumber(new Long(seq));
081        }
082    
083    
084        public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
085    
086            long id;
087            try {
088                id = getMessageSequenceId(identity);
089            } catch (JMSException e1) {
090                return null;
091            }
092            
093            // Get a connection and pull the message out of the DB
094            Connection c = null;
095            try {
096                c = persistenceAdapter.getConnection();            
097                byte data[] = adapter.doGetMessage(c, id);
098                if( data==null )
099                    return null;
100                
101                ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);;
102                answer.setJMSMessageID(identity.getMessageID());
103                answer.setJMSMessageIdentity(identity);
104                return answer;            
105            } catch (IOException e) {
106                throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
107            } catch (SQLException e) {
108                throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
109            } finally {
110                    persistenceAdapter.returnConnection(c);
111            }
112        }
113    
114        /**
115         * @param identity
116         * @return
117         * @throws JMSException
118         */
119        protected long getMessageSequenceId(MessageIdentity identity) throws JMSException {
120            Object sequenceNumber = identity.getSequenceNumber();
121            if (sequenceNumber != null && sequenceNumber.getClass() == Long.class) {
122                return ((Long) sequenceNumber).longValue();
123            } else {
124                // Get a connection and pull the message out of the DB
125                Connection c = null;
126                try {
127                    c = persistenceAdapter.getConnection();
128                    Long rc = adapter.getMessageSequenceId(c, identity.getMessageID());
129                    if (rc == null)
130                        throw new JMSException("Could not locate message in database with message id: "
131                                + identity.getMessageID());
132                    return rc.longValue();
133                } catch (SQLException e) {
134                    throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID()
135                            + " in container: " + e, e);
136                } finally {
137                    persistenceAdapter.returnConnection(c);
138                }
139            }
140        }
141    
142            public void removeMessage(MessageAck ack) throws JMSException {
143            long seq = getMessageSequenceId(ack.getMessageIdentity());
144    
145            // Get a connection and remove the message from the DB
146            Connection c = null;
147            try {
148                c = persistenceAdapter.getConnection();            
149                adapter.doRemoveMessage(c, seq);
150            } catch (SQLException e) {
151                throw JMSExceptionHelper.newJMSException("Failed to broker message: " + ack.getMessageID() + " in container: " + e, e);
152            } finally {
153                    persistenceAdapter.returnConnection(c);
154            }
155        }
156    
157    
158        public void recover(final RecoveryListener listener) throws JMSException {
159            
160            // Get all the Message ids out of the database.
161            Connection c = null;
162            try {
163                c = persistenceAdapter.getConnection();            
164                adapter.doRecover(c, destinationName, new MessageListResultHandler() {
165                    public void onMessage(long seq, String messageID) throws JMSException {
166                        listener.recoverMessage(new MessageIdentity(messageID, new Long(seq)));                
167                    }
168                });     
169                
170            } catch (SQLException e) {
171                throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
172            } finally {
173                    persistenceAdapter.returnConnection(c);
174            } 
175        }
176    
177        public void start() throws JMSException {
178        }
179    
180        public void stop() throws JMSException {
181        }
182    
183        /**
184         * @see org.activemq.store.MessageStore#removeAllMessages()
185         */
186        public void removeAllMessages() throws JMSException {
187            // Get a connection and remove the message from the DB
188            Connection c = null;
189            try {
190                c = persistenceAdapter.getConnection();            
191                adapter.doRemoveAllMessages(c, destinationName);
192            } catch (SQLException e) {
193                throw JMSExceptionHelper.newJMSException("Failed to broker remove all messages: " + e, e);
194            } finally {
195                    persistenceAdapter.returnConnection(c);
196            }
197        }
198    }