• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Unit testing infrastructure for Scapy
8"""
9
10import builtins
11import bz2
12import copy
13import code
14import getopt
15import glob
16import hashlib
17import importlib
18import json
19import logging
20import os
21import os.path
22import sys
23import threading
24import time
25import traceback
26import warnings
27import zlib
28
29from scapy.consts import WINDOWS, BIG_ENDIAN
30from scapy.config import conf
31from scapy.compat import base64_bytes
32from scapy.themes import DefaultTheme, BlackAndWhite
33from scapy.utils import tex_escape
34
35
36# Check UTF-8 support #
37
38def _utf8_support():
39    """
40    Check UTF-8 support for the output
41    """
42    try:
43        if WINDOWS:
44            return (sys.stdout.encoding == "utf-8")
45        return True
46    except AttributeError:
47        return False
48
49
50if _utf8_support():
51    arrow = "\u2514"
52    dash = "\u2501"
53    checkmark = "\u2713"
54else:
55    arrow = "->"
56    dash = "--"
57    checkmark = "OK"
58
59
60#   Util class   #
61
62class Bunch:
63    __init__ = lambda self, **kw: setattr(self, '__dict__', kw)
64
65
66def retry_test(func):
67    """Retries the passed function 3 times before failing"""
68    v = None
69    tb = None
70    for _ in range(3):
71        try:
72            return func()
73        except Exception:
74            t, v, tb = sys.exc_info()
75            time.sleep(1)
76
77    if v and tb:
78        raise v.with_traceback(tb)
79
80
81def scapy_path(fname):
82    """Resolves a path relative to scapy's root folder"""
83    if fname.startswith('/'):
84        fname = fname[1:]
85    return os.path.abspath(os.path.join(
86        os.path.dirname(__file__), '../../', fname
87    ))
88
89
90class no_debug_dissector:
91    """Context object used to disable conf.debug_dissector"""
92    def __init__(self, reverse=False):
93        self.new_value = reverse
94
95    def __enter__(self):
96        self.old_dbg = conf.debug_dissector
97        conf.debug_dissector = self.new_value
98
99    def __exit__(self, exc_type, exc_value, traceback):
100        conf.debug_dissector = self.old_dbg
101
102
103#    Import tool    #
104
105
106def import_module(name):
107    if name.endswith(".py"):
108        name = name[:-3]
109    try:
110        return importlib.import_module(name, package="scapy")
111    except Exception:
112        return importlib.import_module(name)
113
114
115#    INTERNAL/EXTERNAL FILE EMBEDDING    #
116
117class File:
118    def __init__(self, name, URL, local):
119        self.name = name
120        self.local = local.encode("utf8")
121        self.URL = URL
122
123    def get_local(self):
124        return bz2.decompress(base64_bytes(self.local))
125
126    def get_URL(self):
127        return self.URL
128
129    def write(self, dir):
130        if dir:
131            dir += "/"
132        with open(dir + self.name, "wb") as fdesc:
133            fdesc.write(self.get_local())
134
135
136# Embed a base64 encoded bziped version of js and css files
137# to work if you can't reach Internet.
138class External_Files:
139    UTscapy_js = File("UTscapy.js", "https://scapy.net/files/UTscapy/UTscapy.js",  # noqa: E501
140                      """QlpoOTFBWSZTWWVijKQAAXxfgERUYOvAChIhBAC
141/79+qQAH8AFA0poANAMjQAAAGABo0NGEZNBo0\n0BhgAaNDRhGTQaNNAYFURJinp
142lGaKbRkJiekzSenqmpA0Gm1LFMpRUklVQlK9WUTZYpNFI1IiEWE\nFT09Sfj5uO+
143qO6S5DQwKIxM92+Zku94wL6V/1KTKan2c66Ug6SmVKy1ZIrgauxMVLF5xLH0lJRQ
144u\nKlqLF10iatlTzqvw7S9eS3+h4lu3GZyMgoOude3NJ1pQy8eo+X96IYZw+yneh
145siPj73m0rnvQ3QX\nZ9BJQiZQYQ5/uNcl2WOlC5vyQqV/BWsnr2NZYLYXQLDs/Bf
146fk4ZfR4/SH6GfA5Xlek4xHNHqbSsR\nbREOgueXo3kcYi94K6hSO3ldD2O/qJXOF
147qJ8o3TE2aQahxtQpCVUKQMvODHwu2YkaORYZC6gihEa\nllcHDIAtRPScBACAJnU
148ggYhLDX6DEko7nC9GvAw5OcEkiyDUbLdiGCzDaXWMC2DuQ2Y6sGf6NcRu\nON7QS
149bhHsPc4KKmZ/xdyRThQkGVijKQ=\n""")
150    UTscapy_css = File("UTscapy.css", "https://scapy.net/files/UTscapy/UTscapy.css",  # noqa: E501
151                       """QlpoOTFBWSZTWbpATIwAAFpfgHwQSB//+Cpj2Q
152C//9/6UAS5t7qcLut3NNDp0gxKMmpqaep6n6iP\n1J+pPU0yAAaeoaDI0BJCTJqa
153j1BoaGhoAAPSAAAJNSRqmmk8TQmj1DT1Hom1HkQABoNDmmJgATAB\nMAAJgACYJI
154hDQUzCR5Q0niRoaAGgGmZS+faw7LNbkliDG1Q52WJCd85cxRVVKegld8qCRISoto
155GD\nEGREFEYRW0CxAgTb13lodjuN7E1aCFgRFVhiEmZAZ/ek+XR0c8DWiAKpBgY2
156LNpQ1rOvlnoUI1Al\n0ySaP1w2MyFxoQqRicScCm6WnQOxDnufxk8s2deLLKlN+r
157fvxyTTCGRAWZONkVGIxVQRZGZLeAwH\nbpQXZcYj467i85knEOYWmLcokaqEGYGS
158xMCpD+cOIaL7GCxEU/aNSlWFNCvQBvzb915huAgdIdD2\nya9ZQGoqrmtommfAxu
1597FGTDBNBfir9UkAMmT1KRzxasJ0n2OE+mlgTZzJnhydbJaMtAk8DJzUuvv\nZpc3
160CJLVyr8F3NmIQO5E3SJSY3SQnk1CQwlELqFutXjeWWzmiywo7xJk5rUcVOV9+Ro4
16196WmXsUr\nkKhNocbnFztqPhesccW5kja+KuNFmzdw4DVOBJ2JPhGOYSwCUiwUe2
162kOshYBdULUmwYwToAGdgA9\n5n3bSpG85LUFIE0Cw78EYVgY0ESnYW5UdfgBhj1w
163PiiXDEG2vAtr38O9kdwg3tFU/0okilEjDYDa\nEfkomkLUSokmE8g1fMYBqQyyaP
164RWmySO3EtAuMVhQqIuMldOzLqWubl7k1MnhuBaELOgtB2TChcS\n0k7jvgdBKIef
165UkdAf3t2GO/LVSrDvkcb4l4TrwrI7JeCo8pBvXqZBqZJSqbsAziG7QDQVNqdtFGz
166\nEvMKOvKvUQ6mJFigLxBnziGQGQDEMQPSGhlV2BwAN6rZEmLwgED0OrEiSxXDcB
167MDskp36AV7IbKa\nCila/Wm1BKhBF+ZIqtiFyYpUhI1Q5+JK0zK7aVyLS9y7GaSr
168NCRpr7uaa1UgapVKs6wKKQzYCWsV\n8iCGrAkgWZEnDMJWCGUZOIpcmMle1UXSAl
169d5OoUYXNo0L7WSOcxEkSGjCcRhjvMRP1pAUuBPRCRA\n2lhC0ZgLYDAf5V2agMUa
170ki1ZgOQDXQ7aIDTdjGRTgnzPML0V1X+tIoSSZmZhrxZbluMWGEkwwky6\n0ObWIM
171cEbX4cawPPBVc6m5UUPbEmBANyjtNvTKE2ri7oOmBVKIMLqQKm+4rlmisu2uGSxW
172zTov5w\nqQDp61FkHk40wzQUKk4YcBlbQT1l8VXeZJYAVFjSJIcC8JykBYZJ1yka
173I4LDm5WP7s2NaRkhhV7A\nFVSD5zA8V/DJzfTk0QHmCT2wRgwPKjP60EqqlDUaST
174/i7kinChIXSAmRgA==\n""")
175
176    def get_local_dict(cls):
177        return {x: y.name for (x, y) in cls.__dict__.items()
178                if isinstance(y, File)}
179    get_local_dict = classmethod(get_local_dict)
180
181    def get_URL_dict(cls):
182        return {x: y.URL for (x, y) in cls.__dict__.items()
183                if isinstance(y, File)}
184    get_URL_dict = classmethod(get_URL_dict)
185
186
187#    HELPER CLASSES FOR PARAMETRING OUTPUT FORMAT    #
188
189class EnumClass:
190    def from_string(cls, x):
191        return cls.__dict__[x.upper()]
192    from_string = classmethod(from_string)
193
194
195class Format(EnumClass):
196    TEXT = 1
197    ANSI = 2
198    HTML = 3
199    LATEX = 4
200    XUNIT = 5
201    LIVE = 6
202
203
204#    TEST CLASSES    #
205
206class TestClass:
207    def __getitem__(self, item):
208        return getattr(self, item)
209
210    def add_keywords(self, kws):
211        if isinstance(kws, str):
212            kws = [kws.lower()]
213        for kwd in kws:
214            kwd = kwd.lower()
215            if kwd.startswith('-'):
216                try:
217                    self.keywords.remove(kwd[1:])
218                except KeyError:
219                    pass
220            else:
221                self.keywords.add(kwd)
222
223
224class TestCampaign(TestClass):
225    def __init__(self, title):
226        self.title = title
227        self.filename = None
228        self.headcomments = ""
229        self.campaign = []
230        self.keywords = set()
231        self.crc = None
232        self.sha = None
233        self.preexec = None
234        self.preexec_output = None
235        self.end_pos = 0
236        self.interrupted = False
237        self.duration = 0.0
238
239    def add_testset(self, testset):
240        self.campaign.append(testset)
241        testset.keywords.update(self.keywords)
242
243    def trunc(self, index):
244        self.campaign = self.campaign[:index]
245
246    def startNum(self, beginpos):
247        for ts in self:
248            for t in ts:
249                t.num = beginpos
250                beginpos += 1
251        self.end_pos = beginpos
252
253    def __iter__(self):
254        return self.campaign.__iter__()
255
256    def all_tests(self):
257        for ts in self:
258            for t in ts:
259                yield t
260
261
262class TestSet(TestClass):
263    def __init__(self, name):
264        self.name = name
265        self.tests = []
266        self.comments = ""
267        self.keywords = set()
268        self.crc = None
269        self.expand = 1
270
271    def add_test(self, test):
272        self.tests.append(test)
273        test.keywords.update(self.keywords)
274
275    def trunc(self, index):
276        self.tests = self.tests[:index]
277
278    def __iter__(self):
279        return self.tests.__iter__()
280
281
282class UnitTest(TestClass):
283    def __init__(self, name):
284        self.name = name
285        self.test = ""
286        self.comments = ""
287        self.result = "passed"
288        self.fresult = ""
289        # make instance True at init to have a different truth value than None
290        self.duration = 0
291        self.output = ""
292        self.num = -1
293        self.keywords = set()
294        self.crc = None
295        self.expand = 1
296
297    def prepare(self, theme):
298        if self.result == "passed":
299            self.fresult = theme.success(self.result)
300        else:
301            self.fresult = theme.fail(self.result)
302
303    def __nonzero__(self):
304        return self.result == "passed"
305    __bool__ = __nonzero__
306
307
308# Careful note: all data not included will be set by default.
309# Use -c as first argument !!
310def parse_config_file(config_path, verb=3):
311    """Parse provided json to get configuration
312    Empty default json:
313    {
314      "testfiles": [],
315      "breakfailed": true,
316      "onlyfailed": false,
317      "verb": 3,
318      "dump": 0,
319      "docs": 0,
320      "crc": true,
321      "preexec": {},
322      "global_preexec": "",
323      "outputfile": null,
324      "local": true,
325      "format": "ansi",
326      "num": null,
327      "modules": [],
328      "kw_ok": [],
329      "kw_ko": []
330    }
331
332    """
333    with open(config_path) as config_file:
334        data = json.load(config_file)
335        if verb > 2:
336            print(" %s Loaded config file" % arrow, config_path)
337
338    def get_if_exist(key, default):
339        return data[key] if key in data else default
340    return Bunch(testfiles=get_if_exist("testfiles", []),
341                 breakfailed=get_if_exist("breakfailed", True),
342                 remove_testfiles=get_if_exist("remove_testfiles", []),
343                 onlyfailed=get_if_exist("onlyfailed", False),
344                 verb=get_if_exist("verb", 3),
345                 dump=get_if_exist("dump", 0), crc=get_if_exist("crc", 1),
346                 docs=get_if_exist("docs", 0),
347                 preexec=get_if_exist("preexec", {}),
348                 global_preexec=get_if_exist("global_preexec", ""),
349                 outfile=get_if_exist("outputfile", sys.stdout),
350                 local=get_if_exist("local", False),
351                 num=get_if_exist("num", None),
352                 modules=get_if_exist("modules", []),
353                 kw_ok=get_if_exist("kw_ok", []),
354                 kw_ko=get_if_exist("kw_ko", []),
355                 format=get_if_exist("format", "ansi"))
356
357#    PARSE CAMPAIGN    #
358
359
360def parse_campaign_file(campaign_file):
361    test_campaign = TestCampaign("Test campaign")
362    test_campaign.filename = campaign_file.name
363    testset = None
364    test = None
365    testnb = 0
366
367    for line in campaign_file.readlines():
368        if line[0] == '#':
369            continue
370        if line[0] == "~":
371            (test or testset or test_campaign).add_keywords(line[1:].split())
372        elif line[0] == "%":
373            test_campaign.title = line[1:].strip()
374        elif line[0] == "+":
375            testset = TestSet(line[1:].strip())
376            test_campaign.add_testset(testset)
377            test = None
378        elif line[0] == "=":
379            test = UnitTest(line[1:].strip())
380            test.num = testnb
381            testnb += 1
382            if testset is None:
383                error_m = "Please create a test set (i.e. '+' section)."
384                raise getopt.GetoptError(error_m)
385            testset.add_test(test)
386        elif line[0] == "*":
387            if test is not None:
388                test.comments += line[1:]
389            elif testset is not None:
390                testset.comments += line[1:]
391            else:
392                test_campaign.headcomments += line[1:]
393        else:
394            if test is None:
395                if line.strip():
396                    raise ValueError("Unknown content [%s]" % line.strip())
397            else:
398                test.test += line
399    return test_campaign
400
401
402def dump_campaign(test_campaign):
403    print("#" * (len(test_campaign.title) + 6))
404    print("## %(title)s ##" % test_campaign)
405    print("#" * (len(test_campaign.title) + 6))
406    if test_campaign.sha and test_campaign.crc:
407        print("CRC=[%(crc)s] SHA=[%(sha)s]" % test_campaign)
408    print("from file %(filename)s" % test_campaign)
409    print()
410    for ts in test_campaign:
411        if ts.crc:
412            print("+--[%s]%s(%s)--" % (ts.name, "-" * max(2, 80 - len(ts.name) - 18), ts.crc))  # noqa: E501
413        else:
414            print("+--[%s]%s" % (ts.name, "-" * max(2, 80 - len(ts.name) - 6)))
415        if ts.keywords:
416            print("  kw=%s" % ",".join(ts.keywords))
417        for t in ts:
418            print("%(num)03i %(name)s" % t)
419            c = k = ""
420            if t.keywords:
421                k = "kw=%s" % ",".join(t.keywords)
422            if t.crc:
423                c = "[%(crc)s] " % t
424            if c or k:
425                print("    %s%s" % (c, k))
426
427
428def docs_campaign(test_campaign):
429    print("%(title)s" % test_campaign)
430    print("=" * (len(test_campaign.title)))
431    print()
432    if len(test_campaign.headcomments):
433        print("%s" % test_campaign.headcomments.strip().replace("\n", ""))
434        print()
435    for ts in test_campaign:
436        print("%s" % ts.name)
437        print("-" * len(ts.name))
438        print()
439        if len(ts.comments):
440            print("%s" % ts.comments.strip().replace("\n", ""))
441            print()
442        for t in ts:
443            print("%s" % t.name)
444            print("^" * len(t.name))
445            print()
446            if len(t.comments):
447                print("%s" % t.comments.strip().replace("\n", ""))
448                print()
449            print("Usage example::")
450            for line in t.test.split('\n'):
451                if not line.rstrip().endswith('# no_docs'):
452                    print("\t%s" % line)
453
454
455#    COMPUTE CAMPAIGN DIGESTS    #
456def crc32(x):
457    return "%08X" % (0xffffffff & zlib.crc32(bytearray(x, "utf8")))
458
459
460def sha1(x):
461    return hashlib.sha1(x.encode("utf8")).hexdigest().upper()
462
463
464def compute_campaign_digests(test_campaign):
465    dc = ""
466    for ts in test_campaign:
467        dts = ""
468        for t in ts:
469            dt = t.test.strip()
470            t.crc = crc32(dt)
471            dts += "\0" + dt
472        ts.crc = crc32(dts)
473        dc += "\0\x01" + dts
474    test_campaign.crc = crc32(dc)
475    with open(test_campaign.filename) as fdesc:
476        test_campaign.sha = sha1(fdesc.read())
477
478
479#    FILTER CAMPAIGN     #
480
481def filter_tests_on_numbers(test_campaign, num):
482    if num:
483        for ts in test_campaign:
484            ts.tests = [t for t in ts.tests if t.num in num]
485        test_campaign.campaign = [ts for ts in test_campaign.campaign
486                                  if ts.tests]
487
488
489def _filter_tests_kw(test_campaign, kw, keep):
490    def kw_match(lst, kw):
491        return any(k for k in lst if kw == k)
492
493    if kw:
494        kw = kw.lower()
495        if keep:
496            cond = lambda x: x
497        else:
498            cond = lambda x: not x
499        for ts in test_campaign:
500            ts.tests = [t for t in ts.tests if cond(kw_match(t.keywords, kw))]
501
502
503def filter_tests_keep_on_keywords(test_campaign, kw):
504    return _filter_tests_kw(test_campaign, kw, True)
505
506
507def filter_tests_remove_on_keywords(test_campaign, kw):
508    return _filter_tests_kw(test_campaign, kw, False)
509
510
511def remove_empty_testsets(test_campaign):
512    test_campaign.campaign = [ts for ts in test_campaign.campaign if ts.tests]
513
514
515# RUN TEST #
516
517def _run_test_timeout(test, get_interactive_session, verb=3, my_globals=None):
518    """Run a test with timeout"""
519    from scapy.autorun import StopAutorunTimeout
520    try:
521        return get_interactive_session(test,
522                                       timeout=5 * 60,  # 5 min
523                                       verb=verb,
524                                       my_globals=my_globals)
525    except StopAutorunTimeout:
526        return "-- Test timed out ! --", False
527
528
529def run_test(test, get_interactive_session, theme, verb=3,
530             my_globals=None):
531    """An internal UTScapy function to run a single test"""
532    start_time = time.time()
533    test.output, res = _run_test_timeout(test.test.strip(), get_interactive_session, verb=verb, my_globals=my_globals)
534    test.result = "failed"
535    try:
536        if res is None or res:
537            test.result = "passed"
538        if test.output.endswith('KeyboardInterrupt\n'):
539            test.result = "interrupted"
540            raise KeyboardInterrupt
541    except Exception:
542        test.output += "UTscapy: Error during result interpretation:\n"
543        test.output += "".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2],))
544    finally:
545        test.duration = time.time() - start_time
546        if test.result == "failed":
547            from scapy.sendrecv import debug
548            # Add optional debugging data to log
549            if debug.crashed_on:
550                cls, val = debug.crashed_on
551                test.output += "\n\nPACKET DISSECTION FAILED ON:\n %s(bytes.fromhex('%s'))" % (cls.__name__, val.hex())
552                debug.crashed_on = None
553        test.prepare(theme)
554        if verb > 2:
555            print("%(fresult)6s %(crc)s %(duration)06.2fs %(name)s" % test)
556        elif verb > 1:
557            print("%(fresult)6s %(crc)s %(name)s" % test)
558
559    return bool(test)
560
561# RUN CAMPAIGN #
562
563
564def import_UTscapy_tools(ses):
565    """Adds UTScapy tools directly to a session"""
566    ses["Bunch"] = Bunch
567    ses["retry_test"] = retry_test
568    ses["scapy_path"] = scapy_path
569    ses["no_debug_dissector"] = no_debug_dissector
570    if WINDOWS:
571        from scapy.arch.windows import _route_add_loopback
572        _route_add_loopback()
573        ses["conf"].ifaces = conf.ifaces
574        ses["conf"].route.routes = conf.route.routes
575        ses["conf"].route6.routes = conf.route6.routes
576
577
578def run_campaign(test_campaign, get_interactive_session, theme,
579                 drop_to_interpreter=False, verb=3,
580                 scapy_ses=None):
581    passed = failed = 0
582    if test_campaign.preexec:
583        test_campaign.preexec_output = get_interactive_session(
584            test_campaign.preexec.strip(),
585            my_globals=scapy_ses
586        )[0]
587
588    # Drop
589    def drop(scapy_ses):
590        code.interact(banner="Test '%s' failed. "
591                             "exit() to stop, Ctrl-D to leave "
592                             "this interpreter and continue "
593                             "with the current test campaign"
594                             % t.name, local=scapy_ses)
595
596    try:
597        for i, testset in enumerate(test_campaign):
598            for j, t in enumerate(testset):
599                if run_test(t, get_interactive_session, theme,
600                            verb=verb, my_globals=scapy_ses):
601                    passed += 1
602                else:
603                    failed += 1
604                    if drop_to_interpreter:
605                        drop(scapy_ses)
606                test_campaign.duration += t.duration
607    except KeyboardInterrupt:
608        failed += 1
609        testset.trunc(j + 1)
610        test_campaign.trunc(i + 1)
611        test_campaign.interrupted = True
612        if verb:
613            print("Campaign interrupted!")
614            if drop_to_interpreter:
615                drop(scapy_ses)
616
617    test_campaign.passed = passed
618    test_campaign.failed = failed
619    style = [theme.success, theme.fail][bool(failed)]
620    if verb > 2:
621        print("Campaign CRC=%(crc)s in %(duration)06.2fs SHA=%(sha)s" % test_campaign)
622        print(style("PASSED=%i FAILED=%i" % (passed, failed)))
623    elif verb:
624        print("Campaign CRC=%(crc)s  SHA=%(sha)s" % test_campaign)
625        print(style("PASSED=%i FAILED=%i" % (passed, failed)))
626    return failed
627
628
629#    INFO LINES    #
630
631def info_line(test_campaign, theme):
632    filename = test_campaign.filename
633    duration = test_campaign.duration
634    if duration > 10:
635        duration = theme.format(duration, "bg_red+white")
636    elif duration > 5:
637        duration = theme.format(duration, "red")
638    if filename is None:
639        return "Run at %s by UTscapy in %s" % (
640            time.strftime("%H:%M:%S"),
641            duration
642        )
643    else:
644        return "Run at %s from [%s] by UTscapy in %s" % (
645            time.strftime("%H:%M:%S"),
646            filename,
647            duration
648        )
649
650
651def html_info_line(test_campaign):
652    filename = test_campaign.filename
653    if filename is None:
654        return """Run %s by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % time.ctime()  # noqa: E501
655    else:
656        return """Run %s from [%s] by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % (time.ctime(), filename)  # noqa: E501
657
658
659def latex_info_line(test_campaign):
660    filename = test_campaign.filename
661    if filename is None:
662        return """by UTscapy""", """%s""" % time.ctime()
663    else:
664        return """from %s by UTscapy""" % tex_escape(filename), """%s""" % time.ctime()
665
666
667#    CAMPAIGN TO something    #
668
669def campaign_to_TEXT(test_campaign, theme):
670    ptheme = [lambda x: x, theme.success][bool(test_campaign.passed)]
671    ftheme = [lambda x: x, theme.fail][bool(test_campaign.failed)]
672
673    output = theme.green("\n%(title)s\n" % test_campaign)
674    output += dash + " " + info_line(test_campaign, theme) + "\n"
675    output += ptheme(" " + arrow + " Passed=%(passed)i\n" % test_campaign)
676    output += ftheme(" " + arrow + " Failed=%(failed)i\n" % test_campaign)
677    output += "%(headcomments)s\n" % test_campaign
678
679    for testset in test_campaign:
680        if any(t.expand for t in testset):
681            output += "######\n## %(name)s\n######\n%(comments)s\n\n" % testset
682            for t in testset:
683                if t.expand:
684                    output += "###(%(num)03i)=[%(result)s] %(name)s\n%(comments)s\n%(output)s\n\n" % t  # noqa: E501
685
686    return output
687
688
689def campaign_to_ANSI(test_campaign, theme):
690    return campaign_to_TEXT(test_campaign, theme)
691
692
693def campaign_to_xUNIT(test_campaign):
694    output = '<?xml version="1.0" encoding="UTF-8" ?>\n<testsuite>\n'
695    for testset in test_campaign:
696        for t in testset:
697            output += ' <testcase classname="%s"\n' % testset.name.replace('"', ' ')  # noqa: E501
698            output += '           name="%s"\n' % t.name.replace('"', ' ')  # noqa: E501
699            output += '           duration="0">\n' % t
700            if not t:
701                output += '<error><![CDATA[%(output)s]]></error>\n' % t
702            output += "</testcase>\n"
703    output += '</testsuite>'
704    return output
705
706
707def campaign_to_HTML(test_campaign):
708    output = """
709<h1>%(title)s</h1>
710
711<p>
712""" % test_campaign
713
714    if test_campaign.crc is not None and test_campaign.sha is not None:
715        output += "CRC=<span class=crc>%(crc)s</span> SHA=<span class=crc>%(sha)s</span><br>" % test_campaign
716    output += "<small><em>" + html_info_line(test_campaign) + "</em></small>"
717    output += "".join([
718        test_campaign.headcomments,
719        "\n<p>",
720        "PASSED=%(passed)i FAILED=%(failed)i" % test_campaign,
721        " <span class=warn_interrupted>INTERRUPTED!</span>" if test_campaign.interrupted else "",
722        "<p>\n\n",
723    ])
724
725    for testset in test_campaign:
726        output += "<h2>" % testset
727        if testset.crc is not None:
728            output += "<span class=crc>%(crc)s</span> " % testset
729        output += "%(name)s</h2>\n%(comments)s\n<ul>\n" % testset
730        for t in testset:
731            output += """<li class=%(result)s id="tst%(num)il">\n""" % t
732            if t.expand == 2:
733                output += """
734<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">+%(num)03i+</span>
735<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')">-%(num)03i-</span>
736""" % t
737            else:
738                output += """
739<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')">+%(num)03i+</span>
740<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">-%(num)03i-</span>
741""" % t
742            if t.crc is not None:
743                output += "<span class=crc>%(crc)s</span>\n" % t
744            output += """%(name)s\n<span class="comment %(result)s" id="tst%(num)i" """ % t  # noqa: E501
745            if t.expand < 2:
746                output += """ style="POSITION: absolute; VISIBILITY: hidden;" """  # noqa: E501
747            output += """><br>%(comments)s
748<pre>
749%(output)s</pre></span>
750""" % t
751        output += "\n</ul>\n\n"
752    return output
753
754
755def pack_html_campaigns(runned_campaigns, data, local=False, title=None):
756    output = """
757<html>
758<head>
759<title>%(title)s</title>
760<h1>UTScapy tests</h1>
761
762<span class=control_button onClick="hide_all('tst')">Shrink All</span>
763<span class=control_button onClick="show_all('tst')">Expand All</span>
764<span class=control_button onClick="show_passed('tst')">Expand Passed</span>
765<span class=control_button onClick="show_failed('tst')">Expand Failed</span>
766
767<p>
768"""
769    for test_campaign in runned_campaigns:
770        for ts in test_campaign:
771            for t in ts:
772                output += """<span class=button%(result)s onClick="goto_id('tst%(num)il')">%(num)03i</span>\n""" % t
773
774    output += """</p>\n\n
775<link rel="stylesheet" href="%(UTscapy_css)s" type="text/css">
776<script language="JavaScript" src="%(UTscapy_js)s" type="text/javascript"></script>
777</head>
778<body>
779%(data)s
780</body></html>
781"""
782    out_dict = {'data': data, 'title': title if title else "UTScapy tests"}
783    if local:
784        dirname = os.path.dirname(test_campaign.output_file)
785        External_Files.UTscapy_js.write(dirname)
786        External_Files.UTscapy_css.write(dirname)
787        out_dict.update(External_Files.get_local_dict())
788    else:
789        out_dict.update(External_Files.get_URL_dict())
790
791    output %= out_dict
792    return output
793
794
795def campaign_to_LATEX(test_campaign):
796    output = r"""
797\chapter{%(title)s}
798Run %%s on \date{%%s}
799\begin{description}
800\item[Passed:] %(passed)i
801\item[Failed:] %(failed)i
802\end{description}
803
804%(headcomments)s
805
806""" % test_campaign
807    output %= latex_info_line(test_campaign)
808
809    for testset in test_campaign:
810        output += "\\section{%(name)s}\n\n%(comments)s\n\n" % testset
811        for t in testset:
812            t.comments = tex_escape(t.comments)
813            if t.expand:
814                output += r"""\subsection{%(name)s}
815
816Test result: \textbf{%(result)s}\newline
817
818%(comments)s
819\begin{alltt}
820%(output)s
821\end{alltt}
822
823""" % t
824
825    return output
826
827
828def pack_latex_campaigns(runned_campaigns, data, local=False, title=None):
829    output = r"""
830\documentclass{report}
831\usepackage{alltt}
832\usepackage{xcolor}
833\usepackage{a4wide}
834\usepackage{hyperref}
835
836\title{%(title)s}
837
838\begin{document}
839\maketitle
840\tableofcontents
841
842%(data)s
843\end{document}\n
844"""
845
846    out_dict = {'data': data, 'title': title if title else "UTScapy tests"}
847
848    output %= out_dict
849    return output
850
851
852# USAGE #
853
854def usage():
855    print("""Usage: UTscapy [-m module] [-f {text|ansi|HTML|LaTeX|xUnit|live}] [-o output_file]
856               [-t testfile] [-T testfile] [-k keywords [-k ...]] [-K keywords [-K ...]]
857               [-l] [-b] [-d|-D] [-F] [-q[q]] [-i] [-P preexecute_python_code]
858               [-c configfile]
859-t\t\t: provide test files (can be used many times)
860-T\t\t: if -t is used with *, remove a specific file (can be used many times)
861-l\t\t: generate local .js and .css files
862-F\t\t: expand only failed tests
863-b\t\t: don't stop at the first failed campaign
864-d\t\t: dump campaign
865-D\t\t: dump campaign and stop
866-R\t\t: dump campaign as reStructuredText
867-C\t\t: don't calculate CRC and SHA
868-c\t\t: load a .utsc config file
869-i\t\t: drop into Python interpreter if test failed
870-q\t\t: quiet mode
871-qq\t\t: [silent mode]
872-x\t\t: use pyannotate
873-n <testnum>\t: only tests whose numbers are given (eg. 1,3-7,12)
874-N\t\t: force non root
875-m <module>\t: additional module to put in the namespace
876-k <kw1>,<kw2>,...\t: include only tests with one of those keywords (can be used many times)
877-K <kw1>,<kw2>,...\t: remove tests with one of those keywords (can be used many times)
878-P <preexecute_python_code>
879""")
880    raise SystemExit
881
882
883#    MAIN    #
884
885def execute_campaign(TESTFILE, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS,
886                     FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER,
887                     autorun_func, theme, pos_begin=0,
888                     scapy_ses=None):  # noqa: E501
889    # Parse test file
890    try:
891        test_campaign = parse_campaign_file(TESTFILE)
892    except ValueError as ex:
893        print(
894            theme.red("Error while parsing '%s': '%s'" % (TESTFILE.name, ex))
895        )
896        sys.exit(1)
897
898    # Report parameters
899    if PREEXEC:
900        test_campaign.preexec = PREEXEC
901
902    # Compute campaign CRC and SHA
903    if CRC:
904        compute_campaign_digests(test_campaign)
905
906    # Filter out unwanted tests
907    filter_tests_on_numbers(test_campaign, NUM)
908    for k in KW_OK:
909        filter_tests_keep_on_keywords(test_campaign, k)
910    for k in KW_KO:
911        filter_tests_remove_on_keywords(test_campaign, k)
912
913    remove_empty_testsets(test_campaign)
914
915    # Dump campaign
916    if DUMP:
917        dump_campaign(test_campaign)
918        if DUMP > 1:
919            sys.exit()
920
921    # Dump campaign as reStructuredText
922    if DOCS:
923        docs_campaign(test_campaign)
924        sys.exit()
925
926    # Run tests
927    test_campaign.output_file = OUTPUTFILE
928    result = run_campaign(
929        test_campaign, autorun_func[FORMAT], theme,
930        drop_to_interpreter=INTERPRETER,
931        verb=VERB,
932        scapy_ses=scapy_ses
933    )
934
935    # Shrink passed
936    if ONLYFAILED:
937        for t in test_campaign.all_tests():
938            if t:
939                t.expand = 0
940            else:
941                t.expand = 2
942
943    # Generate report
944    if FORMAT == Format.TEXT:
945        output = campaign_to_TEXT(test_campaign, theme)
946    elif FORMAT == Format.ANSI:
947        output = campaign_to_ANSI(test_campaign, theme)
948    elif FORMAT == Format.HTML:
949        test_campaign.startNum(pos_begin)
950        output = campaign_to_HTML(test_campaign)
951    elif FORMAT == Format.LATEX:
952        output = campaign_to_LATEX(test_campaign)
953    elif FORMAT == Format.XUNIT:
954        output = campaign_to_xUNIT(test_campaign)
955    elif FORMAT == Format.LIVE:
956        output = ""
957
958    return output, (result == 0), test_campaign
959
960
961def resolve_testfiles(TESTFILES):
962    for tfile in TESTFILES[:]:
963        if "*" in tfile:
964            TESTFILES.remove(tfile)
965            TESTFILES.extend(sorted(glob.glob(tfile)))
966    return TESTFILES
967
968
969def main():
970    argv = sys.argv[1:]
971    logger = logging.getLogger("scapy")
972    logger.addHandler(logging.StreamHandler())
973
974    # Treat SyntaxWarning as errors
975    warnings.filterwarnings("error", category=SyntaxWarning)
976
977    import scapy
978    print(dash + " UTScapy - Scapy %s - %s" % (
979        scapy.__version__, sys.version.split(" ")[0]
980    ))
981
982    # Parse arguments
983
984    FORMAT = Format.ANSI
985    OUTPUTFILE = sys.stdout
986    LOCAL = 0
987    NUM = None
988    NON_ROOT = False
989    KW_OK = []
990    KW_KO = []
991    DUMP = 0
992    DOCS = 0
993    CRC = True
994    BREAKFAILED = True
995    ONLYFAILED = False
996    VERB = 3
997    GLOB_PREEXEC = ""
998    PREEXEC_DICT = {}
999    MODULES = []
1000    TESTFILES = []
1001    ANNOTATIONS_MODE = False
1002    INTERPRETER = False
1003    try:
1004        opts = getopt.getopt(argv, "o:t:T:c:f:hbln:m:k:K:DRdCiFqNP:s:x")
1005        for opt, optarg in opts[0]:
1006            if opt == "-h":
1007                usage()
1008            elif opt == "-b":
1009                BREAKFAILED = False
1010            elif opt == "-F":
1011                ONLYFAILED = True
1012            elif opt == "-q":
1013                VERB -= 1
1014            elif opt == "-D":
1015                DUMP = 2
1016            elif opt == "-R":
1017                DOCS = 1
1018            elif opt == "-d":
1019                DUMP = 1
1020            elif opt == "-C":
1021                CRC = False
1022            elif opt == "-i":
1023                INTERPRETER = True
1024            elif opt == "-x":
1025                ANNOTATIONS_MODE = True
1026            elif opt == "-P":
1027                GLOB_PREEXEC += "\n" + optarg
1028            elif opt == "-f":
1029                try:
1030                    FORMAT = Format.from_string(optarg)
1031                except KeyError as msg:
1032                    raise getopt.GetoptError("Unknown output format %s" % msg)
1033            elif opt == "-t":
1034                TESTFILES.append(optarg)
1035                TESTFILES = resolve_testfiles(TESTFILES)
1036            elif opt == "-T":
1037                TESTFILES.remove(optarg)
1038            elif opt == "-c":
1039                data = parse_config_file(optarg, VERB)
1040                BREAKFAILED = data.breakfailed
1041                ONLYFAILED = data.onlyfailed
1042                VERB = data.verb
1043                DUMP = data.dump
1044                CRC = data.crc
1045                PREEXEC_DICT = data.preexec
1046                GLOB_PREEXEC = data.global_preexec
1047                OUTPUTFILE = data.outfile
1048                TESTFILES = data.testfiles
1049                LOCAL = 1 if data.local else 0
1050                NUM = data.num
1051                MODULES = data.modules
1052                KW_OK.extend(data.kw_ok)
1053                KW_KO.extend(data.kw_ko)
1054                try:
1055                    FORMAT = Format.from_string(data.format)
1056                except KeyError as msg:
1057                    raise getopt.GetoptError("Unknown output format %s" % msg)
1058                TESTFILES = resolve_testfiles(TESTFILES)
1059                for testfile in resolve_testfiles(data.remove_testfiles):
1060                    try:
1061                        TESTFILES.remove(testfile)
1062                    except ValueError:
1063                        error_m = "Cannot remove %s from test files" % testfile
1064                        raise getopt.GetoptError(error_m)
1065            elif opt == "-o":
1066                OUTPUTFILE = optarg
1067                if not os.access(os.path.dirname(os.path.abspath(OUTPUTFILE)), os.W_OK):
1068                    raise getopt.GetoptError("Cannot write to file %s" % OUTPUTFILE)
1069            elif opt == "-l":
1070                LOCAL = 1
1071            elif opt == "-n":
1072                NUM = []
1073                for v in (x.strip() for x in optarg.split(",")):
1074                    try:
1075                        NUM.append(int(v))
1076                    except ValueError:
1077                        v1, v2 = [int(e) for e in v.split('-', 1)]
1078                        NUM.extend(range(v1, v2 + 1))
1079            elif opt == "-N":
1080                NON_ROOT = True
1081            elif opt == "-m":
1082                MODULES.append(optarg)
1083            elif opt == "-k":
1084                KW_OK.extend(optarg.split(","))
1085            elif opt == "-K":
1086                KW_KO.extend(optarg.split(","))
1087
1088    except getopt.GetoptError as msg:
1089        print("ERROR:", msg)
1090        raise SystemExit
1091
1092    if FORMAT in [Format.LIVE, Format.ANSI]:
1093        theme = DefaultTheme()
1094    else:
1095        theme = BlackAndWhite()
1096
1097    # Disable tests if needed
1098
1099    try:
1100        if NON_ROOT or os.getuid() != 0:  # Non root
1101            # Discard root tests
1102            KW_KO.append("needs_root")
1103            if VERB > 2:
1104                print(" " + arrow + " Non-root mode")
1105    except AttributeError:
1106        pass
1107
1108    if BIG_ENDIAN:
1109        KW_KO.append("little_endian_only")
1110
1111    if conf.use_pcap or WINDOWS:
1112        KW_KO.append("not_libpcap")
1113        if VERB > 2:
1114            print(" " + arrow + " libpcap mode")
1115
1116    KW_KO.append("disabled")
1117
1118    if ANNOTATIONS_MODE:
1119        try:
1120            from pyannotate_runtime import collect_types
1121        except ImportError:
1122            raise ImportError("Please install pyannotate !")
1123        collect_types.init_types_collection()
1124        collect_types.start()
1125
1126    if VERB > 2:
1127        print(" " + arrow + " Booting scapy...")
1128    try:
1129        from scapy import all as scapy
1130    except Exception as e:
1131        print("[CRITICAL]: Cannot import Scapy: %s" % e)
1132        traceback.print_exc()
1133        sys.exit(1)  # Abort the tests
1134
1135    for m in MODULES:
1136        try:
1137            mod = import_module(m)
1138            builtins.__dict__.update(mod.__dict__)
1139        except ImportError as e:
1140            raise getopt.GetoptError("cannot import [%s]: %s" % (m, e))
1141
1142    autorun_func = {
1143        Format.TEXT: scapy.autorun_get_text_interactive_session,
1144        Format.ANSI: scapy.autorun_get_ansi_interactive_session,
1145        Format.HTML: scapy.autorun_get_html_interactive_session,
1146        Format.LATEX: scapy.autorun_get_latex_interactive_session,
1147        Format.XUNIT: scapy.autorun_get_text_interactive_session,
1148        Format.LIVE: scapy.autorun_get_live_interactive_session,
1149    }
1150
1151    if VERB > 2:
1152        print(" " + arrow + " Discovering tests files...")
1153
1154    glob_output = ""
1155    glob_result = 0
1156    glob_title = None
1157
1158    UNIQUE = len(TESTFILES) == 1
1159
1160    # Resolve tags and asterix
1161    for prex in copy.copy(PREEXEC_DICT).keys():
1162        if "*" in prex:
1163            pycode = PREEXEC_DICT[prex]
1164            del PREEXEC_DICT[prex]
1165            for gl in glob.iglob(prex):
1166                _pycode = pycode.replace("%name%", os.path.splitext(os.path.split(gl)[1])[0])  # noqa: E501
1167                PREEXEC_DICT[gl] = _pycode
1168
1169    pos_begin = 0
1170
1171    runned_campaigns = []
1172
1173    from scapy.main import _scapy_builtins
1174    scapy_ses = _scapy_builtins()
1175    import_UTscapy_tools(scapy_ses)
1176
1177    # Execute all files
1178    for TESTFILE in TESTFILES:
1179        if VERB > 2:
1180            print(theme.green(dash + " Loading: %s" % TESTFILE))
1181        PREEXEC = PREEXEC_DICT[TESTFILE] if TESTFILE in PREEXEC_DICT else GLOB_PREEXEC
1182        with open(TESTFILE) as testfile:
1183            output, result, campaign = execute_campaign(
1184                testfile, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS,
1185                FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER,
1186                autorun_func, theme,
1187                pos_begin=pos_begin,
1188                scapy_ses=copy.copy(scapy_ses)
1189            )
1190        runned_campaigns.append(campaign)
1191        pos_begin = campaign.end_pos
1192        if UNIQUE:
1193            glob_title = campaign.title
1194        glob_output += output
1195        if not result:
1196            glob_result = 1
1197            if BREAKFAILED:
1198                break
1199
1200    if VERB > 2:
1201        print(
1202            checkmark + " All campaigns executed. Writing output..."
1203        )
1204
1205    if ANNOTATIONS_MODE:
1206        collect_types.stop()
1207        collect_types.dump_stats("pyannotate_results")
1208
1209    # Concenate outputs
1210    if FORMAT == Format.HTML:
1211        glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title)
1212    if FORMAT == Format.LATEX:
1213        glob_output = pack_latex_campaigns(runned_campaigns, glob_output, LOCAL, glob_title)
1214
1215    # Write the final output
1216    # Note: on Python 2, we force-encode to ignore ascii errors
1217    # on Python 3, we need to detect the type of stream
1218    if OUTPUTFILE == sys.stdout:
1219        print(glob_output, file=OUTPUTFILE)
1220    else:
1221        with open(OUTPUTFILE, "wb") as f:
1222            f.write(glob_output.encode("utf8", "ignore")
1223                    if 'b' in f.mode else glob_output)
1224
1225    # Print end message
1226    if VERB > 2:
1227        if glob_result == 0:
1228            print(theme.green("UTscapy ended successfully"))
1229        else:
1230            print(theme.red("UTscapy ended with error code %s" % glob_result))
1231
1232    # Check active threads
1233    if VERB > 2:
1234        if threading.active_count() > 1:
1235            print("\nWARNING: UNFINISHED THREADS")
1236            print(threading.enumerate())
1237        import multiprocessing
1238        processes = multiprocessing.active_children()
1239        if processes:
1240            print("\nWARNING: UNFINISHED PROCESSES")
1241            print(processes)
1242
1243    sys.stdout.flush()
1244
1245    # Return state
1246    return glob_result
1247
1248
1249if __name__ == "__main__":
1250    if sys.warnoptions:
1251        with warnings.catch_warnings(record=True) as cw:
1252            warnings.resetwarnings()
1253            # Let's discover the garbage waste
1254            warnings.simplefilter('error')
1255            print("### Warning mode enabled ###")
1256            res = main()
1257            if cw:
1258                res = 1
1259        sys.exit(res)
1260    else:
1261        sys.exit(main())
1262