Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Framework for writing automated integration tests. 
 24   
 25  This module provides a way of writing automated integration tests from 
 26  within Twisted's unit testing framework, trial. Test cases are 
 27  constructed as subclasses of the normal trial 
 28  L{twisted.trial.unittest.TestCase} class. 
 29   
 30  Integration tests look like normal test methods, except that they are 
 31  decorated with L{integration.test}, take an extra "plan" argument, and 
 32  do not return anything. For example:: 
 33   
 34    from twisted.trial import unittest 
 35    from flumotion.twisted import integration 
 36   
 37    class IntegrationTestExample(unittest.TestCase): 
 38        @integration.test 
 39        def testEchoFunctionality(self, plan): 
 40            process = plan.spawn('echo', 'hello world') 
 41            plan.wait(process, 0) 
 42   
 43  This example will spawn a process, as if you typed "echo 'hello world'" 
 44  at the shell prompt. It then waits for the process to exit, expecting 
 45  the exit status to be 0. 
 46   
 47  The example illustrates two of the fundamental plan operators, spawn and 
 48  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 49  The other operators are "spawnPar", which spawns a number of processes 
 50  in parallel, "waitPar", which waits for a number of processes in 
 51  parallel, and "kill", which kills one or more processes via SIGTERM and 
 52  then waits for them to exit. 
 53   
 54  It is evident that this framework is most appropriate for testing the 
 55  integration of multiple processes, and is not suitable for in-process 
 56  tests. The plan that is built up is only executed after the test method 
 57  exits, via the L{integration.test} decorator; the writer of the 
 58  integration test does not have access to the plan's state. 
 59   
 60  Note that all process exits must be anticipated. If at any point the 
 61  integration tester receives SIGCHLD, the next operation must be a wait 
 62  for that process. If this is not the case, the test is interpreted as 
 63  having failed. 
 64   
 65  Also note that while the test is running, the stdout and stderr of each 
 66  spawned process is redirected into log files in a subdirectory of where 
 67  the test is located. For example, in the previous example, the following 
 68  files will be created:: 
 69   
 70    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 71    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 72   
 73  In the case that multiple echo commands are run in the same plan, the 
 74  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 75  successful completion of the test case, the log directory will be 
 76  deleted. 
 77  """ 
 78   
 79  import os 
 80  import signal 
 81  import tempfile 
 82   
 83  from twisted.python import failure 
 84  from twisted.internet import reactor, protocol, defer 
 85  from flumotion.common import log as flog 
 86   
 87  __version__ = "$Rev$" 
 88   
 89   
 90  # Twisted's reactor.iterate() is defined like this: 
 91  # 
 92  #     def iterate(self, delay=0): 
 93  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 94  #        """ 
 95  #        self.runUntilCurrent() 
 96  #        self.doIteration(delay) 
 97  # 
 98  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 99  # something is added to the threadCallQueue between runUntilCurrent() 
