1# Copyright 2018 the V8 project authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from .result import SKIPPED 6 7 8""" 9Pipeline 10 11Test processors are chained together and communicate with each other by 12calling previous/next processor in the chain. 13 ----next_test()----> ----next_test()----> 14Proc1 Proc2 Proc3 15 <---result_for()---- <---result_for()---- 16 17For every next_test there is exactly one result_for call. 18If processor ignores the test it has to return SkippedResult. 19If it created multiple subtests for one test and wants to pass all of them to 20the previous processor it can enclose them in GroupedResult. 21 22 23Subtests 24 25When test processor needs to modify the test or create some variants of the 26test it creates subtests and sends them to the next processor. 27Each subtest has: 28- procid - globally unique id that should contain id of the parent test and 29 some suffix given by test processor, e.g. its name + subtest type. 30- processor - which created it 31- origin - pointer to the parent (sub)test 32""" 33 34 35DROP_RESULT = 0 36DROP_OUTPUT = 1 37DROP_PASS_OUTPUT = 2 38DROP_PASS_STDOUT = 3 39 40 41class TestProc(object): 42 def __init__(self): 43 self._prev_proc = None 44 self._next_proc = None 45 self._stopped = False 46 self._requirement = DROP_RESULT 47 self._prev_requirement = None 48 self._reduce_result = lambda result: result 49 50 def connect_to(self, next_proc): 51 """Puts `next_proc` after itself in the chain.""" 52 next_proc._prev_proc = self 53 self._next_proc = next_proc 54 55 def remove_from_chain(self): 56 if self._prev_proc: 57 self._prev_proc._next_proc = self._next_proc 58 if self._next_proc: 59 self._next_proc._prev_proc = self._prev_proc 60 61 def setup(self, requirement=DROP_RESULT): 62 """ 63 Method called by previous processor or processor pipeline creator to let 64 the processors know what part of the result can be ignored. 65 """ 66 self._prev_requirement = requirement 67 if self._next_proc: 68 self._next_proc.setup(max(requirement, self._requirement)) 69 70 # Since we're not winning anything by droping part of the result we are 71 # dropping the whole result or pass it as it is. The real reduction happens 72 # during result creation (in the output processor), so the result is 73 # immutable. 74 if (self._prev_requirement < self._requirement and 75 self._prev_requirement == DROP_RESULT): 76 self._reduce_result = lambda _: None 77 78 def next_test(self, test): 79 """ 80 Method called by previous processor whenever it produces new test. 81 This method shouldn't be called by anyone except previous processor. 82 Returns a boolean value to signal whether the test was loaded into the 83 execution queue successfully or not. 84 """ 85 raise NotImplementedError() 86 87 def result_for(self, test, result): 88 """ 89 Method called by next processor whenever it has result for some test. 90 This method shouldn't be called by anyone except next processor. 91 """ 92 raise NotImplementedError() 93 94 def heartbeat(self): 95 if self._prev_proc: 96 self._prev_proc.heartbeat() 97 98 def stop(self): 99 if not self._stopped: 100 self._stopped = True 101 if self._prev_proc: 102 self._prev_proc.stop() 103 if self._next_proc: 104 self._next_proc.stop() 105 106 @property 107 def is_stopped(self): 108 return self._stopped 109 110 ### Communication 111 112 def notify_previous(self, event): 113 self._on_event(event) 114 if self._prev_proc: 115 self._prev_proc.notify_previous(event) 116 117 def _on_event(self, event): 118 """Called when processors to the right signal events, e.g. termination. 119 120 Args: 121 event: A text describing the signalled event. 122 """ 123 pass 124 125 def _send_test(self, test): 126 """Helper method for sending test to the next processor.""" 127 return self._next_proc.next_test(test) 128 129 def _send_result(self, test, result): 130 """Helper method for sending result to the previous processor.""" 131 if not test.keep_output: 132 result = self._reduce_result(result) 133 self._prev_proc.result_for(test, result) 134 135 136class TestProcObserver(TestProc): 137 """Processor used for observing the data.""" 138 def __init__(self): 139 super(TestProcObserver, self).__init__() 140 141 def next_test(self, test): 142 self._on_next_test(test) 143 return self._send_test(test) 144 145 def result_for(self, test, result): 146 self._on_result_for(test, result) 147 self._send_result(test, result) 148 149 def heartbeat(self): 150 self._on_heartbeat() 151 super(TestProcObserver, self).heartbeat() 152 153 def _on_next_test(self, test): 154 """Method called after receiving test from previous processor but before 155 sending it to the next one.""" 156 pass 157 158 def _on_result_for(self, test, result): 159 """Method called after receiving result from next processor but before 160 sending it to the previous one.""" 161 pass 162 163 def _on_heartbeat(self): 164 pass 165 166 167class TestProcProducer(TestProc): 168 """Processor for creating subtests.""" 169 170 def __init__(self, name): 171 super(TestProcProducer, self).__init__() 172 self._name = name 173 174 def next_test(self, test): 175 return self._next_test(test) 176 177 def result_for(self, subtest, result): 178 self._result_for(subtest.origin, subtest, result) 179 180 ### Implementation 181 def _next_test(self, test): 182 raise NotImplementedError() 183 184 def _result_for(self, test, subtest, result): 185 """ 186 result_for method extended with `subtest` parameter. 187 188 Args 189 test: test used by current processor to create the subtest. 190 subtest: test for which the `result` is. 191 result: subtest execution result created by the output processor. 192 """ 193 raise NotImplementedError() 194 195 ### Managing subtests 196 def _create_subtest(self, test, subtest_id, **kwargs): 197 """Creates subtest with subtest id <processor name>-`subtest_id`.""" 198 return test.create_subtest(self, '%s-%s' % (self._name, subtest_id), 199 **kwargs) 200 201 202class TestProcFilter(TestProc): 203 """Processor for filtering tests.""" 204 205 def next_test(self, test): 206 if self._filter(test): 207 return False 208 209 return self._send_test(test) 210 211 def result_for(self, test, result): 212 self._send_result(test, result) 213 214 def _filter(self, test): 215 """Returns whether test should be filtered out.""" 216 raise NotImplementedError() 217