Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 worker-side objects to handle worker clients 24 """ 25 26 import os 27 import sys 28 import signal 29 30 from twisted.cred import portal 31 from twisted.internet import defer, reactor 32 from twisted.spread import pb 33 from zope.interface import implements 34 35 from flumotion.common import errors, log 36 from flumotion.common import common, worker, startset 37 from flumotion.twisted import checkers, fdserver 38 from flumotion.twisted import pb as fpb 39 40 JOB_SHUTDOWN_TIMEOUT = 5 41 4244 # FIXME: there is mkstemp for sockets, so we have a small window 45 # here in which the socket could be created by something else 46 # I didn't succeed in preparing a socket file with that name either 47 48 # caller needs to delete name before using 49 import tempfile 50 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 51 os.close(fd) 52 53 return name54 5557 """ 58 I hold information about a job. 59 60 @cvar pid: PID of the child process 61 @type pid: int 62 @cvar avatarId: avatar identification string 63 @type avatarId: str 64 @cvar type: type of the component to create 65 @type type: str 66 @cvar moduleName: name of the module to create the component from 67 @type moduleName: str 68 @cvar methodName: the factory method to use to create the component 69 @type methodName: str 70 @cvar nice: the nice level to run the job as 71 @type nice: int 72 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 73 create the component 74 @type bundles: list of (str, str) 75 """ 76 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 77 'nice', 'bundles')8780 self.pid = pid 81 self.avatarId = avatarId 82 self.type = type 83 self.moduleName = moduleName 84 self.methodName = methodName 85 self.nice = nice 86 self.bundles = bundles130 13190 self._startSet = startSet 91 self._deferredStart = startSet.createRegistered(avatarId) 92 worker.ProcessProtocol.__init__(self, heaven, avatarId, 93 'component', 94 heaven.getWorkerName())9597 heaven = self.loggable 98 heaven.brain.callRemote('componentAddMessage', self.avatarId, 99 message)100102 heaven = self.loggable 103 dstarts = self._startSet 104 signum = status.value.signal 105 106 # we need to trigger a failure on the create deferred 107 # if the job failed before logging in to the worker; 108 # otherwise the manager still thinks it's starting up when it's 109 # dead. If the job already attached to the worker however, 110 # the create deferred will already have callbacked. 111 deferred = dstarts.createRegistered(self.avatarId) 112 if deferred is self._deferredStart: 113 if signum: 114 reason = "received signal %d" % signum 115 else: 116 reason = "unknown reason" 117 text = "Component '%s' has exited early (%s). " \ 118 "This is sometimes triggered by a corrupt " \ 119 "GStreamer registry." % (self.avatarId, reason) 120 dstarts.createFailed(self.avatarId, 121 errors.ComponentCreateError(text)) 122 123 if dstarts.shutdownRegistered(self.avatarId): 124 dstarts.shutdownSuccess(self.avatarId) 125 126 heaven.jobStopped(self.pid) 127 128 # chain up 129 worker.ProcessProtocol.processEnded(self, status)133 """ 134 I am similar to but not quite the same as a manager-side Heaven. 135 I manage avatars inside the worker for job processes spawned by the worker. 136 137 @ivar avatars: dict of avatarId -> avatar 138 @type avatars: dict of str -> L{base.BaseJobAvatar} 139 @ivar brain: the worker brain 140 @type brain: L{worker.WorkerBrain} 141 """ 142 143 logCategory = "job-heaven" 144 implements(portal.IRealm) 145 146 avatarClass = None 147270 ret.addCallback(stopListening) 271 return ret 272149 """ 150 @param brain: a reference to the worker brain 151 @type brain: L{WorkerBrain} 152 """ 153 self.avatars = {} # componentId -> avatar 154 self.brain = brain 155 self._socketPath = _getSocketPath() 156 self._port = None 157 self._onShutdown = None # If set, a deferred to fire when our last child 158 # process exits 159 160 self._jobInfos = {} # processid -> JobInfo 161 162 self._startSet = startset.StartSet(lambda x: x in self.avatars, 163 errors.ComponentAlreadyStartingError, 164 errors.ComponentAlreadyRunningError)165167 assert self._port is None 168 assert self.avatarClass is not None 169 # FIXME: we should hand a username and password to log in with to 170 # the job process instead of allowing anonymous 171 checker = checkers.FlexibleCredentialsChecker() 172 checker.allowPasswordless(True) 173 p = portal.Portal(self, [checker]) 174 f = pb.PBServerFactory(p) 175 try: 176 os.unlink(self._socketPath) 177 except: 178 pass 179 180 # Rather than a listenUNIX(), we use listenWith so that we can specify 181 # our particular Port, which creates Transports that we know how to 182 # pass FDs over. 183 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 184 self._port = port185 186 ### portal.IRealm method188 if pb.IPerspective in interfaces: 189 avatar = self.avatarClass(self, avatarId, mind) 190 assert avatarId not in self.avatars 191 self.avatars[avatarId] = avatar 192 return pb.IPerspective, avatar, avatar.logout 193 else: 194 raise NotImplementedError("no interface")195197 if avatarId in self.avatars: 198 del self.avatars[avatarId] 199 else: 200 self.warning("some programmer is telling me about an avatar " 201 "I have no idea about: %s", avatarId)202204 """ 205 Gets the name of the worker that spawns the process. 206 207 @rtype: str 208 """ 209 return self.brain.workerName210 213 216 219221 return self._jobInfos.keys()222224 self.debug('telling kids about new log file descriptors') 225 for avatar in self.avatars.values(): 226 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())227229 if pid in self._jobInfos: 230 self.debug('Removing job info for %d', pid) 231 del self._jobInfos[pid] 232 233 if not self._jobInfos and self._onShutdown: 234 self.debug("Last child exited") 235 self._onShutdown.callback(None) 236 else: 237 self.warning("some programmer is telling me about a pid " 238 "I have no idea about: %d", pid)239241 self.debug('Shutting down JobHeaven') 242 self.debug('Stopping all jobs') 243 for avatar in self.avatars.values(): 244 avatar.stop() 245 246 if self.avatars: 247 # If our jobs fail to shut down nicely within some period of 248 # time, shut them down less nicely 249 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 250 def cancelDelayedCall(res, dc): 251 # be nice to unit tests 252 if dc.active(): 253 dc.cancel() 254 return res255 256 self._onShutdown = defer.Deferred() 257 self._onShutdown.addCallback(cancelDelayedCall, dc) 258 ret = self._onShutdown 259 else: 260 # everything's gone already, return success 261 ret = defer.succeed(None) 262 263 def stopListening(_): 264 # possible for it to be None, if we haven't been told to 265 # listen yet, as in some test cases 266 if self._port: 267 port = self._port 268 self._port = None 269 return port.stopListening()274 self.warning("Killing all children immediately") 275 for pid in self.getJobPids(): 276 self.killJobByPid(pid, signum)277279 if pid not in self._jobInfos: 280 raise errors.UnknownComponentError(pid) 281 282 jobInfo = self._jobInfos[pid] 283 self.debug("Sending signal %d to job %s at pid %d", signum, 284 jobInfo.avatarId, jobInfo.pid) 285 common.signalPid(jobInfo.pid, signum)286288 for job in self._jobInfos.values(): 289 if job.avatarId == avatarId: 290 self.killJobByPid(job.pid, signum)291 292294 """ 295 I am an avatar for the job living in the worker. 296 """ 297 logCategory = 'job-avatar' 298354300 """ 301 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 302 @type avatarId: str 303 """ 304 fpb.Avatar.__init__(self, avatarId) 305 self._heaven = heaven 306 self.setMind(mind) 307 self.pid = None308310 """ 311 @param mind: reference to the job's JobMedium on which we can call 312 @type mind: L{twisted.spread.pb.RemoteReference} 313 """ 314 fpb.Avatar.setMind(self, mind) 315 self.haveMind()316 320322 self.log('logout called, %s disconnected', self.avatarId) 323 324 self._heaven.removeAvatar(self.avatarId)325 331333 try: 334 # FIXME: pay attention to the return value of 335 # sendFileDescriptor; is the same as the return value of 336 # sendmsg(2) 337 self.mind.broker.transport.sendFileDescriptor(fd, message) 338 return True 339 except RuntimeError, e: 340 # RuntimeError is what is thrown by the C code doing this 341 # when there are issues 342 self.warning("RuntimeError %s sending file descriptors", 343 log.getExceptionMessage(e)) 344 return False345347 """ 348 Tell the job to log to the given file descriptors. 349 """ 350 self.debug('Giving job new stdout and stderr') 351 if self.mind: 352 self._sendFileDescriptor(stdout, "redirectStdout") 353 self._sendFileDescriptor(stdout, "redirectStderr")
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Aug 7 15:02:43 2008 | http://epydoc.sourceforge.net |