1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Unit testing infrastructure for Scapy 8""" 9 10import builtins 11import bz2 12import copy 13import code 14import getopt 15import glob 16import hashlib 17import importlib 18import json 19import logging 20import os 21import os.path 22import sys 23import threading 24import time 25import traceback 26import warnings 27import zlib 28 29from scapy.consts import WINDOWS, BIG_ENDIAN 30from scapy.config import conf 31from scapy.compat import base64_bytes 32from scapy.themes import DefaultTheme, BlackAndWhite 33from scapy.utils import tex_escape 34 35 36# Check UTF-8 support # 37 38def _utf8_support(): 39 """ 40 Check UTF-8 support for the output 41 """ 42 try: 43 if WINDOWS: 44 return (sys.stdout.encoding == "utf-8") 45 return True 46 except AttributeError: 47 return False 48 49 50if _utf8_support(): 51 arrow = "\u2514" 52 dash = "\u2501" 53 checkmark = "\u2713" 54else: 55 arrow = "->" 56 dash = "--" 57 checkmark = "OK" 58 59 60# Util class # 61 62class Bunch: 63 __init__ = lambda self, **kw: setattr(self, '__dict__', kw) 64 65 66def retry_test(func): 67 """Retries the passed function 3 times before failing""" 68 v = None 69 tb = None 70 for _ in range(3): 71 try: 72 return func() 73 except Exception: 74 t, v, tb = sys.exc_info() 75 time.sleep(1) 76 77 if v and tb: 78 raise v.with_traceback(tb) 79 80 81def scapy_path(fname): 82 """Resolves a path relative to scapy's root folder""" 83 if fname.startswith('/'): 84 fname = fname[1:] 85 return os.path.abspath(os.path.join( 86 os.path.dirname(__file__), '../../', fname 87 )) 88 89 90class no_debug_dissector: 91 """Context object used to disable conf.debug_dissector""" 92 def __init__(self, reverse=False): 93 self.new_value = reverse 94 95 def __enter__(self): 96 self.old_dbg = conf.debug_dissector 97 conf.debug_dissector = self.new_value 98 99 def __exit__(self, exc_type, exc_value, traceback): 100 conf.debug_dissector = self.old_dbg 101 102 103# Import tool # 104 105 106def import_module(name): 107 if name.endswith(".py"): 108 name = name[:-3] 109 try: 110 return importlib.import_module(name, package="scapy") 111 except Exception: 112 return importlib.import_module(name) 113 114 115# INTERNAL/EXTERNAL FILE EMBEDDING # 116 117class File: 118 def __init__(self, name, URL, local): 119 self.name = name 120 self.local = local.encode("utf8") 121 self.URL = URL 122 123 def get_local(self): 124 return bz2.decompress(base64_bytes(self.local)) 125 126 def get_URL(self): 127 return self.URL 128 129 def write(self, dir): 130 if dir: 131 dir += "/" 132 with open(dir + self.name, "wb") as fdesc: 133 fdesc.write(self.get_local()) 134 135 136# Embed a base64 encoded bziped version of js and css files 137# to work if you can't reach Internet. 138class External_Files: 139 UTscapy_js = File("UTscapy.js", "https://scapy.net/files/UTscapy/UTscapy.js", # noqa: E501 140 """QlpoOTFBWSZTWWVijKQAAXxfgERUYOvAChIhBAC 141/79+qQAH8AFA0poANAMjQAAAGABo0NGEZNBo0\n0BhgAaNDRhGTQaNNAYFURJinp 142lGaKbRkJiekzSenqmpA0Gm1LFMpRUklVQlK9WUTZYpNFI1IiEWE\nFT09Sfj5uO+ 143qO6S5DQwKIxM92+Zku94wL6V/1KTKan2c66Ug6SmVKy1ZIrgauxMVLF5xLH0lJRQ 144u\nKlqLF10iatlTzqvw7S9eS3+h4lu3GZyMgoOude3NJ1pQy8eo+X96IYZw+yneh 145siPj73m0rnvQ3QX\nZ9BJQiZQYQ5/uNcl2WOlC5vyQqV/BWsnr2NZYLYXQLDs/Bf 146fk4ZfR4/SH6GfA5Xlek4xHNHqbSsR\nbREOgueXo3kcYi94K6hSO3ldD2O/qJXOF 147qJ8o3TE2aQahxtQpCVUKQMvODHwu2YkaORYZC6gihEa\nllcHDIAtRPScBACAJnU 148ggYhLDX6DEko7nC9GvAw5OcEkiyDUbLdiGCzDaXWMC2DuQ2Y6sGf6NcRu\nON7QS 149bhHsPc4KKmZ/xdyRThQkGVijKQ=\n""") 150 UTscapy_css = File("UTscapy.css", "https://scapy.net/files/UTscapy/UTscapy.css", # noqa: E501 151 """QlpoOTFBWSZTWbpATIwAAFpfgHwQSB//+Cpj2Q 152C//9/6UAS5t7qcLut3NNDp0gxKMmpqaep6n6iP\n1J+pPU0yAAaeoaDI0BJCTJqa 153j1BoaGhoAAPSAAAJNSRqmmk8TQmj1DT1Hom1HkQABoNDmmJgATAB\nMAAJgACYJI 154hDQUzCR5Q0niRoaAGgGmZS+faw7LNbkliDG1Q52WJCd85cxRVVKegld8qCRISoto 155GD\nEGREFEYRW0CxAgTb13lodjuN7E1aCFgRFVhiEmZAZ/ek+XR0c8DWiAKpBgY2 156LNpQ1rOvlnoUI1Al\n0ySaP1w2MyFxoQqRicScCm6WnQOxDnufxk8s2deLLKlN+r 157fvxyTTCGRAWZONkVGIxVQRZGZLeAwH\nbpQXZcYj467i85knEOYWmLcokaqEGYGS 158xMCpD+cOIaL7GCxEU/aNSlWFNCvQBvzb915huAgdIdD2\nya9ZQGoqrmtommfAxu 1597FGTDBNBfir9UkAMmT1KRzxasJ0n2OE+mlgTZzJnhydbJaMtAk8DJzUuvv\nZpc3 160CJLVyr8F3NmIQO5E3SJSY3SQnk1CQwlELqFutXjeWWzmiywo7xJk5rUcVOV9+Ro4 16196WmXsUr\nkKhNocbnFztqPhesccW5kja+KuNFmzdw4DVOBJ2JPhGOYSwCUiwUe2 162kOshYBdULUmwYwToAGdgA9\n5n3bSpG85LUFIE0Cw78EYVgY0ESnYW5UdfgBhj1w 163PiiXDEG2vAtr38O9kdwg3tFU/0okilEjDYDa\nEfkomkLUSokmE8g1fMYBqQyyaP 164RWmySO3EtAuMVhQqIuMldOzLqWubl7k1MnhuBaELOgtB2TChcS\n0k7jvgdBKIef 165UkdAf3t2GO/LVSrDvkcb4l4TrwrI7JeCo8pBvXqZBqZJSqbsAziG7QDQVNqdtFGz 166\nEvMKOvKvUQ6mJFigLxBnziGQGQDEMQPSGhlV2BwAN6rZEmLwgED0OrEiSxXDcB 167MDskp36AV7IbKa\nCila/Wm1BKhBF+ZIqtiFyYpUhI1Q5+JK0zK7aVyLS9y7GaSr 168NCRpr7uaa1UgapVKs6wKKQzYCWsV\n8iCGrAkgWZEnDMJWCGUZOIpcmMle1UXSAl 169d5OoUYXNo0L7WSOcxEkSGjCcRhjvMRP1pAUuBPRCRA\n2lhC0ZgLYDAf5V2agMUa 170ki1ZgOQDXQ7aIDTdjGRTgnzPML0V1X+tIoSSZmZhrxZbluMWGEkwwky6\n0ObWIM 171cEbX4cawPPBVc6m5UUPbEmBANyjtNvTKE2ri7oOmBVKIMLqQKm+4rlmisu2uGSxW 172zTov5w\nqQDp61FkHk40wzQUKk4YcBlbQT1l8VXeZJYAVFjSJIcC8JykBYZJ1yka 173I4LDm5WP7s2NaRkhhV7A\nFVSD5zA8V/DJzfTk0QHmCT2wRgwPKjP60EqqlDUaST 174/i7kinChIXSAmRgA==\n""") 175 176 def get_local_dict(cls): 177 return {x: y.name for (x, y) in cls.__dict__.items() 178 if isinstance(y, File)} 179 get_local_dict = classmethod(get_local_dict) 180 181 def get_URL_dict(cls): 182 return {x: y.URL for (x, y) in cls.__dict__.items() 183 if isinstance(y, File)} 184 get_URL_dict = classmethod(get_URL_dict) 185 186 187# HELPER CLASSES FOR PARAMETRING OUTPUT FORMAT # 188 189class EnumClass: 190 def from_string(cls, x): 191 return cls.__dict__[x.upper()] 192 from_string = classmethod(from_string) 193 194 195class Format(EnumClass): 196 TEXT = 1 197 ANSI = 2 198 HTML = 3 199 LATEX = 4 200 XUNIT = 5 201 LIVE = 6 202 203 204# TEST CLASSES # 205 206class TestClass: 207 def __getitem__(self, item): 208 return getattr(self, item) 209 210 def add_keywords(self, kws): 211 if isinstance(kws, str): 212 kws = [kws.lower()] 213 for kwd in kws: 214 kwd = kwd.lower() 215 if kwd.startswith('-'): 216 try: 217 self.keywords.remove(kwd[1:]) 218 except KeyError: 219 pass 220 else: 221 self.keywords.add(kwd) 222 223 224class TestCampaign(TestClass): 225 def __init__(self, title): 226 self.title = title 227 self.filename = None 228 self.headcomments = "" 229 self.campaign = [] 230 self.keywords = set() 231 self.crc = None 232 self.sha = None 233 self.preexec = None 234 self.preexec_output = None 235 self.end_pos = 0 236 self.interrupted = False 237 self.duration = 0.0 238 239 def add_testset(self, testset): 240 self.campaign.append(testset) 241 testset.keywords.update(self.keywords) 242 243 def trunc(self, index): 244 self.campaign = self.campaign[:index] 245 246 def startNum(self, beginpos): 247 for ts in self: 248 for t in ts: 249 t.num = beginpos 250 beginpos += 1 251 self.end_pos = beginpos 252 253 def __iter__(self): 254 return self.campaign.__iter__() 255 256 def all_tests(self): 257 for ts in self: 258 for t in ts: 259 yield t 260 261 262class TestSet(TestClass): 263 def __init__(self, name): 264 self.name = name 265 self.tests = [] 266 self.comments = "" 267 self.keywords = set() 268 self.crc = None 269 self.expand = 1 270 271 def add_test(self, test): 272 self.tests.append(test) 273 test.keywords.update(self.keywords) 274 275 def trunc(self, index): 276 self.tests = self.tests[:index] 277 278 def __iter__(self): 279 return self.tests.__iter__() 280 281 282class UnitTest(TestClass): 283 def __init__(self, name): 284 self.name = name 285 self.test = "" 286 self.comments = "" 287 self.result = "passed" 288 self.fresult = "" 289 # make instance True at init to have a different truth value than None 290 self.duration = 0 291 self.output = "" 292 self.num = -1 293 self.keywords = set() 294 self.crc = None 295 self.expand = 1 296 297 def prepare(self, theme): 298 if self.result == "passed": 299 self.fresult = theme.success(self.result) 300 else: 301 self.fresult = theme.fail(self.result) 302 303 def __nonzero__(self): 304 return self.result == "passed" 305 __bool__ = __nonzero__ 306 307 308# Careful note: all data not included will be set by default. 309# Use -c as first argument !! 310def parse_config_file(config_path, verb=3): 311 """Parse provided json to get configuration 312 Empty default json: 313 { 314 "testfiles": [], 315 "breakfailed": true, 316 "onlyfailed": false, 317 "verb": 3, 318 "dump": 0, 319 "docs": 0, 320 "crc": true, 321 "preexec": {}, 322 "global_preexec": "", 323 "outputfile": null, 324 "local": true, 325 "format": "ansi", 326 "num": null, 327 "modules": [], 328 "kw_ok": [], 329 "kw_ko": [] 330 } 331 332 """ 333 with open(config_path) as config_file: 334 data = json.load(config_file) 335 if verb > 2: 336 print(" %s Loaded config file" % arrow, config_path) 337 338 def get_if_exist(key, default): 339 return data[key] if key in data else default 340 return Bunch(testfiles=get_if_exist("testfiles", []), 341 breakfailed=get_if_exist("breakfailed", True), 342 remove_testfiles=get_if_exist("remove_testfiles", []), 343 onlyfailed=get_if_exist("onlyfailed", False), 344 verb=get_if_exist("verb", 3), 345 dump=get_if_exist("dump", 0), crc=get_if_exist("crc", 1), 346 docs=get_if_exist("docs", 0), 347 preexec=get_if_exist("preexec", {}), 348 global_preexec=get_if_exist("global_preexec", ""), 349 outfile=get_if_exist("outputfile", sys.stdout), 350 local=get_if_exist("local", False), 351 num=get_if_exist("num", None), 352 modules=get_if_exist("modules", []), 353 kw_ok=get_if_exist("kw_ok", []), 354 kw_ko=get_if_exist("kw_ko", []), 355 format=get_if_exist("format", "ansi")) 356 357# PARSE CAMPAIGN # 358 359 360def parse_campaign_file(campaign_file): 361 test_campaign = TestCampaign("Test campaign") 362 test_campaign.filename = campaign_file.name 363 testset = None 364 test = None 365 testnb = 0 366 367 for line in campaign_file.readlines(): 368 if line[0] == '#': 369 continue 370 if line[0] == "~": 371 (test or testset or test_campaign).add_keywords(line[1:].split()) 372 elif line[0] == "%": 373 test_campaign.title = line[1:].strip() 374 elif line[0] == "+": 375 testset = TestSet(line[1:].strip()) 376 test_campaign.add_testset(testset) 377 test = None 378 elif line[0] == "=": 379 test = UnitTest(line[1:].strip()) 380 test.num = testnb 381 testnb += 1 382 if testset is None: 383 error_m = "Please create a test set (i.e. '+' section)." 384 raise getopt.GetoptError(error_m) 385 testset.add_test(test) 386 elif line[0] == "*": 387 if test is not None: 388 test.comments += line[1:] 389 elif testset is not None: 390 testset.comments += line[1:] 391 else: 392 test_campaign.headcomments += line[1:] 393 else: 394 if test is None: 395 if line.strip(): 396 raise ValueError("Unknown content [%s]" % line.strip()) 397 else: 398 test.test += line 399 return test_campaign 400 401 402def dump_campaign(test_campaign): 403 print("#" * (len(test_campaign.title) + 6)) 404 print("## %(title)s ##" % test_campaign) 405 print("#" * (len(test_campaign.title) + 6)) 406 if test_campaign.sha and test_campaign.crc: 407 print("CRC=[%(crc)s] SHA=[%(sha)s]" % test_campaign) 408 print("from file %(filename)s" % test_campaign) 409 print() 410 for ts in test_campaign: 411 if ts.crc: 412 print("+--[%s]%s(%s)--" % (ts.name, "-" * max(2, 80 - len(ts.name) - 18), ts.crc)) # noqa: E501 413 else: 414 print("+--[%s]%s" % (ts.name, "-" * max(2, 80 - len(ts.name) - 6))) 415 if ts.keywords: 416 print(" kw=%s" % ",".join(ts.keywords)) 417 for t in ts: 418 print("%(num)03i %(name)s" % t) 419 c = k = "" 420 if t.keywords: 421 k = "kw=%s" % ",".join(t.keywords) 422 if t.crc: 423 c = "[%(crc)s] " % t 424 if c or k: 425 print(" %s%s" % (c, k)) 426 427 428def docs_campaign(test_campaign): 429 print("%(title)s" % test_campaign) 430 print("=" * (len(test_campaign.title))) 431 print() 432 if len(test_campaign.headcomments): 433 print("%s" % test_campaign.headcomments.strip().replace("\n", "")) 434 print() 435 for ts in test_campaign: 436 print("%s" % ts.name) 437 print("-" * len(ts.name)) 438 print() 439 if len(ts.comments): 440 print("%s" % ts.comments.strip().replace("\n", "")) 441 print() 442 for t in ts: 443 print("%s" % t.name) 444 print("^" * len(t.name)) 445 print() 446 if len(t.comments): 447 print("%s" % t.comments.strip().replace("\n", "")) 448 print() 449 print("Usage example::") 450 for line in t.test.split('\n'): 451 if not line.rstrip().endswith('# no_docs'): 452 print("\t%s" % line) 453 454 455# COMPUTE CAMPAIGN DIGESTS # 456def crc32(x): 457 return "%08X" % (0xffffffff & zlib.crc32(bytearray(x, "utf8"))) 458 459 460def sha1(x): 461 return hashlib.sha1(x.encode("utf8")).hexdigest().upper() 462 463 464def compute_campaign_digests(test_campaign): 465 dc = "" 466 for ts in test_campaign: 467 dts = "" 468 for t in ts: 469 dt = t.test.strip() 470 t.crc = crc32(dt) 471 dts += "\0" + dt 472 ts.crc = crc32(dts) 473 dc += "\0\x01" + dts 474 test_campaign.crc = crc32(dc) 475 with open(test_campaign.filename) as fdesc: 476 test_campaign.sha = sha1(fdesc.read()) 477 478 479# FILTER CAMPAIGN # 480 481def filter_tests_on_numbers(test_campaign, num): 482 if num: 483 for ts in test_campaign: 484 ts.tests = [t for t in ts.tests if t.num in num] 485 test_campaign.campaign = [ts for ts in test_campaign.campaign 486 if ts.tests] 487 488 489def _filter_tests_kw(test_campaign, kw, keep): 490 def kw_match(lst, kw): 491 return any(k for k in lst if kw == k) 492 493 if kw: 494 kw = kw.lower() 495 if keep: 496 cond = lambda x: x 497 else: 498 cond = lambda x: not x 499 for ts in test_campaign: 500 ts.tests = [t for t in ts.tests if cond(kw_match(t.keywords, kw))] 501 502 503def filter_tests_keep_on_keywords(test_campaign, kw): 504 return _filter_tests_kw(test_campaign, kw, True) 505 506 507def filter_tests_remove_on_keywords(test_campaign, kw): 508 return _filter_tests_kw(test_campaign, kw, False) 509 510 511def remove_empty_testsets(test_campaign): 512 test_campaign.campaign = [ts for ts in test_campaign.campaign if ts.tests] 513 514 515# RUN TEST # 516 517def _run_test_timeout(test, get_interactive_session, verb=3, my_globals=None): 518 """Run a test with timeout""" 519 from scapy.autorun import StopAutorunTimeout 520 try: 521 return get_interactive_session(test, 522 timeout=5 * 60, # 5 min 523 verb=verb, 524 my_globals=my_globals) 525 except StopAutorunTimeout: 526 return "-- Test timed out ! --", False 527 528 529def run_test(test, get_interactive_session, theme, verb=3, 530 my_globals=None): 531 """An internal UTScapy function to run a single test""" 532 start_time = time.time() 533 test.output, res = _run_test_timeout(test.test.strip(), get_interactive_session, verb=verb, my_globals=my_globals) 534 test.result = "failed" 535 try: 536 if res is None or res: 537 test.result = "passed" 538 if test.output.endswith('KeyboardInterrupt\n'): 539 test.result = "interrupted" 540 raise KeyboardInterrupt 541 except Exception: 542 test.output += "UTscapy: Error during result interpretation:\n" 543 test.output += "".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2],)) 544 finally: 545 test.duration = time.time() - start_time 546 if test.result == "failed": 547 from scapy.sendrecv import debug 548 # Add optional debugging data to log 549 if debug.crashed_on: 550 cls, val = debug.crashed_on 551 test.output += "\n\nPACKET DISSECTION FAILED ON:\n %s(bytes.fromhex('%s'))" % (cls.__name__, val.hex()) 552 debug.crashed_on = None 553 test.prepare(theme) 554 if verb > 2: 555 print("%(fresult)6s %(crc)s %(duration)06.2fs %(name)s" % test) 556 elif verb > 1: 557 print("%(fresult)6s %(crc)s %(name)s" % test) 558 559 return bool(test) 560 561# RUN CAMPAIGN # 562 563 564def import_UTscapy_tools(ses): 565 """Adds UTScapy tools directly to a session""" 566 ses["Bunch"] = Bunch 567 ses["retry_test"] = retry_test 568 ses["scapy_path"] = scapy_path 569 ses["no_debug_dissector"] = no_debug_dissector 570 if WINDOWS: 571 from scapy.arch.windows import _route_add_loopback 572 _route_add_loopback() 573 ses["conf"].ifaces = conf.ifaces 574 ses["conf"].route.routes = conf.route.routes 575 ses["conf"].route6.routes = conf.route6.routes 576 577 578def run_campaign(test_campaign, get_interactive_session, theme, 579 drop_to_interpreter=False, verb=3, 580 scapy_ses=None): 581 passed = failed = 0 582 if test_campaign.preexec: 583 test_campaign.preexec_output = get_interactive_session( 584 test_campaign.preexec.strip(), 585 my_globals=scapy_ses 586 )[0] 587 588 # Drop 589 def drop(scapy_ses): 590 code.interact(banner="Test '%s' failed. " 591 "exit() to stop, Ctrl-D to leave " 592 "this interpreter and continue " 593 "with the current test campaign" 594 % t.name, local=scapy_ses) 595 596 try: 597 for i, testset in enumerate(test_campaign): 598 for j, t in enumerate(testset): 599 if run_test(t, get_interactive_session, theme, 600 verb=verb, my_globals=scapy_ses): 601 passed += 1 602 else: 603 failed += 1 604 if drop_to_interpreter: 605 drop(scapy_ses) 606 test_campaign.duration += t.duration 607 except KeyboardInterrupt: 608 failed += 1 609 testset.trunc(j + 1) 610 test_campaign.trunc(i + 1) 611 test_campaign.interrupted = True 612 if verb: 613 print("Campaign interrupted!") 614 if drop_to_interpreter: 615 drop(scapy_ses) 616 617 test_campaign.passed = passed 618 test_campaign.failed = failed 619 style = [theme.success, theme.fail][bool(failed)] 620 if verb > 2: 621 print("Campaign CRC=%(crc)s in %(duration)06.2fs SHA=%(sha)s" % test_campaign) 622 print(style("PASSED=%i FAILED=%i" % (passed, failed))) 623 elif verb: 624 print("Campaign CRC=%(crc)s SHA=%(sha)s" % test_campaign) 625 print(style("PASSED=%i FAILED=%i" % (passed, failed))) 626 return failed 627 628 629# INFO LINES # 630 631def info_line(test_campaign, theme): 632 filename = test_campaign.filename 633 duration = test_campaign.duration 634 if duration > 10: 635 duration = theme.format(duration, "bg_red+white") 636 elif duration > 5: 637 duration = theme.format(duration, "red") 638 if filename is None: 639 return "Run at %s by UTscapy in %s" % ( 640 time.strftime("%H:%M:%S"), 641 duration 642 ) 643 else: 644 return "Run at %s from [%s] by UTscapy in %s" % ( 645 time.strftime("%H:%M:%S"), 646 filename, 647 duration 648 ) 649 650 651def html_info_line(test_campaign): 652 filename = test_campaign.filename 653 if filename is None: 654 return """Run %s by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % time.ctime() # noqa: E501 655 else: 656 return """Run %s from [%s] by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % (time.ctime(), filename) # noqa: E501 657 658 659def latex_info_line(test_campaign): 660 filename = test_campaign.filename 661 if filename is None: 662 return """by UTscapy""", """%s""" % time.ctime() 663 else: 664 return """from %s by UTscapy""" % tex_escape(filename), """%s""" % time.ctime() 665 666 667# CAMPAIGN TO something # 668 669def campaign_to_TEXT(test_campaign, theme): 670 ptheme = [lambda x: x, theme.success][bool(test_campaign.passed)] 671 ftheme = [lambda x: x, theme.fail][bool(test_campaign.failed)] 672 673 output = theme.green("\n%(title)s\n" % test_campaign) 674 output += dash + " " + info_line(test_campaign, theme) + "\n" 675 output += ptheme(" " + arrow + " Passed=%(passed)i\n" % test_campaign) 676 output += ftheme(" " + arrow + " Failed=%(failed)i\n" % test_campaign) 677 output += "%(headcomments)s\n" % test_campaign 678 679 for testset in test_campaign: 680 if any(t.expand for t in testset): 681 output += "######\n## %(name)s\n######\n%(comments)s\n\n" % testset 682 for t in testset: 683 if t.expand: 684 output += "###(%(num)03i)=[%(result)s] %(name)s\n%(comments)s\n%(output)s\n\n" % t # noqa: E501 685 686 return output 687 688 689def campaign_to_ANSI(test_campaign, theme): 690 return campaign_to_TEXT(test_campaign, theme) 691 692 693def campaign_to_xUNIT(test_campaign): 694 output = '<?xml version="1.0" encoding="UTF-8" ?>\n<testsuite>\n' 695 for testset in test_campaign: 696 for t in testset: 697 output += ' <testcase classname="%s"\n' % testset.name.replace('"', ' ') # noqa: E501 698 output += ' name="%s"\n' % t.name.replace('"', ' ') # noqa: E501 699 output += ' duration="0">\n' % t 700 if not t: 701 output += '<error><![CDATA[%(output)s]]></error>\n' % t 702 output += "</testcase>\n" 703 output += '</testsuite>' 704 return output 705 706 707def campaign_to_HTML(test_campaign): 708 output = """ 709<h1>%(title)s</h1> 710 711<p> 712""" % test_campaign 713 714 if test_campaign.crc is not None and test_campaign.sha is not None: 715 output += "CRC=<span class=crc>%(crc)s</span> SHA=<span class=crc>%(sha)s</span><br>" % test_campaign 716 output += "<small><em>" + html_info_line(test_campaign) + "</em></small>" 717 output += "".join([ 718 test_campaign.headcomments, 719 "\n<p>", 720 "PASSED=%(passed)i FAILED=%(failed)i" % test_campaign, 721 " <span class=warn_interrupted>INTERRUPTED!</span>" if test_campaign.interrupted else "", 722 "<p>\n\n", 723 ]) 724 725 for testset in test_campaign: 726 output += "<h2>" % testset 727 if testset.crc is not None: 728 output += "<span class=crc>%(crc)s</span> " % testset 729 output += "%(name)s</h2>\n%(comments)s\n<ul>\n" % testset 730 for t in testset: 731 output += """<li class=%(result)s id="tst%(num)il">\n""" % t 732 if t.expand == 2: 733 output += """ 734<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">+%(num)03i+</span> 735<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')">-%(num)03i-</span> 736""" % t 737 else: 738 output += """ 739<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')">+%(num)03i+</span> 740<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">-%(num)03i-</span> 741""" % t 742 if t.crc is not None: 743 output += "<span class=crc>%(crc)s</span>\n" % t 744 output += """%(name)s\n<span class="comment %(result)s" id="tst%(num)i" """ % t # noqa: E501 745 if t.expand < 2: 746 output += """ style="POSITION: absolute; VISIBILITY: hidden;" """ # noqa: E501 747 output += """><br>%(comments)s 748<pre> 749%(output)s</pre></span> 750""" % t 751 output += "\n</ul>\n\n" 752 return output 753 754 755def pack_html_campaigns(runned_campaigns, data, local=False, title=None): 756 output = """ 757<html> 758<head> 759<title>%(title)s</title> 760<h1>UTScapy tests</h1> 761 762<span class=control_button onClick="hide_all('tst')">Shrink All</span> 763<span class=control_button onClick="show_all('tst')">Expand All</span> 764<span class=control_button onClick="show_passed('tst')">Expand Passed</span> 765<span class=control_button onClick="show_failed('tst')">Expand Failed</span> 766 767<p> 768""" 769 for test_campaign in runned_campaigns: 770 for ts in test_campaign: 771 for t in ts: 772 output += """<span class=button%(result)s onClick="goto_id('tst%(num)il')">%(num)03i</span>\n""" % t 773 774 output += """</p>\n\n 775<link rel="stylesheet" href="%(UTscapy_css)s" type="text/css"> 776<script language="JavaScript" src="%(UTscapy_js)s" type="text/javascript"></script> 777</head> 778<body> 779%(data)s 780</body></html> 781""" 782 out_dict = {'data': data, 'title': title if title else "UTScapy tests"} 783 if local: 784 dirname = os.path.dirname(test_campaign.output_file) 785 External_Files.UTscapy_js.write(dirname) 786 External_Files.UTscapy_css.write(dirname) 787 out_dict.update(External_Files.get_local_dict()) 788 else: 789 out_dict.update(External_Files.get_URL_dict()) 790 791 output %= out_dict 792 return output 793 794 795def campaign_to_LATEX(test_campaign): 796 output = r""" 797\chapter{%(title)s} 798Run %%s on \date{%%s} 799\begin{description} 800\item[Passed:] %(passed)i 801\item[Failed:] %(failed)i 802\end{description} 803 804%(headcomments)s 805 806""" % test_campaign 807 output %= latex_info_line(test_campaign) 808 809 for testset in test_campaign: 810 output += "\\section{%(name)s}\n\n%(comments)s\n\n" % testset 811 for t in testset: 812 t.comments = tex_escape(t.comments) 813 if t.expand: 814 output += r"""\subsection{%(name)s} 815 816Test result: \textbf{%(result)s}\newline 817 818%(comments)s 819\begin{alltt} 820%(output)s 821\end{alltt} 822 823""" % t 824 825 return output 826 827 828def pack_latex_campaigns(runned_campaigns, data, local=False, title=None): 829 output = r""" 830\documentclass{report} 831\usepackage{alltt} 832\usepackage{xcolor} 833\usepackage{a4wide} 834\usepackage{hyperref} 835 836\title{%(title)s} 837 838\begin{document} 839\maketitle 840\tableofcontents 841 842%(data)s 843\end{document}\n 844""" 845 846 out_dict = {'data': data, 'title': title if title else "UTScapy tests"} 847 848 output %= out_dict 849 return output 850 851 852# USAGE # 853 854def usage(): 855 print("""Usage: UTscapy [-m module] [-f {text|ansi|HTML|LaTeX|xUnit|live}] [-o output_file] 856 [-t testfile] [-T testfile] [-k keywords [-k ...]] [-K keywords [-K ...]] 857 [-l] [-b] [-d|-D] [-F] [-q[q]] [-i] [-P preexecute_python_code] 858 [-c configfile] 859-t\t\t: provide test files (can be used many times) 860-T\t\t: if -t is used with *, remove a specific file (can be used many times) 861-l\t\t: generate local .js and .css files 862-F\t\t: expand only failed tests 863-b\t\t: don't stop at the first failed campaign 864-d\t\t: dump campaign 865-D\t\t: dump campaign and stop 866-R\t\t: dump campaign as reStructuredText 867-C\t\t: don't calculate CRC and SHA 868-c\t\t: load a .utsc config file 869-i\t\t: drop into Python interpreter if test failed 870-q\t\t: quiet mode 871-qq\t\t: [silent mode] 872-x\t\t: use pyannotate 873-n <testnum>\t: only tests whose numbers are given (eg. 1,3-7,12) 874-N\t\t: force non root 875-m <module>\t: additional module to put in the namespace 876-k <kw1>,<kw2>,...\t: include only tests with one of those keywords (can be used many times) 877-K <kw1>,<kw2>,...\t: remove tests with one of those keywords (can be used many times) 878-P <preexecute_python_code> 879""") 880 raise SystemExit 881 882 883# MAIN # 884 885def execute_campaign(TESTFILE, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS, 886 FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER, 887 autorun_func, theme, pos_begin=0, 888 scapy_ses=None): # noqa: E501 889 # Parse test file 890 try: 891 test_campaign = parse_campaign_file(TESTFILE) 892 except ValueError as ex: 893 print( 894 theme.red("Error while parsing '%s': '%s'" % (TESTFILE.name, ex)) 895 ) 896 sys.exit(1) 897 898 # Report parameters 899 if PREEXEC: 900 test_campaign.preexec = PREEXEC 901 902 # Compute campaign CRC and SHA 903 if CRC: 904 compute_campaign_digests(test_campaign) 905 906 # Filter out unwanted tests 907 filter_tests_on_numbers(test_campaign, NUM) 908 for k in KW_OK: 909 filter_tests_keep_on_keywords(test_campaign, k) 910 for k in KW_KO: 911 filter_tests_remove_on_keywords(test_campaign, k) 912 913 remove_empty_testsets(test_campaign) 914 915 # Dump campaign 916 if DUMP: 917 dump_campaign(test_campaign) 918 if DUMP > 1: 919 sys.exit() 920 921 # Dump campaign as reStructuredText 922 if DOCS: 923 docs_campaign(test_campaign) 924 sys.exit() 925 926 # Run tests 927 test_campaign.output_file = OUTPUTFILE 928 result = run_campaign( 929 test_campaign, autorun_func[FORMAT], theme, 930 drop_to_interpreter=INTERPRETER, 931 verb=VERB, 932 scapy_ses=scapy_ses 933 ) 934 935 # Shrink passed 936 if ONLYFAILED: 937 for t in test_campaign.all_tests(): 938 if t: 939 t.expand = 0 940 else: 941 t.expand = 2 942 943 # Generate report 944 if FORMAT == Format.TEXT: 945 output = campaign_to_TEXT(test_campaign, theme) 946 elif FORMAT == Format.ANSI: 947 output = campaign_to_ANSI(test_campaign, theme) 948 elif FORMAT == Format.HTML: 949 test_campaign.startNum(pos_begin) 950 output = campaign_to_HTML(test_campaign) 951 elif FORMAT == Format.LATEX: 952 output = campaign_to_LATEX(test_campaign) 953 elif FORMAT == Format.XUNIT: 954 output = campaign_to_xUNIT(test_campaign) 955 elif FORMAT == Format.LIVE: 956 output = "" 957 958 return output, (result == 0), test_campaign 959 960 961def resolve_testfiles(TESTFILES): 962 for tfile in TESTFILES[:]: 963 if "*" in tfile: 964 TESTFILES.remove(tfile) 965 TESTFILES.extend(sorted(glob.glob(tfile))) 966 return TESTFILES 967 968 969def main(): 970 argv = sys.argv[1:] 971 logger = logging.getLogger("scapy") 972 logger.addHandler(logging.StreamHandler()) 973 974 # Treat SyntaxWarning as errors 975 warnings.filterwarnings("error", category=SyntaxWarning) 976 977 import scapy 978 print(dash + " UTScapy - Scapy %s - %s" % ( 979 scapy.__version__, sys.version.split(" ")[0] 980 )) 981 982 # Parse arguments 983 984 FORMAT = Format.ANSI 985 OUTPUTFILE = sys.stdout 986 LOCAL = 0 987 NUM = None 988 NON_ROOT = False 989 KW_OK = [] 990 KW_KO = [] 991 DUMP = 0 992 DOCS = 0 993 CRC = True 994 BREAKFAILED = True 995 ONLYFAILED = False 996 VERB = 3 997 GLOB_PREEXEC = "" 998 PREEXEC_DICT = {} 999 MODULES = [] 1000 TESTFILES = [] 1001 ANNOTATIONS_MODE = False 1002 INTERPRETER = False 1003 try: 1004 opts = getopt.getopt(argv, "o:t:T:c:f:hbln:m:k:K:DRdCiFqNP:s:x") 1005 for opt, optarg in opts[0]: 1006 if opt == "-h": 1007 usage() 1008 elif opt == "-b": 1009 BREAKFAILED = False 1010 elif opt == "-F": 1011 ONLYFAILED = True 1012 elif opt == "-q": 1013 VERB -= 1 1014 elif opt == "-D": 1015 DUMP = 2 1016 elif opt == "-R": 1017 DOCS = 1 1018 elif opt == "-d": 1019 DUMP = 1 1020 elif opt == "-C": 1021 CRC = False 1022 elif opt == "-i": 1023 INTERPRETER = True 1024 elif opt == "-x": 1025 ANNOTATIONS_MODE = True 1026 elif opt == "-P": 1027 GLOB_PREEXEC += "\n" + optarg 1028 elif opt == "-f": 1029 try: 1030 FORMAT = Format.from_string(optarg) 1031 except KeyError as msg: 1032 raise getopt.GetoptError("Unknown output format %s" % msg) 1033 elif opt == "-t": 1034 TESTFILES.append(optarg) 1035 TESTFILES = resolve_testfiles(TESTFILES) 1036 elif opt == "-T": 1037 TESTFILES.remove(optarg) 1038 elif opt == "-c": 1039 data = parse_config_file(optarg, VERB) 1040 BREAKFAILED = data.breakfailed 1041 ONLYFAILED = data.onlyfailed 1042 VERB = data.verb 1043 DUMP = data.dump 1044 CRC = data.crc 1045 PREEXEC_DICT = data.preexec 1046 GLOB_PREEXEC = data.global_preexec 1047 OUTPUTFILE = data.outfile 1048 TESTFILES = data.testfiles 1049 LOCAL = 1 if data.local else 0 1050 NUM = data.num 1051 MODULES = data.modules 1052 KW_OK.extend(data.kw_ok) 1053 KW_KO.extend(data.kw_ko) 1054 try: 1055 FORMAT = Format.from_string(data.format) 1056 except KeyError as msg: 1057 raise getopt.GetoptError("Unknown output format %s" % msg) 1058 TESTFILES = resolve_testfiles(TESTFILES) 1059 for testfile in resolve_testfiles(data.remove_testfiles): 1060 try: 1061 TESTFILES.remove(testfile) 1062 except ValueError: 1063 error_m = "Cannot remove %s from test files" % testfile 1064 raise getopt.GetoptError(error_m) 1065 elif opt == "-o": 1066 OUTPUTFILE = optarg 1067 if not os.access(os.path.dirname(os.path.abspath(OUTPUTFILE)), os.W_OK): 1068 raise getopt.GetoptError("Cannot write to file %s" % OUTPUTFILE) 1069 elif opt == "-l": 1070 LOCAL = 1 1071 elif opt == "-n": 1072 NUM = [] 1073 for v in (x.strip() for x in optarg.split(",")): 1074 try: 1075 NUM.append(int(v)) 1076 except ValueError: 1077 v1, v2 = [int(e) for e in v.split('-', 1)] 1078 NUM.extend(range(v1, v2 + 1)) 1079 elif opt == "-N": 1080 NON_ROOT = True 1081 elif opt == "-m": 1082 MODULES.append(optarg) 1083 elif opt == "-k": 1084 KW_OK.extend(optarg.split(",")) 1085 elif opt == "-K": 1086 KW_KO.extend(optarg.split(",")) 1087 1088 except getopt.GetoptError as msg: 1089 print("ERROR:", msg) 1090 raise SystemExit 1091 1092 if FORMAT in [Format.LIVE, Format.ANSI]: 1093 theme = DefaultTheme() 1094 else: 1095 theme = BlackAndWhite() 1096 1097 # Disable tests if needed 1098 1099 try: 1100 if NON_ROOT or os.getuid() != 0: # Non root 1101 # Discard root tests 1102 KW_KO.append("needs_root") 1103 if VERB > 2: 1104 print(" " + arrow + " Non-root mode") 1105 except AttributeError: 1106 pass 1107 1108 if BIG_ENDIAN: 1109 KW_KO.append("little_endian_only") 1110 1111 if conf.use_pcap or WINDOWS: 1112 KW_KO.append("not_libpcap") 1113 if VERB > 2: 1114 print(" " + arrow + " libpcap mode") 1115 1116 KW_KO.append("disabled") 1117 1118 if ANNOTATIONS_MODE: 1119 try: 1120 from pyannotate_runtime import collect_types 1121 except ImportError: 1122 raise ImportError("Please install pyannotate !") 1123 collect_types.init_types_collection() 1124 collect_types.start() 1125 1126 if VERB > 2: 1127 print(" " + arrow + " Booting scapy...") 1128 try: 1129 from scapy import all as scapy 1130 except Exception as e: 1131 print("[CRITICAL]: Cannot import Scapy: %s" % e) 1132 traceback.print_exc() 1133 sys.exit(1) # Abort the tests 1134 1135 for m in MODULES: 1136 try: 1137 mod = import_module(m) 1138 builtins.__dict__.update(mod.__dict__) 1139 except ImportError as e: 1140 raise getopt.GetoptError("cannot import [%s]: %s" % (m, e)) 1141 1142 autorun_func = { 1143 Format.TEXT: scapy.autorun_get_text_interactive_session, 1144 Format.ANSI: scapy.autorun_get_ansi_interactive_session, 1145 Format.HTML: scapy.autorun_get_html_interactive_session, 1146 Format.LATEX: scapy.autorun_get_latex_interactive_session, 1147 Format.XUNIT: scapy.autorun_get_text_interactive_session, 1148 Format.LIVE: scapy.autorun_get_live_interactive_session, 1149 } 1150 1151 if VERB > 2: 1152 print(" " + arrow + " Discovering tests files...") 1153 1154 glob_output = "" 1155 glob_result = 0 1156 glob_title = None 1157 1158 UNIQUE = len(TESTFILES) == 1 1159 1160 # Resolve tags and asterix 1161 for prex in copy.copy(PREEXEC_DICT).keys(): 1162 if "*" in prex: 1163 pycode = PREEXEC_DICT[prex] 1164 del PREEXEC_DICT[prex] 1165 for gl in glob.iglob(prex): 1166 _pycode = pycode.replace("%name%", os.path.splitext(os.path.split(gl)[1])[0]) # noqa: E501 1167 PREEXEC_DICT[gl] = _pycode 1168 1169 pos_begin = 0 1170 1171 runned_campaigns = [] 1172 1173 from scapy.main import _scapy_builtins 1174 scapy_ses = _scapy_builtins() 1175 import_UTscapy_tools(scapy_ses) 1176 1177 # Execute all files 1178 for TESTFILE in TESTFILES: 1179 if VERB > 2: 1180 print(theme.green(dash + " Loading: %s" % TESTFILE)) 1181 PREEXEC = PREEXEC_DICT[TESTFILE] if TESTFILE in PREEXEC_DICT else GLOB_PREEXEC 1182 with open(TESTFILE) as testfile: 1183 output, result, campaign = execute_campaign( 1184 testfile, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS, 1185 FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER, 1186 autorun_func, theme, 1187 pos_begin=pos_begin, 1188 scapy_ses=copy.copy(scapy_ses) 1189 ) 1190 runned_campaigns.append(campaign) 1191 pos_begin = campaign.end_pos 1192 if UNIQUE: 1193 glob_title = campaign.title 1194 glob_output += output 1195 if not result: 1196 glob_result = 1 1197 if BREAKFAILED: 1198 break 1199 1200 if VERB > 2: 1201 print( 1202 checkmark + " All campaigns executed. Writing output..." 1203 ) 1204 1205 if ANNOTATIONS_MODE: 1206 collect_types.stop() 1207 collect_types.dump_stats("pyannotate_results") 1208 1209 # Concenate outputs 1210 if FORMAT == Format.HTML: 1211 glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title) 1212 if FORMAT == Format.LATEX: 1213 glob_output = pack_latex_campaigns(runned_campaigns, glob_output, LOCAL, glob_title) 1214 1215 # Write the final output 1216 # Note: on Python 2, we force-encode to ignore ascii errors 1217 # on Python 3, we need to detect the type of stream 1218 if OUTPUTFILE == sys.stdout: 1219 print(glob_output, file=OUTPUTFILE) 1220 else: 1221 with open(OUTPUTFILE, "wb") as f: 1222 f.write(glob_output.encode("utf8", "ignore") 1223 if 'b' in f.mode else glob_output) 1224 1225 # Print end message 1226 if VERB > 2: 1227 if glob_result == 0: 1228 print(theme.green("UTscapy ended successfully")) 1229 else: 1230 print(theme.red("UTscapy ended with error code %s" % glob_result)) 1231 1232 # Check active threads 1233 if VERB > 2: 1234 if threading.active_count() > 1: 1235 print("\nWARNING: UNFINISHED THREADS") 1236 print(threading.enumerate()) 1237 import multiprocessing 1238 processes = multiprocessing.active_children() 1239 if processes: 1240 print("\nWARNING: UNFINISHED PROCESSES") 1241 print(processes) 1242 1243 sys.stdout.flush() 1244 1245 # Return state 1246 return glob_result 1247 1248 1249if __name__ == "__main__": 1250 if sys.warnoptions: 1251 with warnings.catch_warnings(record=True) as cw: 1252 warnings.resetwarnings() 1253 # Let's discover the garbage waste 1254 warnings.simplefilter('error') 1255 print("### Warning mode enabled ###") 1256 res = main() 1257 if cw: 1258 res = 1 1259 sys.exit(res) 1260 else: 1261 sys.exit(main()) 1262