001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.activemq.camel.converter; 019 020 import java.io.Serializable; 021 022 import javax.jms.JMSException; 023 import javax.jms.MessageListener; 024 import javax.jms.Message; 025 026 import org.apache.activemq.command.ActiveMQMessage; 027 import org.apache.activemq.command.ActiveMQObjectMessage; 028 import org.apache.activemq.command.ActiveMQTextMessage; 029 import org.apache.camel.Converter; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Processor; 032 import org.apache.camel.component.jms.JmsBinding; 033 import org.apache.camel.component.jms.JmsEndpoint; 034 035 /** 036 * @version $Revision: 758789 $ 037 */ 038 @Converter 039 public class ActiveMQMessageConverter { 040 private JmsBinding binding = new JmsBinding(); 041 042 /** 043 * Converts the inbound message exchange to an ActiveMQ JMS message 044 * 045 * @return the ActiveMQ message 046 */ 047 @Converter 048 public ActiveMQMessage toMessage(Exchange exchange) throws JMSException { 049 ActiveMQMessage message = createActiveMQMessage(exchange); 050 getBinding().appendJmsProperties(message, exchange); 051 return message; 052 } 053 054 /** 055 * Allows a JMS {@link MessageListener} to be converted to a Camel {@link Processor} 056 * so that we can provide better 057 * <a href="">Bean Integration</a> so that we can use any JMS MessageListener in 058 * in Camel as a bean 059 * @param listener the JMS message listener 060 * @return a newly created Camel Processor which when invoked will invoke 061 * {@link MessageListener#onMessage(Message)} 062 */ 063 @Converter 064 public Processor toProcessor(final MessageListener listener) { 065 return new Processor() { 066 public void process(Exchange exchange) throws Exception { 067 Message message = toMessage(exchange); 068 listener.onMessage(message); 069 } 070 071 @Override 072 public String toString() { 073 return "Processor of MessageListener: " + listener; 074 } 075 }; 076 } 077 078 private static ActiveMQMessage createActiveMQMessage(Exchange exchange) throws JMSException { 079 Object body = exchange.getIn().getBody(); 080 if (body instanceof String) { 081 ActiveMQTextMessage answer = new ActiveMQTextMessage(); 082 answer.setText((String) body); 083 return answer; 084 } else if (body instanceof Serializable) { 085 ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); 086 answer.setObject((Serializable) body); 087 return answer; 088 } else { 089 return new ActiveMQMessage(); 090 } 091 092 } 093 094 // Properties 095 //------------------------------------------------------------------------- 096 public JmsBinding getBinding() { 097 return binding; 098 } 099 100 public void setBinding(JmsBinding binding) { 101 this.binding = binding; 102 } 103 }