Package flumotion :: Package admin :: Module admin
[hide private]

Source Code for Module flumotion.admin.admin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  model abstraction for administration clients supporting different views 
 24  """ 
 25   
 26  from twisted.internet import error, defer, reactor 
 27  from zope.interface import implements 
 28   
 29  from flumotion.common import common, errors, interfaces, log 
 30  from flumotion.common import keycards, planet, medium, package 
 31  from flumotion.common import messages, signals, connection 
 32  from flumotion.configure import configure 
 33  from flumotion.twisted import pb as fpb 
 34   
 35  # these two imports are for their side effects of jelly type 
 36  # registration 
 37  from flumotion.common import planet, worker 
 38   
 39  from flumotion.common.messages import N_ 
 40  T_ = messages.gettexter('flumotion') 
 41   
42 -class AdminClientFactory(fpb.ReconnectingFPBClientFactory):
43 perspectiveInterface = interfaces.IAdminMedium 44
45 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
46 """ 47 @type medium: AdminModel 48 """ 49 fpb.ReconnectingFPBClientFactory.__init__(self) 50 self.medium = medium 51 self.maxDelay = maxDelay 52 53 self.extraTenacious = extraTenacious 54 self.hasBeenConnected = 0
55
56 - def clientConnectionMade(self, broker):
57 self.hasBeenConnected = 1 58 59 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
60
61 - def clientConnectionFailed(self, connector, reason):
62 """ 63 @param reason: L{twisted.spread.pb.failure.Failure} 64 """ 65 if reason.check(error.DNSLookupError): 66 self.debug('DNS lookup error') 67 if not self.extraTenacious: 68 self.medium.connectionFailed(reason) 69 return 70 elif (reason.check(error.ConnectionRefusedError) 71 or reason.check(error.ConnectError)): 72 # If we're logging in for the first time, we want to make this a 73 # real error; we present a dialog, etc. 74 # However, if we fail later on (e.g. manager shut down, and 75 # hasn't yet been restarted), we want to keep trying to reconnect, 76 # so we just log a message. 77 self.debug("Error connecting to %s: %s", connector.getDestination(), 78 log.getFailureMessage(reason)) 79 if self.hasBeenConnected: 80 self.log("we've been connected before though, so going " 81 "to retry") 82 # fall through 83 elif self.extraTenacious: 84 self.log("trying again due to +100 tenacity") 85 # fall through 86 else: 87 self.log("telling medium about connection failure") 88 self.medium.connectionFailed(reason) 89 # return 90 return 91 92 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 93 connector, reason)
94 95 # vmethod implementation
96 - def gotDeferredLogin(self, d):
97 def success(remote): 98 self.medium.setRemoteReference(remote)
99 100 def error(failure): 101 if self.extraTenacious: 102 self.debug('connection problem: %s', 103 log.getFailureMessage(failure)) 104 self.debug('we are tenacious, so trying again later') 105 self.disconnect() 106 elif failure.check(errors.ConnectionFailedError): 107 self.debug("emitting connection-failed") 108 self.medium.emit('connection-failed', "I failed my master") 109 self.debug("emitted connection-failed") 110 elif failure.check(errors.ConnectionRefusedError): 111 self.debug("emitting connection-refused") 112 self.medium.emit('connection-refused') 113 self.debug("emitted connection-refused") 114 elif failure.check(errors.NotAuthenticatedError): 115 # FIXME: unauthorized login emit ! 116 self.debug("emitting connection-refused") 117 self.medium.emit('connection-refused') 118 self.debug("emitted connection-refused") 119 else: 120 self.medium.emit('connection-error', failure) 121 self.warning('connection error: %s', 122 log.getFailureMessage(failure))
123 # swallow error 124 125 d.addCallbacks(success, error) 126 return d 127 128 # FIXME: stop using signals, we can provide a richer interface with actual 129 # objects and real interfaces for the views a model communicates with
130 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
131 """ 132 I live in the admin client. 133 I am a data model for any admin view implementing a UI to 134 communicate with one manager. 135 I send signals when things happen. 136 137 Manager calls on us through L{flumotion.manager.admin.AdminAvatar} 138 """ 139 __signals__ = ('connected', 'disconnected', 'connection-refused', 140 'connection-failed', 'connection-error', 'reloading', 141 'message', 'update') 142 143 logCategory = 'adminmodel' 144 145 implements(interfaces.IAdminMedium) 146 147 # Public instance variables (read-only) 148 planet = None 149
150 - def __init__(self):
151 # All of these instance variables are private. Cuidado cabrones! 152 self.connectionInfo = None 153 self.keepTrying = None 154 self._writeConnection = True 155 156 self.managerId = '<uninitialized>' 157 158 self.connected = False 159 self.clientFactory = None 160 161 self._deferredConnect = None 162 163 self._components = {} # dict of components 164 self.planet = None 165 self._workerHeavenState = None
166
167 - def connectToManager(self, connectionInfo, keepTrying=False, 168 writeConnection=True):
169 'Connect to a host.' 170 assert self.clientFactory is None 171 172 self.connectionInfo = connectionInfo 173 self._writeConnection = writeConnection 174 175 # give the admin an id unique to the manager -- if a program is 176 # adminning multiple managers, this id should tell them apart 177 # (and identify duplicates) 178 self.managerId = str(connectionInfo) 179 self.logName = self.managerId 180 181 self.info('Connecting to manager %s with %s', 182 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP') 183 184 self.clientFactory = AdminClientFactory(self, 185 extraTenacious=keepTrying, 186 maxDelay=20) 187 self.clientFactory.startLogin(connectionInfo.authenticator) 188 189 if connectionInfo.use_ssl: 190 common.assertSSLAvailable() 191 from twisted.internet import ssl 192 reactor.connectSSL(connectionInfo.host, connectionInfo.port, 193 self.clientFactory, ssl.ClientContextFactory()) 194 else: 195 reactor.connectTCP(connectionInfo.host, connectionInfo.port, 196 self.clientFactory) 197 198 def connected(model, d): 199 # model is really "self". yay gobject? 200 d.callback(model)
201 202 def disconnected(model, d): 203 # can happen after setRemoteReference but before 204 # getPlanetState or getWorkerHeavenState returns 205 if not keepTrying: 206 d.errback(errors.ConnectionFailedError('Lost connection'))
207 208 def connection_refused(model, d): 209 if not keepTrying: 210 d.errback(errors.ConnectionRefusedError()) 211 212 def connection_failed(model, reason, d): 213 if not keepTrying: 214 d.errback(errors.ConnectionFailedError(reason)) 215 216 def connection_error(model, failure, d): 217 if not keepTrying: 218 d.errback(failure) 219 220 d = defer.Deferred() 221 ids = [] 222 ids.append(self.connect('connected', connected, d)) 223 ids.append(self.connect('disconnected', disconnected, d)) 224 ids.append(self.connect('connection-refused', connection_refused, d)) 225 ids.append(self.connect('connection-failed', connection_failed, d)) 226 ids.append(self.connect('connection-error', connection_error, d)) 227 228 def success(model): 229 map(self.disconnect, ids) 230 self._deferredConnect = None 231 return model 232 233 def failure(f): 234 map(self.disconnect, ids) 235 self._deferredConnect = None 236 return f 237 238 d.addCallbacks(success, failure) 239 self._deferredConnect = d 240 return d 241
242 - def shutdown(self):
243 self.debug('shutting down') 244 if self.clientFactory is not None: 245 # order not semantically important, but this way we avoid a 246 # "reconnecting in X seconds" in the log 247 self.clientFactory.stopTrying() 248 self.clientFactory.disconnect() 249 self.clientFactory = None 250 251 if self._deferredConnect is not None: 252 # this can happen with keepTrying=True 253 self.debug('cancelling connection attempt') 254 self._deferredConnect.errback(errors.ConnectionCancelledError())
255
256 - def reconnect(self, keepTrying=False):
257 """Close any existing connection to the manager and 258 reconnect.""" 259 self.debug('asked to log in again') 260 self.shutdown() 261 return self.connectToManager(self.connectionInfo, keepTrying)
262 263 # FIXME: give these three sensible names
264 - def adminInfoStr(self):
265 return self.managerId
266
267 - def connectionInfoStr(self):
268 return '%s:%s (%s)' % (self.connectionInfo.host, 269 self.connectionInfo.port, 270 self.connectionInfo.use_ssl 271 and 'https' or 'http')
272 273 # used in fgc
274 - def managerInfoStr(self):
275 assert self.planet 276 return '%s (%s)' % (self.planet.get('name'), self.managerId)
277
278 - def connectionFailed(self, failure):
279 # called by client factory 280 if failure.check(error.DNSLookupError): 281 message = ("Could not look up host '%s'." 282 % self.connectionInfo.host) 283 elif (failure.check(error.ConnectionRefusedError) 284 or failure.check(error.ConnectionRefusedError)): 285 message = ("Could not connect to host '%s' on port %d." 286 % (self.connectionInfo.host, 287 self.connectionInfo.port)) 288 else: 289 message = ("Unexpected failure.\nDebug information: %s" 290 % log.getFailureMessage (failure)) 291 self.debug('emitting connection-failed') 292 self.emit('connection-failed', message) 293 self.debug('emitted connection-failed')
294
295 - def setRemoteReference(self, remoteReference):
296 self.debug("setRemoteReference %r", remoteReference) 297 def gotPlanetState(planet): 298 self.planet = planet 299 # monkey, Monkey, MONKEYPATCH!!!!! 300 self.planet.admin = self 301 self.debug('got planet state') 302 return self.callRemote('getWorkerHeavenState')
303 304 def gotWorkerHeavenState(whs): 305 self._workerHeavenState = whs 306 self.debug('got worker state') 307 308 self.debug('Connected to manager and retrieved all state') 309 self.connected = True 310 self.emit('connected') 311 312 def writeConnection(): 313 i = self.connectionInfo 314 if not (i.authenticator.username 315 and i.authenticator.password): 316 self.log('not caching connection information') 317 return 318 s = ''.join(['<connection>', 319 '<host>%s</host>' % i.host, 320 '<manager>%s</manager>' % self.planet.get('name'), 321 '<port>%d</port>' % i.port, 322 '<use_insecure>%d</use_insecure>' 323 % ((not i.use_ssl) and 1 or 0), 324 '<user>%s</user>' % i.authenticator.username, 325 '<passwd>%s</passwd>' % i.authenticator.password, 326 '</connection>']) 327 328 import os 329 import md5 330 sum = md5.new(s).hexdigest() 331 f = os.path.join(configure.registrydir, '%s.connection' % sum) 332 try: 333 h = open(f, 'w') 334 h.write(s) 335 h.close() 336 except Exception, e: 337 self.info('failed to write connection cache file %s: %s', 338 f, log.getExceptionMessage(e)) 339 340 # chain up 341 medium.PingingMedium.setRemoteReference(self, remoteReference) 342 343 # fixme: push the disconnect notification upstream 344 def remoteDisconnected(remoteReference): 345 self.debug("emitting disconnected") 346 self.connected = False 347 self.emit('disconnected') 348 self.debug("emitted disconnected") 349 self.remote.notifyOnDisconnect(remoteDisconnected) 350 351 d = self.callRemote('getPlanetState') 352 d.addCallback(gotPlanetState) 353 d.addCallback(gotWorkerHeavenState) 354 if self._writeConnection: 355 d.addCallback(lambda _: writeConnection()) 356 return d 357 358 ### model functions; called by UI's to send requests to manager or comp 359 360 ## view management functions
361 - def isConnected(self):
362 return self.connected
363 364 ## generic remote call methods
365 - def componentCallRemote(self, componentState, methodName, *args, **kwargs):
366 """ 367 Call the given method on the given component with the given args. 368 369 @param componentState: component to call the method on 370 @type componentState: L{flumotion.common.planet.AdminComponentState} 371 @param methodName: name of method to call; serialized to a 372 remote_methodName on the worker's medium 373 374 @rtype: L{twisted.internet.defer.Deferred} 375 """ 376 d = self.callRemote('componentCallRemote', 377 componentState, methodName, 378 *args, **kwargs) 379 def errback(failure): 380 msg = None 381 if failure.check(errors.NoMethodError): 382 msg = "Remote method '%s' does not exist." % methodName 383 msg += "\n" + failure.value 384 else: 385 msg = log.getFailureMessage(failure) 386 387 # FIXME: we probably need a nicer way of getting component 388 # messages shown from the admin model, but this allows us to 389 # make sure every type of admin has these messages 390 self.warning(msg) 391 m = messages.Warning(T_(N_("Internal error in component.")), 392 debug=msg) 393 componentState.observe_append('messages', m) 394 return failure
395 396 d.addErrback(errback) 397 # FIXME: dialog for other errors ? 398 return d 399
400 - def workerCallRemote(self, workerName, methodName, *args, **kwargs):
401 """ 402 Call the the given method on the given worker with the given args. 403 404 @param workerName: name of the worker to call the method on 405 @param methodName: name of method to call; serialized to a 406 remote_methodName on the worker's medium 407 408 @rtype: L{twisted.internet.defer.Deferred} 409 """ 410 return self.callRemote('workerCallRemote', workerName, 411 methodName, *args, **kwargs)
412 413 ## manager remote methods
414 - def loadConfiguration(self, xml_string):
415 return self.callRemote('loadConfiguration', xml_string)
416
417 - def getConfiguration(self):
418 return self.callRemote('getConfiguration')
419
420 - def cleanComponents(self):
421 return self.callRemote('cleanComponents')
422 423 ## worker remote methods
424 - def checkElements(self, workerName, elements):
425 return self.workerCallRemote(workerName, 'checkElements', elements)
426
427 - def checkImport(self, workerName, moduleName):
428 return self.workerCallRemote(workerName, 'checkImport', moduleName)
429
430 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
431 """ 432 Run the given function and args on the given worker. If the 433 worker does not already have the module, or it is out of date, 434 it will be retrieved from the manager. 435 436 @rtype: L{twisted.internet.defer.Deferred} firing an 437 L{flumotion.common.messages.Result} 438 """ 439 return self.workerCallRemote(workerName, 'runFunction', moduleName, 440 functionName, *args, **kwargs)
441
442 - def getWorkerHeavenState(self):
443 return self._workerHeavenState
444