001 /** 002 * 003 * Copyright 2005 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.advisories; 019 020 import javax.jms.Connection; 021 import javax.jms.Destination; 022 import javax.jms.JMSException; 023 import javax.jms.Message; 024 import javax.jms.MessageConsumer; 025 import javax.jms.MessageListener; 026 import javax.jms.ObjectMessage; 027 import javax.jms.Session; 028 029 import org.activemq.message.ActiveMQDestination; 030 import org.activemq.message.ConsumerInfo; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 035 036 /** 037 * A ProducerDemandAdvisor is used to know when a destination is in demand. 038 * 039 * Sometimes generating messages to send to a destination is very expensive 040 * and the application would like to avoid producing messages if there are no 041 * active consumers for the destination. There is a "demand" for messages 042 * when a consumer does come active. 043 * 044 * This object uses Advisory messages to know when consumer go active and 045 * inactive. 046 */ 047 public class ProducerDemandAdvisor { 048 049 private static final Log log = LogFactory.getLog(ProducerDemandAdvisor.class); 050 051 private final ActiveMQDestination destination; 052 private Connection connection; 053 private Session session; 054 private SynchronizedBoolean started = new SynchronizedBoolean(false); 055 private int consumerCount; 056 private ProducerDemandListener demandListener; 057 058 public ProducerDemandAdvisor( Connection connection, final Destination destination ) throws JMSException { 059 this.connection = connection; 060 this.destination = ActiveMQDestination.transformDestination(destination); 061 } 062 063 /** 064 * @param destination 065 */ 066 private void fireDemandEvent() { 067 demandListener.onEvent( new ProducerDemandEvent(destination, isInDemand())); 068 } 069 070 public boolean isInDemand() { 071 return consumerCount>0; 072 } 073 074 public ProducerDemandListener getDemandListener() { 075 return demandListener; 076 } 077 078 synchronized public void setDemandListener(ProducerDemandListener demandListener) { 079 this.demandListener = demandListener; 080 fireDemandEvent(); 081 } 082 083 public void start() throws JMSException { 084 if (started.commit(false, true)) { 085 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 086 MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory()); 087 consumer.setMessageListener(new MessageListener(){ 088 public void onMessage(Message msg) { 089 process(msg); 090 } 091 }); 092 } 093 } 094 095 public void stop() throws JMSException { 096 if (started.commit(true, false)) { 097 if (session != null) { 098 session.close(); 099 } 100 } 101 } 102 103 protected void process(Message msg) { 104 if (msg instanceof ObjectMessage) { 105 try { 106 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject(); 107 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info); 108 109 110 boolean inDemand = isInDemand(); 111 112 if ( info.isStarted() ) { 113 consumerCount++; 114 } else { 115 consumerCount--; 116 } 117 118 // Notify listener if there was a change in demand. 119 if (inDemand ^ isInDemand() && demandListener != null) { 120 fireDemandEvent(); 121 } 122 } catch (JMSException e) { 123 log.error("Failed to process message: " + msg); 124 } 125 } 126 } 127 128 }