• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import json
5import multiprocessing
6import sys
7import tempfile
8
9from perf_insights import map_single_trace
10from perf_insights.mre import file_handle
11from perf_insights.mre import mre_result
12from perf_insights.mre import reduce_map_results
13from perf_insights.mre import threaded_work_queue
14from perf_insights.results import gtest_progress_reporter
15
16AUTO_JOB_COUNT = -1
17
18
19class MapError(Exception):
20
21  def __init__(self, *args):
22    super(MapError, self).__init__(*args)
23    self.canonical_url = None
24
25
26class MapRunner(object):
27  def __init__(self, trace_handles, job,
28               stop_on_error=False, progress_reporter=None,
29               jobs=AUTO_JOB_COUNT,
30               output_formatters=None):
31    self._job = job
32    self._stop_on_error = stop_on_error
33    self._failed_canonical_url_to_dump = None
34    if progress_reporter is None:
35      self._progress_reporter = gtest_progress_reporter.GTestProgressReporter(
36                                    sys.stdout)
37    else:
38      self._progress_reporter = progress_reporter
39    self._output_formatters = output_formatters or []
40
41    self._trace_handles = trace_handles
42    self._num_traces_merged_into_results = 0
43    self._map_results = None
44    self._map_results_file = None
45
46    if jobs == AUTO_JOB_COUNT:
47      jobs = multiprocessing.cpu_count()
48    self._wq = threaded_work_queue.ThreadedWorkQueue(num_threads=jobs)
49
50  def _ProcessOneTrace(self, trace_handle):
51    canonical_url = trace_handle.canonical_url
52    run_reporter = self._progress_reporter.WillRun(canonical_url)
53    result = map_single_trace.MapSingleTrace(
54        trace_handle,
55        self._job)
56
57    had_failure = len(result.failures) > 0
58
59    for f in result.failures:
60      run_reporter.DidAddFailure(f)
61    run_reporter.DidRun(had_failure)
62
63    self._wq.PostMainThreadTask(self._MergeResultIntoMaster, result)
64
65  def _MergeResultIntoMaster(self, result):
66    self._map_results.append(result)
67
68    had_failure = len(result.failures) > 0
69    if self._stop_on_error and had_failure:
70      err = MapError("Mapping error")
71      self._AbortMappingDueStopOnError(err)
72      raise err
73
74    self._num_traces_merged_into_results += 1
75    if self._num_traces_merged_into_results == len(self._trace_handles):
76      self._wq.PostMainThreadTask(self._AllMappingDone)
77
78  def _AbortMappingDueStopOnError(self, err):
79    self._wq.Stop(err)
80
81  def _AllMappingDone(self):
82    self._wq.Stop()
83
84  def RunMapper(self):
85    self._map_results = []
86
87    if not self._trace_handles:
88      err = MapError("No trace handles specified.")
89      raise err
90
91    if self._job.map_function_handle:
92      for trace_handle in self._trace_handles:
93        self._wq.PostAnyThreadTask(self._ProcessOneTrace, trace_handle)
94
95      self._wq.Run()
96
97    return self._map_results
98
99  def _Reduce(self, job_results, key, map_results_file_name):
100    reduce_map_results.ReduceMapResults(job_results, key,
101                                        map_results_file_name, self._job)
102
103  def RunReducer(self, reduce_handles_with_keys):
104    if self._job.reduce_function_handle:
105      self._wq.Reset()
106
107      job_results = mre_result.MreResult()
108
109      for cur in reduce_handles_with_keys:
110        handle = cur['handle']
111        for key in cur['keys']:
112          self._wq.PostAnyThreadTask(
113              self._Reduce, job_results, key, handle)
114
115      def _Stop():
116        self._wq.Stop()
117
118      self._wq.PostAnyThreadTask(_Stop)
119      self._wq.Run()
120
121      return job_results
122    return None
123
124  def _ConvertResultsToFileHandlesAndKeys(self, results_list):
125    handles_and_keys = []
126    for current_result in results_list:
127      _, path = tempfile.mkstemp()
128      with open(path, 'w') as results_file:
129        json.dump(current_result.AsDict(), results_file)
130      rh = file_handle.URLFileHandle(path, 'file://' + path)
131      handles_and_keys.append(
132          {'handle': rh, 'keys': current_result.pairs.keys()})
133    return handles_and_keys
134
135  def Run(self):
136    mapper_results = self.RunMapper()
137    reduce_handles = self._ConvertResultsToFileHandlesAndKeys(mapper_results)
138    reducer_results = self.RunReducer(reduce_handles)
139
140    if reducer_results:
141      results = [reducer_results]
142    else:
143      results = mapper_results
144
145    for of in self._output_formatters:
146      of.Format(results)
147
148    return results
149