Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import signal 
 24   
 25  from twisted.python import failure 
 26  from twisted.internet import reactor, protocol, defer 
 27  from flumotion.common import log as flog 
 28   
 29  """ 
 30  Framework for writing automated integration tests. 
 31   
 32  This module provides a way of writing automated integration tests from 
 33  within Twisted's unit testing framework, trial. Test cases are 
 34  constructed as subclasses of the normal trial 
 35  L{twisted.trial.unittest.TestCase} class. 
 36   
 37  Integration tests look like normal test methods, except that they are 
 38  decorated with L{integration.test}, take an extra "plan" argument, and 
 39  do not return anything. For example: 
 40   
 41    from twisted.trial import unittest 
 42    from flumotion.twisted import integration 
 43   
 44    class IntegrationTestExample(unittest.TestCase): 
 45        @integration.test 
 46        def testEchoFunctionality(self, plan): 
 47            process = plan.spawn('echo', 'hello world') 
 48            plan.wait(process, 0) 
 49   
 50  This example will spawn a process, as if you typed "echo 'hello world'" 
 51  at the shell prompt. It then waits for the process to exit, expecting 
 52  the exit status to be 0. 
 53   
 54  The example illustrates two of the fundamental plan operators, spawn and 
 55  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 56  The other operators are "spawnPar", which spawns a number of processes 
 57  in parallel, "waitPar", which waits for a number of processes in 
 58  parallel, and "kill", which kills one or more processes via SIGTERM and 
 59  then waits for them to exit. 
 60   
 61  It is evident that this framework is most appropriate for testing the 
 62  integration of multiple processes, and is not suitable for in-process 
 63  tests. The plan that is built up is only executed after the test method 
 64  exits, via the L{integration.test} decorator; the writer of the 
 65  integration test does not have access to the plan's state. 
 66   
 67  Note that all process exits must be anticipated. If at any point the 
 68  integration tester receives SIGCHLD, the next operation must be a wait 
 69  for that process. If this is not the case, the test is interpreted as 
 70  having failed. 
 71   
 72  Also note that while the test is running, the stdout and stderr of each 
 73  spawned process is redirected into log files in a subdirectory of where 
 74  the test is located. For example, in the previous example, the following 
 75  files will be created: 
 76   
 77    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 78    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 79   
 80  In the case that multiple echo commands are run in the same plan, the 
 81  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 82  successful completion of the test case, the log directory will be 
 83  deleted. 
 84  """ 
 85   
 86  # Twisted's reactor.iterate() is defined like this: 
 87  # 
 88  #     def iterate(self, delay=0): 
 89  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 90  #        """ 
 91  #        self.runUntilCurrent() 
 92  #        self.doIteration(delay) 
 93  # 
 94  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 95  # something is added to the threadCallQueue between runUntilCurrent() 
 96  # and doIteration(), the reactor needs to have an fd ready for reading 
 97  # to shortcut the select(). This is done by callFromThread() calling 
 98  # reactor.wakeUp(), which will write on the wakeup FD. 
 99  # 
