• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# mypy: disallow-untyped-defs
2
3import functools
4import logging
5import os
6import re
7import subprocess
8import time
9from threading import Lock
10from typing import Any, List, Optional, Sequence
11
12
13logger = logging.getLogger("strobelight_function_profiler")
14
15console_handler = logging.StreamHandler()
16formatter = logging.Formatter(
17    "%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"
18)
19console_handler.setFormatter(formatter)
20
21logger.addHandler(console_handler)
22logger.setLevel(logging.INFO)
23logger.propagate = False
24
25
26class StrobelightCLIProfilerError(Exception):
27    """
28    Raised when an error happens during strobelight profiling
29    """
30
31
32def _pid_namespace_link(pid: Optional[int] = None) -> str:
33    """Returns the link to the process's namespace, example: pid:[4026531836]"""
34    PID_NAMESPACE_PATH = "/proc/{}/ns/pid"
35    pid = pid or os.getpid()
36    return os.readlink(PID_NAMESPACE_PATH.format(pid))
37
38
39def _pid_namespace(pid: Optional[int] = None) -> int:
40    """Returns the process's namespace id"""
41    pid = pid or os.getpid()
42    link = _pid_namespace_link(pid)
43    return int(link[link.find("[") + 1 : -1])
44
45
46def _command_to_string(command: Sequence[str]) -> str:
47    return " ".join(command)
48
49
50class StrobelightCLIFunctionProfiler:
51    """
52    Note: this is a meta only tool.
53
54    StrobelightCLIFunctionProfiler can be used to profile a python function and
55    generate a strobelight link with the results. It works on meta servers but
56    does not requries an fbcode target.
57    When stop_at_error is false(default), error during profiling does not prevent
58    the work function from running.
59
60    Check function_profiler_example.py for an example.
61    """
62
63    # This lock is used to make sure only one thread is running the profiler at any point.
64    _lock = Lock()
65
66    def __init__(
67        self,
68        *,
69        stop_at_error: bool = False,
70        max_profile_duration_sec: int = 60 * 10,
71        sample_each: float = 1e7,  # sample each sample_each cycles.
72        run_user_name: str = "pytorch-strobelight-ondemand",
73        timeout_wait_for_running_sec: int = 60,
74        timeout_wait_for_finished_sec: int = 60,
75        recorded_env_variables: Optional[List[str]] = None,
76        sample_tags: Optional[List[str]] = None,
77        stack_max_len: int = 127,
78        async_stack_max_len: int = 127,
79    ):
80        self.stop_at_error = stop_at_error
81        self.max_profile_duration_sec = max_profile_duration_sec
82        self.sample_each = sample_each
83        self.run_user_name = run_user_name
84        self.timeout_wait_for_running_sec = timeout_wait_for_running_sec
85        self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec
86        # Results of the most recent run.
87        # Tracks the strobelight run id of the most recent run
88        self.current_run_id: Optional[int] = None
89        self.sample_tags = sample_tags
90
91    def _run_async(self) -> None:
92        processId = os.getpid()
93        namespace = _pid_namespace(processId)
94        command = [
95            "strobeclient",
96            "run",
97            "--profiler",
98            "pyperf",
99            "--event",
100            "cycles",
101            "--async",
102            "--sample-interval",
103            f"{int(self.sample_each)}",
104            "--duration-ms",
105            f"{int(self.max_profile_duration_sec * 1000)}",
106            "--pid",
107            f"{namespace}:{processId}",
108        ]
109
110        if self.sample_tags:
111            command.append("--sample-tags")
112            command.append(",".join(self.sample_tags))
113
114        logger.debug("running command: %s", _command_to_string(command))
115        result = subprocess.run(command, capture_output=True)
116        output = result.stderr.decode("utf-8")
117        logger.debug("output:\n{%s}", output)
118
119        if result.returncode != 0:
120            raise StrobelightCLIProfilerError(
121                f"failed to start strobelight profiling, error in run_async:{output}"
122            )
123
124        if match := re.search(r"INFO Run Id: (-?\d+)", output):
125            self.current_run_id = int(match.group(1))
126            return
127
128        raise StrobelightCLIProfilerError(
129            f"failed to start strobelight profiling, unexpected result {output}"
130        )
131
132    def _wait_for_running(self, counter: int = 0) -> None:
133        if counter > 20:
134            raise StrobelightCLIProfilerError(
135                "wait_for_running called more than 20 times"
136            )
137
138        command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]
139        logger.debug("running command: %s", _command_to_string(command))
140        result = subprocess.run(command, capture_output=True)
141        output = result.stderr.decode("utf-8")
142        logger.debug("output:\n{%s}", output)
143
144        if result.returncode != 0:
145            raise StrobelightCLIProfilerError(
146                f"failed to start strobelight profiling, error in wait_for_running:{output}"
147            )
148
149        if match := re.search("Profile run status: (.*)", output):
150            current_status = match.group(1)
151            if current_status == "RUNNING":
152                return
153            elif current_status == "PREPARING":
154                time.sleep(10)
155                self._wait_for_running(counter + 1)
156                return
157            else:
158                raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")
159
160        raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
161
162    def _stop_run(self) -> None:
163        command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]
164        logger.debug("running command: %s", _command_to_string(command))
165        result = subprocess.run(command, capture_output=True)
166        output = result.stderr.decode("utf-8")
167        logger.debug("output:\n{%s}", output)
168
169        if result.returncode != 0:
170            raise StrobelightCLIProfilerError(
171                f"failed to stop strobelight profiling, return code is not 0 :{output}"
172            )
173
174        if match := re.search("INFO ::1:(.*)", output):
175            current_status = match.group(1)
176            if current_status.__contains__("Success!"):
177                return
178            else:
179                raise StrobelightCLIProfilerError(
180                    f"failed to stop strobelight profiling, got {current_status} result"
181                )
182
183        raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
184
185    def _get_results(self) -> None:
186        command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]
187        logger.debug("running command: %s", _command_to_string(command))
188        result = subprocess.run(command, capture_output=True)
189        output = result.stderr.decode("utf-8")
190        logger.debug("output:\n{%s}", output)
191
192        if result.returncode != 0:
193            raise StrobelightCLIProfilerError(
194                f"failed to extract profiling results, return code is not 0 : {output}"
195            )
196
197        if match := re.search("INFO ::1:(.*)", output):
198            current_status = match.group(1)
199            if current_status.__contains__("Profile run status: PROCESSING"):
200                time.sleep(10)
201                self._get_results()
202                return
203            elif not current_status.__contains__("Profile run finished with SUCCESS"):
204                raise StrobelightCLIProfilerError(
205                    f"failed to extract profiling results, unexpected response {output}"
206                )
207
208        for item in re.findall(
209            r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",
210            output,
211        ):
212            logger.info(item[0])
213
214    def _stop_strobelight_no_throw(
215        self,
216        collect_results: bool,
217    ) -> None:
218        try:
219            # call stop run
220            self._stop_run()
221            logger.info("strobelight profiling stopped")
222
223            logger.debug("collection stopped")
224
225            if not collect_results:
226                return
227
228            self._get_results()
229        except Exception as error:
230            logger.warning("error during stop_strobelight", exc_info=True)
231
232    # Return true if strobelight started and is running. Never throw.
233    def _start_strobelight(self) -> bool:
234        strobelight_started = False
235        try:
236            self._run_async()
237            strobelight_started = True
238            logger.info("strobelight run id is: %s", self.current_run_id)
239            self._wait_for_running()
240            logger.info("strobelight profiling running")
241            return True
242
243        except Exception as error:
244            logger.warning("error during start_strobelight:", exc_info=True)
245            if strobelight_started:
246                self._stop_strobelight_no_throw(collect_results=False)
247            return False
248
249    def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:
250        self.current_run_id = None
251
252        if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):
253            if not locked:
254                if self.stop_at_error:
255                    raise StrobelightCLIProfilerError("concurrent runs not supported")
256
257                logger.warning("concurrent runs not supported")
258                return work_function(*args, **kwargs)
259
260            started = self._start_strobelight()
261            if not started:
262                if self.stop_at_error:
263                    StrobelightCLIFunctionProfiler._lock.release()
264                    raise StrobelightCLIProfilerError(
265                        "failed to start strobelight profiling"
266                    )
267                result = work_function(*args, **kwargs)
268                StrobelightCLIFunctionProfiler._lock.release()
269                return result
270
271            try:
272                logger.debug("collection started")
273                result = work_function(*args, **kwargs)
274                self._stop_strobelight_no_throw(collect_results=True)
275                StrobelightCLIFunctionProfiler._lock.release()
276                return result
277            except Exception as error:
278                logger.warning("work function throw exception", exc_info=True)
279                self._stop_strobelight_no_throw(collect_results=False)
280                StrobelightCLIFunctionProfiler._lock.release()
281                raise error
282
283
284# A function decorator that wraps profile, if no profiler is provided one with
285# default args is created. A function can be annotated as:
286# @strobelight()
287# @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
288# @strobelight(stop_at_error=True,...)
289def strobelight(
290    profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any
291) -> Any:
292    if not profiler:
293        profiler = StrobelightCLIFunctionProfiler(**kwargs)
294
295    def strobelight_inner(work_function: Any) -> Any:
296        @functools.wraps(work_function)
297        def wrapper_function(*args: Any, **kwargs: Any) -> Any:
298            return profiler.profile(work_function, *args, **kwargs)
299
300        return wrapper_function
301
302    return strobelight_inner
303