Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 23 from twisted.internet import defer 24 25 from flumotion.twisted import pb as fpb 26 from flumotion.common import log, planet, connection, errors, startset 27 from flumotion.admin import admin 28 2931 if object.get('parent'): 32 return get_admin_for_object(object.get('parent')) 33 else: 34 return object.admin35 36 # this is looking for a home.38 class Watched(type): 39 def __init__(self): 40 type.__init__(self) 41 self.watch_id = 0 42 self.watch_procs = {} # id -> proc43 44 def watch(self, proc): 45 self.watch_id += 1 46 self.watch_procs[self.watch_id] = proc 47 return self.watch_id 48 49 def unwatch(self, id): 50 del self.watch_procs[id] 51 52 def notify_changed(self): 53 for proc in self.watch_procs.values(): 54 proc(self) 55 56 def mutate(method): 57 def do_mutate(self, *args, **kwargs): 58 method(self, *args, **kwargs) 59 self.notify_changed() 60 setattr(Watched, method.__name__, do_mutate) 61 for i in mutators: 62 mutate(getattr(type, i)) 63 64 return Watched 65 66 WatchedList = _make_watched(list, 'append', 'insert', 'remove', 'pop', 67 'sort', 'reverse') 68 WatchedDict = _make_watched(dict, '__setitem__', '__delitem__', 'pop', 69 'popitem', 'update') 70 7173 logCategory = 'multiadmin' 74155 156 connectD.addCallbacks(connect_callback, connect_errback) 157 158 def start_callback(_): 159 self._managerConnected(a) 160 161 def start_errback(failure): 162 a.shutdown() 163 return failure 164 165 startD.addCallbacks(start_callback, start_errback) 166 167 return startD 16876 # public 77 self.admins = WatchedDict() # {managerId: AdminModel} 78 # private 79 self.listeners = [] 80 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 81 self._startSet = startset.StartSet(self.admins.has_key, 82 errors.AlreadyConnectingError, 83 errors.AlreadyConnectedError)84 85 # Listener implementation87 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 88 assert signal_name != 'handler' 89 for c in self.listeners: 90 if getattr(c, 'model_handler', None): 91 c.model_handler(c, signal_name, *args, **kwargs) 92 elif getattr(c, 'model_%s' % signal_name): 93 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 94 else: 95 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 96 raise NotImplementedError(s)97 101103 self.listeners.remove(obj)104106 if admin.managerId not in self._reconnectHandlerIds: 107 # the first time a manager is connected to, start listening 108 # for reconnections; intertwingled with removeManager() 109 ids = [] 110 ids.append(admin.connect('connected', 111 self._managerConnected)) 112 ids.append(admin.connect('disconnected', 113 self._managerDisconnected)) 114 self._reconnectHandlerIds[admin.managerId] = admin, ids 115 116 planet = admin.planet 117 self.info('Connected to manager %s (planet %s)', 118 admin.managerId, planet.get('name')) 119 assert admin.managerId not in self.admins 120 self.admins[admin.managerId] = admin 121 self.emit('addPlanet', admin, planet)122124 if admin.managerId in self.admins: 125 self.emit('removePlanet', admin, admin.planet) 126 del self.admins[admin.managerId] 127 else: 128 self.warning('Could not find admin model %r', admin)129132 i = connectionInfo 133 managerId = str(i) 134 135 # This dance of deferreds is here so as to make sure that 136 # removeManager can cancel a pending connection. 137 138 # can raise errors.AlreadyConnectingError or 139 # errors.AlreadyConnectedError 140 try: 141 startD = self._startSet.createStart(managerId) 142 except Exception, e: 143 return defer.fail(e) 144 145 a = admin.AdminModel() 146 connectD = a.connectToManager(i, tenacious, 147 writeConnection=writeConnection) 148 assert a.managerId == managerId 149 150 def connect_callback(_): 151 self._startSet.avatarStarted(managerId)152 153 def connect_errback(failure): 154 self._startSet.avatarStopped(managerId, lambda _: failure)170 self.info('disconnecting from %s', managerId) 171 172 # Four cases: 173 # (1) We have no idea about this managerId, the caller is 174 # confused -- do nothing 175 # (2) We started connecting to this managerId, but never 176 # succeeded -- cancel pending connections 177 # (3) We connected at least once, and are connected now -- we 178 # have entries in the _reconnectHandlerIds and in self.admins -- 179 # disconnect from the signals, disconnect from the remote 180 # manager, and don't try to reconnect 181 # (4) We connected at least once, but are disconnected now -- we 182 # have an entry in _reconnectHandlerIds but not self.admins -- 183 # disconnect from the signals, and stop trying to reconnect 184 185 # stop listening to admin's signals, if the manager had actually 186 # connected at some point 187 if managerId in self._reconnectHandlerIds: 188 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 189 map(admin.disconnect, handlerIds) # (3) and (4) 190 if managerId not in self.admins: 191 admin.shutdown() # (4) 192 193 if managerId in self.admins: # (3) 194 admin = self.admins[managerId] 195 admin.shutdown() 196 self._managerDisconnected(admin) 197 198 # Firing this has the side effect of errbacking on any pending 199 # start, calling start_errback above if appropriate. (2) 200 self._startSet.avatarStopped( 201 managerId, lambda _: errors.ConnectionCancelledError()) 202 203 # always succeed, see (1) 204 return defer.succeed(managerId)205207 '''Call a procedure on each component that is a child of OBJECT''' 208 # ah, for multimethods... 209 if isinstance(object, planet.AdminPlanetState): 210 self.for_each_component(object.get('atmosphere'), proc) 211 for f in object.get('flows'): 212 self.for_each_component(f, proc) 213 elif (isinstance(object, planet.AdminAtmosphereState) or 214 isinstance(object, planet.AdminFlowState)): 215 for c in object.get('components'): 216 self.for_each_component(c, proc) 217 elif isinstance(object, planet.AdminComponentState): 218 proc(object)219221 '''Call a method on the remote component object associated with 222 a component state''' 223 admin = get_admin_for_object(object) 224 def do_op(object): 225 admin.callRemote('component'+op, object)226 self.for_each_component(object, do_op) 227
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Aug 7 15:02:42 2008 | http://epydoc.sourceforge.net |