001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.net.URI;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.atomic.AtomicReference;
023    
024    import org.apache.activemq.broker.region.Destination;
025    import org.apache.activemq.broker.region.MessageReference;
026    import org.apache.activemq.broker.region.Subscription;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.BrokerId;
029    import org.apache.activemq.command.BrokerInfo;
030    import org.apache.activemq.command.ConnectionInfo;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.command.DestinationInfo;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.MessageDispatchNotification;
037    import org.apache.activemq.command.MessagePull;
038    import org.apache.activemq.command.ProducerInfo;
039    import org.apache.activemq.command.RemoveSubscriptionInfo;
040    import org.apache.activemq.command.Response;
041    import org.apache.activemq.command.SessionInfo;
042    import org.apache.activemq.command.TransactionId;
043    import org.apache.activemq.kaha.Store;
044    import org.apache.activemq.usage.Usage;
045    
046    /**
047     * Like a BrokerFilter but it allows you to switch the getNext().broker. This
048     * has more overhead than a BrokerFilter since access to the getNext().broker
049     * has to synchronized since it is mutable
050     * 
051     * @version $Revision: 1.10 $
052     */
053    public class MutableBrokerFilter implements Broker {
054    
055        protected AtomicReference<Broker> next = new AtomicReference<Broker>();
056    
057        public MutableBrokerFilter(Broker next) {
058            this.next.set(next);
059        }
060    
061        public Broker getAdaptor(Class type) {
062            if (type.isInstance(this)) {
063                return this;
064            }
065            return next.get().getAdaptor(type);
066        }
067    
068        public Broker getNext() {
069            return next.get();
070        }
071    
072        public void setNext(Broker next) {
073            this.next.set(next);
074        }
075    
076        public Map<ActiveMQDestination, Destination> getDestinationMap() {
077            return getNext().getDestinationMap();
078        }
079    
080        public Set getDestinations(ActiveMQDestination destination) {
081            return getNext().getDestinations(destination);
082        }
083    
084        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
085            getNext().acknowledge(consumerExchange, ack);
086        }
087    
088        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
089            getNext().addConnection(context, info);
090        }
091    
092        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
093            return getNext().addConsumer(context, info);
094        }
095    
096        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
097            getNext().addProducer(context, info);
098        }
099    
100        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
101            getNext().commitTransaction(context, xid, onePhase);
102        }
103    
104        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
105            getNext().removeSubscription(context, info);
106        }
107    
108        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
109            return getNext().getPreparedTransactions(context);
110        }
111    
112        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
113            return getNext().prepareTransaction(context, xid);
114        }
115    
116        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
117            getNext().removeConnection(context, info, error);
118        }
119    
120        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
121            getNext().removeConsumer(context, info);
122        }
123    
124        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
125            getNext().removeProducer(context, info);
126        }
127    
128        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
129            getNext().rollbackTransaction(context, xid);
130        }
131    
132        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
133            getNext().send(producerExchange, messageSend);
134        }
135    
136        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
137            getNext().beginTransaction(context, xid);
138        }
139    
140        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
141            getNext().forgetTransaction(context, transactionId);
142        }
143    
144        public Connection[] getClients() throws Exception {
145            return getNext().getClients();
146        }
147    
148        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
149            return getNext().addDestination(context, destination);
150        }
151    
152        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
153            getNext().removeDestination(context, destination, timeout);
154        }
155    
156        public ActiveMQDestination[] getDestinations() throws Exception {
157            return getNext().getDestinations();
158        }
159    
160        public void start() throws Exception {
161            getNext().start();
162        }
163    
164        public void stop() throws Exception {
165            getNext().stop();
166        }
167    
168        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
169            getNext().addSession(context, info);
170        }
171    
172        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
173            getNext().removeSession(context, info);
174        }
175    
176        public BrokerId getBrokerId() {
177            return getNext().getBrokerId();
178        }
179    
180        public String getBrokerName() {
181            return getNext().getBrokerName();
182        }
183    
184        public void gc() {
185            getNext().gc();
186        }
187    
188        public void addBroker(Connection connection, BrokerInfo info) {
189            getNext().addBroker(connection, info);
190        }
191    
192        public void removeBroker(Connection connection, BrokerInfo info) {
193            getNext().removeBroker(connection, info);
194        }
195    
196        public BrokerInfo[] getPeerBrokerInfos() {
197            return getNext().getPeerBrokerInfos();
198        }
199    
200        public void preProcessDispatch(MessageDispatch messageDispatch) {
201            getNext().preProcessDispatch(messageDispatch);
202        }
203    
204        public void postProcessDispatch(MessageDispatch messageDispatch) {
205            getNext().postProcessDispatch(messageDispatch);
206        }
207    
208        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
209            getNext().processDispatchNotification(messageDispatchNotification);
210        }
211    
212        public boolean isStopped() {
213            return getNext().isStopped();
214        }
215    
216        public Set<ActiveMQDestination> getDurableDestinations() {
217            return getNext().getDurableDestinations();
218        }
219    
220        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
221            getNext().addDestinationInfo(context, info);
222    
223        }
224    
225        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
226            getNext().removeDestinationInfo(context, info);
227    
228        }
229    
230        public boolean isFaultTolerantConfiguration() {
231            return getNext().isFaultTolerantConfiguration();
232        }
233    
234        public ConnectionContext getAdminConnectionContext() {
235            return getNext().getAdminConnectionContext();
236        }
237    
238        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
239            getNext().setAdminConnectionContext(adminConnectionContext);
240        }
241    
242        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
243            return getNext().messagePull(context, pull);
244        }
245    
246        public Store getTempDataStore() {
247            return getNext().getTempDataStore();
248        }
249    
250        public URI getVmConnectorURI() {
251            return getNext().getVmConnectorURI();
252        }
253    
254        public void brokerServiceStarted() {
255            getNext().brokerServiceStarted();
256        }
257    
258        public BrokerService getBrokerService() {
259            return getNext().getBrokerService();
260        }
261    
262        public boolean isExpired(MessageReference messageReference) {
263            return getNext().isExpired(messageReference);
264        }
265    
266        public void messageExpired(ConnectionContext context, MessageReference message) {
267            getNext().messageExpired(context, message);
268        }
269    
270        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
271            getNext().sendToDeadLetterQueue(context, messageReference);
272        }
273    
274        public Broker getRoot() {
275            return getNext().getRoot();
276        }
277        
278        public long getBrokerSequenceId() {
279            return getNext().getBrokerSequenceId();
280        }
281        
282        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
283            getNext().fastProducer(context, producerInfo);
284        }
285    
286        public void isFull(ConnectionContext context,Destination destination, Usage usage) {
287            getNext().isFull(context,destination, usage);
288        }
289    
290        public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
291            getNext().messageConsumed(context, messageReference);
292        }
293    
294        public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
295            getNext().messageDelivered(context, messageReference);
296        }
297    
298        public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
299            getNext().messageDiscarded(context, messageReference);
300        }
301    
302        public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
303            getNext().slowConsumer(context, dest,subs);
304        }
305        
306        public void nowMasterBroker() {   
307           getNext().nowMasterBroker();
308        }
309    
310    }