• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import sys
3import time
4
5from subprocess import CalledProcessError, check_call
6from typing import List, IO, Optional, Tuple
7
8
9def which(command: str, paths: Optional[str] = None) -> Optional[str]:
10    """which(command, [paths]) - Look up the given command in the paths string
11    (or the PATH environment variable, if unspecified)."""
12
13    if paths is None:
14        paths = os.environ.get('PATH', '')
15
16    # Check for absolute match first.
17    if os.path.exists(command):
18        return command
19
20    # Would be nice if Python had a lib function for this.
21    if not paths:
22        paths = os.defpath
23
24    # Get suffixes to search.
25    # On Cygwin, 'PATHEXT' may exist but it should not be used.
26    if os.pathsep == ';':
27        pathext = os.environ.get('PATHEXT', '').split(';')
28    else:
29        pathext = ['']
30
31    # Search the paths...
32    for path in paths.split(os.pathsep):
33        for ext in pathext:
34            p = os.path.join(path, command + ext)
35            if os.path.exists(p):
36                return p
37
38    return None
39
40
41def has_no_extension(file_name: str) -> bool:
42    root, ext = os.path.splitext(file_name)
43    return ext == ""
44
45
46def is_valid_single_input_file(file_name: str) -> bool:
47    root, ext = os.path.splitext(file_name)
48    return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
49
50
51def time_to_str(time: float) -> str:
52    """
53    Convert given time in seconds into a human-readable string.
54    """
55    return f"{time:.2f}s"
56
57
58def memory_to_str(memory: int) -> str:
59    """
60    Convert given number of bytes into a human-readable string.
61    """
62    if memory:
63        try:
64            import humanize
65            return humanize.naturalsize(memory, gnu=True)
66        except ImportError:
67            # no formatter installed, let's keep it in bytes
68            return f"{memory}B"
69
70    # If memory is 0, we didn't succeed measuring it.
71    return "N/A"
72
73
74def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
75    """
76    Run command with arguments.  Wait for command to complete and measure
77    execution time and peak memory consumption.
78    If the exit code was zero then return, otherwise raise
79    CalledProcessError.  The CalledProcessError object will have the
80    return code in the returncode attribute.
81
82    The arguments are the same as for the call and check_call functions.
83
84    Return a tuple of execution time and peak memory.
85    """
86    peak_mem = 0
87    start_time = time.time()
88
89    try:
90        import psutil as ps
91
92        def get_memory(process: ps.Process) -> int:
93            mem = 0
94
95            # we want to gather memory usage from all of the child processes
96            descendants = list(process.children(recursive=True))
97            descendants.append(process)
98
99            for subprocess in descendants:
100                try:
101                    mem += subprocess.memory_info().rss
102                except (ps.NoSuchProcess, ps.AccessDenied):
103                    continue
104
105            return mem
106
107        with ps.Popen(*popenargs, **kwargs) as process:
108            # while the process is running calculate resource utilization.
109            while (process.is_running() and
110                   process.status() != ps.STATUS_ZOMBIE):
111                # track the peak utilization of the process
112                peak_mem = max(peak_mem, get_memory(process))
113                time.sleep(.5)
114
115            if process.is_running():
116                process.kill()
117
118        if process.returncode != 0:
119            cmd = kwargs.get("args")
120            if cmd is None:
121                cmd = popenargs[0]
122            raise CalledProcessError(process.returncode, cmd)
123
124    except ImportError:
125        # back off to subprocess if we don't have psutil installed
126        peak_mem = 0
127        check_call(*popenargs, **kwargs)
128
129    return time.time() - start_time, peak_mem
130
131
132def run_script(script_path: str, build_log_file: IO, cwd: str,
133               out=sys.stdout, err=sys.stderr, verbose: int = 0):
134    """
135    Run the provided script if it exists.
136    """
137    if os.path.exists(script_path):
138        try:
139            if verbose == 1:
140                out.write(f"  Executing: {script_path}\n")
141
142            check_call(f"chmod +x '{script_path}'", cwd=cwd,
143                       stderr=build_log_file,
144                       stdout=build_log_file,
145                       shell=True)
146
147            check_call(f"'{script_path}'", cwd=cwd,
148                       stderr=build_log_file,
149                       stdout=build_log_file,
150                       shell=True)
151
152        except CalledProcessError:
153            err.write(f"Error: Running {script_path} failed. "
154                      f"See {build_log_file.name} for details.\n")
155            sys.exit(-1)
156
157
158def is_comment_csv_line(entries: List[str]) -> bool:
159    """
160    Treat CSV lines starting with a '#' as a comment.
161    """
162    return len(entries) > 0 and entries[0].startswith("#")
163