1# mypy: disallow-untyped-defs 2 3import functools 4import logging 5import os 6import re 7import subprocess 8import time 9from threading import Lock 10from typing import Any, List, Optional, Sequence 11 12 13logger = logging.getLogger("strobelight_function_profiler") 14 15console_handler = logging.StreamHandler() 16formatter = logging.Formatter( 17 "%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s" 18) 19console_handler.setFormatter(formatter) 20 21logger.addHandler(console_handler) 22logger.setLevel(logging.INFO) 23logger.propagate = False 24 25 26class StrobelightCLIProfilerError(Exception): 27 """ 28 Raised when an error happens during strobelight profiling 29 """ 30 31 32def _pid_namespace_link(pid: Optional[int] = None) -> str: 33 """Returns the link to the process's namespace, example: pid:[4026531836]""" 34 PID_NAMESPACE_PATH = "/proc/{}/ns/pid" 35 pid = pid or os.getpid() 36 return os.readlink(PID_NAMESPACE_PATH.format(pid)) 37 38 39def _pid_namespace(pid: Optional[int] = None) -> int: 40 """Returns the process's namespace id""" 41 pid = pid or os.getpid() 42 link = _pid_namespace_link(pid) 43 return int(link[link.find("[") + 1 : -1]) 44 45 46def _command_to_string(command: Sequence[str]) -> str: 47 return " ".join(command) 48 49 50class StrobelightCLIFunctionProfiler: 51 """ 52 Note: this is a meta only tool. 53 54 StrobelightCLIFunctionProfiler can be used to profile a python function and 55 generate a strobelight link with the results. It works on meta servers but 56 does not requries an fbcode target. 57 When stop_at_error is false(default), error during profiling does not prevent 58 the work function from running. 59 60 Check function_profiler_example.py for an example. 61 """ 62 63 # This lock is used to make sure only one thread is running the profiler at any point. 64 _lock = Lock() 65 66 def __init__( 67 self, 68 *, 69 stop_at_error: bool = False, 70 max_profile_duration_sec: int = 60 * 10, 71 sample_each: float = 1e7, # sample each sample_each cycles. 72 run_user_name: str = "pytorch-strobelight-ondemand", 73 timeout_wait_for_running_sec: int = 60, 74 timeout_wait_for_finished_sec: int = 60, 75 recorded_env_variables: Optional[List[str]] = None, 76 sample_tags: Optional[List[str]] = None, 77 stack_max_len: int = 127, 78 async_stack_max_len: int = 127, 79 ): 80 self.stop_at_error = stop_at_error 81 self.max_profile_duration_sec = max_profile_duration_sec 82 self.sample_each = sample_each 83 self.run_user_name = run_user_name 84 self.timeout_wait_for_running_sec = timeout_wait_for_running_sec 85 self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec 86 # Results of the most recent run. 87 # Tracks the strobelight run id of the most recent run 88 self.current_run_id: Optional[int] = None 89 self.sample_tags = sample_tags 90 91 def _run_async(self) -> None: 92 processId = os.getpid() 93 namespace = _pid_namespace(processId) 94 command = [ 95 "strobeclient", 96 "run", 97 "--profiler", 98 "pyperf", 99 "--event", 100 "cycles", 101 "--async", 102 "--sample-interval", 103 f"{int(self.sample_each)}", 104 "--duration-ms", 105 f"{int(self.max_profile_duration_sec * 1000)}", 106 "--pid", 107 f"{namespace}:{processId}", 108 ] 109 110 if self.sample_tags: 111 command.append("--sample-tags") 112 command.append(",".join(self.sample_tags)) 113 114 logger.debug("running command: %s", _command_to_string(command)) 115 result = subprocess.run(command, capture_output=True) 116 output = result.stderr.decode("utf-8") 117 logger.debug("output:\n{%s}", output) 118 119 if result.returncode != 0: 120 raise StrobelightCLIProfilerError( 121 f"failed to start strobelight profiling, error in run_async:{output}" 122 ) 123 124 if match := re.search(r"INFO Run Id: (-?\d+)", output): 125 self.current_run_id = int(match.group(1)) 126 return 127 128 raise StrobelightCLIProfilerError( 129 f"failed to start strobelight profiling, unexpected result {output}" 130 ) 131 132 def _wait_for_running(self, counter: int = 0) -> None: 133 if counter > 20: 134 raise StrobelightCLIProfilerError( 135 "wait_for_running called more than 20 times" 136 ) 137 138 command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"] 139 logger.debug("running command: %s", _command_to_string(command)) 140 result = subprocess.run(command, capture_output=True) 141 output = result.stderr.decode("utf-8") 142 logger.debug("output:\n{%s}", output) 143 144 if result.returncode != 0: 145 raise StrobelightCLIProfilerError( 146 f"failed to start strobelight profiling, error in wait_for_running:{output}" 147 ) 148 149 if match := re.search("Profile run status: (.*)", output): 150 current_status = match.group(1) 151 if current_status == "RUNNING": 152 return 153 elif current_status == "PREPARING": 154 time.sleep(10) 155 self._wait_for_running(counter + 1) 156 return 157 else: 158 raise StrobelightCLIProfilerError(f"unexpected {current_status} phase") 159 160 raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ") 161 162 def _stop_run(self) -> None: 163 command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)] 164 logger.debug("running command: %s", _command_to_string(command)) 165 result = subprocess.run(command, capture_output=True) 166 output = result.stderr.decode("utf-8") 167 logger.debug("output:\n{%s}", output) 168 169 if result.returncode != 0: 170 raise StrobelightCLIProfilerError( 171 f"failed to stop strobelight profiling, return code is not 0 :{output}" 172 ) 173 174 if match := re.search("INFO ::1:(.*)", output): 175 current_status = match.group(1) 176 if current_status.__contains__("Success!"): 177 return 178 else: 179 raise StrobelightCLIProfilerError( 180 f"failed to stop strobelight profiling, got {current_status} result" 181 ) 182 183 raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ") 184 185 def _get_results(self) -> None: 186 command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)] 187 logger.debug("running command: %s", _command_to_string(command)) 188 result = subprocess.run(command, capture_output=True) 189 output = result.stderr.decode("utf-8") 190 logger.debug("output:\n{%s}", output) 191 192 if result.returncode != 0: 193 raise StrobelightCLIProfilerError( 194 f"failed to extract profiling results, return code is not 0 : {output}" 195 ) 196 197 if match := re.search("INFO ::1:(.*)", output): 198 current_status = match.group(1) 199 if current_status.__contains__("Profile run status: PROCESSING"): 200 time.sleep(10) 201 self._get_results() 202 return 203 elif not current_status.__contains__("Profile run finished with SUCCESS"): 204 raise StrobelightCLIProfilerError( 205 f"failed to extract profiling results, unexpected response {output}" 206 ) 207 208 for item in re.findall( 209 r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))", 210 output, 211 ): 212 logger.info(item[0]) 213 214 def _stop_strobelight_no_throw( 215 self, 216 collect_results: bool, 217 ) -> None: 218 try: 219 # call stop run 220 self._stop_run() 221 logger.info("strobelight profiling stopped") 222 223 logger.debug("collection stopped") 224 225 if not collect_results: 226 return 227 228 self._get_results() 229 except Exception as error: 230 logger.warning("error during stop_strobelight", exc_info=True) 231 232 # Return true if strobelight started and is running. Never throw. 233 def _start_strobelight(self) -> bool: 234 strobelight_started = False 235 try: 236 self._run_async() 237 strobelight_started = True 238 logger.info("strobelight run id is: %s", self.current_run_id) 239 self._wait_for_running() 240 logger.info("strobelight profiling running") 241 return True 242 243 except Exception as error: 244 logger.warning("error during start_strobelight:", exc_info=True) 245 if strobelight_started: 246 self._stop_strobelight_no_throw(collect_results=False) 247 return False 248 249 def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any: 250 self.current_run_id = None 251 252 if locked := StrobelightCLIFunctionProfiler._lock.acquire(False): 253 if not locked: 254 if self.stop_at_error: 255 raise StrobelightCLIProfilerError("concurrent runs not supported") 256 257 logger.warning("concurrent runs not supported") 258 return work_function(*args, **kwargs) 259 260 started = self._start_strobelight() 261 if not started: 262 if self.stop_at_error: 263 StrobelightCLIFunctionProfiler._lock.release() 264 raise StrobelightCLIProfilerError( 265 "failed to start strobelight profiling" 266 ) 267 result = work_function(*args, **kwargs) 268 StrobelightCLIFunctionProfiler._lock.release() 269 return result 270 271 try: 272 logger.debug("collection started") 273 result = work_function(*args, **kwargs) 274 self._stop_strobelight_no_throw(collect_results=True) 275 StrobelightCLIFunctionProfiler._lock.release() 276 return result 277 except Exception as error: 278 logger.warning("work function throw exception", exc_info=True) 279 self._stop_strobelight_no_throw(collect_results=False) 280 StrobelightCLIFunctionProfiler._lock.release() 281 raise error 282 283 284# A function decorator that wraps profile, if no profiler is provided one with 285# default args is created. A function can be annotated as: 286# @strobelight() 287# @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..)) 288# @strobelight(stop_at_error=True,...) 289def strobelight( 290 profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any 291) -> Any: 292 if not profiler: 293 profiler = StrobelightCLIFunctionProfiler(**kwargs) 294 295 def strobelight_inner(work_function: Any) -> Any: 296 @functools.wraps(work_function) 297 def wrapper_function(*args: Any, **kwargs: Any) -> Any: 298 return profiler.profile(work_function, *args, **kwargs) 299 300 return wrapper_function 301 302 return strobelight_inner 303