Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Twisted-like flavors 
 24   
 25  Inspired by L{twisted.spread.flavors} 
 26  """ 
 27   
 28  from twisted.internet import defer 
 29  from twisted.spread import pb 
 30  from zope.interface import Interface 
 31  from flumotion.common import log 
 32   
 33   
 34  ### Generice Cacheable/RemoteCache for state objects 
35 -class IStateListener(Interface):
36 """ 37 I am an interface for objects that want to listen to changes on 38 cached states. 39 """
40 - def stateSet(self, object, key, value):
41 """ 42 @type object: L{StateRemoteCache} 43 @param object: the state object having changed 44 @type key: string 45 @param key: the key being set 46 @param value: the value the key is being set to 47 48 The given key on the given object has been set to the given value. 49 """
50
51 - def stateAppend(self, object, key, value):
52 """ 53 @type object: L{StateRemoteCache} 54 @param object: the state object having changed 55 @type key: string 56 @param key: the key being appended to 57 @param value: the value being appended to the list given by key 58 59 The given value has been added to the list given by the key. 60 """
61
62 - def stateRemove(self, object, key, value):
63 """ 64 @type object: L{StateRemoteCache} 65 @param object: the state object having changed 66 @type key: string 67 @param key: the key being removed from 68 @param value: the value being removed from the list given by key 69 70 The given value has been removed from the list given by the key. 71 """
72
73 -class StateCacheable(pb.Cacheable):
74 """ 75 I am a cacheable state object. 76 77 I cache key-value pairs, where values can be either single objects 78 or list of objects. 79 """
80 - def __init__(self):
81 self._observers = [] 82 self._dict = {}
83 84 # our methods
85 - def addKey(self, key, value=None):
86 """ 87 Add a key to the state cache so it can be used with set. 88 """ 89 self._dict[key] = value
90 91 # don't use [] as the default value, it creates only one reference and 92 # reuses it
93 - def addListKey(self, key, value=None):
94 """ 95 Add a key for a list of objects to the state cache. 96 """ 97 if value is None: 98 value = [] 99 self._dict[key] = value
100 101 # don't use {} as the default value, it creates only one reference and 102 # reuses it
103 - def addDictKey(self, key, value=None):
104 """ 105 Add a key for a dict value to the state cache. 106 """ 107 if value is None: 108 value = {} 109 self._dict[key] = value
110
111 - def hasKey(self, key):
112 return key in self._dict.keys()
113
114 - def keys(self):
115 return self._dict.keys()
116
117 - def get(self, key, otherwise=None):
118 """ 119 Get the state cache value for the given key. 120 121 Return otherwise in case where key is present but value None. 122 """ 123 if not key in self._dict.keys(): 124 raise KeyError('%s in %r' % (key, self)) 125 126 v = self._dict[key] 127 # not v would also trigger empty lists 128 if v == None: 129 return otherwise 130 131 return v
132
133 - def set(self, key, value):
134 """ 135 Set a given state key to the given value. 136 Notifies observers of this Cacheable through observe_set. 137 """ 138 if not key in self._dict.keys(): 139 raise KeyError('%s in %r' % (key, self)) 140 141 self._dict[key] = value 142 list = [o.callRemote('set', key, value) for o in self._observers] 143 return defer.DeferredList(list)
144
145 - def append(self, key, value):
146 """ 147 Append the given object to the given list. 148 Notifies observers of this Cacheable through observe_append. 149 """ 150 if not key in self._dict.keys(): 151 raise KeyError('%s in %r' % (key, self)) 152 153 self._dict[key].append(value) 154 list = [o.callRemote('append', key, value) for o in self._observers] 155 return defer.DeferredList(list)
156
157 - def remove(self, key, value):
158 """ 159 Remove the given object from the given list. 160 Notifies observers of this Cacheable through observe_remove. 161 """ 162 if not key in self._dict.keys(): 163 raise KeyError('%s in %r' % (key, self)) 164 165 try: 166 self._dict[key].remove(value) 167 except ValueError: 168 raise ValueError('value %r not in list %r for key %r' % ( 169 value, self._dict[key], key)) 170 list = [o.callRemote('remove', key, value) for o in self._observers] 171 dl = defer.DeferredList(list) 172 return dl
173
174 - def setitem(self, key, subkey, value):
175 """ 176 Set a value in the given dict. 177 Notifies observers of this Cacheable through observe_setitem. 178 """ 179 if not key in self._dict.keys(): 180 raise KeyError('%s in %r' % (key, self)) 181 182 self._dict[key][subkey] = value 183 list = [o.callRemote('setitem', key, subkey, value) 184 for o in self._observers] 185 return defer.DeferredList(list)
186
187 - def delitem(self, key, subkey):
188 """ 189 Removes an element from the given dict. Note that the key refers 190 to the dict; it is the subkey (and its value) that will be removed. 191 Notifies observers of this Cacheable through observe_delitem. 192 """ 193 if not key in self._dict.keys(): 194 raise KeyError('%s in %r' % (key, self)) 195 196 try: 197 value = self._dict[key].pop(subkey) 198 except KeyError: 199 raise KeyError('key %r not in dict %r for key %r' % ( 200 subkey, self._dict[key], key)) 201 list = [o.callRemote('delitem', key, subkey, value) for o in 202 self._observers] 203 dl = defer.DeferredList(list) 204 return dl
205 206 # pb.Cacheable methods
207 - def getStateToCacheAndObserveFor(self, perspective, observer):
208 self._observers.append(observer) 209 return self._dict
210
211 - def stoppedObserving(self, perspective, observer):
212 self._observers.remove(observer)
213 214 # At some point, a StateRemoteCache will become invalid. The normal way 215 # would be losing the connection to the RemoteCacheable, although 216 # particular kinds of RemoteCache objects might have other ways 217 # (e.g. component removed from flow). 218 # 219 # We support listening for invalidation events. However, in order to 220 # ensure predictable program behavior, we can't do a notifyOnDisconnect 221 # directly on the broker. If we did that, program semantics would be 222 # dependent on the call order of the notifyOnDisconnect methods, which 223 # would likely lead to heisenbugs. 224 # 225 # Instead, invalidation will only be performed by the application, if at 226 # all, via an explicit call to invalidate(). 227
228 -class StateRemoteCache(pb.RemoteCache):
229 """ 230 I am a remote cache of a state object. 231 """
232 - def __init__(self):
233 self._listeners = {}
234 # no constructor 235 # pb.RemoteCache.__init__(self) 236 237 # our methods
238 - def hasKey(self, key):
239 return key in self._dict.keys()
240
241 - def keys(self):
242 return self._dict.keys()
243
244 - def get(self, key, otherwise=None):
245 """ 246 Get the state cache value for the given key. 247 248 Return otherwise in case where key is present but value None. 249 """ 250 if not key in self._dict.keys(): 251 raise KeyError('%s in %r' % (key, self)) 252 253 v = self._dict[key] 254 # compare to actual None, otherwise we also get zero-like values 255 if v == None: 256 return otherwise 257 258 return v
259
260 - def _ensureListeners(self):
261 # when this is created through serialization from a JobCS, 262 # __init__ does not seem to get called, so create self._listeners 263 if not hasattr(self, '_listeners'): 264 # fixme: this means that callbacks will be fired in 265 # arbitrary order; should be fired in order of connecting. 266 self._listeners = {}
267
268 - def addListener(self, listener, set=None, append=None, remove=None, 269 setitem=None, delitem=None, invalidate=None):
270 """ 271 Adds a listener to the remote cache. 272 273 The caller will be notified of state events via the functions 274 given as the 'set', 'append', and 'remove', 'setitem', and 275 'delitem' keyword arguments. 276 277 Setting one of the event handlers to None will ignore that 278 event. It is an error for all event handlers to be None. 279 280 @param listener: A new listener object that wants to receive 281 cache state change notifications. 282 @type listener: object implementing 283 L{flumotion.twisted.flavors.IStateListener} 284 @param set: A procedure to call when a value is set 285 @type set: procedure(object, key, value) -> None 286 @param append: A procedure to call when a value is appended to a 287 list 288 @type append: procedure(object, key, value) -> None 289 @param remove: A procedure to call when a value is removed from 290 a list 291 @type remove: procedure(object, key, value) -> None 292 @param setitem: A procedure to call when a value is set in a 293 dict. 294 @type setitem: procedure(object, key, subkey, value) -> None 295 @param delitem: A procedure to call when a value is removed 296 from a dict. 297 @type delitem: procedure(object, key, subkey, value) -> None 298 @param invalidate: A procedure to call when this cache has been 299 invalidated. 300 @type invalidate: procedure(object) -> None 301 """ 302 if not (set or append or remove or setitem or delitem or invalidate): 303 # FIXME: remove this behavior in 0.6 304 import sys 305 log.safeprintf(sys.stderr, 306 "Warning: Use of deprecated %r.addListener(%r)" 307 " without explicit event handlers\n", self, 308 listener) 309 set = listener.stateSet 310 append = listener.stateAppend 311 remove = listener.stateRemove 312 self._ensureListeners() 313 if listener in self._listeners: 314 raise KeyError, listener 315 self._listeners[listener] = [set, append, remove, setitem, 316 delitem, invalidate] 317 if invalidate and hasattr(self, '_cache_invalid'): 318 invalidate(self)
319
320 - def removeListener(self, listener):
321 self._ensureListeners() 322 if listener not in self._listeners: 323 raise KeyError, listener 324 del self._listeners[listener]
325 326 # pb.RemoteCache methods
327 - def setCopyableState(self, dict):
328 self._dict = dict
329
330 - def _notifyListeners(self, index, *args):
331 # notify our local listeners; compute set of procs first, so as 332 # to allow the listeners set to change during the calls 333 self._ensureListeners() 334 for proc in [tup[index] for tup in self._listeners.values()]: 335 if proc: 336 try: 337 proc(self, *args) 338 except Exception, e: 339 # These are all programming errors 340 log.warning("stateremotecache", 341 'Exception in StateCache handler: %s', 342 log.getExceptionMessage(e))
343
344 - def observe_set(self, key, value):
345 self._dict[key] = value 346 # if we also subclass from Cacheable, then we're a proxy, so proxy 347 if hasattr(self, 'set'): 348 StateCacheable.set(self, key, value) 349 350 self._notifyListeners(0, key, value)
351
352 - def observe_append(self, key, value):
353 # if we also subclass from Cacheable, then we're a proxy, so proxy 354 if hasattr(self, 'append'): 355 StateCacheable.append(self, key, value) 356 else: 357 self._dict[key].append(value) 358 359 self._notifyListeners(1, key, value)
360
361 - def observe_remove(self, key, value):
362 # if we also subclass from Cacheable, then we're a proxy, so proxy 363 if hasattr(self, 'remove'): 364 StateCacheable.remove(self, key, value) 365 else: 366 try: 367 self._dict[key].remove(value) 368 except ValueError: 369 raise ValueError("value %r not under key %r with values %r" % 370 (value, key, self._dict[key])) 371 372 self._notifyListeners(2, key, value)
373
374 - def observe_setitem(self, key, subkey, value):
375 # if we also subclass from Cacheable, then we're a proxy, so proxy 376 if hasattr(self, 'setitem'): 377 StateCacheable.setitem(self, key, subkey, value) 378 else: 379 self._dict[key][subkey] = value 380 381 self._notifyListeners(3, key, subkey, value)
382
383 - def observe_delitem(self, key, subkey, value):
384 # if we also subclass from Cacheable, then we're a proxy, so proxy 385 if hasattr(self, 'delitem'): 386 StateCacheable.delitem(self, key, subkey) 387 else: 388 try: 389 del self._dict[key][subkey] 390 except KeyError: 391 raise KeyError("key %r not in dict %r for state dict %r" % 392 (subkey, self._dict[key], self._dict)) 393 394 self._notifyListeners(4, key, subkey, value)
395
396 - def invalidate(self):
397 """Invalidate this StateRemoteCache. 398 399 Calling this method will result in the invalidate callback being 400 called for all listeners that passed an invalidate handler to 401 addListener. This method is not called automatically; it is 402 provided as a convenience to applications. 403 """ 404 assert not hasattr(self, '_cache_invalid'), \ 405 'object has already been invalidated' 406 # if we also subclass from Cacheable, there is currently no way 407 # to remotely invalidate the cache. that's ok though, because 408 # double-caches are currently only used by the manager, which 409 # does not call invalidate() on its caches. 410 self._cache_invalid = True 411 412 self._notifyListeners(5)
413