• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8import threading
9from six.moves import range
10
11
12class MultithreadedProcessor():
13    """
14    An instance of this class allows to execute a given function many times.
15    Function's calls are parallelized by using Python threads. The function
16    must take exactly one parameter: unique task id.
17    There is a simple example in the file test_multithreaded_processor.py.
18    Please note that there is no Python implementation that allows for "real"
19    multithreading. Employing this class makes sense only if the given function
20    stalls on some I/O operations and/or significant amount of execution time
21    is beyond Python interpreter.
22
23    """
24
25    def __init__(self, number_of_threads):
26        """
27        @param number_of_threads: number of threads used for parallelization.
28                This value must be larger than 0 and is usually NOT directly
29                related to the number of available cores.
30
31        """
32        assert number_of_threads > 0
33        self._number_of_threads = number_of_threads
34        self._critical_section = threading.Lock()
35
36
37    def run(self, function_task, number_of_tasks):
38        """
39        This method calls a given function many times. Each call may be
40        executed in separate thread (depending on the number of threads
41        set in __init__(...)). This is blocking method; it exits only if
42        all threads finish. It also stops on a first error (exception raised
43        in any task). In this case the function cancels all unstarted tasks,
44        waits for all threads to finish (started tasks cannot be stopped) and
45        raises an exception.
46
47        @param function_task: a function to execute, it must take exactly
48                one parameter: a task_id. It is an unique number from
49                the range [0,number_of_tasks). Each call of the function
50                corresponds to a single task. Tasks can be executed in
51                parallel and in any order, but every task is executed
52                exactly once.
53        @param number_of_tasks: a number of tasks to execute
54
55        @return: An list of outputs from all tasks, an index of every element
56                coresponds to task id.
57
58        @throws Exception if at least one of the tasks threw any Exception.
59
60        """
61        self._tasks_ids = list(range(number_of_tasks))  # tasks ids to process
62        self._outputs = [None]*number_of_tasks
63        self._error = None
64
65        # creating and starting threads
66        threads = []
67        while len(threads) < self._number_of_threads:
68            thread = threading.Thread(target=self._thread, args=[function_task])
69            threads.append(thread)
70            thread.start()
71
72        # waiting for all threads to finish
73        for thread in threads:
74            thread.join()
75
76        # the self._error is set <=> at least one of the tasks failed
77        if self._error is not None:
78            message = 'One of threads failed with the following error: '
79            message += self._error
80            raise Exception(message)
81
82        # no errors - the list of outputs is returned
83        return self._outputs
84
85
86    def _thread(self, function_task):
87        """
88        An internal method representing single thread. It processes available
89        tasks. It exits when there is no more tasks to process or when a task
90        threw an exception. This method is not supposed to throw any
91        exceptions.
92
93        @param function_task: a task function to execute, it must take exactly
94                one parameter: a task_id. These identifiers are taken from
95                the list self._tasks_ids.
96
97        """
98        try:
99
100            while True:
101                # critical section is used here to synchronize access to
102                # shared variables
103                with self._critical_section:
104                    # exit if there is no more tasks to process
105                    if len(self._tasks_ids) == 0:
106                        return
107                    # otherwise take task id to process
108                    task_id = self._tasks_ids.pop()
109                # run task with assigned task id
110                self._outputs[task_id] = function_task(task_id)
111
112        except BaseException as exception:
113            # probably the task being processed raised an exception
114            # critical section is used to synchronized access to shared fields
115            with self._critical_section:
116                # if this is the first error ...
117                if self._error is None:
118                    # ... we cancel all unassigned tasks ...
119                    self._tasks_ids = []
120                    # ... and saved the error as string
121                    self._error = str(exception)
122                # exit on the first spotted error
123                return
124