• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import multiprocessing
5import sys
6
7from tracing.mre import map_single_trace
8from tracing.mre import threaded_work_queue
9from tracing.mre import gtest_progress_reporter
10
11AUTO_JOB_COUNT = -1
12
13
14class MapError(Exception):
15
16  def __init__(self, *args):
17    super(MapError, self).__init__(*args)
18    self.canonical_url = None
19
20
21class MapRunner(object):
22  def __init__(self, trace_handles, job,
23               stop_on_error=False, progress_reporter=None,
24               jobs=AUTO_JOB_COUNT,
25               output_formatters=None,
26               extra_import_options=None):
27    self._job = job
28    self._stop_on_error = stop_on_error
29    self._failed_canonical_url_to_dump = None
30    if progress_reporter is None:
31      self._progress_reporter = gtest_progress_reporter.GTestProgressReporter(
32          sys.stdout)
33    else:
34      self._progress_reporter = progress_reporter
35    self._output_formatters = output_formatters or []
36    self._extra_import_options = extra_import_options
37
38    self._trace_handles = trace_handles
39    self._num_traces_merged_into_results = 0
40    self._map_results = None
41    self._map_results_file = None
42
43    if jobs == AUTO_JOB_COUNT:
44      jobs = multiprocessing.cpu_count()
45    self._wq = threaded_work_queue.ThreadedWorkQueue(num_threads=jobs)
46
47  def _ProcessOneTrace(self, trace_handle):
48    canonical_url = trace_handle.canonical_url
49    run_reporter = self._progress_reporter.WillRun(canonical_url)
50    result = map_single_trace.MapSingleTrace(
51        trace_handle,
52        self._job,
53        extra_import_options=self._extra_import_options)
54
55    had_failure = len(result.failures) > 0
56
57    for f in result.failures:
58      run_reporter.DidAddFailure(f)
59    run_reporter.DidRun(had_failure)
60
61    self._wq.PostMainThreadTask(
62        self._MergeResultIntoMaster, result, trace_handle)
63
64  def _MergeResultIntoMaster(self, result, trace_handle):
65    self._map_results[trace_handle.canonical_url] = result
66
67    had_failure = len(result.failures) > 0
68    if self._stop_on_error and had_failure:
69      err = MapError("Mapping error")
70      self._AbortMappingDueStopOnError(err)
71      raise err
72
73    self._num_traces_merged_into_results += 1
74    if self._num_traces_merged_into_results == len(self._trace_handles):
75      self._wq.PostMainThreadTask(self._AllMappingDone)
76
77  def _AbortMappingDueStopOnError(self, err):
78    self._wq.Stop(err)
79
80  def _AllMappingDone(self):
81    self._wq.Stop()
82
83  def RunMapper(self):
84    self._map_results = {}
85
86    if not self._trace_handles:
87      err = MapError("No trace handles specified.")
88      raise err
89
90    if self._job.map_function_handle:
91      for trace_handle in self._trace_handles:
92        self._wq.PostAnyThreadTask(self._ProcessOneTrace, trace_handle)
93
94      self._wq.Run()
95
96    return self._map_results
97
98  def Run(self):
99    results_by_trace = self.RunMapper()
100    results = results_by_trace.values()
101
102    for of in self._output_formatters:
103      of.Format(results)
104
105    return results
106