001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.camel; 018 019 import javax.jms.IllegalStateException; 020 import javax.jms.JMSException; 021 import javax.jms.Message; 022 import javax.jms.MessageConsumer; 023 import javax.jms.MessageListener; 024 025 import org.apache.activemq.ActiveMQSession; 026 import org.apache.activemq.util.JMSExceptionSupport; 027 import org.apache.camel.Consumer; 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.PollingConsumer; 031 import org.apache.camel.Processor; 032 033 /** 034 * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from 035 * a Camel {@link Endpoint} 036 * 037 * @version $Revision: $ 038 */ 039 public class CamelMessageConsumer implements MessageConsumer { 040 private final CamelDestination destination; 041 private final Endpoint endpoint; 042 private final ActiveMQSession session; 043 private final String messageSelector; 044 private final boolean noLocal; 045 private MessageListener messageListener; 046 private Consumer consumer; 047 private PollingConsumer pollingConsumer; 048 private boolean closed; 049 050 public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) { 051 this.destination = destination; 052 this.endpoint = endpoint; 053 this.session = session; 054 this.messageSelector = messageSelector; 055 this.noLocal = noLocal; 056 } 057 058 public void close() throws JMSException { 059 if (!closed) { 060 closed = true; 061 try { 062 if (consumer != null) { 063 consumer.stop(); 064 } 065 if (pollingConsumer != null) { 066 pollingConsumer.stop(); 067 } 068 } catch (JMSException e) { 069 throw e; 070 } catch (Exception e) { 071 throw JMSExceptionSupport.create(e); 072 } 073 } 074 } 075 076 public MessageListener getMessageListener() throws JMSException { 077 return messageListener; 078 } 079 080 public void setMessageListener(MessageListener messageListener) throws JMSException { 081 this.messageListener = messageListener; 082 if (messageListener != null && consumer == null) { 083 consumer = createConsumer(); 084 } 085 } 086 087 public Message receive() throws JMSException { 088 Exchange exchange = getPollingConsumer().receive(); 089 return createMessage(exchange); 090 } 091 092 public Message receive(long timeoutMillis) throws JMSException { 093 Exchange exchange = getPollingConsumer().receive(timeoutMillis); 094 return createMessage(exchange); 095 } 096 097 public Message receiveNoWait() throws JMSException { 098 Exchange exchange = getPollingConsumer().receiveNoWait(); 099 return createMessage(exchange); 100 } 101 102 // Properties 103 // ----------------------------------------------------------------------- 104 105 public CamelDestination getDestination() { 106 return destination; 107 } 108 109 public Endpoint getEndpoint() { 110 return endpoint; 111 } 112 113 public String getMessageSelector() { 114 return messageSelector; 115 } 116 117 public boolean isNoLocal() { 118 return noLocal; 119 } 120 121 public ActiveMQSession getSession() { 122 return session; 123 } 124 125 // Implementation methods 126 // ----------------------------------------------------------------------- 127 128 protected PollingConsumer getPollingConsumer() throws JMSException { 129 try { 130 if (pollingConsumer == null) { 131 pollingConsumer = endpoint.createPollingConsumer(); 132 pollingConsumer.start(); 133 } 134 return pollingConsumer; 135 } catch (JMSException e) { 136 throw e; 137 } catch (Exception e) { 138 throw JMSExceptionSupport.create(e); 139 } 140 } 141 142 protected Message createMessage(Exchange exchange) throws JMSException { 143 if (exchange != null) { 144 Message message = destination.getBinding().makeJmsMessage(exchange, session); 145 return message; 146 } else { 147 return null; 148 } 149 } 150 151 protected Consumer createConsumer() throws JMSException { 152 try { 153 Consumer answer = endpoint.createConsumer(new Processor() { 154 public void process(Exchange exchange) throws Exception { 155 Message message = createMessage(exchange); 156 getMessageListener().onMessage(message); 157 } 158 }); 159 answer.start(); 160 return answer; 161 } catch (JMSException e) { 162 throw e; 163 } catch (Exception e) { 164 throw JMSExceptionSupport.create(e); 165 } 166 } 167 168 protected void checkClosed() throws javax.jms.IllegalStateException { 169 if (closed) { 170 throw new IllegalStateException("The producer is closed"); 171 } 172 } 173 }