• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import threading
6
7
8class MultithreadedProcessor():
9    """
10    An instance of this class allows to execute a given function many times.
11    Function's calls are parallelized by using Python threads. The function
12    must take exactly one parameter: unique task id.
13    There is a simple example in the file test_multithreaded_processor.py.
14    Please note that there is no Python implementation that allows for "real"
15    multithreading. Employing this class makes sense only if the given function
16    stalls on some I/O operations and/or significant amount of execution time
17    is beyond Python interpreter.
18
19    """
20
21    def __init__(self, number_of_threads):
22        """
23        @param number_of_threads: number of threads used for parallelization.
24                This value must be larger than 0 and is usually NOT directly
25                related to the number of available cores.
26
27        """
28        assert number_of_threads > 0
29        self._number_of_threads = number_of_threads
30        self._critical_section = threading.Lock()
31
32
33    def run(self, function_task, number_of_tasks):
34        """
35        This method calls a given function many times. Each call may be
36        executed in separate thread (depending on the number of threads
37        set in __init__(...)). This is blocking method; it exits only if
38        all threads finish. It also stops on a first error (exception raised
39        in any task). In this case the function cancels all unstarted tasks,
40        waits for all threads to finish (started tasks cannot be stopped) and
41        raises an exception.
42
43        @param function_task: a function to execute, it must take exactly
44                one parameter: a task_id. It is an unique number from
45                the range [0,number_of_tasks). Each call of the function
46                corresponds to a single task. Tasks can be executed in
47                parallel and in any order, but every task is executed
48                exactly once.
49        @param number_of_tasks: a number of tasks to execute
50
51        @return: An list of outputs from all tasks, an index of every element
52                coresponds to task id.
53
54        @throws Exception if at least one of the tasks threw any Exception.
55
56        """
57        self._tasks_ids = range(number_of_tasks) # list of tasks ids to process
58        self._outputs = [None]*number_of_tasks
59        self._error = None
60
61        # creating and starting threads
62        threads = []
63        while len(threads) < self._number_of_threads:
64            thread = threading.Thread(target=self._thread, args=[function_task])
65            threads.append(thread)
66            thread.start()
67
68        # waiting for all threads to finish
69        for thread in threads:
70            thread.join()
71
72        # the self._error is set <=> at least one of the tasks failed
73        if self._error is not None:
74            message = 'One of threads failed with the following error: '
75            message += self._error
76            raise Exception(message)
77
78        # no errors - the list of outputs is returned
79        return self._outputs
80
81
82    def _thread(self, function_task):
83        """
84        An internal method representing single thread. It processes available
85        tasks. It exits when there is no more tasks to process or when a task
86        threw an exception. This method is not supposed to throw any
87        exceptions.
88
89        @param function_task: a task function to execute, it must take exactly
90                one parameter: a task_id. These identifiers are taken from
91                the list self._tasks_ids.
92
93        """
94        try:
95
96            while True:
97                # critical section is used here to synchronize access to
98                # shared variables
99                with self._critical_section:
100                    # exit if there is no more tasks to process
101                    if len(self._tasks_ids) == 0:
102                        return
103                    # otherwise take task id to process
104                    task_id = self._tasks_ids.pop()
105                # run task with assigned task id
106                self._outputs[task_id] = function_task(task_id)
107
108        except BaseException as exception:
109            # probably the task being processed raised an exception
110            # critical section is used to synchronized access to shared fields
111            with self._critical_section:
112                # if this is the first error ...
113                if self._error is None:
114                    # ... we cancel all unassigned tasks ...
115                    self._tasks_ids = []
116                    # ... and saved the error as string
117                    self._error = str(exception)
118                # exit on the first spotted error
119                return
120