Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import sys 
 28  import signal 
 29   
 30  from twisted.cred import portal 
 31  from twisted.internet import defer, reactor 
 32  from twisted.spread import pb 
 33  from zope.interface import implements 
 34   
 35  from flumotion.common import errors, log 
 36  from flumotion.common import common, worker, startset 
 37  from flumotion.twisted import checkers, fdserver 
 38  from flumotion.twisted import pb as fpb 
 39   
 40  JOB_SHUTDOWN_TIMEOUT = 5 
 41   
 42   
43 -def _getSocketPath():
44 # FIXME: there is mkstemp for sockets, so we have a small window 45 # here in which the socket could be created by something else 46 # I didn't succeed in preparing a socket file with that name either 47 48 # caller needs to delete name before using 49 import tempfile 50 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 51 os.close(fd) 52 53 return name
54 55
56 -class JobInfo(object):
57 """ 58 I hold information about a job. 59 60 @cvar pid: PID of the child process 61 @type pid: int 62 @cvar avatarId: avatar identification string 63 @type avatarId: str 64 @cvar type: type of the component to create 65 @type type: str 66 @cvar moduleName: name of the module to create the component from 67 @type moduleName: str 68 @cvar methodName: the factory method to use to create the component 69 @type methodName: str 70 @cvar nice: the nice level to run the job as 71 @type nice: int 72 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 73 create the component 74 @type bundles: list of (str, str) 75 """ 76 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 77 'nice', 'bundles')
78 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 79 bundles):
80 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles
87
88 -class JobProcessProtocol(worker.ProcessProtocol):
89 - def __init__(self, heaven, avatarId, startSet):
90 self._startSet = startSet 91 self._deferredStart = startSet.createRegistered(avatarId) 92 worker.ProcessProtocol.__init__(self, heaven, avatarId, 93 'component', 94 heaven.getWorkerName())
95
96 - def sendMessage(self, message):
97 heaven = self.loggable 98 heaven.brain.callRemote('componentAddMessage', self.avatarId, 99 message)
100
101 - def processEnded(self, status):
102 heaven = self.loggable 103 dstarts = self._startSet 104 signum = status.value.signal 105 106 # we need to trigger a failure on the create deferred 107 # if the job failed before logging in to the worker; 108 # otherwise the manager still thinks it's starting up when it's 109 # dead. If the job already attached to the worker however, 110 # the create deferred will already have callbacked. 111 deferred = dstarts.createRegistered(self.avatarId) 112 if deferred is self._deferredStart: 113 if signum: 114 reason = "received signal %d" % signum 115 else: 116 reason = "unknown reason" 117 text = "Component '%s' has exited early (%s). " \ 118 "This is sometimes triggered by a corrupt " \ 119 "GStreamer registry." % (self.avatarId, reason) 120 dstarts.createFailed(self.avatarId, 121 errors.ComponentCreateError(text)) 122 123 if dstarts.shutdownRegistered(self.avatarId): 124 dstarts.shutdownSuccess(self.avatarId) 125 126 heaven.jobStopped(self.pid) 127 128 # chain up 129 worker.ProcessProtocol.processEnded(self, status)
130 131
132 -class BaseJobHeaven(pb.Root, log.Loggable):
133 """ 134 I am similar to but not quite the same as a manager-side Heaven. 135 I manage avatars inside the worker for job processes spawned by the worker. 136 137 @ivar avatars: dict of avatarId -> avatar 138 @type avatars: dict of str -> L{base.BaseJobAvatar} 139 @ivar brain: the worker brain 140 @type brain: L{worker.WorkerBrain} 141 """ 142 143 logCategory = "job-heaven" 144 implements(portal.IRealm) 145 146 avatarClass = None 147
148 - def __init__(self, brain):
149 """ 150 @param brain: a reference to the worker brain 151 @type brain: L{WorkerBrain} 152 """ 153 self.avatars = {} # componentId -> avatar 154 self.brain = brain 155 self._socketPath = _getSocketPath() 156 self._port = None 157 self._onShutdown = None # If set, a deferred to fire when our last child 158 # process exits 159 160 self._jobInfos = {} # processid -> JobInfo 161 162 self._startSet = startset.StartSet(lambda x: x in self.avatars, 163 errors.ComponentAlreadyStartingError, 164 errors.ComponentAlreadyRunningError)
165
166 - def listen(self):
167 assert self._port is None 168 assert self.avatarClass is not None 169 # FIXME: we should hand a username and password to log in with to 170 # the job process instead of allowing anonymous 171 checker = checkers.FlexibleCredentialsChecker() 172 checker.allowPasswordless(True) 173 p = portal.Portal(self, [checker]) 174 f = pb.PBServerFactory(p) 175 try: 176 os.unlink(self._socketPath) 177 except: 178 pass 179 180 # Rather than a listenUNIX(), we use listenWith so that we can specify 181 # our particular Port, which creates Transports that we know how to 182 # pass FDs over. 183 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 184 self._port = port
185 186 ### portal.IRealm method
187 - def requestAvatar(self, avatarId, mind, *interfaces):
188 if pb.IPerspective in interfaces: 189 avatar = self.avatarClass(self, avatarId, mind) 190 assert avatarId not in self.avatars 191 self.avatars[avatarId] = avatar 192 return pb.IPerspective, avatar, avatar.logout 193 else: 194 raise NotImplementedError("no interface")
195
196 - def removeAvatar(self, avatarId):
197 if avatarId in self.avatars: 198 del self.avatars[avatarId] 199 else: 200 self.warning("some programmer is telling me about an avatar " 201 "I have no idea about: %s", avatarId)
202
203 - def getWorkerName(self):
204 """ 205 Gets the name of the worker that spawns the process. 206 207 @rtype: str 208 """ 209 return self.brain.workerName
210
211 - def addJobInfo(self, processId, jobInfo):
212 self._jobInfos[processId] = jobInfo
213
214 - def getJobInfo(self, processId):
215 return self._jobInfos[processId]
216
217 - def getJobInfos(self):
218 return self._jobInfos.values()
219
220 - def getJobPids(self):
221 return self._jobInfos.keys()
222
223 - def rotateChildLogFDs(self):
224 self.debug('telling kids about new log file descriptors') 225 for avatar in self.avatars.values(): 226 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
227
228 - def jobStopped(self, pid):
229 if pid in self._jobInfos: 230 self.debug('Removing job info for %d', pid) 231 del self._jobInfos[pid] 232 233 if not self._jobInfos and self._onShutdown: 234 self.debug("Last child exited") 235 self._onShutdown.callback(None) 236 else: 237 self.warning("some programmer is telling me about a pid " 238 "I have no idea about: %d", pid)
239
240 - def shutdown(self):
241 self.debug('Shutting down JobHeaven') 242 self.debug('Stopping all jobs') 243 for avatar in self.avatars.values(): 244 avatar.stop() 245 246 if self.avatars: 247 # If our jobs fail to shut down nicely within some period of 248 # time, shut them down less nicely 249 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 250 def cancelDelayedCall(res, dc): 251 # be nice to unit tests 252 if dc.active(): 253 dc.cancel() 254 return res
255 256 self._onShutdown = defer.Deferred() 257 self._onShutdown.addCallback(cancelDelayedCall, dc) 258 ret = self._onShutdown 259 else: 260 # everything's gone already, return success 261 ret = defer.succeed(None) 262 263 def stopListening(_): 264 # possible for it to be None, if we haven't been told to 265 # listen yet, as in some test cases 266 if self._port: 267 port = self._port 268 self._port = None 269 return port.stopListening()
270 ret.addCallback(stopListening) 271 return ret 272
273 - def kill(self, signum=signal.SIGKILL):
274 self.warning("Killing all children immediately") 275 for pid in self.getJobPids(): 276 self.killJobByPid(pid, signum)
277
278 - def killJobByPid(self, pid, signum):
279 if pid not in self._jobInfos: 280 raise errors.UnknownComponentError(pid) 281 282 jobInfo = self._jobInfos[pid] 283 self.debug("Sending signal %d to job %s at pid %d", signum, 284 jobInfo.avatarId, jobInfo.pid) 285 common.signalPid(jobInfo.pid, signum)
286
287 - def killJob(self, avatarId, signum):
288 for job in self._jobInfos.values(): 289 if job.avatarId == avatarId: 290 self.killJobByPid(job.pid, signum)
291 292
293 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
294 """ 295 I am an avatar for the job living in the worker. 296 """ 297 logCategory = 'job-avatar' 298
299 - def __init__(self, heaven, avatarId, mind):
300 """ 301 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 302 @type avatarId: str 303 """ 304 fpb.Avatar.__init__(self, avatarId) 305 self._heaven = heaven 306 self.setMind(mind) 307 self.pid = None
308
309 - def setMind(self, mind):
310 """ 311 @param mind: reference to the job's JobMedium on which we can call 312 @type mind: L{twisted.spread.pb.RemoteReference} 313 """ 314 fpb.Avatar.setMind(self, mind) 315 self.haveMind()
316
317 - def haveMind(self):
318 # implement me in subclasses 319 pass
320
321 - def logout(self):
322 self.log('logout called, %s disconnected', self.avatarId) 323 324 self._heaven.removeAvatar(self.avatarId)
325
326 - def stop(self):
327 """ 328 returns: a deferred marking completed stop. 329 """ 330 raise NotImplementedError
331
332 - def _sendFileDescriptor(self, fd, message):
333 try: 334 # FIXME: pay attention to the return value of 335 # sendFileDescriptor; is the same as the return value of 336 # sendmsg(2) 337 self.mind.broker.transport.sendFileDescriptor(fd, message) 338 return True 339 except RuntimeError, e: 340 # RuntimeError is what is thrown by the C code doing this 341 # when there are issues 342 self.warning("RuntimeError %s sending file descriptors", 343 log.getExceptionMessage(e)) 344 return False
345
346 - def logTo(self, stdout, stderr):
347 """ 348 Tell the job to log to the given file descriptors. 349 """ 350 self.debug('Giving job new stdout and stderr') 351 if self.mind: 352 self._sendFileDescriptor(stdout, "redirectStdout") 353 self._sendFileDescriptor(stdout, "redirectStderr")
354