|
|||||||||||||||||||
Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
JavaGroupsBroadcastingListener.java | 0% | 0% | 0% | 0% |
|
1 |
/*
|
|
2 |
* Copyright (c) 2002-2003 by OpenSymphony
|
|
3 |
* All rights reserved.
|
|
4 |
*/
|
|
5 |
package com.opensymphony.oscache.plugins.clustersupport;
|
|
6 |
|
|
7 |
import com.opensymphony.oscache.base.Cache;
|
|
8 |
import com.opensymphony.oscache.base.Config;
|
|
9 |
import com.opensymphony.oscache.base.FinalizationException;
|
|
10 |
import com.opensymphony.oscache.base.InitializationException;
|
|
11 |
|
|
12 |
import org.apache.commons.logging.Log;
|
|
13 |
import org.apache.commons.logging.LogFactory;
|
|
14 |
|
|
15 |
import org.jgroups.Address;
|
|
16 |
import org.jgroups.Channel;
|
|
17 |
|
|
18 |
import org.jgroups.blocks.NotificationBus;
|
|
19 |
|
|
20 |
import java.io.Serializable;
|
|
21 |
|
|
22 |
/**
|
|
23 |
* <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
|
|
24 |
* the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
|
|
25 |
* messages across a cluster.</p>
|
|
26 |
*
|
|
27 |
* <p>One of the following properties should be configured in <code>oscache.properties</code> for
|
|
28 |
* this listener:
|
|
29 |
* <ul>
|
|
30 |
* <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
|
|
31 |
* <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
|
|
32 |
* control over the behaviour of JavaGroups</li>
|
|
33 |
* </ul>
|
|
34 |
* Please refer to the clustering documentation for further details on the configuration of this listener.</p>
|
|
35 |
*
|
|
36 |
* @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
|
|
37 |
*/
|
|
38 |
public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer { |
|
39 |
private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class); |
|
40 |
private static final String BUS_NAME = "OSCacheBus"; |
|
41 |
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties"; |
|
42 |
private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip"; |
|
43 |
|
|
44 |
/**
|
|
45 |
* The first half of the default channel properties. They default channel properties are:
|
|
46 |
* <pre>
|
|
47 |
* UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
|
|
48 |
* mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
|
|
49 |
* PING(timeout=2000;num_initial_members=3):\
|
|
50 |
* MERGE2(min_interval=5000;max_interval=10000):\
|
|
51 |
* FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
|
|
52 |
* pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
|
|
53 |
* UNICAST(timeout=300,600,1200,2400):\
|
|
54 |
* pbcast.STABLE(desired_avg_gossip=20000):\
|
|
55 |
* FRAG(frag_size=8096;down_thread=false;up_thread=false):\
|
|
56 |
* pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
|
|
57 |
* </pre>
|
|
58 |
*
|
|
59 |
* Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
|
|
60 |
*/
|
|
61 |
private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr="; |
|
62 |
|
|
63 |
/**
|
|
64 |
* The second half of the default channel properties. They default channel properties are:
|
|
65 |
* <pre>
|
|
66 |
* UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
|
|
67 |
* mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
|
|
68 |
* PING(timeout=2000;num_initial_members=3):\
|
|
69 |
* MERGE2(min_interval=5000;max_interval=10000):\
|
|
70 |
* FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
|
|
71 |
* pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
|
|
72 |
* UNICAST(timeout=300,600,1200,2400):\
|
|
73 |
* pbcast.STABLE(desired_avg_gossip=20000):\
|
|
74 |
* FRAG(frag_size=8096;down_thread=false;up_thread=false):\
|
|
75 |
* pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
|
|
76 |
* </pre>
|
|
77 |
*
|
|
78 |
* Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
|
|
79 |
*/
|
|
80 |
private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)"; |
|
81 |
private static final String DEFAULT_MULTICAST_IP = "231.12.21.132"; |
|
82 |
private NotificationBus bus;
|
|
83 |
|
|
84 |
/**
|
|
85 |
* Initializes the broadcasting listener by starting up a JavaGroups notification
|
|
86 |
* bus instance to handle incoming and outgoing messages.
|
|
87 |
*
|
|
88 |
* @param config An OSCache configuration object.
|
|
89 |
* @throws com.opensymphony.oscache.base.InitializationException If this listener has
|
|
90 |
* already been initialized.
|
|
91 |
*/
|
|
92 | 0 |
public synchronized void initialize(Cache cache, Config config) throws InitializationException { |
93 | 0 |
super.initialize(cache, config);
|
94 |
|
|
95 | 0 |
String properties = config.getProperty(CHANNEL_PROPERTIES); |
96 | 0 |
String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY); |
97 |
|
|
98 | 0 |
if ((properties == null) && (multicastIP == null)) { |
99 | 0 |
multicastIP = DEFAULT_MULTICAST_IP; |
100 |
} |
|
101 |
|
|
102 | 0 |
if (properties == null) { |
103 | 0 |
properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST; |
104 |
} else {
|
|
105 | 0 |
properties = properties.trim(); |
106 |
} |
|
107 |
|
|
108 | 0 |
if (log.isInfoEnabled()) {
|
109 | 0 |
log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
|
110 |
} |
|
111 |
|
|
112 | 0 |
try {
|
113 | 0 |
bus = new NotificationBus(BUS_NAME, properties);
|
114 | 0 |
bus.start(); |
115 | 0 |
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false)); |
116 | 0 |
bus.setConsumer(this);
|
117 | 0 |
log.info("JavaGroups clustering support started successfully");
|
118 |
} catch (Exception e) {
|
|
119 | 0 |
throw new InitializationException("Initialization failed: " + e); |
120 |
} |
|
121 |
} |
|
122 |
|
|
123 |
/**
|
|
124 |
* Shuts down the JavaGroups being managed by this listener. This
|
|
125 |
* occurs once the cache is shut down and this listener is no longer
|
|
126 |
* in use.
|
|
127 |
*
|
|
128 |
* @throws com.opensymphony.oscache.base.FinalizationException
|
|
129 |
*/
|
|
130 | 0 |
public synchronized void finialize() throws FinalizationException { |
131 | 0 |
if (log.isInfoEnabled()) {
|
132 | 0 |
log.info("JavaGroups shutting down...");
|
133 |
} |
|
134 |
|
|
135 | 0 |
bus.stop(); |
136 | 0 |
bus = null;
|
137 |
|
|
138 | 0 |
if (log.isInfoEnabled()) {
|
139 | 0 |
log.info("JavaGroups shutdown complete.");
|
140 |
} |
|
141 |
} |
|
142 |
|
|
143 |
/**
|
|
144 |
* Uses JavaGroups to broadcast the supplied notification message across the cluster.
|
|
145 |
*
|
|
146 |
* @param message The cluster nofication message to broadcast.
|
|
147 |
*/
|
|
148 | 0 |
protected void sendNotification(ClusterNotification message) { |
149 | 0 |
bus.sendNotification(message); |
150 |
} |
|
151 |
|
|
152 |
/**
|
|
153 |
* Handles incoming notification messages from JavaGroups. This method should
|
|
154 |
* never be called directly.
|
|
155 |
*
|
|
156 |
* @param serializable The incoming message object. This must be a {@link ClusterNotification}.
|
|
157 |
*/
|
|
158 | 0 |
public void handleNotification(Serializable serializable) { |
159 | 0 |
if (!(serializable instanceof ClusterNotification)) { |
160 | 0 |
log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored."); |
161 |
|
|
162 | 0 |
return;
|
163 |
} |
|
164 |
|
|
165 | 0 |
handleClusterNotification((ClusterNotification) serializable); |
166 |
} |
|
167 |
|
|
168 |
/**
|
|
169 |
* We are not using the caching, so we just return something that identifies
|
|
170 |
* us. This method should never be called directly.
|
|
171 |
*/
|
|
172 | 0 |
public Serializable getCache() {
|
173 | 0 |
return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress(); |
174 |
} |
|
175 |
|
|
176 |
/**
|
|
177 |
* A callback that is fired when a new member joins the cluster. This
|
|
178 |
* method should never be called directly.
|
|
179 |
*
|
|
180 |
* @param address The address of the member who just joined.
|
|
181 |
*/
|
|
182 | 0 |
public void memberJoined(Address address) { |
183 | 0 |
if (log.isInfoEnabled()) {
|
184 | 0 |
log.info("A new member at address '" + address + "' has joined the cluster"); |
185 |
} |
|
186 |
} |
|
187 |
|
|
188 |
/**
|
|
189 |
* A callback that is fired when an existing member leaves the cluster.
|
|
190 |
* This method should never be called directly.
|
|
191 |
*
|
|
192 |
* @param address The address of the member who left.
|
|
193 |
*/
|
|
194 | 0 |
public void memberLeft(Address address) { |
195 | 0 |
if (log.isInfoEnabled()) {
|
196 | 0 |
log.info("Member at address '" + address + "' left the cluster"); |
197 |
} |
|
198 |
} |
|
199 |
} |
|
200 |
|
|