1""" 2TestCmd.py: a testing framework for commands and scripts. 3 4The TestCmd module provides a framework for portable automated testing of 5executable commands and scripts (in any language, not just Python), especially 6commands and scripts that require file system interaction. 7 8In addition to running tests and evaluating conditions, the TestCmd module 9manages and cleans up one or more temporary workspace directories, and provides 10methods for creating files and directories in those workspace directories from 11in-line data, here-documents), allowing tests to be completely self-contained. 12 13A TestCmd environment object is created via the usual invocation: 14 15 test = TestCmd() 16 17The TestCmd module provides pass_test(), fail_test(), and no_result() unbound 18methods that report test results for use with the Aegis change management 19system. These methods terminate the test immediately, reporting PASSED, FAILED 20or NO RESULT respectively and exiting with status 0 (success), 1 or 2 21respectively. This allows for a distinction between an actual failed test and a 22test that could not be properly evaluated because of an external condition (such 23as a full file system or incorrect permissions). 24 25""" 26 27# Copyright 2000 Steven Knight 28# This module is free software, and you may redistribute it and/or modify 29# it under the same terms as Python itself, so long as this copyright message 30# and disclaimer are retained in their original form. 31# 32# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, 33# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF 34# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH 35# DAMAGE. 36# 37# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 38# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 39# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, 40# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, 41# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 42 43# Copyright 2002-2003 Vladimir Prus. 44# Copyright 2002-2003 Dave Abrahams. 45# Copyright 2006 Rene Rivera. 46# Distributed under the Boost Software License, Version 1.0. 47# (See accompanying file LICENSE_1_0.txt or copy at 48# http://www.boost.org/LICENSE_1_0.txt) 49 50from __future__ import print_function 51 52__author__ = "Steven Knight <knight@baldmt.com>" 53__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software" 54__version__ = "0.02" 55 56from types import * 57 58import os 59import os.path 60import re 61import shutil 62import stat 63import subprocess 64import sys 65import tempfile 66import traceback 67 68 69tempfile.template = 'testcmd.' 70 71_Cleanup = [] 72 73def _clean(): 74 global _Cleanup 75 list = _Cleanup[:] 76 _Cleanup = [] 77 list.reverse() 78 for test in list: 79 test.cleanup() 80 81sys.exitfunc = _clean 82 83 84def caller(tblist, skip): 85 string = "" 86 arr = [] 87 for file, line, name, text in tblist: 88 if file[-10:] == "TestCmd.py": 89 break 90 arr = [(file, line, name, text)] + arr 91 atfrom = "at" 92 for file, line, name, text in arr[skip:]: 93 if name == "?": 94 name = "" 95 else: 96 name = " (" + name + ")" 97 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) 98 atfrom = "\tfrom" 99 return string 100 101 102def fail_test(self=None, condition=True, function=None, skip=0): 103 """Cause the test to fail. 104 105 By default, the fail_test() method reports that the test FAILED and exits 106 with a status of 1. If a condition argument is supplied, the test fails 107 only if the condition is true. 108 109 """ 110 if not condition: 111 return 112 if not function is None: 113 function() 114 of = "" 115 desc = "" 116 sep = " " 117 if not self is None: 118 if self.program: 119 of = " of " + " ".join(self.program) 120 sep = "\n\t" 121 if self.description: 122 desc = " [" + self.description + "]" 123 sep = "\n\t" 124 125 at = caller(traceback.extract_stack(), skip) 126 127 sys.stderr.write("FAILED test" + of + desc + sep + at + """ 128in directory: """ + os.getcwd() ) 129 sys.exit(1) 130 131 132def no_result(self=None, condition=True, function=None, skip=0): 133 """Causes a test to exit with no valid result. 134 135 By default, the no_result() method reports NO RESULT for the test and 136 exits with a status of 2. If a condition argument is supplied, the test 137 fails only if the condition is true. 138 139 """ 140 if not condition: 141 return 142 if not function is None: 143 function() 144 of = "" 145 desc = "" 146 sep = " " 147 if not self is None: 148 if self.program: 149 of = " of " + self.program 150 sep = "\n\t" 151 if self.description: 152 desc = " [" + self.description + "]" 153 sep = "\n\t" 154 155 at = caller(traceback.extract_stack(), skip) 156 sys.stderr.write("NO RESULT for test" + of + desc + sep + at) 157 sys.exit(2) 158 159 160def pass_test(self=None, condition=True, function=None): 161 """Causes a test to pass. 162 163 By default, the pass_test() method reports PASSED for the test and exits 164 with a status of 0. If a condition argument is supplied, the test passes 165 only if the condition is true. 166 167 """ 168 if not condition: 169 return 170 if not function is None: 171 function() 172 sys.stderr.write("PASSED\n") 173 sys.exit(0) 174 175class MatchError(object): 176 def __init__(self, message): 177 self.message = message 178 def __nonzero__(self): 179 return False 180 def __bool__(self): 181 return False 182 183def match_exact(lines=None, matches=None): 184 """ 185 Returns whether the given lists or strings containing lines separated 186 using newline characters contain exactly the same data. 187 188 """ 189 if not type(lines) is list: 190 lines = lines.split("\n") 191 if not type(matches) is list: 192 matches = matches.split("\n") 193 if len(lines) != len(matches): 194 return 195 for i in range(len(lines)): 196 if lines[i] != matches[i]: 197 return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % 198 (i+1, matches[i], lines[i])) 199 if len(lines) < len(matches): 200 return MatchError("Missing lines at line %d\n- %s" % 201 (len(lines), "\n- ".join(matches[len(lines):]))) 202 if len(lines) > len(matches): 203 return MatchError("Extra lines at line %d\n+ %s" % 204 (len(matches), "\n+ ".join(lines[len(matches):]))) 205 return 1 206 207 208def match_re(lines=None, res=None): 209 """ 210 Given lists or strings contain lines separated using newline characters. 211 This function matches those lines one by one, interpreting the lines in the 212 res parameter as regular expressions. 213 214 """ 215 if not type(lines) is list: 216 lines = lines.split("\n") 217 if not type(res) is list: 218 res = res.split("\n") 219 for i in range(min(len(lines), len(res))): 220 if not re.compile("^" + res[i] + "$").search(lines[i]): 221 return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % 222 (i+1, res[i], lines[i])) 223 if len(lines) < len(res): 224 return MatchError("Missing lines at line %d\n- %s" % 225 (len(lines), "\n- ".join(res[len(lines):]))) 226 if len(lines) > len(res): 227 return MatchError("Extra lines at line %d\n+ %s" % 228 (len(res), "\n+ ".join(lines[len(res):]))) 229 return 1 230 231 232class TestCmd: 233 def __init__(self, description=None, program=None, workdir=None, 234 subdir=None, verbose=False, match=None, inpath=None): 235 236 self._cwd = os.getcwd() 237 self.description_set(description) 238 self.program_set(program, inpath) 239 self.verbose_set(verbose) 240 if match is None: 241 self.match_func = match_re 242 else: 243 self.match_func = match 244 self._dirlist = [] 245 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} 246 env = os.environ.get('PRESERVE') 247 if env: 248 self._preserve['pass_test'] = env 249 self._preserve['fail_test'] = env 250 self._preserve['no_result'] = env 251 else: 252 env = os.environ.get('PRESERVE_PASS') 253 if env is not None: 254 self._preserve['pass_test'] = env 255 env = os.environ.get('PRESERVE_FAIL') 256 if env is not None: 257 self._preserve['fail_test'] = env 258 env = os.environ.get('PRESERVE_PASS') 259 if env is not None: 260 self._preserve['PRESERVE_NO_RESULT'] = env 261 self._stdout = [] 262 self._stderr = [] 263 self.status = None 264 self.condition = 'no_result' 265 self.workdir_set(workdir) 266 self.subdir(subdir) 267 268 def __del__(self): 269 self.cleanup() 270 271 def __repr__(self): 272 return "%x" % id(self) 273 274 def cleanup(self, condition=None): 275 """ 276 Removes any temporary working directories for the specified TestCmd 277 environment. If the environment variable PRESERVE was set when the 278 TestCmd environment was created, temporary working directories are not 279 removed. If any of the environment variables PRESERVE_PASS, 280 PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd 281 environment was created, then temporary working directories are not 282 removed if the test passed, failed or had no result, respectively. 283 Temporary working directories are also preserved for conditions 284 specified via the preserve method. 285 286 Typically, this method is not called directly, but is used when the 287 script exits to clean up temporary working directories as appropriate 288 for the exit status. 289 290 """ 291 if not self._dirlist: 292 return 293 if condition is None: 294 condition = self.condition 295 if self._preserve[condition]: 296 for dir in self._dirlist: 297 print("Preserved directory %s" % dir) 298 else: 299 list = self._dirlist[:] 300 list.reverse() 301 for dir in list: 302 self.writable(dir, 1) 303 shutil.rmtree(dir, ignore_errors=1) 304 305 self._dirlist = [] 306 self.workdir = None 307 os.chdir(self._cwd) 308 try: 309 global _Cleanup 310 _Cleanup.remove(self) 311 except (AttributeError, ValueError): 312 pass 313 314 def description_set(self, description): 315 """Set the description of the functionality being tested.""" 316 self.description = description 317 318 def fail_test(self, condition=True, function=None, skip=0): 319 """Cause the test to fail.""" 320 if not condition: 321 return 322 self.condition = 'fail_test' 323 fail_test(self = self, 324 condition = condition, 325 function = function, 326 skip = skip) 327 328 def match(self, lines, matches): 329 """Compare actual and expected file contents.""" 330 return self.match_func(lines, matches) 331 332 def match_exact(self, lines, matches): 333 """Compare actual and expected file content exactly.""" 334 return match_exact(lines, matches) 335 336 def match_re(self, lines, res): 337 """Compare file content with a regular expression.""" 338 return match_re(lines, res) 339 340 def no_result(self, condition=True, function=None, skip=0): 341 """Report that the test could not be run.""" 342 if not condition: 343 return 344 self.condition = 'no_result' 345 no_result(self = self, 346 condition = condition, 347 function = function, 348 skip = skip) 349 350 def pass_test(self, condition=True, function=None): 351 """Cause the test to pass.""" 352 if not condition: 353 return 354 self.condition = 'pass_test' 355 pass_test(self, condition, function) 356 357 def preserve(self, *conditions): 358 """ 359 Arrange for the temporary working directories for the specified 360 TestCmd environment to be preserved for one or more conditions. If no 361 conditions are specified, arranges for the temporary working 362 directories to be preserved for all conditions. 363 364 """ 365 if conditions is (): 366 conditions = ('pass_test', 'fail_test', 'no_result') 367 for cond in conditions: 368 self._preserve[cond] = 1 369 370 def program_set(self, program, inpath): 371 """Set the executable program or script to be tested.""" 372 if not inpath and program and not os.path.isabs(program[0]): 373 program[0] = os.path.join(self._cwd, program[0]) 374 self.program = program 375 376 def read(self, file, mode='rb'): 377 """ 378 Reads and returns the contents of the specified file name. The file 379 name may be a list, in which case the elements are concatenated with 380 the os.path.join() method. The file is assumed to be under the 381 temporary working directory unless it is an absolute path name. The I/O 382 mode for the file may be specified and must begin with an 'r'. The 383 default is 'rb' (binary read). 384 385 """ 386 if type(file) is list: 387 file = os.path.join(*file) 388 if not os.path.isabs(file): 389 file = os.path.join(self.workdir, file) 390 if mode[0] != 'r': 391 raise ValueError("mode must begin with 'r'") 392 return open(file, mode).read() 393 394 def run(self, program=None, arguments=None, chdir=None, stdin=None, 395 universal_newlines=True): 396 """ 397 Runs a test of the program or script for the test environment. 398 Standard output and error output are saved for future retrieval via the 399 stdout() and stderr() methods. 400 401 'universal_newlines' parameter controls how the child process 402 input/output streams are opened as defined for the same named Python 403 subprocess.POpen constructor parameter. 404 405 """ 406 if chdir: 407 if not os.path.isabs(chdir): 408 chdir = os.path.join(self.workpath(chdir)) 409 if self.verbose: 410 sys.stderr.write("chdir(" + chdir + ")\n") 411 else: 412 chdir = self.workdir 413 414 cmd = [] 415 if program and program[0]: 416 if program[0] != self.program[0] and not os.path.isabs(program[0]): 417 program[0] = os.path.join(self._cwd, program[0]) 418 cmd += program 419 else: 420 cmd += self.program 421 if arguments: 422 cmd += arguments.split(" ") 423 if self.verbose: 424 sys.stderr.write("run(" + " ".join(cmd) + ")\n") 425 p = subprocess.Popen(cmd, stdin=subprocess.PIPE, 426 stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir, 427 universal_newlines=universal_newlines) 428 429 if stdin: 430 if type(stdin) is list: 431 stdin = "".join(stdin) 432 out, err = p.communicate(stdin) 433 if not type(out) is str: 434 out = out.decode() 435 if not type(err) is str: 436 err = err.decode() 437 self._stdout.append(out) 438 self._stderr.append(err) 439 self.status = p.returncode 440 441 if self.verbose: 442 sys.stdout.write(self._stdout[-1]) 443 sys.stderr.write(self._stderr[-1]) 444 445 def stderr(self, run=None): 446 """ 447 Returns the error output from the specified run number. If there is 448 no specified run number, then returns the error output of the last run. 449 If the run number is less than zero, then returns the error output from 450 that many runs back from the current run. 451 452 """ 453 if not run: 454 run = len(self._stderr) 455 elif run < 0: 456 run = len(self._stderr) + run 457 run -= 1 458 if run < 0: 459 return '' 460 return self._stderr[run] 461 462 def stdout(self, run=None): 463 """ 464 Returns the standard output from the specified run number. If there 465 is no specified run number, then returns the standard output of the 466 last run. If the run number is less than zero, then returns the 467 standard output from that many runs back from the current run. 468 469 """ 470 if not run: 471 run = len(self._stdout) 472 elif run < 0: 473 run = len(self._stdout) + run 474 run -= 1 475 if run < 0: 476 return '' 477 return self._stdout[run] 478 479 def subdir(self, *subdirs): 480 """ 481 Create new subdirectories under the temporary working directory, one 482 for each argument. An argument may be a list, in which case the list 483 elements are concatenated using the os.path.join() method. 484 Subdirectories multiple levels deep must be created using a separate 485 argument for each level: 486 487 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) 488 489 Returns the number of subdirectories actually created. 490 491 """ 492 count = 0 493 for sub in subdirs: 494 if sub is None: 495 continue 496 if type(sub) is list: 497 sub = os.path.join(*tuple(sub)) 498 new = os.path.join(self.workdir, sub) 499 try: 500 os.mkdir(new) 501 except: 502 pass 503 else: 504 count += 1 505 return count 506 507 def unlink(self, file): 508 """ 509 Unlinks the specified file name. The file name may be a list, in 510 which case the elements are concatenated using the os.path.join() 511 method. The file is assumed to be under the temporary working directory 512 unless it is an absolute path name. 513 514 """ 515 if type(file) is list: 516 file = os.path.join(*tuple(file)) 517 if not os.path.isabs(file): 518 file = os.path.join(self.workdir, file) 519 os.unlink(file) 520 521 def verbose_set(self, verbose): 522 """Set the verbose level.""" 523 self.verbose = verbose 524 525 def workdir_set(self, path): 526 """ 527 Creates a temporary working directory with the specified path name. 528 If the path is a null string (''), a unique directory name is created. 529 530 """ 531 if os.path.isabs(path): 532 self.workdir = path 533 else: 534 if path != None: 535 if path == '': 536 path = tempfile.mktemp() 537 if path != None: 538 os.mkdir(path) 539 self._dirlist.append(path) 540 global _Cleanup 541 try: 542 _Cleanup.index(self) 543 except ValueError: 544 _Cleanup.append(self) 545 # We would like to set self.workdir like this: 546 # self.workdir = path 547 # But symlinks in the path will report things differently from 548 # os.getcwd(), so chdir there and back to fetch the canonical 549 # path. 550 cwd = os.getcwd() 551 os.chdir(path) 552 self.workdir = os.getcwd() 553 os.chdir(cwd) 554 else: 555 self.workdir = None 556 557 def workpath(self, *args): 558 """ 559 Returns the absolute path name to a subdirectory or file within the 560 current temporary working directory. Concatenates the temporary working 561 directory name with the specified arguments using os.path.join(). 562 563 """ 564 return os.path.join(self.workdir, *tuple(args)) 565 566 def writable(self, top, write): 567 """ 568 Make the specified directory tree writable (write == 1) or not 569 (write == None). 570 571 """ 572 def _walk_chmod(arg, dirname, names): 573 st = os.stat(dirname) 574 os.chmod(dirname, arg(st[stat.ST_MODE])) 575 for name in names: 576 fullname = os.path.join(dirname, name) 577 st = os.stat(fullname) 578 os.chmod(fullname, arg(st[stat.ST_MODE])) 579 580 _mode_writable = lambda mode: stat.S_IMODE(mode|0o200) 581 _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200) 582 583 if write: 584 f = _mode_writable 585 else: 586 f = _mode_non_writable 587 try: 588 for root, _, files in os.walk(top): 589 _walk_chmod(f, root, files) 590 except: 591 pass # Ignore any problems changing modes. 592 593 def write(self, file, content, mode='wb'): 594 """ 595 Writes the specified content text (second argument) to the specified 596 file name (first argument). The file name may be a list, in which case 597 the elements are concatenated using the os.path.join() method. The file 598 is created under the temporary working directory. Any subdirectories in 599 the path must already exist. The I/O mode for the file may be specified 600 and must begin with a 'w'. The default is 'wb' (binary write). 601 602 """ 603 if type(file) is list: 604 file = os.path.join(*tuple(file)) 605 if not os.path.isabs(file): 606 file = os.path.join(self.workdir, file) 607 if mode[0] != 'w': 608 raise ValueError("mode must begin with 'w'") 609 open(file, mode).write(content) 610