1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import threading 6 7 8class MultithreadedProcessor(): 9 """ 10 An instance of this class allows to execute a given function many times. 11 Function's calls are parallelized by using Python threads. The function 12 must take exactly one parameter: unique task id. 13 There is a simple example in the file test_multithreaded_processor.py. 14 Please note that there is no Python implementation that allows for "real" 15 multithreading. Employing this class makes sense only if the given function 16 stalls on some I/O operations and/or significant amount of execution time 17 is beyond Python interpreter. 18 19 """ 20 21 def __init__(self, number_of_threads): 22 """ 23 @param number_of_threads: number of threads used for parallelization. 24 This value must be larger than 0 and is usually NOT directly 25 related to the number of available cores. 26 27 """ 28 assert number_of_threads > 0 29 self._number_of_threads = number_of_threads 30 self._critical_section = threading.Lock() 31 32 33 def run(self, function_task, number_of_tasks): 34 """ 35 This method calls a given function many times. Each call may be 36 executed in separate thread (depending on the number of threads 37 set in __init__(...)). This is blocking method; it exits only if 38 all threads finish. It also stops on a first error (exception raised 39 in any task). In this case the function cancels all unstarted tasks, 40 waits for all threads to finish (started tasks cannot be stopped) and 41 raises an exception. 42 43 @param function_task: a function to execute, it must take exactly 44 one parameter: a task_id. It is an unique number from 45 the range [0,number_of_tasks). Each call of the function 46 corresponds to a single task. Tasks can be executed in 47 parallel and in any order, but every task is executed 48 exactly once. 49 @param number_of_tasks: a number of tasks to execute 50 51 @return: An list of outputs from all tasks, an index of every element 52 coresponds to task id. 53 54 @throws Exception if at least one of the tasks threw any Exception. 55 56 """ 57 self._tasks_ids = range(number_of_tasks) # list of tasks ids to process 58 self._outputs = [None]*number_of_tasks 59 self._error = None 60 61 # creating and starting threads 62 threads = [] 63 while len(threads) < self._number_of_threads: 64 thread = threading.Thread(target=self._thread, args=[function_task]) 65 threads.append(thread) 66 thread.start() 67 68 # waiting for all threads to finish 69 for thread in threads: 70 thread.join() 71 72 # the self._error is set <=> at least one of the tasks failed 73 if self._error is not None: 74 message = 'One of threads failed with the following error: ' 75 message += self._error 76 raise Exception(message) 77 78 # no errors - the list of outputs is returned 79 return self._outputs 80 81 82 def _thread(self, function_task): 83 """ 84 An internal method representing single thread. It processes available 85 tasks. It exits when there is no more tasks to process or when a task 86 threw an exception. This method is not supposed to throw any 87 exceptions. 88 89 @param function_task: a task function to execute, it must take exactly 90 one parameter: a task_id. These identifiers are taken from 91 the list self._tasks_ids. 92 93 """ 94 try: 95 96 while True: 97 # critical section is used here to synchronize access to 98 # shared variables 99 with self._critical_section: 100 # exit if there is no more tasks to process 101 if len(self._tasks_ids) == 0: 102 return 103 # otherwise take task id to process 104 task_id = self._tasks_ids.pop() 105 # run task with assigned task id 106 self._outputs[task_id] = function_task(task_id) 107 108 except BaseException as exception: 109 # probably the task being processed raised an exception 110 # critical section is used to synchronized access to shared fields 111 with self._critical_section: 112 # if this is the first error ... 113 if self._error is None: 114 # ... we cancel all unassigned tasks ... 115 self._tasks_ids = [] 116 # ... and saved the error as string 117 self._error = str(exception) 118 # exit on the first spotted error 119 return 120