1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8import threading 9from six.moves import range 10 11 12class MultithreadedProcessor(): 13 """ 14 An instance of this class allows to execute a given function many times. 15 Function's calls are parallelized by using Python threads. The function 16 must take exactly one parameter: unique task id. 17 There is a simple example in the file test_multithreaded_processor.py. 18 Please note that there is no Python implementation that allows for "real" 19 multithreading. Employing this class makes sense only if the given function 20 stalls on some I/O operations and/or significant amount of execution time 21 is beyond Python interpreter. 22 23 """ 24 25 def __init__(self, number_of_threads): 26 """ 27 @param number_of_threads: number of threads used for parallelization. 28 This value must be larger than 0 and is usually NOT directly 29 related to the number of available cores. 30 31 """ 32 assert number_of_threads > 0 33 self._number_of_threads = number_of_threads 34 self._critical_section = threading.Lock() 35 36 37 def run(self, function_task, number_of_tasks): 38 """ 39 This method calls a given function many times. Each call may be 40 executed in separate thread (depending on the number of threads 41 set in __init__(...)). This is blocking method; it exits only if 42 all threads finish. It also stops on a first error (exception raised 43 in any task). In this case the function cancels all unstarted tasks, 44 waits for all threads to finish (started tasks cannot be stopped) and 45 raises an exception. 46 47 @param function_task: a function to execute, it must take exactly 48 one parameter: a task_id. It is an unique number from 49 the range [0,number_of_tasks). Each call of the function 50 corresponds to a single task. Tasks can be executed in 51 parallel and in any order, but every task is executed 52 exactly once. 53 @param number_of_tasks: a number of tasks to execute 54 55 @return: An list of outputs from all tasks, an index of every element 56 coresponds to task id. 57 58 @throws Exception if at least one of the tasks threw any Exception. 59 60 """ 61 self._tasks_ids = list(range(number_of_tasks)) # tasks ids to process 62 self._outputs = [None]*number_of_tasks 63 self._error = None 64 65 # creating and starting threads 66 threads = [] 67 while len(threads) < self._number_of_threads: 68 thread = threading.Thread(target=self._thread, args=[function_task]) 69 threads.append(thread) 70 thread.start() 71 72 # waiting for all threads to finish 73 for thread in threads: 74 thread.join() 75 76 # the self._error is set <=> at least one of the tasks failed 77 if self._error is not None: 78 message = 'One of threads failed with the following error: ' 79 message += self._error 80 raise Exception(message) 81 82 # no errors - the list of outputs is returned 83 return self._outputs 84 85 86 def _thread(self, function_task): 87 """ 88 An internal method representing single thread. It processes available 89 tasks. It exits when there is no more tasks to process or when a task 90 threw an exception. This method is not supposed to throw any 91 exceptions. 92 93 @param function_task: a task function to execute, it must take exactly 94 one parameter: a task_id. These identifiers are taken from 95 the list self._tasks_ids. 96 97 """ 98 try: 99 100 while True: 101 # critical section is used here to synchronize access to 102 # shared variables 103 with self._critical_section: 104 # exit if there is no more tasks to process 105 if len(self._tasks_ids) == 0: 106 return 107 # otherwise take task id to process 108 task_id = self._tasks_ids.pop() 109 # run task with assigned task id 110 self._outputs[task_id] = function_task(task_id) 111 112 except BaseException as exception: 113 # probably the task being processed raised an exception 114 # critical section is used to synchronized access to shared fields 115 with self._critical_section: 116 # if this is the first error ... 117 if self._error is None: 118 # ... we cancel all unassigned tasks ... 119 self._tasks_ids = [] 120 # ... and saved the error as string 121 self._error = str(exception) 122 # exit on the first spotted error 123 return 124