Package flumotion :: Package job :: Module job
[hide private]

Source Code for Module flumotion.job.job

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  the job-side half of the worker-job connection 
 24  """ 
 25   
 26  import os 
 27  import resource 
 28  import sys 
 29   
 30  # I've read somewhere that importing the traceback module messes up the 
 31  # exception state, so it's better to import it globally instead of in the 
 32  # exception handler 
 33  # import traceback 
 34   
 35  from twisted.cred import credentials 
 36  from twisted.internet import reactor, defer 
 37  from twisted.python import failure 
 38  from twisted.spread import pb 
 39  from zope.interface import implements 
 40   
 41  from flumotion.common import config, errors, interfaces, log, registry, keycards 
 42  from flumotion.common import medium, package 
 43  from flumotion.common.reflectcall import createComponent, reflectCallCatching 
 44  from flumotion.component import component 
 45   
 46  from flumotion.twisted import fdserver 
 47  from flumotion.twisted import pb as fpb 
 48  from flumotion.twisted import defer as fdefer 
 49   
50 -class JobMedium(medium.BaseMedium):
51 """ 52 I am a medium between the job and the worker's job avatar. 53 I live in the job process. 54 55 @cvar component: the component this is a medium for; created as part of 56 L{remote_create} 57 @type component: L{flumotion.component.component.BaseComponent} 58 """ 59 logCategory = 'jobmedium' 60 remoteLogName = 'jobavatar' 61 62 implements(interfaces.IJobMedium) 63
64 - def __init__(self):
65 self.avatarId = None 66 self.logName = None 67 self.component = None 68 69 self._workerName = None 70 self._managerHost = None 71 self._managerPort = None 72 self._managerTransport = None 73 self._managerKeycard = None 74 self._componentClientFactory = None # from component to manager 75 76 self._hasStoppedReactor = False
77 78 ### pb.Referenceable remote methods called on by the WorkerBrain
79 - def remote_bootstrap(self, workerName, host, port, transport, authenticator, 80 packagePaths):
81 """ 82 I receive the information on how to connect to the manager. I also set 83 up package paths to be able to run the component. 84 85 Called by the worker's JobAvatar. 86 87 @param workerName: the name of the worker running this job 88 @type workerName: str 89 @param host: the host that is running the manager 90 @type host: str 91 @param port: port on which the manager is listening 92 @type port: int 93 @param transport: 'tcp' or 'ssl' 94 @type transport: str 95 @param authenticator: remote reference to the worker-side authenticator 96 @type authenticator: L{twisted.spread.pb.RemoteReference} to a 97 L{flumotion.twisted.pb.Authenticator} 98 @param packagePaths: ordered list of 99 (package name, package path) tuples 100 @type packagePaths: list of (str, str) 101 """ 102 self._workerName = workerName 103 self._managerHost = host 104 self._managerPort = port 105 self._managerTransport = transport 106 if authenticator: 107 self._authenticator = fpb.RemoteAuthenticator(authenticator) 108 else: 109 self.debug('no authenticator, will not be able to log ' 110 'into manager') 111 self._authenticator = None 112 113 packager = package.getPackager() 114 for name, path in packagePaths: 115 self.debug('registering package path for %s' % name) 116 self.log('... from path %s' % path) 117 packager.registerPackagePath(path, name)
118
119 - def remote_getPid(self):
120 return os.getpid()
121
122 - def remote_runFunction(self, moduleName, methodName, *args, **kwargs):
123 """ 124 I am called on by the worker's JobAvatar to run a function, 125 normally on behalf of the flumotion wizard. 126 127 @param moduleName: name of the module containing the function 128 @type moduleName: str 129 @param methodName: the method to run 130 @type methodName: str 131 @param args: args to pass to the method 132 @type args: tuple 133 @param kwargs: kwargs to pass to the method 134 @type kwargs: dict 135 136 @returns: the result of invoking the method 137 """ 138 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName, 139 args, kwargs)) 140 # FIXME: do we want to do this? 141 self._enableCoreDumps() 142 143 return reflectCallCatching(errors.RemoteRunError, moduleName, 144 methodName, *args, **kwargs)
145
146 - def remote_create(self, avatarId, type, moduleName, methodName, 147 nice, conf):
148 """ 149 I am called on by the worker's JobAvatar to create a component. 150 151 @param avatarId: avatarId for component to log in to manager 152 @type avatarId: str 153 @param type: type of component to start 154 @type type: str 155 @param moduleName: name of the module to create the component from 156 @type moduleName: str 157 @param methodName: the factory method to use to create the component 158 @type methodName: str 159 @param nice: the nice level 160 @type nice: int 161 @param conf: the component configuration 162 @type conf: dict 163 """ 164 self.avatarId = avatarId 165 self.logName = avatarId 166 167 self.component = self._createComponent(avatarId, type, moduleName, 168 methodName, nice, conf) 169 self.component.setShutdownHook(self._componentStopped)
170
171 - def _componentStopped(self):
172 # stop reactor from a callLater so remote methods finish nicely 173 reactor.callLater(0, self.shutdown)
174
175 - def remote_stop(self):
176 if self.component: 177 self.debug('stopping component and shutting down') 178 self.component.stop() 179 else: 180 reactor.callLater(0, self.shutdown)
181
182 - def shutdownHandler(self):
183 dlist = [] 184 if self.hasRemoteReference(): 185 # tell the worker we are shutting down 186 dlist.append(self.callRemote("cleanShutdown")) 187 if self.component: 188 medium = self.component.medium 189 if medium.hasRemoteReference(): 190 dlist.append(medium.callRemote("cleanShutdown")) 191 192 # We mustn't fire the deferred returned from here except from a 193 # callLater. 194 dl = defer.DeferredList(dlist, fireOnOneErrback=False) 195 return fdefer.defer_call_later(dl)
196 197 ### our methods
198 - def shutdown(self):
199 """ 200 Shut down the job process completely, cleaning up the component 201 so the reactor can be left from. 202 """ 203 if self._hasStoppedReactor: 204 self.debug("Not stopping reactor again, already shutting down") 205 else: 206 self._hasStoppedReactor = True 207 self.info("Stopping reactor in job process") 208 reactor.stop()
209
210 - def _setNice(self, nice):
211 if not nice: 212 return 213 214 try: 215 os.nice(nice) 216 except OSError, e: 217 self.warning('Failed to set nice level: %s' % str(e)) 218 else: 219 self.debug('Nice level set to %d' % nice)
220
221 - def _enableCoreDumps(self):
222 soft, hard = resource.getrlimit(resource.RLIMIT_CORE) 223 if hard != resource.RLIM_INFINITY: 224 self.warning('Could not set unlimited core dump sizes, ' 225 'setting to %d instead' % hard) 226 else: 227 self.debug('Enabling core dumps of unlimited size') 228 229 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
230
231 - def _createComponent(self, avatarId, type, moduleName, methodName, 232 nice, conf):
233 """ 234 Create a component of the given type. 235 Log in to the manager with the given avatarId. 236 237 @param avatarId: avatarId component will use to log in to manager 238 @type avatarId: str 239 @param type: type of component to start 240 @type type: str 241 @param moduleName: name of the module that contains the entry point 242 @type moduleName: str 243 @param methodName: name of the factory method to create the component 244 @type methodName: str 245 @param nice: the nice level to run with 246 @type nice: int 247 @param conf: the component configuration 248 @type conf: dict 249 """ 250 self.info('Creating component "%s" of type "%s"', avatarId, type) 251 252 self._setNice(nice) 253 self._enableCoreDumps() 254 255 try: 256 comp = createComponent(moduleName, methodName, conf) 257 except Exception, e: 258 msg = "Exception %s during createComponent: %s" % ( 259 e.__class__.__name__, " ".join(e.args)) 260 # traceback.print_exc() 261 # a ComponentCreateError is already formatted 262 if isinstance(e, errors.ComponentCreateError): 263 msg = e.args[0] 264 self.warning( 265 "raising ComponentCreateError(%s) and stopping job" % msg) 266 # This is a Nasty Hack. We raise ComponentCreateError, which can be 267 # caught on the other side and marshalled as a reasonably 268 # comprehensible error message. However, if we shutdown immediately, 269 # the PB connection won't be available, so the worker will just get 270 # an error about that! So, instead, we shut down in a tenth of a 271 # second, usually allowing the worker to get scheduled and read the 272 # exception over PB. Ick! 273 reactor.callLater(0.1, self.shutdown) 274 raise errors.ComponentCreateError(msg) 275 276 comp.setWorkerName(self._workerName) 277 278 # make component log in to manager 279 self.debug('creating ComponentClientFactory') 280 managerClientFactory = component.ComponentClientFactory(comp) 281 self._componentClientFactory = managerClientFactory 282 self.debug('created ComponentClientFactory %r' % managerClientFactory) 283 self._authenticator.avatarId = avatarId 284 managerClientFactory.startLogin(self._authenticator) 285 286 host = self._managerHost 287 port = self._managerPort 288 transport = self._managerTransport 289 self.debug('logging in with authenticator %r' % self._authenticator) 290 if transport == "ssl": 291 from flumotion.common import common 292 common.assertSSLAvailable() 293 from twisted.internet import ssl 294 self.info('Connecting to manager %s:%d with SSL' % (host, port)) 295 reactor.connectSSL(host, port, managerClientFactory, 296 ssl.ClientContextFactory()) 297 elif transport == "tcp": 298 self.info('Connecting to manager %s:%d with TCP' % (host, port)) 299 reactor.connectTCP(host, port, managerClientFactory) 300 else: 301 self.warning('Unknown transport protocol %s' % self._managerTransport) 302 303 return comp
304
305 -class JobClientBroker(pb.Broker, log.Loggable):
306 """ 307 A pb.Broker subclass that handles FDs being passed (with associated data) 308 over the same connection as the normal PB data stream. 309 When an FD is seen, the FD should be added to a given eater or feeder 310 element. 311 """
312 - def __init__(self, connectionClass, **kwargs):
313 """ 314 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection} 315 """ 316 pb.Broker.__init__(self, **kwargs) 317 318 self._connectionClass = connectionClass
319
320 - def fileDescriptorsReceived(self, fds, message):
321 # file descriptors get delivered to the component 322 self.debug('received fds %r, message %r' % (fds, message)) 323 if message.startswith('sendFeed '): 324 def parseargs(_, feedName, eaterId=None): 325 return feedName, eaterId
326 feedName, eaterId = parseargs(*message.split(' ')) 327 self.factory.medium.component.feedToFD(feedName, fds[0], 328 os.close, eaterId) 329 elif message.startswith('receiveFeed '): 330 def parseargs2(_, eaterAlias, feedId=None): 331 return eaterAlias, feedId
332 eaterAlias, feedId = parseargs2(*message.split(' ')) 333 self.factory.medium.component.eatFromFD(eaterAlias, feedId, 334 fds[0]) 335 elif message == 'redirectStdout': 336 self.debug('told to rotate stdout to fd %d', fds[0]) 337 os.dup2(fds[0], sys.stdout.fileno()) 338 os.close(fds[0]) 339 self.debug('rotated stdout') 340 elif message == 'redirectStderr': 341 self.debug('told to rotate stderr to fd %d', fds[0]) 342 os.dup2(fds[0], sys.stderr.fileno()) 343 os.close(fds[0]) 344 self.info('rotated stderr') 345 else: 346 self.warning('Unknown message received: %r' % message) 347
348 -class JobClientFactory(pb.PBClientFactory, log.Loggable):
349 """ 350 I am a client factory that logs in to the WorkerBrain. 351 I live in the flumotion-job process spawned by the worker. 352 353 @cvar medium: the medium for the JobHeaven to access us through 354 @type medium: L{JobMedium} 355 """ 356 logCategory = "job" 357 perspectiveInterface = interfaces.IJobMedium 358
359 - def __init__(self, id):
360 """ 361 @param id: the avatar id used for logging into the workerbrain 362 @type id: str 363 """ 364 pb.PBClientFactory.__init__(self) 365 366 self.medium = JobMedium() 367 self.logName = id 368 self.login(id) 369 370 # use an FD-passing broker instead 371 self.protocol = JobClientBroker
372 373 ### pb.PBClientFactory methods
374 - def buildProtocol(self, addr):
375 p = self.protocol(fdserver.FDServer) 376 p.factory = self 377 return p
378 379 # FIXME: might be nice if jobs got a password to use to log in to brain
380 - def login(self, username):
381 def haveReference(remoteReference): 382 self.info('Logged in to worker') 383 self.debug('perspective %r connected', remoteReference) 384 self.medium.setRemoteReference(remoteReference)
385 386 self.info('Logging in to worker') 387 d = pb.PBClientFactory.login(self, 388 credentials.UsernamePassword(username, ''), 389 self.medium) 390 d.addCallback(haveReference) 391 return d
392 393 # the only way stopFactory can be called is if the WorkerBrain closes 394 # the pb server. Ideally though we would have gotten a notice before. 395 # This ensures we shut down the component/job in ALL cases where the worker 396 # goes away.
397 - def stopFactory(self):
398 self.debug('shutting down medium') 399 self.medium.shutdown() 400 self.debug('shut down medium')
401