1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Steering stage unittest. 5 6Part of the Chrome build flags optimization. 7""" 8 9__author__ = 'yuhenglong@google.com (Yuheng Long)' 10 11import multiprocessing 12import unittest 13 14from generation import Generation 15from mock_task import IdentifierMockTask 16import pipeline_process 17import steering 18 19# Pick an integer at random. 20STEERING_TEST_STAGE = -8 21 22# The number of generations to be used to do the testing. 23NUMBER_OF_GENERATIONS = 20 24 25# The number of tasks to be put in each generation to be tested. 26NUMBER_OF_TASKS = 20 27 28# The stride of permutation used to shuffle the input list of tasks. Should be 29# relatively prime with NUMBER_OF_TASKS. 30STRIDE = 7 31 32 33class MockGeneration(Generation): 34 """This class emulates an actual generation. 35 36 It will output the next_generations when the method Next is called. The 37 next_generations is initiated when the MockGeneration instance is constructed. 38 """ 39 40 def __init__(self, tasks, next_generations): 41 """Set up the next generations for this task. 42 43 Args: 44 tasks: A set of tasks to be run. 45 next_generations: A list of generations as the next generation of the 46 current generation. 47 """ 48 Generation.__init__(self, tasks, None) 49 self._next_generations = next_generations 50 51 def Next(self, _): 52 return self._next_generations 53 54 def IsImproved(self): 55 if self._next_generations: 56 return True 57 return False 58 59 60class SteeringTest(unittest.TestCase): 61 """This class test the steering method. 62 63 The steering algorithm should return if there is no new task in the initial 64 generation. The steering algorithm should send all the tasks to the next stage 65 and should terminate once there is no pending generation. A generation is 66 pending if it contains pending task. A task is pending if its (test) result 67 is not ready. 68 """ 69 70 def testSteering(self): 71 """Test that the steering algorithm processes all the tasks properly. 72 73 Test that the steering algorithm sends all the tasks to the next stage. Test 74 that the steering algorithm terminates once all the tasks have been 75 processed, i.e., the results for the tasks are all ready. 76 """ 77 78 # A list of generations used to test the steering stage. 79 generations = [] 80 81 task_index = 0 82 previous_generations = None 83 84 # Generate a sequence of generations to be tested. Each generation will 85 # output the next generation in reverse order of the list when the "Next" 86 # method is called. 87 for _ in range(NUMBER_OF_GENERATIONS): 88 # Use a consecutive sequence of numbers as identifiers for the set of 89 # tasks put into a generation. 90 test_ranges = range(task_index, task_index + NUMBER_OF_TASKS) 91 tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] 92 steering_tasks = set(tasks) 93 94 # Let the previous generation as the offspring generation of the current 95 # generation. 96 current_generation = MockGeneration(steering_tasks, previous_generations) 97 generations.insert(0, current_generation) 98 previous_generations = [current_generation] 99 100 task_index += NUMBER_OF_TASKS 101 102 # If there is no generation at all, the unittest returns right away. 103 if not current_generation: 104 return 105 106 # Set up the input and result queue for the steering method. 107 manager = multiprocessing.Manager() 108 input_queue = manager.Queue() 109 result_queue = manager.Queue() 110 111 steering_process = multiprocessing.Process( 112 target=steering.Steering, 113 args=(set(), [current_generation], input_queue, result_queue)) 114 steering_process.start() 115 116 # Test that each generation is processed properly. I.e., the generations are 117 # processed in order. 118 while generations: 119 generation = generations.pop(0) 120 tasks = [task for task in generation.Pool()] 121 122 # Test that all the tasks are processed once and only once. 123 while tasks: 124 task = result_queue.get() 125 126 assert task in tasks 127 tasks.remove(task) 128 129 input_queue.put(task) 130 131 task = result_queue.get() 132 133 # Test that the steering algorithm returns properly after processing all 134 # the generations. 135 assert task == pipeline_process.POISONPILL 136 137 steering_process.join() 138 139 def testCache(self): 140 """The steering algorithm returns immediately if there is no new tasks. 141 142 If all the new tasks have been cached before, the steering algorithm does 143 not have to execute these tasks again and thus can terminate right away. 144 """ 145 146 # Put a set of tasks in the cache and add this set to initial generation. 147 test_ranges = range(NUMBER_OF_TASKS) 148 tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] 149 steering_tasks = set(tasks) 150 151 current_generation = MockGeneration(steering_tasks, None) 152 153 # Set up the input and result queue for the steering method. 154 manager = multiprocessing.Manager() 155 input_queue = manager.Queue() 156 result_queue = manager.Queue() 157 158 steering_process = multiprocessing.Process( 159 target=steering.Steering, 160 args=(steering_tasks, [current_generation], input_queue, result_queue)) 161 162 steering_process.start() 163 164 # Test that the steering method returns right away. 165 assert result_queue.get() == pipeline_process.POISONPILL 166 steering_process.join() 167 168 169if __name__ == '__main__': 170 unittest.main() 171