1# Copyright 2021 the V8 project authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from collections import deque 6 7from . import base 8 9 10class SequenceProc(base.TestProc): 11 """Processor ensuring heavy tests are sent sequentially into the execution 12 pipeline. 13 14 The class keeps track of the number of tests in the pipeline marked heavy 15 and permits only a configurable amount. An excess amount is queued and sent 16 as soon as other heavy tests return. 17 """ 18 def __init__(self, max_heavy): 19 """Initialize the processor. 20 21 Args: 22 max_heavy: The maximum number of heavy tests that will be sent further 23 down the pipeline simultaneously. 24 """ 25 super(SequenceProc, self).__init__() 26 assert max_heavy > 0 27 self.max_heavy = max_heavy 28 self.n_heavy = 0 29 self.buffer = deque() 30 31 def next_test(self, test): 32 if test.is_heavy: 33 if self.n_heavy < self.max_heavy: 34 # Enough space to send more heavy tests. Check if the test is not 35 # filtered otherwise. 36 used = self._send_test(test) 37 if used: 38 self.n_heavy += 1 39 return used 40 else: 41 # Too many tests in the pipeline. Buffer the test and indicate that 42 # this test didn't end up in the execution queue (i.e. test loader 43 # will try to send more tests). 44 self.buffer.append(test) 45 return False 46 else: 47 return self._send_test(test) 48 49 def result_for(self, test, result): 50 if test.is_heavy: 51 # A heavy test finished computing. Try to send one from the buffer. 52 self.n_heavy -= 1 53 while self.buffer: 54 next_test = self.buffer.popleft() 55 if self._send_test(next_test): 56 self.n_heavy += 1 57 break 58 59 self._send_result(test, result) 60