Clover coverage report -
Coverage timestamp: Sat Apr 30 2005 21:58:28 PDT
file stats: LOC: 200   Methods: 7
NCLOC: 79   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
JavaGroupsBroadcastingListener.java 0% 0% 0% 0%
coverage
 1   
 /*
 2   
  * Copyright (c) 2002-2003 by OpenSymphony
 3   
  * All rights reserved.
 4   
  */
 5   
 package com.opensymphony.oscache.plugins.clustersupport;
 6   
 
 7   
 import com.opensymphony.oscache.base.Cache;
 8   
 import com.opensymphony.oscache.base.Config;
 9   
 import com.opensymphony.oscache.base.FinalizationException;
 10   
 import com.opensymphony.oscache.base.InitializationException;
 11   
 
 12   
 import org.apache.commons.logging.Log;
 13   
 import org.apache.commons.logging.LogFactory;
 14   
 
 15   
 import org.jgroups.Address;
 16   
 import org.jgroups.Channel;
 17   
 
 18   
 import org.jgroups.blocks.NotificationBus;
 19   
 
 20   
 import java.io.Serializable;
 21   
 
 22   
 /**
 23   
  * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
 24   
  * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
 25   
  * messages across a cluster.</p>
 26   
  *
 27   
  * <p>One of the following properties should be configured in <code>oscache.properties</code> for
 28   
  * this listener:
 29   
  * <ul>
 30   
  * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
 31   
  * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
 32   
  * control over the behaviour of JavaGroups</li>
 33   
  * </ul>
 34   
  * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
 35   
  *
 36   
  * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
 37   
  */
 38   
 public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
 39   
     private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
 40   
     private static final String BUS_NAME = "OSCacheBus";
 41   
     private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
 42   
     private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
 43   
 
 44   
     /**
 45   
     * The first half of the default channel properties. They default channel properties are:
 46   
     * <pre>
 47   
     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
 48   
     * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
 49   
     * PING(timeout=2000;num_initial_members=3):\
 50   
     * MERGE2(min_interval=5000;max_interval=10000):\
 51   
     * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
 52   
     * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
 53   
     * UNICAST(timeout=300,600,1200,2400):\
 54   
     * pbcast.STABLE(desired_avg_gossip=20000):\
 55   
     * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
 56   
     * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
 57   
     * </pre>
 58   
     *
 59   
     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
 60   
     */
 61   
     private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
 62   
 
 63   
     /**
 64   
     * The second half of the default channel properties. They default channel properties are:
 65   
     * <pre>
 66   
     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
 67   
     * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
 68   
     * PING(timeout=2000;num_initial_members=3):\
 69   
     * MERGE2(min_interval=5000;max_interval=10000):\
 70   
     * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
 71   
     * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
 72   
     * UNICAST(timeout=300,600,1200,2400):\
 73   
     * pbcast.STABLE(desired_avg_gossip=20000):\
 74   
     * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
 75   
     * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
 76   
     * </pre>
 77   
     *
 78   
     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
 79   
     */
 80   
     private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
 81   
     private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
 82   
     private NotificationBus bus;
 83   
 
 84   
     /**
 85   
     * Initializes the broadcasting listener by starting up a JavaGroups notification
 86   
     * bus instance to handle incoming and outgoing messages.
 87   
     *
 88   
     * @param config An OSCache configuration object.
 89   
     * @throws com.opensymphony.oscache.base.InitializationException If this listener has
 90   
     * already been initialized.
 91   
     */
 92  0
     public synchronized void initialize(Cache cache, Config config) throws InitializationException {
 93  0
         super.initialize(cache, config);
 94   
 
 95  0
         String properties = config.getProperty(CHANNEL_PROPERTIES);
 96  0
         String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
 97   
 
 98  0
         if ((properties == null) && (multicastIP == null)) {
 99  0
             multicastIP = DEFAULT_MULTICAST_IP;
 100   
         }
 101   
 
 102  0
         if (properties == null) {
 103  0
             properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
 104   
         } else {
 105  0
             properties = properties.trim();
 106   
         }
 107   
 
 108  0
         if (log.isInfoEnabled()) {
 109  0
             log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
 110   
         }
 111   
 
 112  0
         try {
 113  0
             bus = new NotificationBus(BUS_NAME, properties);
 114  0
             bus.start();
 115  0
             bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
 116  0
             bus.setConsumer(this);
 117  0
             log.info("JavaGroups clustering support started successfully");
 118   
         } catch (Exception e) {
 119  0
             throw new InitializationException("Initialization failed: " + e);
 120   
         }
 121   
     }
 122   
 
 123   
     /**
 124   
     * Shuts down the JavaGroups being managed by this listener. This
 125   
     * occurs once the cache is shut down and this listener is no longer
 126   
     * in use.
 127   
     *
 128   
     * @throws com.opensymphony.oscache.base.FinalizationException
 129   
     */
 130  0
     public synchronized void finialize() throws FinalizationException {
 131  0
         if (log.isInfoEnabled()) {
 132  0
             log.info("JavaGroups shutting down...");
 133   
         }
 134   
 
 135  0
         bus.stop();
 136  0
         bus = null;
 137   
 
 138  0
         if (log.isInfoEnabled()) {
 139  0
             log.info("JavaGroups shutdown complete.");
 140   
         }
 141   
     }
 142   
 
 143   
     /**
 144   
     * Uses JavaGroups to broadcast the supplied notification message across the cluster.
 145   
     *
 146   
     * @param message The cluster nofication message to broadcast.
 147   
     */
 148  0
     protected void sendNotification(ClusterNotification message) {
 149  0
         bus.sendNotification(message);
 150   
     }
 151   
 
 152   
     /**
 153   
     * Handles incoming notification messages from JavaGroups. This method should
 154   
     * never be called directly.
 155   
     *
 156   
     * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
 157   
     */
 158  0
     public void handleNotification(Serializable serializable) {
 159  0
         if (!(serializable instanceof ClusterNotification)) {
 160  0
             log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
 161   
 
 162  0
             return;
 163   
         }
 164   
 
 165  0
         handleClusterNotification((ClusterNotification) serializable);
 166   
     }
 167   
 
 168   
     /**
 169   
     * We are not using the caching, so we just return something that identifies
 170   
     * us. This method should never be called directly.
 171   
     */
 172  0
     public Serializable getCache() {
 173  0
         return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
 174   
     }
 175   
 
 176   
     /**
 177   
     * A callback that is fired when a new member joins the cluster. This
 178   
     * method should never be called directly.
 179   
     *
 180   
     * @param address The address of the member who just joined.
 181   
     */
 182  0
     public void memberJoined(Address address) {
 183  0
         if (log.isInfoEnabled()) {
 184  0
             log.info("A new member at address '" + address + "' has joined the cluster");
 185   
         }
 186   
     }
 187   
 
 188   
     /**
 189   
     * A callback that is fired when an existing member leaves the cluster.
 190   
     * This method should never be called directly.
 191   
     *
 192   
     * @param address The address of the member who left.
 193   
     */
 194  0
     public void memberLeft(Address address) {
 195  0
         if (log.isInfoEnabled()) {
 196  0
             log.info("Member at address '" + address + "' left the cluster");
 197   
         }
 198   
     }
 199   
 }
 200