001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.state;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.Vector;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import javax.jms.TransactionRolledBackException;
027    
028    import org.apache.activemq.command.Command;
029    import org.apache.activemq.command.ConnectionId;
030    import org.apache.activemq.command.ConnectionInfo;
031    import org.apache.activemq.command.ConsumerId;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.DestinationInfo;
034    import org.apache.activemq.command.ExceptionResponse;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageId;
037    import org.apache.activemq.command.ProducerId;
038    import org.apache.activemq.command.ProducerInfo;
039    import org.apache.activemq.command.Response;
040    import org.apache.activemq.command.SessionId;
041    import org.apache.activemq.command.SessionInfo;
042    import org.apache.activemq.command.TransactionInfo;
043    import org.apache.activemq.transport.Transport;
044    import org.apache.activemq.util.IOExceptionSupport;
045    import org.apache.commons.logging.Log;
046    import org.apache.commons.logging.LogFactory;
047    
048    /**
049     * Tracks the state of a connection so a newly established transport can be
050     * re-initialized to the state that was tracked.
051     * 
052     * @version $Revision$
053     */
054    public class ConnectionStateTracker extends CommandVisitorAdapter {
055        private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
056    
057        private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
058    
059        protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
060         
061        private boolean trackTransactions;
062        private boolean restoreSessions = true;
063        private boolean restoreConsumers = true;
064        private boolean restoreProducers = true;
065        private boolean restoreTransaction = true;
066        private boolean trackMessages = true;
067        private boolean trackTransactionProducers = true;
068        private int maxCacheSize = 128 * 1024;
069        private int currentCacheSize;
070        private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
071            protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
072                boolean result = currentCacheSize > maxCacheSize;
073                if (result) {
074                    currentCacheSize -= eldest.getValue().getSize();
075                }
076                return result;
077            }
078        };
079        
080        
081        private class RemoveTransactionAction implements Runnable {
082            private final TransactionInfo info;
083    
084            public RemoveTransactionAction(TransactionInfo info) {
085                this.info = info;
086            }
087    
088            public void run() {
089                ConnectionId connectionId = info.getConnectionId();
090                ConnectionState cs = connectionStates.get(connectionId);
091                cs.removeTransactionState(info.getTransactionId());
092            }
093        }
094    
095        /**
096         * 
097         * 
098         * @param command
099         * @return null if the command is not state tracked.
100         * @throws IOException
101         */
102        public Tracked track(Command command) throws IOException {
103            try {
104                return (Tracked)command.visit(this);
105            } catch (IOException e) {
106                throw e;
107            } catch (Throwable e) {
108                throw IOExceptionSupport.create(e);
109            }
110        }
111        
112        public void trackBack(Command command) {
113            if (trackMessages && command != null && command.isMessage()) {
114                Message message = (Message) command;
115                if (message.getTransactionId()==null) {
116                    currentCacheSize = currentCacheSize +  message.getSize();
117                }
118            }
119        }
120    
121        public void restore(Transport transport) throws IOException {
122            // Restore the connections.
123            for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
124                ConnectionState connectionState = iter.next();
125                if (LOG.isDebugEnabled()) {
126                    LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
127                }
128                transport.oneway(connectionState.getInfo());
129                restoreTempDestinations(transport, connectionState);
130    
131                if (restoreSessions) {
132                    restoreSessions(transport, connectionState);
133                }
134    
135                if (restoreTransaction) {
136                    restoreTransactions(transport, connectionState);
137                }
138            }
139            //now flush messages
140            for (Message msg:messageCache.values()) {
141                transport.oneway(msg);
142            }
143        }
144    
145        private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
146            Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
147            for (TransactionState transactionState : connectionState.getTransactionStates()) {
148                if (LOG.isDebugEnabled()) {
149                    LOG.debug("tx: " + transactionState.getId());
150                }
151                
152                // rollback any completed transactions - no way to know if commit got there
153                // or if reply went missing
154                //
155                if (!transactionState.getCommands().isEmpty()) {
156                    Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
157                    if (lastCommand instanceof TransactionInfo) {
158                        TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
159                        if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
160                            if (LOG.isDebugEnabled()) {
161                                LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
162                            }
163                            toRollback.add(transactionInfo);
164                            continue;
165                        }
166                    }
167                }
168                
169                // replay short lived producers that may have been involved in the transaction
170                for (ProducerState producerState : transactionState.getProducerStates().values()) {
171                    if (LOG.isDebugEnabled()) {
172                        LOG.debug("tx replay producer :" + producerState.getInfo());
173                    }
174                    transport.oneway(producerState.getInfo());
175                }
176                
177                for (Command command : transactionState.getCommands()) {
178                    if (LOG.isDebugEnabled()) {
179                        LOG.debug("tx replay: " + command);
180                    }
181                    transport.oneway(command);
182                }
183                
184                for (ProducerState producerState : transactionState.getProducerStates().values()) {
185                    if (LOG.isDebugEnabled()) {
186                        LOG.debug("tx remove replayed producer :" + producerState.getInfo());
187                    }
188                    transport.oneway(producerState.getInfo().createRemoveCommand());
189                }
190            }
191            
192            for (TransactionInfo command: toRollback) {
193                // respond to the outstanding commit
194                ExceptionResponse response = new ExceptionResponse();
195                response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
196                response.setCorrelationId(command.getCommandId());
197                transport.getTransportListener().onCommand(response);
198            }
199        }
200    
201        /**
202         * @param transport
203         * @param connectionState
204         * @throws IOException
205         */
206        protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
207            // Restore the connection's sessions
208            for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
209                SessionState sessionState = (SessionState)iter2.next();
210                if (LOG.isDebugEnabled()) {
211                    LOG.debug("session: " + sessionState.getInfo().getSessionId());
212                }
213                transport.oneway(sessionState.getInfo());
214    
215                if (restoreProducers) {
216                    restoreProducers(transport, sessionState);
217                }
218    
219                if (restoreConsumers) {
220                    restoreConsumers(transport, sessionState);
221                }
222            }
223        }
224    
225        /**
226         * @param transport
227         * @param sessionState
228         * @throws IOException
229         */
230        protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
231            // Restore the session's consumers
232            for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
233                ConsumerState consumerState = (ConsumerState)iter3.next();
234                if (LOG.isDebugEnabled()) {
235                    LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId());
236                }
237                transport.oneway(consumerState.getInfo());
238            }
239        }
240    
241        /**
242         * @param transport
243         * @param sessionState
244         * @throws IOException
245         */
246        protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
247            // Restore the session's producers
248            for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
249                ProducerState producerState = (ProducerState)iter3.next();
250                if (LOG.isDebugEnabled()) {
251                    LOG.debug("producer: " + producerState.getInfo().getProducerId());
252                }
253                transport.oneway(producerState.getInfo());
254            }
255        }
256    
257        /**
258         * @param transport
259         * @param connectionState
260         * @throws IOException
261         */
262        protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
263            throws IOException {
264            // Restore the connection's temp destinations.
265            for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
266                transport.oneway((DestinationInfo)iter2.next());
267            }
268        }
269    
270        public Response processAddDestination(DestinationInfo info) {
271            if (info != null) {
272                ConnectionState cs = connectionStates.get(info.getConnectionId());
273                if (cs != null && info.getDestination().isTemporary()) {
274                    cs.addTempDestination(info);
275                }
276            }
277            return TRACKED_RESPONSE_MARKER;
278        }
279    
280        public Response processRemoveDestination(DestinationInfo info) {
281            if (info != null) {
282                ConnectionState cs = connectionStates.get(info.getConnectionId());
283                if (cs != null && info.getDestination().isTemporary()) {
284                    cs.removeTempDestination(info.getDestination());
285                }
286            }
287            return TRACKED_RESPONSE_MARKER;
288        }
289    
290        public Response processAddProducer(ProducerInfo info) {
291            if (info != null && info.getProducerId() != null) {
292                SessionId sessionId = info.getProducerId().getParentId();
293                if (sessionId != null) {
294                    ConnectionId connectionId = sessionId.getParentId();
295                    if (connectionId != null) {
296                        ConnectionState cs = connectionStates.get(connectionId);
297                        if (cs != null) {
298                            SessionState ss = cs.getSessionState(sessionId);
299                            if (ss != null) {
300                                ss.addProducer(info);
301                            }
302                        }
303                    }
304                }
305            }
306            return TRACKED_RESPONSE_MARKER;
307        }
308    
309        public Response processRemoveProducer(ProducerId id) {
310            if (id != null) {
311                SessionId sessionId = id.getParentId();
312                if (sessionId != null) {
313                    ConnectionId connectionId = sessionId.getParentId();
314                    if (connectionId != null) {
315                        ConnectionState cs = connectionStates.get(connectionId);
316                        if (cs != null) {
317                            SessionState ss = cs.getSessionState(sessionId);
318                            if (ss != null) {
319                                ss.removeProducer(id);
320                            }
321                        }
322                    }
323                }
324            }
325            return TRACKED_RESPONSE_MARKER;
326        }
327    
328        public Response processAddConsumer(ConsumerInfo info) {
329            if (info != null) {
330                SessionId sessionId = info.getConsumerId().getParentId();
331                if (sessionId != null) {
332                    ConnectionId connectionId = sessionId.getParentId();
333                    if (connectionId != null) {
334                        ConnectionState cs = connectionStates.get(connectionId);
335                        if (cs != null) {
336                            SessionState ss = cs.getSessionState(sessionId);
337                            if (ss != null) {
338                                ss.addConsumer(info);
339                            }
340                        }
341                    }
342                }
343            }
344            return TRACKED_RESPONSE_MARKER;
345        }
346    
347        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
348            if (id != null) {
349                SessionId sessionId = id.getParentId();
350                if (sessionId != null) {
351                    ConnectionId connectionId = sessionId.getParentId();
352                    if (connectionId != null) {
353                        ConnectionState cs = connectionStates.get(connectionId);
354                        if (cs != null) {
355                            SessionState ss = cs.getSessionState(sessionId);
356                            if (ss != null) {
357                                ss.removeConsumer(id);
358                            }
359                        }
360                    }
361                }
362            }
363            return TRACKED_RESPONSE_MARKER;
364        }
365    
366        public Response processAddSession(SessionInfo info) {
367            if (info != null) {
368                ConnectionId connectionId = info.getSessionId().getParentId();
369                if (connectionId != null) {
370                    ConnectionState cs = connectionStates.get(connectionId);
371                    if (cs != null) {
372                        cs.addSession(info);
373                    }
374                }
375            }
376            return TRACKED_RESPONSE_MARKER;
377        }
378    
379        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
380            if (id != null) {
381                ConnectionId connectionId = id.getParentId();
382                if (connectionId != null) {
383                    ConnectionState cs = connectionStates.get(connectionId);
384                    if (cs != null) {
385                        cs.removeSession(id);
386                    }
387                }
388            }
389            return TRACKED_RESPONSE_MARKER;
390        }
391    
392        public Response processAddConnection(ConnectionInfo info) {
393            if (info != null) {
394                connectionStates.put(info.getConnectionId(), new ConnectionState(info));
395            }
396            return TRACKED_RESPONSE_MARKER;
397        }
398    
399        public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
400            if (id != null) {
401                connectionStates.remove(id);
402            }
403            return TRACKED_RESPONSE_MARKER;
404        }
405    
406        public Response processMessage(Message send) throws Exception {
407            if (send != null) {
408                if (trackTransactions && send.getTransactionId() != null) {
409                    ProducerId producerId = send.getProducerId();
410                    ConnectionId connectionId = producerId.getParentId().getParentId();
411                    if (connectionId != null) {
412                        ConnectionState cs = connectionStates.get(connectionId);
413                        if (cs != null) {
414                            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
415                            if (transactionState != null) {
416                                transactionState.addCommand(send);
417                                
418                                if (trackTransactionProducers) {
419                                    // for jmstemplate, track the producer in case it is closed before commit
420                                    // and needs to be replayed
421                                    SessionState ss = cs.getSessionState(producerId.getParentId());
422                                    ProducerState producerState = ss.getProducerState(producerId);
423                                    producerState.setTransactionState(transactionState);            
424                                }
425                            }
426                        }
427                    }
428                    return TRACKED_RESPONSE_MARKER;
429                }else if (trackMessages) {
430                    messageCache.put(send.getMessageId(), send.copy());
431                }
432            }
433            return null;
434        }
435    
436        public Response processBeginTransaction(TransactionInfo info) {
437            if (trackTransactions && info != null && info.getTransactionId() != null) {
438                ConnectionId connectionId = info.getConnectionId();
439                if (connectionId != null) {
440                    ConnectionState cs = connectionStates.get(connectionId);
441                    if (cs != null) {
442                        cs.addTransactionState(info.getTransactionId());
443                        TransactionState state = cs.getTransactionState(info.getTransactionId());
444                        state.addCommand(info);
445                    }
446                }
447                return TRACKED_RESPONSE_MARKER;
448            }
449            return null;
450        }
451    
452        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
453            if (trackTransactions && info != null) {
454                ConnectionId connectionId = info.getConnectionId();
455                if (connectionId != null) {
456                    ConnectionState cs = connectionStates.get(connectionId);
457                    if (cs != null) {
458                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
459                        if (transactionState != null) {
460                            transactionState.addCommand(info);
461                        }
462                    }
463                }
464                return TRACKED_RESPONSE_MARKER;
465            }
466            return null;
467        }
468    
469        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
470            if (trackTransactions && info != null) {
471                ConnectionId connectionId = info.getConnectionId();
472                if (connectionId != null) {
473                    ConnectionState cs = connectionStates.get(connectionId);
474                    if (cs != null) {
475                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
476                        if (transactionState != null) {
477                            transactionState.addCommand(info);
478                            return new Tracked(new RemoveTransactionAction(info));
479                        }
480                    }
481                }
482            }
483            return null;
484        }
485    
486        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
487            if (trackTransactions && info != null) {
488                ConnectionId connectionId = info.getConnectionId();
489                if (connectionId != null) {
490                    ConnectionState cs = connectionStates.get(connectionId);
491                    if (cs != null) {
492                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
493                        if (transactionState != null) {
494                            transactionState.addCommand(info);
495                            return new Tracked(new RemoveTransactionAction(info));
496                        }
497                    }
498                }
499            }
500            return null;
501        }
502    
503        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
504            if (trackTransactions && info != null) {
505                ConnectionId connectionId = info.getConnectionId();
506                if (connectionId != null) {
507                    ConnectionState cs = connectionStates.get(connectionId);
508                    if (cs != null) {
509                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
510                        if (transactionState != null) {
511                            transactionState.addCommand(info);
512                            return new Tracked(new RemoveTransactionAction(info));
513                        }
514                    }
515                }
516            }
517            return null;
518        }
519    
520        public Response processEndTransaction(TransactionInfo info) throws Exception {
521            if (trackTransactions && info != null) {
522                ConnectionId connectionId = info.getConnectionId();
523                if (connectionId != null) {
524                    ConnectionState cs = connectionStates.get(connectionId);
525                    if (cs != null) {
526                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
527                        if (transactionState != null) {
528                            transactionState.addCommand(info);
529                        }
530                    }
531                }
532                return TRACKED_RESPONSE_MARKER;
533            }
534            return null;
535        }
536    
537        public boolean isRestoreConsumers() {
538            return restoreConsumers;
539        }
540    
541        public void setRestoreConsumers(boolean restoreConsumers) {
542            this.restoreConsumers = restoreConsumers;
543        }
544    
545        public boolean isRestoreProducers() {
546            return restoreProducers;
547        }
548    
549        public void setRestoreProducers(boolean restoreProducers) {
550            this.restoreProducers = restoreProducers;
551        }
552    
553        public boolean isRestoreSessions() {
554            return restoreSessions;
555        }
556    
557        public void setRestoreSessions(boolean restoreSessions) {
558            this.restoreSessions = restoreSessions;
559        }
560    
561        public boolean isTrackTransactions() {
562            return trackTransactions;
563        }
564    
565        public void setTrackTransactions(boolean trackTransactions) {
566            this.trackTransactions = trackTransactions;
567        }
568        
569        public boolean isTrackTransactionProducers() {
570            return this.trackTransactionProducers;
571        }
572    
573        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
574            this.trackTransactionProducers = trackTransactionProducers;
575        }
576        
577        public boolean isRestoreTransaction() {
578            return restoreTransaction;
579        }
580    
581        public void setRestoreTransaction(boolean restoreTransaction) {
582            this.restoreTransaction = restoreTransaction;
583        }
584    
585        public boolean isTrackMessages() {
586            return trackMessages;
587        }
588    
589        public void setTrackMessages(boolean trackMessages) {
590            this.trackMessages = trackMessages;
591        }
592    
593        public int getMaxCacheSize() {
594            return maxCacheSize;
595        }
596    
597        public void setMaxCacheSize(int maxCacheSize) {
598            this.maxCacheSize = maxCacheSize;
599        }
600    
601    }