1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import signal
24
25 from twisted.python import failure
26 from twisted.internet import reactor, protocol, defer
27 from flumotion.common import log as flog
28
29 """
30 Framework for writing automated integration tests.
31
32 This module provides a way of writing automated integration tests from
33 within Twisted's unit testing framework, trial. Test cases are
34 constructed as subclasses of the normal trial
35 L{twisted.trial.unittest.TestCase} class.
36
37 Integration tests look like normal test methods, except that they are
38 decorated with L{integration.test}, take an extra "plan" argument, and
39 do not return anything. For example:
40
41 from twisted.trial import unittest
42 from flumotion.twisted import integration
43
44 class IntegrationTestExample(unittest.TestCase):
45 @integration.test
46 def testEchoFunctionality(self, plan):
47 process = plan.spawn('echo', 'hello world')
48 plan.wait(process, 0)
49
50 This example will spawn a process, as if you typed "echo 'hello world'"
51 at the shell prompt. It then waits for the process to exit, expecting
52 the exit status to be 0.
53
54 The example illustrates two of the fundamental plan operators, spawn and
55 wait. "spawn" spawns a process. "wait" waits for a process to finish.
56 The other operators are "spawnPar", which spawns a number of processes
57 in parallel, "waitPar", which waits for a number of processes in
58 parallel, and "kill", which kills one or more processes via SIGTERM and
59 then waits for them to exit.
60
61 It is evident that this framework is most appropriate for testing the
62 integration of multiple processes, and is not suitable for in-process
63 tests. The plan that is built up is only executed after the test method
64 exits, via the L{integration.test} decorator; the writer of the
65 integration test does not have access to the plan's state.
66
67 Note that all process exits must be anticipated. If at any point the
68 integration tester receives SIGCHLD, the next operation must be a wait
69 for that process. If this is not the case, the test is interpreted as
70 having failed.
71
72 Also note that while the test is running, the stdout and stderr of each
73 spawned process is redirected into log files in a subdirectory of where
74 the test is located. For example, in the previous example, the following
75 files will be created:
76
77 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
78 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
79
80 In the case that multiple echo commands are run in the same plan, the
81 subsequent commands will be named as echo-1, echo-2, and the like. Upon
82 successful completion of the test case, the log directory will be
83 deleted.
84 """
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
107
108 -def log(format, *args):
109 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
110 -def debug(format, *args):
111 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
112 -def info(format, *args):
113 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
115 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
116 -def error(format, *args):
117 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
118
120 if os.sep in executable:
121 if os.access(os.path.abspath(executable), os.X_OK):
122 return os.path.abspath(executable)
123 elif os.getenv('PATH'):
124 for path in os.getenv('PATH').split(os.pathsep):
125 if os.access(os.path.join(path, executable), os.X_OK):
126 return os.path.join(path, executable)
127 raise CommandNotFoundException(executable)
128
130 - def __init__(self, process, expectedCode, actualCode):
131 Exception.__init__(self)
132 self.process = process
133 self.expected = expectedCode
134 self.actual = actualCode
136 return ('Expected exit code %r from %r, but got %r'
137 % (self.expected, self.process, self.actual))
138
144 return 'The process %r exited prematurely.' % self.process
145
151 return 'Command %r not found in the PATH.' % self.command
152
155 Exception.__init__(self)
156 self.processes = processes
158 return ('Processes still running at end of test: %r'
159 % (self.processes,))
160
165
167 return ('Timed out waiting for %r to exit with status %r'
168 % (self.process, self.status))
169
172 self.exitDeferred = defer.Deferred()
173 self.timedOut = False
174
176 return self.exitDeferred
177
178 - def timeout(self, process, status):
182
191
193 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
194
195 - def __init__(self, name, argv, testDir):
196 self.name = name
197 self.argv = (_which(argv[0]),) + argv[1:]
198 self.testDir = testDir
199
200 self.pid = None
201 self.protocol = None
202 self.state = self.NOT_STARTED
203 self._timeoutDC = None
204
205 log('created process object %r', self)
206
208 assert self.state == self.NOT_STARTED
209
210 self.protocol = ProcessProtocol()
211
212 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
213 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
214
215 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
216
217
218
219
220
221
222
223
224
225
226
227
228 info('spawning process %r, argv=%r', self, self.argv)
229 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
230 env = dict(os.environ)
231 env['FLU_DEBUG'] = '5'
232 process = reactor.spawnProcess(self.protocol, self.argv[0],
233 env=env, args=self.argv,
234 childFDs=childFDs)
235 signal.signal(signal.SIGTERM, termHandler)
236
237 stdout.close()
238 stderr.close()
239
240
241
242
243 self.pid = process.pid
244 self.state = self.STARTED
245
246 def got_exit(res):
247 self.state = self.STOPPED
248 info('process %r has stopped', self)
249 return res
250 self.protocol.getDeferred().addCallback(got_exit)
251
252 - def kill(self, sig=signal.SIGTERM):
253 assert self.state == self.STARTED
254 info('killing process %r, signal %d', self, sig)
255 os.kill(self.pid, sig)
256
257 - def wait(self, status, timeout=20):
267 d.addCallback(got_exit)
268 if self.state == self.STARTED:
269 self._timeoutDC = reactor.callLater(timeout,
270 self.protocol.timeout,
271 self,
272 status)
273 def cancel_timeout(res):
274 debug('cancelling timeout for %r', self)
275 if self._timeoutDC.active():
276 self._timeoutDC.cancel()
277 return res
278 d.addCallbacks(cancel_timeout, cancel_timeout)
279 return d
280
282 return '<Process %s in state %s>' % (self.name, self.state)
283
285
286
288 self.processes = []
289 self.timeout = 20
290
291 - def spawn(self, process):
296
302
303 - def kill(self, process):
307
308 - def wait(self, process, exitCode):
309 assert process in self.processes
310 def remove_from_processes_list(_):
311 self.processes.remove(process)
312 d = process.wait(exitCode, timeout=self.timeout)
313 d.addCallback(remove_from_processes_list)
314 return d
315
330 p.protocol.processEnded = callbacker(d)
331 p.kill(sig=signal.SIGKILL)
332 d = defer.DeferredList(dlist)
333 def error(_):
334 if failure:
335 return failure
336 else:
337 raise e
338 d.addCallback(error)
339 return d
340 return failure
341
342 - def run(self, ops, timeout=20):
343 self.timeout = timeout
344 d = defer.Deferred()
345 def run_op(_, op):
346
347
348 return op[0](*op[1:])
349 for op in ops:
350 d.addCallback(run_op, op)
351 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
352 lambda failure: self._checkProcesses(failure=failure))
353
354
355
356
357
358 reactor.callLater(0, d.callback, None)
359 return d
360
362 - def __init__(self, testCase, testName):
363 self.name = testName
364 self.testCaseName = testCase.__class__.__name__
365 self.processes = {}
366 self.outputDir = self._makeOutputDir(os.getcwd())
367
368
369
370 self.vm = PlanExecutor()
371 self.ops = []
372 self.timeout = 20
373
375
376 try:
377 os.mkdir(testDir)
378 except OSError:
379 pass
380 tail = '%s-%s' % (self.testCaseName, self.name)
381 outputDir = os.path.join(testDir, tail)
382 os.mkdir(outputDir)
383 return outputDir
384
386 for root, dirs, files in os.walk(self.outputDir, topdown=False):
387 for name in files:
388 os.remove(os.path.join(root, name))
389 for name in dirs:
390 os.rmdir(os.path.join(root, name))
391 os.rmdir(self.outputDir)
392 self.outputDir = None
393
404
407
410
411 - def spawn(self, command, *args):
415
417 processes = []
418 self._appendOp(self.vm.checkExits, ())
419 for argv in argvs:
420 assert isinstance(argv, tuple), \
421 'all arguments to spawnPar must be tuples'
422 for arg in argv:
423 assert isinstance(arg, str), \
424 'all subarguments to spawnPar must be strings'
425 processes.append(self._allocProcess(argv))
426 for process in processes:
427 self._appendOp(self.vm.spawn, process)
428 return tuple(processes)
429
430 - def wait(self, process, status):
432
433 - def waitPar(self, *processStatusPairs):
438
439 - def kill(self, process, status=None):
443
448
450 testName = proc.__name__
451 def wrappedtest(self):
452 plan = Plan(self, testName)
453 proc(self, plan)
454 return plan.execute()
455 try:
456 wrappedtest.__name__ = testName
457 except Exception:
458
459 pass
460
461
462 wrappedtest.timeout = 666
463 return wrappedtest
464