1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import multiprocessing 5import sys 6 7from tracing.mre import map_single_trace 8from tracing.mre import threaded_work_queue 9from tracing.mre import gtest_progress_reporter 10 11AUTO_JOB_COUNT = -1 12 13 14class MapError(Exception): 15 16 def __init__(self, *args): 17 super(MapError, self).__init__(*args) 18 self.canonical_url = None 19 20 21class MapRunner(object): 22 def __init__(self, trace_handles, job, 23 stop_on_error=False, progress_reporter=None, 24 jobs=AUTO_JOB_COUNT, 25 output_formatters=None, 26 extra_import_options=None): 27 self._job = job 28 self._stop_on_error = stop_on_error 29 self._failed_canonical_url_to_dump = None 30 if progress_reporter is None: 31 self._progress_reporter = gtest_progress_reporter.GTestProgressReporter( 32 sys.stdout) 33 else: 34 self._progress_reporter = progress_reporter 35 self._output_formatters = output_formatters or [] 36 self._extra_import_options = extra_import_options 37 38 self._trace_handles = trace_handles 39 self._num_traces_merged_into_results = 0 40 self._map_results = None 41 self._map_results_file = None 42 43 if jobs == AUTO_JOB_COUNT: 44 jobs = multiprocessing.cpu_count() 45 self._wq = threaded_work_queue.ThreadedWorkQueue(num_threads=jobs) 46 47 def _ProcessOneTrace(self, trace_handle): 48 canonical_url = trace_handle.canonical_url 49 run_reporter = self._progress_reporter.WillRun(canonical_url) 50 result = map_single_trace.MapSingleTrace( 51 trace_handle, 52 self._job, 53 extra_import_options=self._extra_import_options) 54 55 had_failure = len(result.failures) > 0 56 57 for f in result.failures: 58 run_reporter.DidAddFailure(f) 59 run_reporter.DidRun(had_failure) 60 61 self._wq.PostMainThreadTask( 62 self._MergeResultIntoMaster, result, trace_handle) 63 64 def _MergeResultIntoMaster(self, result, trace_handle): 65 self._map_results[trace_handle.canonical_url] = result 66 67 had_failure = len(result.failures) > 0 68 if self._stop_on_error and had_failure: 69 err = MapError("Mapping error") 70 self._AbortMappingDueStopOnError(err) 71 raise err 72 73 self._num_traces_merged_into_results += 1 74 if self._num_traces_merged_into_results == len(self._trace_handles): 75 self._wq.PostMainThreadTask(self._AllMappingDone) 76 77 def _AbortMappingDueStopOnError(self, err): 78 self._wq.Stop(err) 79 80 def _AllMappingDone(self): 81 self._wq.Stop() 82 83 def RunMapper(self): 84 self._map_results = {} 85 86 if not self._trace_handles: 87 err = MapError("No trace handles specified.") 88 raise err 89 90 if self._job.map_function_handle: 91 for trace_handle in self._trace_handles: 92 self._wq.PostAnyThreadTask(self._ProcessOneTrace, trace_handle) 93 94 self._wq.Run() 95 96 return self._map_results 97 98 def Run(self): 99 results_by_trace = self.RunMapper() 100 results = results_by_trace.values() 101 102 for of in self._output_formatters: 103 of.Format(results) 104 105 return results 106