• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 the V8 project authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from .result import SKIPPED
6
7
8"""
9Pipeline
10
11Test processors are chained together and communicate with each other by
12calling previous/next processor in the chain.
13     ----next_test()---->     ----next_test()---->
14Proc1                    Proc2                    Proc3
15     <---result_for()----     <---result_for()----
16
17For every next_test there is exactly one result_for call.
18If processor ignores the test it has to return SkippedResult.
19If it created multiple subtests for one test and wants to pass all of them to
20the previous processor it can enclose them in GroupedResult.
21
22
23Subtests
24
25When test processor needs to modify the test or create some variants of the
26test it creates subtests and sends them to the next processor.
27Each subtest has:
28- procid - globally unique id that should contain id of the parent test and
29          some suffix given by test processor, e.g. its name + subtest type.
30- processor - which created it
31- origin - pointer to the parent (sub)test
32"""
33
34
35DROP_RESULT = 0
36DROP_OUTPUT = 1
37DROP_PASS_OUTPUT = 2
38DROP_PASS_STDOUT = 3
39
40
41class TestProc(object):
42  def __init__(self):
43    self._prev_proc = None
44    self._next_proc = None
45    self._stopped = False
46    self._requirement = DROP_RESULT
47    self._prev_requirement = None
48    self._reduce_result = lambda result: result
49
50  def connect_to(self, next_proc):
51    """Puts `next_proc` after itself in the chain."""
52    next_proc._prev_proc = self
53    self._next_proc = next_proc
54
55  def remove_from_chain(self):
56    if self._prev_proc:
57      self._prev_proc._next_proc = self._next_proc
58    if self._next_proc:
59      self._next_proc._prev_proc = self._prev_proc
60
61  def setup(self, requirement=DROP_RESULT):
62    """
63    Method called by previous processor or processor pipeline creator to let
64    the processors know what part of the result can be ignored.
65    """
66    self._prev_requirement = requirement
67    if self._next_proc:
68      self._next_proc.setup(max(requirement, self._requirement))
69
70    # Since we're not winning anything by droping part of the result we are
71    # dropping the whole result or pass it as it is. The real reduction happens
72    # during result creation (in the output processor), so the result is
73    # immutable.
74    if (self._prev_requirement < self._requirement and
75        self._prev_requirement == DROP_RESULT):
76      self._reduce_result = lambda _: None
77
78  def next_test(self, test):
79    """
80    Method called by previous processor whenever it produces new test.
81    This method shouldn't be called by anyone except previous processor.
82    Returns a boolean value to signal whether the test was loaded into the
83    execution queue successfully or not.
84    """
85    raise NotImplementedError()
86
87  def result_for(self, test, result):
88    """
89    Method called by next processor whenever it has result for some test.
90    This method shouldn't be called by anyone except next processor.
91    """
92    raise NotImplementedError()
93
94  def heartbeat(self):
95    if self._prev_proc:
96      self._prev_proc.heartbeat()
97
98  def stop(self):
99    if not self._stopped:
100      self._stopped = True
101      if self._prev_proc:
102        self._prev_proc.stop()
103      if self._next_proc:
104        self._next_proc.stop()
105
106  @property
107  def is_stopped(self):
108    return self._stopped
109
110  ### Communication
111
112  def notify_previous(self, event):
113    self._on_event(event)
114    if self._prev_proc:
115      self._prev_proc.notify_previous(event)
116
117  def _on_event(self, event):
118    """Called when processors to the right signal events, e.g. termination.
119
120    Args:
121      event: A text describing the signalled event.
122    """
123    pass
124
125  def _send_test(self, test):
126    """Helper method for sending test to the next processor."""
127    return self._next_proc.next_test(test)
128
129  def _send_result(self, test, result):
130    """Helper method for sending result to the previous processor."""
131    if not test.keep_output:
132      result = self._reduce_result(result)
133    self._prev_proc.result_for(test, result)
134
135
136class TestProcObserver(TestProc):
137  """Processor used for observing the data."""
138  def __init__(self):
139    super(TestProcObserver, self).__init__()
140
141  def next_test(self, test):
142    self._on_next_test(test)
143    return self._send_test(test)
144
145  def result_for(self, test, result):
146    self._on_result_for(test, result)
147    self._send_result(test, result)
148
149  def heartbeat(self):
150    self._on_heartbeat()
151    super(TestProcObserver, self).heartbeat()
152
153  def _on_next_test(self, test):
154    """Method called after receiving test from previous processor but before
155    sending it to the next one."""
156    pass
157
158  def _on_result_for(self, test, result):
159    """Method called after receiving result from next processor but before
160    sending it to the previous one."""
161    pass
162
163  def _on_heartbeat(self):
164    pass
165
166
167class TestProcProducer(TestProc):
168  """Processor for creating subtests."""
169
170  def __init__(self, name):
171    super(TestProcProducer, self).__init__()
172    self._name = name
173
174  def next_test(self, test):
175    return self._next_test(test)
176
177  def result_for(self, subtest, result):
178    self._result_for(subtest.origin, subtest, result)
179
180  ### Implementation
181  def _next_test(self, test):
182    raise NotImplementedError()
183
184  def _result_for(self, test, subtest, result):
185    """
186    result_for method extended with `subtest` parameter.
187
188    Args
189      test: test used by current processor to create the subtest.
190      subtest: test for which the `result` is.
191      result: subtest execution result created by the output processor.
192    """
193    raise NotImplementedError()
194
195  ### Managing subtests
196  def _create_subtest(self, test, subtest_id, **kwargs):
197    """Creates subtest with subtest id <processor name>-`subtest_id`."""
198    return test.create_subtest(self, '%s-%s' % (self._name, subtest_id),
199                               **kwargs)
200
201
202class TestProcFilter(TestProc):
203  """Processor for filtering tests."""
204
205  def next_test(self, test):
206    if self._filter(test):
207      return False
208
209    return self._send_test(test)
210
211  def result_for(self, test, result):
212    self._send_result(test, result)
213
214  def _filter(self, test):
215    """Returns whether test should be filtered out."""
216    raise NotImplementedError()
217