• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 the V8 project authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from collections import deque
6
7from . import base
8
9
10class SequenceProc(base.TestProc):
11  """Processor ensuring heavy tests are sent sequentially into the execution
12  pipeline.
13
14  The class keeps track of the number of tests in the pipeline marked heavy
15  and permits only a configurable amount. An excess amount is queued and sent
16  as soon as other heavy tests return.
17  """
18  def __init__(self, max_heavy):
19    """Initialize the processor.
20
21    Args:
22      max_heavy: The maximum number of heavy tests that will be sent further
23                 down the pipeline simultaneously.
24    """
25    super(SequenceProc, self).__init__()
26    assert max_heavy > 0
27    self.max_heavy = max_heavy
28    self.n_heavy = 0
29    self.buffer = deque()
30
31  def next_test(self, test):
32    if test.is_heavy:
33      if self.n_heavy < self.max_heavy:
34        # Enough space to send more heavy tests. Check if the test is not
35        # filtered otherwise.
36        used = self._send_test(test)
37        if used:
38          self.n_heavy += 1
39        return used
40      else:
41        # Too many tests in the pipeline. Buffer the test and indicate that
42        # this test didn't end up in the execution queue (i.e. test loader
43        # will try to send more tests).
44        self.buffer.append(test)
45        return False
46    else:
47      return self._send_test(test)
48
49  def result_for(self, test, result):
50    if test.is_heavy:
51      # A heavy test finished computing. Try to send one from the buffer.
52      self.n_heavy -= 1
53      while self.buffer:
54        next_test = self.buffer.popleft()
55        if self._send_test(next_test):
56          self.n_heavy += 1
57          break
58
59    self._send_result(test, result)
60