001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.camel; 018 019 import javax.jms.JMSException; 020 import javax.jms.MessageConsumer; 021 import javax.jms.MessageProducer; 022 import javax.jms.QueueReceiver; 023 import javax.jms.QueueSender; 024 import javax.jms.TopicPublisher; 025 import javax.jms.TopicSubscriber; 026 027 import org.apache.activemq.ActiveMQConnection; 028 import org.apache.activemq.ActiveMQSession; 029 import org.apache.activemq.CustomDestination; 030 import org.apache.camel.CamelContext; 031 import org.apache.camel.CamelContextAware; 032 import org.apache.camel.Endpoint; 033 import org.apache.camel.component.jms.JmsBinding; 034 import org.apache.camel.component.jms.JmsEndpoint; 035 036 /** 037 * @version $Revision: $ 038 */ 039 public class CamelDestination implements CustomDestination, CamelContextAware { 040 private String uri; 041 private Endpoint endpoint; 042 private CamelContext camelContext; 043 // add in dummy endpoint pending camel release with 044 // https://issues.apache.org/activemq/browse/CAMEL-1982 045 private JmsBinding binding = new JmsBinding(new JmsEndpoint()); 046 047 public CamelDestination() { 048 } 049 050 public CamelDestination(String uri) { 051 this.uri = uri; 052 } 053 054 public String toString() { 055 return uri.toString(); 056 } 057 058 // CustomDestination interface 059 //----------------------------------------------------------------------- 060 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) { 061 return createConsumer(session, messageSelector, false); 062 } 063 064 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) { 065 return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal); 066 } 067 068 public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) { 069 return createDurableSubscriber(session, null, messageSelector, noLocal); 070 } 071 072 public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) { 073 throw new UnsupportedOperationException("This destination is not a Topic: " + this); 074 } 075 076 public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) { 077 throw new UnsupportedOperationException("This destination is not a Queue: " + this); 078 } 079 080 // Producers 081 //----------------------------------------------------------------------- 082 public MessageProducer createProducer(ActiveMQSession session) throws JMSException { 083 return new CamelMessageProducer(this, resolveEndpoint(session), session); 084 } 085 086 public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException { 087 throw new UnsupportedOperationException("This destination is not a Topic: " + this); 088 } 089 090 public QueueSender createSender(ActiveMQSession session) throws JMSException { 091 throw new UnsupportedOperationException("This destination is not a Queue: " + this); 092 } 093 094 // Properties 095 //----------------------------------------------------------------------- 096 097 public String getUri() { 098 return uri; 099 } 100 101 public void setUri(String uri) { 102 this.uri = uri; 103 } 104 105 public Endpoint getEndpoint() { 106 return endpoint; 107 } 108 109 public void setEndpoint(Endpoint endpoint) { 110 this.endpoint = endpoint; 111 } 112 113 public CamelContext getCamelContext() { 114 return camelContext; 115 } 116 117 public void setCamelContext(CamelContext camelContext) { 118 this.camelContext = camelContext; 119 } 120 121 public JmsBinding getBinding() { 122 return binding; 123 } 124 125 public void setBinding(JmsBinding binding) { 126 this.binding = binding; 127 } 128 129 // Implementation methods 130 //----------------------------------------------------------------------- 131 132 /** 133 * Resolves the Camel Endpoint for this destination 134 * 135 * @return 136 */ 137 protected Endpoint resolveEndpoint(ActiveMQSession session) { 138 Endpoint answer = getEndpoint(); 139 if (answer == null) { 140 answer = resolveCamelContext(session).getEndpoint(getUri()); 141 if (answer == null) { 142 throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri()); 143 } 144 } 145 return answer; 146 } 147 148 protected CamelContext resolveCamelContext(ActiveMQSession session) { 149 CamelContext answer = getCamelContext(); 150 if (answer == null) { 151 ActiveMQConnection connection = session.getConnection(); 152 if (connection instanceof CamelConnection) { 153 CamelConnection camelConnection = (CamelConnection) connection; 154 answer = camelConnection.getCamelContext(); 155 } 156 } 157 if (answer == null) { 158 throw new IllegalArgumentException("No CamelContext has been configured"); 159 } 160 return answer; 161 } 162 }