• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Import tests
2~ not_pypy
3
4+ Import tests
5~ imports
6
7= Prepare importing all scapy files
8
9import os
10import glob
11import subprocess
12import re
13import time
14import sys
15from scapy.consts import WINDOWS, OPENBSD
16
17# DEV: to add your file to this list, make sure you have
18# a GREAT reason.
19EXCEPTIONS = [
20    "scapy.__main__",
21    "scapy.all",
22    "scapy.contrib.automotive*",
23    "scapy.contrib.cansocket*",
24    "scapy.contrib.isotp*",
25    "scapy.contrib.scada*",
26    "scapy.layers.all",
27    "scapy.main",
28]
29
30if WINDOWS:
31    EXCEPTIONS.append("scapy.layers.tuntap")
32
33EXCEPTION_PACKAGES = [
34    "arch",
35    "libs",
36    "modules",
37    "tools",
38]
39
40ALL_FILES = [
41    "scapy." + re.match(".*scapy\\" + os.path.sep + "(.*)\\.py$", x).group(1).replace(os.path.sep, ".")
42    for x in glob.iglob(scapy_path('/scapy/**/*.py'), recursive=True)
43]
44ALL_FILES = [
45    x for x in ALL_FILES if
46    not any(x == y if y[-1] != "*" else x.startswith(y[:-1]) for y in EXCEPTIONS) and
47    x.split(".")[1] not in EXCEPTION_PACKAGES
48]
49
50NB_PROC = 1 if WINDOWS or OPENBSD else 4
51
52def append_processes(processes, filename):
53    processes.append(
54        (subprocess.Popen(
55            [sys.executable, "-c", "import %s" % filename],
56            stderr=subprocess.PIPE, encoding="utf8"),
57         time.time(),
58         filename))
59
60def check_processes(processes):
61    for i, tup in enumerate(processes):
62        proc, start_ts, file = tup
63        errs = ""
64        try:
65            _, errs = proc.communicate(timeout=0.5)
66        except subprocess.TimeoutExpired:
67            if time.time() - start_ts > 30:
68                proc.kill()
69                errs = "Timed out (>30s)!"
70        if proc.returncode is None:
71            continue
72        else:
73            print("Finished %s with %d after %f sec" %
74                  (file, proc.returncode, time.time() - start_ts))
75            if proc.returncode != 0:
76                for p in processes:
77                    p[0].kill()
78                raise Exception(
79                    "Importing the file '%s' failed !\\n%s" % (file, errs))
80            del processes[i]
81            return
82
83
84def import_all(FILES):
85    processes = list()
86    while len(processes) == NB_PROC:
87        check_processes(processes)
88    for filename in FILES:
89        check_processes(processes)
90        if len(processes) < NB_PROC:
91            append_processes(processes, filename)
92
93
94= Try importing all core separately
95
96import_all(x for x in ALL_FILES if "layers" not in x and "contrib" not in x)
97
98= Try importing all layers separately
99
100import_all(x for x in ALL_FILES if "layers" in x)
101
102= Try importing all contribs separately
103
104import_all(x for x in ALL_FILES if "contrib" in x)
105