1# Copyright 2018 the V8 project authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from collections import defaultdict 6import time 7 8from . import base 9from ..objects import testcase 10from ..outproc import base as outproc 11 12 13class CombinerProc(base.TestProc): 14 def __init__(self, rng, min_group_size, max_group_size, count): 15 """ 16 Args: 17 rng: random number generator 18 min_group_size: minimum number of tests to combine 19 max_group_size: maximum number of tests to combine 20 count: how many tests to generate. 0 means infinite running 21 """ 22 super(CombinerProc, self).__init__() 23 24 self._rng = rng 25 self._min_size = min_group_size 26 self._max_size = max_group_size 27 self._count = count 28 29 # Index of the last generated test 30 self._current_num = 0 31 32 # {suite name: instance of TestGroups} 33 self._groups = defaultdict(TestGroups) 34 35 # {suite name: instance of TestCombiner} 36 self._combiners = {} 37 38 def setup(self, requirement=base.DROP_RESULT): 39 # Combiner is not able to pass results (even as None) to the previous 40 # processor. 41 assert requirement == base.DROP_RESULT 42 self._next_proc.setup(base.DROP_RESULT) 43 44 def next_test(self, test): 45 group_key = self._get_group_key(test) 46 if not group_key: 47 # Test not suitable for combining 48 return 49 50 self._groups[test.suite.name].add_test(group_key, test) 51 52 def _get_group_key(self, test): 53 combiner = self._get_combiner(test.suite) 54 if not combiner: 55 print ('>>> Warning: There is no combiner for %s testsuite' % 56 test.suite.name) 57 return None 58 return combiner.get_group_key(test) 59 60 def result_for(self, test, result): 61 self._send_next_test() 62 63 def generate_initial_tests(self, num=1): 64 for _ in xrange(0, num): 65 self._send_next_test() 66 67 def _send_next_test(self): 68 if self.is_stopped: 69 return 70 71 if self._count and self._current_num >= self._count: 72 return 73 74 combined_test = self._create_new_test() 75 if not combined_test: 76 # Not enough tests 77 return 78 79 self._send_test(combined_test) 80 81 def _create_new_test(self): 82 suite, combiner = self._select_suite() 83 groups = self._groups[suite] 84 85 max_size = self._rng.randint(self._min_size, self._max_size) 86 sample = groups.sample(self._rng, max_size) 87 if not sample: 88 return None 89 90 self._current_num += 1 91 return combiner.combine('%s-%d' % (suite, self._current_num), sample) 92 93 def _select_suite(self): 94 """Returns pair (suite name, combiner).""" 95 selected = self._rng.randint(0, len(self._groups) - 1) 96 for n, suite in enumerate(self._groups): 97 if n == selected: 98 return suite, self._combiners[suite] 99 100 def _get_combiner(self, suite): 101 combiner = self._combiners.get(suite.name) 102 if not combiner: 103 combiner = suite.get_test_combiner() 104 self._combiners[suite.name] = combiner 105 return combiner 106 107 108class TestGroups(object): 109 def __init__(self): 110 self._groups = defaultdict(list) 111 self._keys = [] 112 113 def add_test(self, key, test): 114 self._groups[key].append(test) 115 self._keys.append(key) 116 117 def sample(self, rng, max_size): 118 # Not enough tests 119 if not self._groups: 120 return None 121 122 group_key = rng.choice(self._keys) 123 tests = self._groups[group_key] 124 return [rng.choice(tests) for _ in xrange(0, max_size)] 125