001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.virtual; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.Set; 023 024 import org.apache.activemq.broker.ProducerBrokerExchange; 025 import org.apache.activemq.broker.region.Destination; 026 import org.apache.activemq.broker.region.DestinationFilter; 027 import org.apache.activemq.broker.region.DestinationInterceptor; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.filter.DestinationMap; 030 031 /** 032 * Implements <a 033 * href="http://activemq.apache.org/virtual-destinations.html">Virtual Topics</a>. 034 * 035 * @org.apache.xbean.XBean 036 * @version $Revision: 650143 $ 037 */ 038 public class VirtualDestinationInterceptor implements DestinationInterceptor { 039 040 private DestinationMap destinationMap = new DestinationMap(); 041 private VirtualDestination[] virtualDestinations; 042 043 public synchronized Destination intercept(Destination destination) { 044 Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination()); 045 List<Destination> destinations = new ArrayList<Destination>(); 046 for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) { 047 VirtualDestination virtualDestination = (VirtualDestination)iter.next(); 048 Destination newNestination = virtualDestination.intercept(destination); 049 destinations.add(newNestination); 050 } 051 if (!destinations.isEmpty()) { 052 if (destinations.size() == 1) { 053 return destinations.get(0); 054 } else { 055 // should rarely be used but here just in case 056 return createCompositeDestination(destination, destinations); 057 } 058 } 059 return destination; 060 } 061 062 063 public synchronized void remove(Destination destination) { 064 } 065 066 public VirtualDestination[] getVirtualDestinations() { 067 return virtualDestinations; 068 } 069 070 public void setVirtualDestinations(VirtualDestination[] virtualDestinations) { 071 destinationMap = new DestinationMap(); 072 this.virtualDestinations = virtualDestinations; 073 for (int i = 0; i < virtualDestinations.length; i++) { 074 VirtualDestination virtualDestination = virtualDestinations[i]; 075 destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination); 076 } 077 } 078 079 protected Destination createCompositeDestination(Destination destination, final List<Destination> destinations) { 080 return new DestinationFilter(destination) { 081 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { 082 for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();) { 083 Destination destination = iter.next(); 084 destination.send(context, messageSend); 085 } 086 } 087 }; 088 } 089 090 }