Package flumotion :: Package component :: Module component
[hide private]

Source Code for Module flumotion.component.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects for components 
 24  """ 
 25   
 26  import os 
 27  import time 
 28  import socket 
 29   
 30  from twisted.internet import reactor, error, defer 
 31  from twisted.spread import pb 
 32  from twisted.python import reflect 
 33  from zope.interface import implements 
 34   
 35  from flumotion.common import interfaces, errors, log, planet, medium 
 36  from flumotion.common import componentui, common, registry, messages 
 37  from flumotion.common import interfaces, reflectcall 
 38   
 39  from flumotion.common.planet import moods 
 40  from flumotion.configure import configure 
 41  from flumotion.twisted import credentials 
 42  from flumotion.twisted import pb as fpb 
 43   
 44  from flumotion.common.messages import N_ 
 45  T_ = messages.gettexter('flumotion') 
 46   
47 -class ComponentClientFactory(fpb.ReconnectingFPBClientFactory):
48 """ 49 I am a client factory for a component logging in to the manager. 50 """ 51 logCategory = 'component' 52 perspectiveInterface = interfaces.IComponentMedium
53 - def __init__(self, component):
54 """ 55 @param component: L{flumotion.component.component.BaseComponent} 56 """ 57 # doing this as a class method triggers a doc error 58 fpb.ReconnectingFPBClientFactory.__init__(self) 59 60 self.component = component 61 # make a medium to interface with the manager 62 self.medium = component.componentMediumClass(component) 63 component.setMedium(self.medium) 64 65 self.maxDelay = 10 66 # get the interfaces implemented by the component medium class 67 #FIXME: interface 68 #self.interfaces = self.medium.__class__.__implements__ 69 70 self.logName = component.name
71
72 - def clientConnectionMade(self, broker):
73 self.medium.broker = broker 74 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
75 76 # vmethod implementation
77 - def gotDeferredLogin(self, d):
78 def remoteDisconnected(remoteReference): 79 if reactor.killed: 80 self.log('Connection to manager lost due to shutdown') 81 else: 82 self.warning('Lost connection to manager, ' 83 'will attempt to reconnect')
84 85 def loginCallback(reference): 86 self.info("Logged in to manager") 87 self.debug("remote reference %r" % reference) 88 self._previously_connected = True 89 90 self.medium.setRemoteReference(reference) 91 reference.notifyOnDisconnect(remoteDisconnected)
92 93 def accessDeniedErrback(failure): 94 failure.trap(errors.NotAuthenticatedError) 95 self.warning('Access denied.') 96 97 def connectionRefusedErrback(failure): 98 failure.trap(error.ConnectionRefusedError) 99 self.warning('Connection to manager refused.') 100 101 def alreadyLoggedInErrback(failure): 102 failure.trap(errors.AlreadyConnectedError) 103 self.warning('Component with id %s is already logged in.', 104 self.medium.authenticator.avatarId) 105 106 def loginFailedErrback(failure): 107 self.warning('Login failed, reason: %s' % failure) 108 109 d.addCallback(loginCallback) 110 d.addErrback(accessDeniedErrback) 111 d.addErrback(connectionRefusedErrback) 112 d.addErrback(alreadyLoggedInErrback) 113 d.addErrback(loginFailedErrback) 114 115 # we want to save the authenticator
116 - def startLogin(self, authenticator):
117 self.medium.setAuthenticator(authenticator) 118 return fpb.ReconnectingFPBClientFactory.startLogin(self, authenticator)
119
120 -def maybe_deferred_chain(procs, *args, **kwargs):
121 def call_proc(_, p): 122 log.debug('', 'calling %r', p) 123 return p(*args, **kwargs)
124 p, procs = procs[0], procs[1:] 125 d = defer.maybeDeferred(call_proc, None, p) 126 for p in procs: 127 d.addCallback(call_proc, p) 128 return d 129 130 # needs to be before BaseComponent because BaseComponent references it
131 -class BaseComponentMedium(medium.PingingMedium):
132 """ 133 I am a medium interfacing with a manager-side avatar. 134 I implement a Referenceable for the manager's avatar to call on me. 135 I have a remote reference to the manager's avatar to call upon. 136 I am created by the L{ComponentClientFactory}. 137 138 @cvar authenticator: the authenticator used to log in to manager 139 @type authenticator: L{flumotion.twisted.pb.Authenticator} 140 """ 141 142 implements(interfaces.IComponentMedium) 143 logCategory = 'basecompmed' 144
145 - def __init__(self, component):
146 """ 147 @param component: L{flumotion.component.component.BaseComponent} 148 """ 149 self.comp = component 150 self.authenticator = None 151 self.broker = None
152
153 - def setRemoteReference(self, reference):
154 self.broker = None # We no longer need that reference 155 medium.PingingMedium.setRemoteReference(self, reference)
156 157 ### our methods
158 - def setup(self, config):
159 pass
160
161 - def getManagerIP(self):
162 """ 163 Return the manager IP as seen by us. 164 """ 165 assert self.remote or self.broker 166 broker = self.broker or self.remote.broker 167 peer = broker.transport.getPeer() 168 try: 169 host = peer.host 170 except AttributeError: 171 host = peer[1] 172 173 res = socket.gethostbyname(host) 174 self.debug("getManagerIP(): we think the manager's IP is %r" % res) 175 return res
176
177 - def getIP(self):
178 """ 179 Return the IP of this component based on connection to the manager. 180 181 Note: this is insufficient in general, and should be replaced by 182 network mapping stuff later. 183 """ 184 assert self.remote 185 host = self.remote.broker.transport.getHost() 186 self.debug("getIP(): using %r as our IP", host.host) 187 return host.host
188
189 - def setAuthenticator(self, authenticator):
190 """ 191 Set the authenticator the client factory has used to log in to the 192 manager. Can be reused by the component's medium to make 193 feed connections which also get authenticated by the manager's 194 bouncer. 195 196 @type authenticator: L{flumotion.twisted.pb.Authenticator} 197 """ 198 self.authenticator = authenticator
199 200 ### pb.Referenceable remote methods 201 ### called from manager by our avatar
202 - def remote_getState(self):
203 """ 204 Return the state of the component, which will be serialized to a 205 L{flumotion.common.planet.ManagerJobState} object. 206 207 @rtype: L{flumotion.common.planet.WorkerJobState} 208 @returns: state of component 209 """ 210 # we can only get the IP after we have a remote reference, so add it 211 # here 212 self.comp.state.set('manager-ip', self.getManagerIP()) 213 return self.comp.state
214
215 - def remote_getConfig(self):
216 """ 217 Return the configuration of the component. 218 219 @rtype: dict 220 @returns: component's current configuration 221 """ 222 return self.comp.config
223
224 - def remote_stop(self):
225 self.info('Stopping component') 226 return self.comp.stop()
227
228 - def remote_reloadComponent(self):
229 """Reload modules in the component.""" 230 from flumotion.common.reload import reload as freload 231 freload()
232
233 - def remote_getUIState(self):
234 """Get a WorkerComponentUIState containing details needed to 235 present an admin-side UI state 236 """ 237 return self.comp.uiState
238
240 """ 241 Base implementation of getMasterClockInfo, can be overridden by 242 subclasses. By default, just returns None. 243 """ 244 return None
245
246 -class BaseComponent(common.InitMixin, log.Loggable):
247 """ 248 I am the base class for all Flumotion components. 249 250 @ivar name: the name of the component 251 @type name: string 252 @ivar medium: the component's medium 253 @type medium: L{BaseComponentMedium} 254 255 @cvar componentMediumClass: the medium class to use for this component 256 @type componentMediumClass: child class of L{BaseComponentMedium} 257 """ 258 259 logCategory = 'basecomp' 260 componentMediumClass = BaseComponentMedium 261
262 - def __init__(self, config, haveError=None):
263 """ 264 Subclasses should not override __init__ at all. 265 266 Instead, they should implement init(), which will be called 267 by this implementation automatically. 268 269 See L{flumotion.common.common.InitMixin} for more details. 270 """ 271 self.debug("initializing %r with config %r", type(self), config) 272 self.config = config 273 self._haveError = haveError 274 275 # this will call self.init() for all implementors of init() 276 common.InitMixin.__init__(self) 277 278 self.setup()
279 280 # BaseComponent interface for subclasses related to component protocol
281 - def init(self):
282 """ 283 A subclass should do as little as possible in its init method. 284 In particular, it should not try to access resources. 285 286 Failures during init are marshalled back to the manager through 287 the worker's remote_create method, since there is no component state 288 proxied to the manager yet at the time of init. 289 """ 290 self.state = planet.WorkerJobState() 291 292 self.name = self.config['name'] 293 294 self.state.set('pid', os.getpid()) 295 self.setMood(moods.waking) 296 297 self.medium = None # the medium connecting us to the manager's avatar 298 299 self.uiState = componentui.WorkerComponentUIState() 300 self.uiState.addKey('cpu-percent') 301 302 self.plugs = {} 303 304 self._happyWaits = [] 305 306 # Start the cpu-usage updating. 307 self._lastTime = time.time() 308 self._lastClock = time.clock() 309 self._cpuPoller = common.Poller(self._pollCPU, 5) 310 311 self._shutdownHook = None
312
313 - def do_check(self):
314 """ 315 Subclasses can implement me to run any checks before the component 316 performs setup. 317 318 Messages can be added to the component state's 'messages' list key. 319 Any error messages added will trigger the component going to sad 320 an L{flumotion.common.errors.ComponentSetupError} being raised; 321 do_setup() will not be called. 322 323 In the event of a fatal problem that can't be expressed through an 324 error message, this method should raise an exception or return a 325 failure. 326 327 It is not necessary to chain up in this function. The return 328 value may be a deferred. 329 """ 330 return defer.maybeDeferred(self.check_properties, 331 self.config['properties'], 332 self.addMessage)
333
334 - def check_properties(self, properties, addMessage):
335 """ 336 BaseComponent convenience vmethod for running checks. 337 338 A component implementation can override this method to run any 339 checks that it needs to. Typically, a check_properties 340 implementation will call the provided addMessage() callback to 341 note warnings or errors. For errors, addMessage() will abort the 342 check process, setting the mood to sad. 343 344 @param properties: The component's properties 345 @type properties: dict of string => object 346 @param addMessage: Thunk to add a message to the component 347 state. Will raise an exception if the 348 message is of level ERROR. 349 @type addMessage: L{flumotion.common.messages.Message} -> None 350 """ 351 pass
352
353 - def do_setup(self):
354 """ 355 Subclasses can implement me to set up the component before it is 356 started. It should set up the component, possibly opening files 357 and resources. 358 Non-programming errors should not be raised, but returned as a 359 failing deferred. 360 361 The return value may be a deferred. 362 """ 363 for socket, plugs in self.config['plugs'].items(): 364 self.plugs[socket] = [] 365 for plug in plugs: 366 instance = reflectcall.reflectCall(plug['module-name'], 367 plug['function-name'], 368 plug) 369 self.plugs[socket].append(instance) 370 self.debug('Starting plug %r on socket %s', 371 instance, socket) 372 instance.start(self) 373 374 # Call check methods, starting from the base class and working down to 375 # subclasses. 376 checks = common.get_all_methods(self, 'do_check', False) 377 return maybe_deferred_chain(checks, self)
378
379 - def do_stop(self):
380 """ 381 BaseComponent vmethod for stopping. 382 The component should do any cleanup it needs, but must not set the 383 component's mood to sleeping. 384 385 @Returns: L{twisted.internet.defer.Deferred} 386 """ 387 for socket, plugs in self.plugs.items(): 388 for plug in plugs: 389 self.debug('Stopping plug %r on socket %s', plug, socket) 390 plug.stop(self) 391 392 for message in self.state.get('messages'): 393 # FIXME: not necessary 394 self.state.remove('messages', message) 395 396 if self._cpuPoller: 397 self._cpuPoller.stop() 398 self._cpuPoller = None 399 400 if self._shutdownHook: 401 self.debug('_stoppedCallback: firing shutdown hook') 402 self._shutdownHook()
403 404 ### BaseComponent implementation related to compoment protocol
405 - def setup(self):
406 """ 407 Sets up the component. Called during __init__, so be sure not 408 to raise exceptions, instead adding messages to the component 409 state. 410 """ 411 def run_setups(): 412 setups = common.get_all_methods(self, 'do_setup', False) 413 return maybe_deferred_chain(setups, self)
414 415 def go_happy(_): 416 self.debug('setup complete, going happy') 417 self.setMood(moods.happy)
418 419 def got_error(failure): 420 if not failure.check(errors.ComponentSetupHandledError): 421 txt = log.getFailureMessage(failure) 422 self.warning('Setup failed: %s', txt) 423 m = messages.Error(T_(N_("Could not setup component.")), 424 debug=txt, 425 id="component-setup-%s" % self.name) 426 # will call setMood(moods.sad) 427 self.addMessage(m) 428 # swallow 429 return None 430 431 self.setMood(moods.waking) 432 433 d = run_setups() 434 d.addCallbacks(go_happy, got_error) 435 # all status info via messages and the mood 436
437 - def setShutdownHook(self, shutdownHook):
438 """ 439 Set the shutdown hook for this component (replacing any previous hook). 440 When a component is stopped, then this hook will be fired. 441 """ 442 self._shutdownHook = shutdownHook
443
444 - def stop(self):
445 """ 446 Tell the component to stop. 447 The connection to the manager will be closed. 448 The job process will also finish. 449 """ 450 self.debug('BaseComponent.stop') 451 452 # Set ourselves to waking while we're shutting down. 453 self.setMood(moods.waking) 454 455 # Run stop methods, starting from the subclass, up to this base class. 456 stops = common.get_all_methods(self, 'do_stop', True) 457 return maybe_deferred_chain(stops, self)
458 459 ### BaseComponent public methods
460 - def getName(self):
461 return self.name
462
463 - def setWorkerName(self, workerName):
464 self.state.set('workerName', workerName)
465
466 - def getWorkerName(self):
467 return self.state.get('workerName')
468
469 - def setMedium(self, medium):
470 assert isinstance(medium, BaseComponentMedium) 471 self.medium = medium 472 self.medium.logName = self.getName()
473
474 - def setMood(self, mood):
475 """ 476 Set the given mood on the component if it's different from the current 477 one. 478 """ 479 current = self.state.get('mood') 480 481 if current == mood.value: 482 self.log('already in mood %r' % mood) 483 return 484 elif current == moods.sad.value: 485 self.info('tried to set mood to %r, but already sad :-(' % mood) 486 return 487 488 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood) 489 self.state.set('mood', mood.value) 490 491 if mood == moods.happy: 492 while self._happyWaits: 493 self._happyWaits.pop(0).callback(None) 494 elif mood == moods.sad: 495 while self._happyWaits: 496 self._happyWaits.pop(0).errback(errors.ComponentStartError())
497
498 - def getMood(self):
499 """ 500 Gets the mood on the component. 501 502 @rtype: int 503 """ 504 return self.state.get('mood')
505
506 - def waitForHappy(self):
507 mood = self.getMood() 508 if mood == moods.happy.value: 509 return defer.succeed(None) 510 elif mood == moods.sad.value: 511 return defer.fail(errors.ComponentStartError()) 512 else: 513 d = defer.Deferred() 514 self._happyWaits.append(d) 515 return d
516
517 - def addMessage(self, message):
518 """ 519 Add a message to the component. 520 If any of the messages is an error, the component will turn sad. 521 522 @type message: L{flumotion.common.messages.Message} 523 """ 524 self.state.append('messages', message) 525 if message.level == messages.ERROR: 526 self.debug('error message, turning sad') 527 self.setMood(moods.sad) 528 if self._haveError: 529 self._haveError(message)
530
531 - def fixRenamedProperties(self, properties, list):
532 """ 533 Fix properties that have been renamed from a previous version, 534 and add a warning for them. 535 536 @param properties: properties; will be modified as a result. 537 @type properties: dict 538 @param list: list of (old, new) tuples of property names. 539 @type list: list of tuple of (str, str) 540 """ 541 found = [] 542 for old, new in list: 543 if properties.has_key(old): 544 found.append((old, new)) 545 546 if found: 547 m = messages.Warning(T_(N_( 548 "Your configuration uses deprecated properties. " 549 "Please update your configuration and correct them.\n")), 550 id = "deprecated") 551 for old, new in found: 552 m.add(T_(N_( 553 "Please rename '%s' to '%s'.\n"), 554 old, new)) 555 self.debug("Setting new property '%s' to %r", new, 556 properties[old]) 557 properties[new] = properties[old] 558 del properties[old] 559 self.addMessage(m)
560
561 - def adminCallRemote(self, methodName, *args, **kwargs):
562 """ 563 Call a remote method on all admin client views on this component. 564 565 This gets serialized through the manager and multiplexed to all 566 admin clients, and from there on to all views connected to each 567 admin client model. 568 569 Because there can be any number of admin clients that this call 570 will go out do, it does not make sense to have one return value. 571 This function will return None always. 572 """ 573 if self.medium: 574 self.medium.callRemote("adminCallRemote", methodName, 575 *args, **kwargs) 576 else: 577 self.debug('asked to adminCallRemote(%s, *%r, **%r), but ' 578 'no manager.' 579 % (methodName, args, kwargs))
580
581 - def _pollCPU(self):
582 # update CPU time stats 583 nowTime = time.time() 584 nowClock = time.clock() 585 deltaTime = nowTime - self._lastTime 586 deltaClock = nowClock - self._lastClock 587 if deltaClock <= 0: 588 # time.clock() wrapped around, shit happens periodically 589 return 590 CPU = deltaClock/deltaTime 591 self.log('latest CPU use: %r', CPU) 592 self.uiState.set('cpu-percent', CPU) 593 self._lastTime = nowTime 594 self._lastClock = nowClock
595