100  # and doIteration(), the reactor needs to have an fd ready for reading 
101  # to shortcut the select(). This is done by callFromThread() calling 
102  # reactor.wakeUp(), which will write on the wakeup FD. 
103  # 
104  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
105  # is being called from another thread. This is obviously borked in the 
106  # signal-handling case, when a signal arrives between runUntilCurrent() 
107  # and doIteration(), and is processed via reactor.callFromThread(), as 
108  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
109  # the waker. This is twisted bug #1997. 
110  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
111   
112   
113 -def log(format, *args):
114 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
115 116
117 -def debug(format, *args):
118 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
119 120
121 -def info(format, *args):
122 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
123 124
125 -def warning(format, *args):
126 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
127 128
129 -def error(format, *args):
130 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
131 132
133 -def _which(executable):
134 if os.sep in executable: 135 if os.access(os.path.abspath(executable), os.X_OK): 136 return os.path.abspath(executable) 137 elif os.getenv('PATH'): 138 for path in os.getenv('PATH').split(os.pathsep): 139 if os.access(os.path.join(path, executable), os.X_OK): 140 return os.path.join(path, executable) 141 raise CommandNotFoundException(executable)
142 143
144 -class UnexpectedExitCodeException(Exception):
145
146 - def __init__(self, process, expectedCode, actualCode):
147 Exception.__init__(self) 148 self.process = process 149 self.expected = expectedCode 150 self.actual = actualCode
151
152 - def __str__(self):
153 return ('Expected exit code %r from %r, but got %r' 154 % (self.expected, self.process, self.actual))
155 156
157 -class UnexpectedExitException(Exception):
158
159 - def __init__(self, process):
160 Exception.__init__(self) 161 self.process = process
162
163 - def __str__(self):
164 return 'The process %r exited prematurely.' % self.process
165 166
167 -class CommandNotFoundException(Exception):
168
169 - def __init__(self, command):
170 Exception.__init__(self) 171 self.command = command
172
173 - def __str__(self):
174 return 'Command %r not found in the PATH.' % self.command
175 176
177 -class ProcessesStillRunningException(Exception):
178
179 - def __init__(self, processes):
180 Exception.__init__(self) 181 self.processes = processes
182
183 - def __str__(self):
184 return ('Processes still running at end of test: %r' 185 % (self.processes, ))
186 187
188 -class TimeoutException(Exception):
189
190 - def __init__(self, process, status):
191 self.process = process 192 self.status = status
193
194 - def __str__(self):
195 return ('Timed out waiting for %r to exit with status %r' 196 % (self.process, self.status))
197 198
199 -class ProcessProtocol(protocol.ProcessProtocol):
200
201 - def __init__(self):
202 self.exitDeferred = defer.Deferred() 203 self.timedOut = False
204
205 - def getDeferred(self):
206 return self.exitDeferred
207
208 - def timeout(self, process, status):
209 info('forcing timeout for process protocol %r', self) 210 self.timedOut = True 211 self.exitDeferred.errback(TimeoutException(process, status))
212
213 - def processEnded(self, status):
214 info('process ended with status %r, exit code %r', 215 status, status.value.exitCode) 216 if self.timedOut: 217 warning('already timed out??') 218 print 'already timed out quoi?' 219 else: 220 info('process ended with status %r, exit code %r', 221 status, status.value.exitCode) 222 self.exitDeferred.callback(status.value.exitCode)
223 224
225 -class Process:
226 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 227
228 - def __init__(self, name, argv, testDir):
229 self.name = name 230 self.argv = (_which(argv[0]), ) + argv[1:] 231 self.testDir = testDir 232 233 self.pid = None 234 self.protocol = None 235 self.state = self.NOT_STARTED 236 self._timeoutDC = None 237 238 log('created process object %r', self)
239
240 - def start(self):
241 assert self.state == self.NOT_STARTED 242 243 self.protocol = ProcessProtocol() 244 245 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 246 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 247 # don't give it a stdin, output to log files 248 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 249 # There's a race condition in twisted.internet.process, whereby 250 # signals received between the fork() and exec() in the child 251 # are handled with the twisted handlers, i.e. postponed, but 252 # they never get called because of the exec(). The end is they 253 # are ignored. 254 # 255 # So, work around that by resetting the sigterm handler to the 256 # default so if we self.kill() immediately after self.start(), 257 # that the subprocess won't ignore the signal. This is a window 258 # in the parent in which SIGTERM will cause immediate 259 # termination instead of the twisted nice termination, but 260 # that's better than the kid missing the signal. 261 info('spawning process %r, argv=%r', self, self.argv) 262 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 263 env = dict(os.environ) 264 env['FLU_DEBUG'] = '5' 265 process = reactor.spawnProcess(self.protocol, self.argv[0], 266 env=env, args=self.argv, 267 childFDs=childFDs) 268 signal.signal(signal.SIGTERM, termHandler) 269 # close our handles on the log files 270 stdout.close() 271 stderr.close() 272 273 # it's possible the process *already* exited, from within the 274 # spawnProcess itself. So set our state to STARTED, *then* 275 # attach the callback. 276 self.pid = process.pid 277 self.state = self.STARTED 278 279 def got_exit(res): 280 self.state = self.STOPPED 281 info('process %r has stopped', self) 282 return res
283 self.protocol.getDeferred().addCallback(got_exit)
284
285 - def kill(self, sig=signal.SIGTERM):
286 assert self.state == self.STARTED 287 info('killing process %r, signal %d', self, sig) 288 os.kill(self.pid, sig)
289
290 - def wait(self, status, timeout=20):
291 assert self.state != self.NOT_STARTED 292 info('waiting for process %r to exit', self) 293 d = self.protocol.getDeferred() 294 295 def got_exit(res): 296 debug('process %r exited with status %r', self, res) 297 if res != status: 298 warning('expected exit code %r for process %r, but got %r', 299 status, self, res) 300 raise UnexpectedExitCodeException(self, status, res)
301 d.addCallback(got_exit) 302 if self.state == self.STARTED: 303 self._timeoutDC = reactor.callLater(timeout, 304 self.protocol.timeout, 305 self, 306 status) 307 308 def cancel_timeout(res): 309 debug('cancelling timeout for %r', self) 310 if self._timeoutDC.active(): 311 self._timeoutDC.cancel() 312 return res 313 d.addCallbacks(cancel_timeout, cancel_timeout) 314 return d 315
316 - def __repr__(self):
317 return '<Process %s in state %s>' % (self.name, self.state)
318 319
320 -class PlanExecutor:
321 # both the vm and its ops 322
323 - def __init__(self):
324 self.processes = [] 325 self.timeout = 20
326
327 - def spawn(self, process):
328 assert process not in self.processes 329 self.processes.append(process) 330 process.start() 331 return defer.succeed(True)
332
333 - def checkExits(self, expectedExits):
334 for process in self.processes: 335 if (process.state != process.STARTED 336 and process not in expectedExits): 337 raise UnexpectedExitException(process)
338
339 - def kill(self, process):
340 assert process in self.processes 341 process.kill() 342 return defer.succeed(True)
343
344 - def wait(self, process, exitCode):
345 assert process in self.processes 346 347 def remove_from_processes_list(_): 348 self.processes.remove(process)
349 d = process.wait(exitCode, timeout=self.timeout) 350 d.addCallback(remove_from_processes_list) 351 return d
352
353 - def _checkProcesses(self, failure=None):
354 if self.processes: 355 warning('processes still running at end of test: %r', 356 self.processes) 357 e = ProcessesStillRunningException(self.processes) 358 dlist = [] 359 # reap all processes, and once we have them reaped, errback 360 for p in self.processes: 361 if p.state != p.STARTED: 362 continue 363 d = defer.Deferred() 364 dlist.append(d) 365 366 def callbacker(d): 367 return lambda status: d.callback(status.value.exitCode)
368 p.protocol.processEnded = callbacker(d) 369 p.kill(sig=signal.SIGKILL) 370 d = defer.DeferredList(dlist) 371 372 def error(_): 373 if failure: 374 return failure 375 else: 376 raise e 377 d.addCallback(error) 378 return d 379 return failure 380
381 - def run(self, ops, timeout=20):
382 self.timeout = timeout 383 d = defer.Deferred() 384 385 def run_op(_, op): 386 # print 'Last result: %r' % (_, ) 387 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 388 return op[0](*op[1:])
389 for op in ops: 390 d.addCallback(run_op, op) 391 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 392 lambda failure: self._checkProcesses(failure=failure)) 393 394 # We should only spawn processes when twisted has set up its 395 # sighandlers. It does that *after* firing the reactor startup 396 # event and before entering the reactor loop. So, make sure 397 # twisted is ready for us by firing the plan in a callLater. 398 reactor.callLater(0, d.callback, None) 399 return d 400 401
402 -class Plan:
403
404 - def __init__(self, testCase, testName):
405 self.name = testName 406 self.testCaseName = testCase.__class__.__name__ 407 self.processes = {} 408 self.testDir = self._makeTestDir() 409 self.outputDir = self._makeOutputDir(self.testDir) 410 411 # put your boots on monterey jacks, cause this gravy just made a 412 # virtual machine whose instructions are python methods 413 self.vm = PlanExecutor() 414 self.ops = [] 415 self.timeout = 20
416
417 - def _makeTestDir(self):
418 testDir = tempfile.mkdtemp(prefix="test_integration") 419 return testDir
420
421 - def _makeOutputDir(self, testDir):
422 tail = '%s-%s' % (self.testCaseName, self.name) 423 outputDir = os.path.join(testDir, tail) 424 os.mkdir(outputDir) 425 return outputDir
426
427 - def _cleanOutputDir(self):
428 for root, dirs, files in os.walk(self.outputDir, topdown=False): 429 for name in files: 430 os.remove(os.path.join(root, name)) 431 for name in dirs: 432 os.rmdir(os.path.join(root, name)) 433 os.rmdir(self.outputDir) 434 os.rmdir(self.testDir) 435 self.testDir = None 436 self.outputDir = None
437
438 - def _allocProcess(self, args):
439 command = args[0] 440 name = command 441 i = 0 442 while name in self.processes: 443 i += 1 444 name = '%s-%d' % (command, i) 445 process = Process(name, args, self.outputDir) 446 self.processes[name] = process 447 return process
448
449 - def _appendOp(self, *args):
450 self.ops.append(args)
451
452 - def setTimeout(self, timeout):
453 self.timeout = timeout
454
455 - def spawn(self, command, *args):
456 allArgs = (command, ) + args 457 process, = self.spawnPar(allArgs) 458 return process
459
460 - def spawnPar(self, *argvs):
461 processes = [] 462 self._appendOp(self.vm.checkExits, ()) 463 for argv in argvs: 464 assert isinstance(argv, tuple), \ 465 'all arguments to spawnPar must be tuples' 466 for arg in argv: 467 assert isinstance(arg, str), \ 468 'all subarguments to spawnPar must be strings' 469 processes.append(self._allocProcess(argv)) 470 for process in processes: 471 self._appendOp(self.vm.spawn, process) 472 return tuple(processes)
473
474 - def wait(self, process, status):
475 self.waitPar((process, status))
476
477 - def waitPar(self, *processStatusPairs):
478 processes = tuple([p for p, s in processStatusPairs]) 479 self._appendOp(self.vm.checkExits, processes) 480 for process, status in processStatusPairs: 481 self._appendOp(self.vm.wait, process, status)
482
483 - def kill(self, process, status=None):
484 self._appendOp(self.vm.checkExits, ()) 485 self._appendOp(self.vm.kill, process) 486 self._appendOp(self.vm.wait, process, status)
487
488 - def execute(self):
489 d = self.vm.run(self.ops, timeout=self.timeout) 490 d.addCallback(lambda _: self._cleanOutputDir()) 491 return d
492 493
494 -def test(proc):
495 testName = proc.__name__ 496 497 def wrappedtest(self): 498 plan = Plan(self, testName) 499 proc(self, plan) 500 return plan.execute()
501 try: 502 wrappedtest.__name__ = testName 503 except TypeError: 504 # can only set procedure names in python >= 2.4 505 pass 506 # trial seems to require a timeout, at least in twisted 2.4, so give 507 # it a nice one 508 wrappedtest.timeout = 666 509 return wrappedtest 510