100  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
101  # is being called from another thread. This is obviously borked in the 
102  # signal-handling case, when a signal arrives between runUntilCurrent() 
103  # and doIteration(), and is processed via reactor.callFromThread(), as 
104  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
105  # the waker. This is twisted bug #1997. 
106  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
107   
108 -def log(format, *args):
109 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
110 -def debug(format, *args):
111 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
112 -def info(format, *args):
113 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
114 -def warning(format, *args):
115 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
116 -def error(format, *args):
117 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
118
119 -def _which(executable):
120 if os.sep in executable: 121 if os.access(os.path.abspath(executable), os.X_OK): 122 return os.path.abspath(executable) 123 elif os.getenv('PATH'): 124 for path in os.getenv('PATH').split(os.pathsep): 125 if os.access(os.path.join(path, executable), os.X_OK): 126 return os.path.join(path, executable) 127 raise CommandNotFoundException(executable)
128
129 -class UnexpectedExitCodeException(Exception):
130 - def __init__(self, process, expectedCode, actualCode):
131 Exception.__init__(self) 132 self.process = process 133 self.expected = expectedCode 134 self.actual = actualCode
135 - def __str__(self):
136 return ('Expected exit code %r from %r, but got %r' 137 % (self.expected, self.process, self.actual))
138
139 -class UnexpectedExitException(Exception):
140 - def __init__(self, process):
141 Exception.__init__(self) 142 self.process = process
143 - def __str__(self):
144 return 'The process %r exited prematurely.' % self.process
145
146 -class CommandNotFoundException(Exception):
147 - def __init__(self, command):
148 Exception.__init__(self) 149 self.command = command
150 - def __str__(self):
151 return 'Command %r not found in the PATH.' % self.command
152
153 -class ProcessesStillRunningException(Exception):
154 - def __init__(self, processes):
155 Exception.__init__(self) 156 self.processes = processes
157 - def __str__(self):
158 return ('Processes still running at end of test: %r' 159 % (self.processes,))
160
161 -class TimeoutException(Exception):
162 - def __init__(self, process, status):
163 self.process = process 164 self.status = status
165
166 - def __str__(self):
167 return ('Timed out waiting for %r to exit with status %r' 168 % (self.process, self.status))
169
170 -class ProcessProtocol(protocol.ProcessProtocol):
171 - def __init__(self):
172 self.exitDeferred = defer.Deferred() 173 self.timedOut = False
174
175 - def getDeferred(self):
176 return self.exitDeferred
177
178 - def timeout(self, process, status):
179 info('forcing timeout for process protocol %r', self) 180 self.timedOut = True 181 self.exitDeferred.errback(TimeoutException(process, status))
182
183 - def processEnded(self, status):
184 info('process ended with status %r, exit code %r', status, status.value.exitCode) 185 if self.timedOut: 186 warning('already timed out??') 187 print 'already timed out quoi?' 188 else: 189 info('process ended with status %r, exit code %r', status, status.value.exitCode) 190 self.exitDeferred.callback(status.value.exitCode)
191
192 -class Process:
193 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 194
195 - def __init__(self, name, argv, testDir):
196 self.name = name 197 self.argv = (_which(argv[0]),) + argv[1:] 198 self.testDir = testDir 199 200 self.pid = None 201 self.protocol = None 202 self.state = self.NOT_STARTED 203 self._timeoutDC = None 204 205 log('created process object %r', self)
206
207 - def start(self):
208 assert self.state == self.NOT_STARTED 209 210 self.protocol = ProcessProtocol() 211 212 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 213 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 214 # don't give it a stdin, output to log files 215 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 216 # There's a race condition in twisted.internet.process, whereby 217 # signals received between the fork() and exec() in the child 218 # are handled with the twisted handlers, i.e. postponed, but 219 # they never get called because of the exec(). The end is they 220 # are ignored. 221 # 222 # So, work around that by resetting the sigterm handler to the 223 # default so if we self.kill() immediately after self.start(), 224 # that the subprocess won't ignore the signal. This is a window 225 # in the parent in which SIGTERM will cause immediate 226 # termination instead of the twisted nice termination, but 227 # that's better than the kid missing the signal. 228 info('spawning process %r, argv=%r', self, self.argv) 229 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 230 env = dict(os.environ) 231 env['FLU_DEBUG'] = '5' 232 process = reactor.spawnProcess(self.protocol, self.argv[0], 233 env=env, args=self.argv, 234 childFDs=childFDs) 235 signal.signal(signal.SIGTERM, termHandler) 236 # close our handles on the log files 237 stdout.close() 238 stderr.close() 239 240 # it's possible the process *already* exited, from within the 241 # spawnProcess itself. So set our state to STARTED, *then* 242 # attach the callback. 243 self.pid = process.pid 244 self.state = self.STARTED 245 246 def got_exit(res): 247 self.state = self.STOPPED 248 info('process %r has stopped', self) 249 return res
250 self.protocol.getDeferred().addCallback(got_exit)
251
252 - def kill(self, sig=signal.SIGTERM):
253 assert self.state == self.STARTED 254 info('killing process %r, signal %d', self, sig) 255 os.kill(self.pid, sig)
256
257 - def wait(self, status, timeout=20):
258 assert self.state != self.NOT_STARTED 259 info('waiting for process %r to exit', self) 260 d = self.protocol.getDeferred() 261 def got_exit(res): 262 debug('process %r exited with status %r', self, res) 263 if res != status: 264 warning('expected exit code %r for process %r, but got %r', 265 status, self, res) 266 raise UnexpectedExitCodeException(self, status, res)
267 d.addCallback(got_exit) 268 if self.state == self.STARTED: 269 self._timeoutDC = reactor.callLater(timeout, 270 self.protocol.timeout, 271 self, 272 status) 273 def cancel_timeout(res): 274 debug('cancelling timeout for %r', self) 275 if self._timeoutDC.active(): 276 self._timeoutDC.cancel() 277 return res 278 d.addCallbacks(cancel_timeout, cancel_timeout) 279 return d 280
281 - def __repr__(self):
282 return '<Process %s in state %s>' % (self.name, self.state)
283
284 -class PlanExecutor:
285 # both the vm and its ops 286
287 - def __init__(self):
288 self.processes = [] 289 self.timeout = 20
290
291 - def spawn(self, process):
292 assert process not in self.processes 293 self.processes.append(process) 294 process.start() 295 return defer.succeed(True)
296
297 - def checkExits(self, expectedExits):
298 for process in self.processes: 299 if (process.state != process.STARTED 300 and process not in expectedExits): 301 raise UnexpectedExitException(process)
302
303 - def kill(self, process):
304 assert process in self.processes 305 process.kill() 306 return defer.succeed(True)
307
308 - def wait(self, process, exitCode):
309 assert process in self.processes 310 def remove_from_processes_list(_): 311 self.processes.remove(process)
312 d = process.wait(exitCode, timeout=self.timeout) 313 d.addCallback(remove_from_processes_list) 314 return d
315
316 - def _checkProcesses(self, failure=None):
317 if self.processes: 318 warning('processes still running at end of test: %r', 319 self.processes) 320 e = ProcessesStillRunningException(self.processes) 321 dlist = [] 322 # reap all processes, and once we have them reaped, errback 323 for p in self.processes: 324 if p.state != p.STARTED: 325 continue 326 d = defer.Deferred() 327 dlist.append(d) 328 def callbacker(d): 329 return lambda status: d.callback(status.value.exitCode)
330 p.protocol.processEnded = callbacker(d) 331 p.kill(sig=signal.SIGKILL) 332 d = defer.DeferredList(dlist) 333 def error(_): 334 if failure: 335 return failure 336 else: 337 raise e 338 d.addCallback(error) 339 return d 340 return failure 341
342 - def run(self, ops, timeout=20):
343 self.timeout = timeout 344 d = defer.Deferred() 345 def run_op(_, op): 346 # print 'Last result: %r' % (_,) 347 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 348 return op[0](*op[1:])
349 for op in ops: 350 d.addCallback(run_op, op) 351 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 352 lambda failure: self._checkProcesses(failure=failure)) 353 354 # We should only spawn processes when twisted has set up its 355 # sighandlers. It does that *after* firing the reactor startup 356 # event and before entering the reactor loop. So, make sure 357 # twisted is ready for us by firing the plan in a callLater. 358 reactor.callLater(0, d.callback, None) 359 return d 360
361 -class Plan:
362 - def __init__(self, testCase, testName):
363 self.name = testName 364 self.testCaseName = testCase.__class__.__name__ 365 self.processes = {} 366 self.outputDir = self._makeOutputDir(os.getcwd()) 367 368 # put your boots on monterey jacks, cause this gravy just made a 369 # virtual machine whose instructions are python methods 370 self.vm = PlanExecutor() 371 self.ops = [] 372 self.timeout = 20
373
374 - def _makeOutputDir(self, testDir):
375 # ensure that testDir exists 376 try: 377 os.mkdir(testDir) 378 except OSError: 379 pass 380 tail = '%s-%s' % (self.testCaseName, self.name) 381 outputDir = os.path.join(testDir, tail) 382 os.mkdir(outputDir) 383 return outputDir
384
385 - def _cleanOutputDir(self):
386 for root, dirs, files in os.walk(self.outputDir, topdown=False): 387 for name in files: 388 os.remove(os.path.join(root, name)) 389 for name in dirs: 390 os.rmdir(os.path.join(root, name)) 391 os.rmdir(self.outputDir) 392 self.outputDir = None
393
394 - def _allocProcess(self, args):
395 command = args[0] 396 name = command 397 i = 0 398 while name in self.processes: 399 i += 1 400 name = '%s-%d' % (command, i) 401 process = Process(name, args, self.outputDir) 402 self.processes[name] = process 403 return process
404
405 - def _appendOp(self, *args):
406 self.ops.append(args)
407
408 - def setTimeout(self, timeout):
409 self.timeout = timeout
410
411 - def spawn(self, command, *args):
412 allArgs = (command,) + args 413 process, = self.spawnPar(allArgs) 414 return process
415
416 - def spawnPar(self, *argvs):
417 processes = [] 418 self._appendOp(self.vm.checkExits, ()) 419 for argv in argvs: 420 assert isinstance(argv, tuple), \ 421 'all arguments to spawnPar must be tuples' 422 for arg in argv: 423 assert isinstance(arg, str), \ 424 'all subarguments to spawnPar must be strings' 425 processes.append(self._allocProcess(argv)) 426 for process in processes: 427 self._appendOp(self.vm.spawn, process) 428 return tuple(processes)
429
430 - def wait(self, process, status):
431 self.waitPar((process, status))
432
433 - def waitPar(self, *processStatusPairs):
434 processes = tuple([p for p,s in processStatusPairs]) 435 self._appendOp(self.vm.checkExits, processes) 436 for process, status in processStatusPairs: 437 self._appendOp(self.vm.wait, process, status)
438
439 - def kill(self, process, status=None):
440 self._appendOp(self.vm.checkExits, ()) 441 self._appendOp(self.vm.kill, process) 442 self._appendOp(self.vm.wait, process, status)
443
444 - def execute(self):
445 d = self.vm.run(self.ops, timeout=self.timeout) 446 d.addCallback(lambda _: self._cleanOutputDir()) 447 return d
448
449 -def test(proc):
450 testName = proc.__name__ 451 def wrappedtest(self): 452 plan = Plan(self, testName) 453 proc(self, plan) 454 return plan.execute()
455 try: 456 wrappedtest.__name__ = testName 457 except Exception: 458 # can only set procedure names in python >= 2.4 459 pass 460 # trial seems to require a timeout, at least in twisted 2.4, so give 461 # it a nice one 462 wrappedtest.timeout = 666 463 return wrappedtest 464