001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.web;
019    
020    import java.io.Externalizable;
021    import java.io.IOException;
022    import java.io.ObjectInput;
023    import java.io.ObjectOutput;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.concurrent.Semaphore;
030    
031    import javax.jms.Connection;
032    import javax.jms.ConnectionFactory;
033    import javax.jms.DeliveryMode;
034    import javax.jms.Destination;
035    import javax.jms.JMSException;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageProducer;
039    import javax.jms.Session;
040    import javax.servlet.ServletContext;
041    import javax.servlet.http.HttpServletRequest;
042    import javax.servlet.http.HttpSession;
043    import javax.servlet.http.HttpSessionActivationListener;
044    import javax.servlet.http.HttpSessionBindingEvent;
045    import javax.servlet.http.HttpSessionBindingListener;
046    import javax.servlet.http.HttpSessionEvent;
047    
048    import org.apache.activemq.ActiveMQConnectionFactory;
049    import org.apache.activemq.MessageAvailableConsumer;
050    import org.apache.activemq.camel.component.ActiveMQComponent;
051    import org.apache.activemq.camel.component.ActiveMQConfiguration;
052    import org.apache.activemq.pool.PooledConnectionFactory;
053    import org.apache.camel.CamelContext;
054    import org.apache.camel.ProducerTemplate;
055    import org.apache.camel.impl.DefaultCamelContext;
056    import org.apache.commons.logging.Log;
057    import org.apache.commons.logging.LogFactory;
058    
059    /**
060     * Represents a messaging client used from inside a web container typically
061     * stored inside a HttpSession TODO controls to prevent DOS attacks with users
062     * requesting many consumers TODO configure consumers with small prefetch.
063     * 
064     * @version $Revision: 1.1.1.1 $
065     */
066    public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
067    
068        public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient";
069        public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory";
070        public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch";
071        public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck";
072        public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL";
073    
074        private static final Log LOG = LogFactory.getLog(WebClient.class);
075    
076        private static transient ConnectionFactory factory;
077    
078        private transient Map<Destination, MessageConsumer> consumers = new HashMap<Destination, MessageConsumer>();
079        private transient Connection connection;
080        private transient Session session;
081        private transient MessageProducer producer;
082        private int deliveryMode = DeliveryMode.NON_PERSISTENT;
083    
084        private final Semaphore semaphore = new Semaphore(1);
085    
086        private CamelContext camelContext;
087        private ProducerTemplate producerTemplate;
088    
089        public WebClient() {
090            if (factory == null) {
091                throw new IllegalStateException("initContext(ServletContext) not called");
092            }
093        }
094    
095        /**
096         * Helper method to get the client for the current session, lazily creating
097         * a client if there is none currently
098         * 
099         * @param request is the current HTTP request
100         * @return the current client or a newly creates
101         */
102        public static WebClient getWebClient(HttpServletRequest request) {
103            HttpSession session = request.getSession(true);
104            WebClient client = getWebClient(session);
105            if (client == null || client.isClosed()) {
106                client = WebClient.createWebClient(request);
107                session.setAttribute(WEB_CLIENT_ATTRIBUTE, client);
108            }
109    
110            return client;
111        }
112    
113        /**
114         * @return the web client for the current HTTP session or null if there is
115         *         not a web client created yet
116         */
117        public static WebClient getWebClient(HttpSession session) {
118            return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE);
119        }
120    
121        public static void initContext(ServletContext context) {
122            initConnectionFactory(context);
123            context.setAttribute("webClients", new HashMap<String, WebClient>());
124        }
125    
126        public int getDeliveryMode() {
127            return deliveryMode;
128        }
129    
130        public void setDeliveryMode(int deliveryMode) {
131            this.deliveryMode = deliveryMode;
132        }
133    
134        public synchronized void closeConsumers() {
135            for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) {
136                MessageConsumer consumer = it.next();
137                it.remove();
138                try {
139                    consumer.setMessageListener(null);
140                    if (consumer instanceof MessageAvailableConsumer) {
141                        ((MessageAvailableConsumer)consumer).setAvailableListener(null);
142                    }
143                    consumer.close();
144                } catch (JMSException e) {
145                    LOG.debug("caught exception closing consumer", e);
146                }
147            }
148        }
149    
150        public synchronized void close() {
151            try {
152                closeConsumers();
153                if (connection != null) {
154                    connection.close();
155                }
156                if (producerTemplate != null) {
157                    producerTemplate.stop();
158                }
159            } catch (Exception e) {
160                LOG.debug("caught exception closing consumer", e);
161            } finally {
162                producer = null;
163                session = null;
164                connection = null;
165                producerTemplate = null;
166                if (consumers != null) {
167                    consumers.clear();
168                }
169                consumers = null;
170            }
171        }
172    
173        public boolean isClosed() {
174            return consumers == null;
175        }
176    
177        public void writeExternal(ObjectOutput out) throws IOException {
178            if (consumers != null) {
179                out.write(consumers.size());
180                Iterator<Destination> i = consumers.keySet().iterator();
181                while (i.hasNext()) {
182                    out.writeObject(i.next().toString());
183                }
184            } else {
185                out.write(-1);
186            }
187    
188        }
189    
190        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
191            int size = in.readInt();
192            if (size >= 0) {
193                consumers = new HashMap<Destination, MessageConsumer>();
194                for (int i = 0; i < size; i++) {
195                    String destinationName = in.readObject().toString();
196    
197                    try {
198                        Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName);
199                        consumers.put(destination, getConsumer(destination, true));
200                    } catch (JMSException e) {
201                        LOG.debug("Caought Exception ", e);
202                        IOException ex = new IOException(e.getMessage());
203                        ex.initCause(e.getCause() != null ? e.getCause() : e);
204                        throw ex;
205    
206                    }
207                }
208            }
209        }
210    
211        public void send(Destination destination, Message message) throws JMSException {
212            getProducer().send(destination, message);
213            if (LOG.isDebugEnabled()) {
214                LOG.debug("Sent! to destination: " + destination + " message: " + message);
215            }
216        }
217    
218        public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException {
219            int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
220            getProducer().send(destination, message, deliveryMode, priority, timeToLive);
221            if (LOG.isDebugEnabled()) {
222                LOG.debug("Sent! to destination: " + destination + " message: " + message);
223            }
224        }
225    
226        public Session getSession() throws JMSException {
227            if (session == null) {
228                session = createSession();
229            }
230            return session;
231        }
232    
233        public Connection getConnection() throws JMSException {
234            if (connection == null) {
235                connection = factory.createConnection();
236                connection.start();
237            }
238            return connection;
239        }
240    
241        protected static synchronized void initConnectionFactory(ServletContext servletContext) {
242            if (factory == null) {
243                factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE);
244            }
245            if (factory == null) {
246                String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM);
247    
248                LOG.debug("Value of: " + BROKER_URL_INIT_PARAM + " is: " + brokerURL);
249    
250                if (brokerURL == null) {
251                    throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param");
252                }
253    
254                ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
255    
256                // Set prefetch policy for factory
257                if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) {
258                    int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue();
259                    amqfactory.getPrefetchPolicy().setAll(prefetch);
260                }
261    
262                // Set optimize acknowledge setting
263                if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) {
264                    boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue();
265                    amqfactory.setOptimizeAcknowledge(optimizeAck);
266                }
267    
268                factory = amqfactory;
269    
270                servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
271            }
272        }
273        
274        public synchronized CamelContext getCamelContext() {
275            if (camelContext == null) {
276                    LOG.debug("Creating camel context");
277                    camelContext = new DefaultCamelContext();
278                    ActiveMQConfiguration conf = new ActiveMQConfiguration();
279                    conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory));
280                    ActiveMQComponent component = new ActiveMQComponent(conf);
281                    camelContext.addComponent("activemq", component);
282            }
283            return camelContext;
284        }
285        
286        public synchronized ProducerTemplate getProducerTemplate() throws Exception {
287            if (producerTemplate == null) {
288                    LOG.debug("Creating producer template");
289                    producerTemplate = getCamelContext().createProducerTemplate();
290                    producerTemplate.start();
291            }
292            return producerTemplate;
293        }
294    
295        public synchronized MessageProducer getProducer() throws JMSException {
296            if (producer == null) {
297                producer = getSession().createProducer(null);
298                producer.setDeliveryMode(deliveryMode);
299            }
300            return producer;
301        }
302    
303        public void setProducer(MessageProducer producer) {
304            this.producer = producer;
305        }
306    
307        public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
308            return getConsumer(destination, true);
309        }
310    
311        public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException {
312            MessageConsumer consumer = consumers.get(destination);
313            if (create && consumer == null) {
314                consumer = getSession().createConsumer(destination);
315                consumers.put(destination, consumer);
316            }
317            return consumer;
318        }
319    
320        public synchronized void closeConsumer(Destination destination) throws JMSException {
321            MessageConsumer consumer = consumers.get(destination);
322            if (consumer != null) {
323                consumers.remove(destination);
324                consumer.setMessageListener(null);
325                if (consumer instanceof MessageAvailableConsumer) {
326                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
327                }
328                consumer.close();
329            }
330        }
331    
332        public synchronized List<MessageConsumer> getConsumers() {
333            return new ArrayList<MessageConsumer>(consumers.values());
334        }
335    
336        protected Session createSession() throws JMSException {
337            return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
338        }
339    
340        public Semaphore getSemaphore() {
341            return semaphore;
342        }
343    
344        public void sessionWillPassivate(HttpSessionEvent event) {
345            close();
346        }
347    
348        public void sessionDidActivate(HttpSessionEvent event) {
349        }
350    
351        public void valueBound(HttpSessionBindingEvent event) {
352        }
353    
354        public void valueUnbound(HttpSessionBindingEvent event) {
355            close();
356        }
357    
358        protected static WebClient createWebClient(HttpServletRequest request) {
359            return new WebClient();
360        }
361    
362    }