1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import json 5import multiprocessing 6import sys 7import tempfile 8 9from perf_insights import map_single_trace 10from perf_insights.mre import file_handle 11from perf_insights.mre import mre_result 12from perf_insights.mre import reduce_map_results 13from perf_insights.mre import threaded_work_queue 14from perf_insights.results import gtest_progress_reporter 15 16AUTO_JOB_COUNT = -1 17 18 19class MapError(Exception): 20 21 def __init__(self, *args): 22 super(MapError, self).__init__(*args) 23 self.canonical_url = None 24 25 26class MapRunner(object): 27 def __init__(self, trace_handles, job, 28 stop_on_error=False, progress_reporter=None, 29 jobs=AUTO_JOB_COUNT, 30 output_formatters=None): 31 self._job = job 32 self._stop_on_error = stop_on_error 33 self._failed_canonical_url_to_dump = None 34 if progress_reporter is None: 35 self._progress_reporter = gtest_progress_reporter.GTestProgressReporter( 36 sys.stdout) 37 else: 38 self._progress_reporter = progress_reporter 39 self._output_formatters = output_formatters or [] 40 41 self._trace_handles = trace_handles 42 self._num_traces_merged_into_results = 0 43 self._map_results = None 44 self._map_results_file = None 45 46 if jobs == AUTO_JOB_COUNT: 47 jobs = multiprocessing.cpu_count() 48 self._wq = threaded_work_queue.ThreadedWorkQueue(num_threads=jobs) 49 50 def _ProcessOneTrace(self, trace_handle): 51 canonical_url = trace_handle.canonical_url 52 run_reporter = self._progress_reporter.WillRun(canonical_url) 53 result = map_single_trace.MapSingleTrace( 54 trace_handle, 55 self._job) 56 57 had_failure = len(result.failures) > 0 58 59 for f in result.failures: 60 run_reporter.DidAddFailure(f) 61 run_reporter.DidRun(had_failure) 62 63 self._wq.PostMainThreadTask(self._MergeResultIntoMaster, result) 64 65 def _MergeResultIntoMaster(self, result): 66 self._map_results.append(result) 67 68 had_failure = len(result.failures) > 0 69 if self._stop_on_error and had_failure: 70 err = MapError("Mapping error") 71 self._AbortMappingDueStopOnError(err) 72 raise err 73 74 self._num_traces_merged_into_results += 1 75 if self._num_traces_merged_into_results == len(self._trace_handles): 76 self._wq.PostMainThreadTask(self._AllMappingDone) 77 78 def _AbortMappingDueStopOnError(self, err): 79 self._wq.Stop(err) 80 81 def _AllMappingDone(self): 82 self._wq.Stop() 83 84 def RunMapper(self): 85 self._map_results = [] 86 87 if not self._trace_handles: 88 err = MapError("No trace handles specified.") 89 raise err 90 91 if self._job.map_function_handle: 92 for trace_handle in self._trace_handles: 93 self._wq.PostAnyThreadTask(self._ProcessOneTrace, trace_handle) 94 95 self._wq.Run() 96 97 return self._map_results 98 99 def _Reduce(self, job_results, key, map_results_file_name): 100 reduce_map_results.ReduceMapResults(job_results, key, 101 map_results_file_name, self._job) 102 103 def RunReducer(self, reduce_handles_with_keys): 104 if self._job.reduce_function_handle: 105 self._wq.Reset() 106 107 job_results = mre_result.MreResult() 108 109 for cur in reduce_handles_with_keys: 110 handle = cur['handle'] 111 for key in cur['keys']: 112 self._wq.PostAnyThreadTask( 113 self._Reduce, job_results, key, handle) 114 115 def _Stop(): 116 self._wq.Stop() 117 118 self._wq.PostAnyThreadTask(_Stop) 119 self._wq.Run() 120 121 return job_results 122 return None 123 124 def _ConvertResultsToFileHandlesAndKeys(self, results_list): 125 handles_and_keys = [] 126 for current_result in results_list: 127 _, path = tempfile.mkstemp() 128 with open(path, 'w') as results_file: 129 json.dump(current_result.AsDict(), results_file) 130 rh = file_handle.URLFileHandle(path, 'file://' + path) 131 handles_and_keys.append( 132 {'handle': rh, 'keys': current_result.pairs.keys()}) 133 return handles_and_keys 134 135 def Run(self): 136 mapper_results = self.RunMapper() 137 reduce_handles = self._ConvertResultsToFileHandlesAndKeys(mapper_results) 138 reducer_results = self.RunReducer(reduce_handles) 139 140 if reducer_results: 141 results = [reducer_results] 142 else: 143 results = mapper_results 144 145 for of in self._output_formatters: 146 of.Format(results) 147 148 return results 149