• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5""" Wrapper that allows method execution in parallel.
6
7This class wraps a list of objects of the same type, emulates their
8interface, and executes any functions called on the objects in parallel
9in ReraiserThreads.
10
11This means that, given a list of objects:
12
13  class Foo:
14    def __init__(self):
15      self.baz = Baz()
16
17    def bar(self, my_param):
18      // do something
19
20  list_of_foos = [Foo(1), Foo(2), Foo(3)]
21
22we can take a sequential operation on that list of objects:
23
24  for f in list_of_foos:
25    f.bar('Hello')
26
27and run it in parallel across all of the objects:
28
29  Parallelizer(list_of_foos).bar('Hello')
30
31It can also handle (non-method) attributes of objects, so that this:
32
33  for f in list_of_foos:
34    f.baz.myBazMethod()
35
36can be run in parallel with:
37
38  Parallelizer(list_of_foos).baz.myBazMethod()
39
40Because it emulates the interface of the wrapped objects, a Parallelizer
41can be passed to a method or function that takes objects of that type:
42
43  def DoesSomethingWithFoo(the_foo):
44    the_foo.bar('Hello')
45    the_foo.bar('world')
46    the_foo.baz.myBazMethod
47
48  DoesSomethingWithFoo(Parallelizer(list_of_foos))
49
50Note that this class spins up a thread for each object. Using this class
51to parallelize operations that are already fast will incur a net performance
52penalty.
53
54"""
55# pylint: disable=protected-access
56
57from devil.utils import reraiser_thread
58from devil.utils import watchdog_timer
59
60_DEFAULT_TIMEOUT = 30
61_DEFAULT_RETRIES = 3
62
63
64class Parallelizer(object):
65  """Allows parallel execution of method calls across a group of objects."""
66
67  def __init__(self, objs):
68    self._orig_objs = objs
69    self._objs = objs
70
71  def __getattr__(self, name):
72    """Emulate getting the |name| attribute of |self|.
73
74    Args:
75      name: The name of the attribute to retrieve.
76    Returns:
77      A Parallelizer emulating the |name| attribute of |self|.
78    """
79    self.pGet(None)
80
81    r = type(self)(self._orig_objs)
82    r._objs = [getattr(o, name) for o in self._objs]
83    return r
84
85  def __getitem__(self, index):
86    """Emulate getting the value of |self| at |index|.
87
88    Returns:
89      A Parallelizer emulating the value of |self| at |index|.
90    """
91    self.pGet(None)
92
93    r = type(self)(self._orig_objs)
94    r._objs = [o[index] for o in self._objs]
95    return r
96
97  def __call__(self, *args, **kwargs):
98    """Emulate calling |self| with |args| and |kwargs|.
99
100    Note that this call is asynchronous. Call pFinish on the return value to
101    block until the call finishes.
102
103    Returns:
104      A Parallelizer wrapping the ReraiserThreadGroup running the call in
105      parallel.
106    Raises:
107      AttributeError if the wrapped objects aren't callable.
108    """
109    self.pGet(None)
110
111    for o in self._objs:
112      if not callable(o):
113        raise AttributeError("'%s' is not callable" % o.__name__)
114
115    r = type(self)(self._orig_objs)
116    r._objs = reraiser_thread.ReraiserThreadGroup(
117        [reraiser_thread.ReraiserThread(
118            o, args=args, kwargs=kwargs,
119            name='%s.%s' % (str(d), o.__name__))
120         for d, o in zip(self._orig_objs, self._objs)])
121    r._objs.StartAll()
122    return r
123
124  def pFinish(self, timeout):
125    """Finish any outstanding asynchronous operations.
126
127    Args:
128      timeout: The maximum number of seconds to wait for an individual
129               result to return, or None to wait forever.
130    Returns:
131      self, now emulating the return values.
132    """
133    self._assertNoShadow('pFinish')
134    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
135      self._objs.JoinAll()
136      self._objs = self._objs.GetAllReturnValues(
137          watchdog_timer.WatchdogTimer(timeout))
138    return self
139
140  def pGet(self, timeout):
141    """Get the current wrapped objects.
142
143    Args:
144      timeout: Same as |pFinish|.
145    Returns:
146      A list of the results, in order of the provided devices.
147    Raises:
148      Any exception raised by any of the called functions.
149    """
150    self._assertNoShadow('pGet')
151    self.pFinish(timeout)
152    return self._objs
153
154  def pMap(self, f, *args, **kwargs):
155    """Map a function across the current wrapped objects in parallel.
156
157    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
158
159    Note that this call is asynchronous. Call pFinish on the return value to
160    block until the call finishes.
161
162    Args:
163      f: The function to call.
164      args: The positional args to pass to f.
165      kwargs: The keyword args to pass to f.
166    Returns:
167      A Parallelizer wrapping the ReraiserThreadGroup running the map in
168      parallel.
169    """
170    self._assertNoShadow('pMap')
171    r = type(self)(self._orig_objs)
172    r._objs = reraiser_thread.ReraiserThreadGroup(
173        [reraiser_thread.ReraiserThread(
174            f, args=tuple([o] + list(args)), kwargs=kwargs,
175            name='%s(%s)' % (f.__name__, d))
176         for d, o in zip(self._orig_objs, self._objs)])
177    r._objs.StartAll()
178    return r
179
180  def _assertNoShadow(self, attr_name):
181    """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
182
183    If the wrapped objects _do_ have an |attr_name| attribute, it will be
184    inaccessible to clients.
185
186    Args:
187      attr_name: The attribute to check.
188    Raises:
189      AssertionError if the wrapped objects have an attribute named 'attr_name'
190      or '_assertNoShadow'.
191    """
192    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
193      assert not hasattr(self._objs, '_assertNoShadow')
194      assert not hasattr(self._objs, attr_name)
195    else:
196      assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
197      assert not any(hasattr(o, attr_name) for o in self._objs)
198
199
200class SyncParallelizer(Parallelizer):
201  """A Parallelizer that blocks on function calls."""
202
203  def __enter__(self):
204    """Emulate entering the context of |self|.
205
206    Note that this call is synchronous.
207
208    Returns:
209      A Parallelizer emulating the value returned from entering into the
210      context of |self|.
211    """
212    r = type(self)(self._orig_objs)
213    r._objs = [o.__enter__ for o in r._objs]
214    return r.__call__()
215
216  def __exit__(self, exc_type, exc_val, exc_tb):
217    """Emulate exiting the context of |self|.
218
219    Note that this call is synchronous.
220
221    Args:
222      exc_type: the exception type.
223      exc_val: the exception value.
224      exc_tb: the exception traceback.
225    """
226    r = type(self)(self._orig_objs)
227    r._objs = [o.__exit__ for o in r._objs]
228    r.__call__(exc_type, exc_val, exc_tb)
229
230  # override
231  def __call__(self, *args, **kwargs):
232    """Emulate calling |self| with |args| and |kwargs|.
233
234    Note that this call is synchronous.
235
236    Returns:
237      A Parallelizer emulating the value returned from calling |self| with
238      |args| and |kwargs|.
239    Raises:
240      AttributeError if the wrapped objects aren't callable.
241    """
242    r = super(SyncParallelizer, self).__call__(*args, **kwargs)
243    r.pFinish(None)
244    return r
245
246  # override
247  def pMap(self, f, *args, **kwargs):
248    """Map a function across the current wrapped objects in parallel.
249
250    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
251
252    Note that this call is synchronous.
253
254    Args:
255      f: The function to call.
256      args: The positional args to pass to f.
257      kwargs: The keyword args to pass to f.
258    Returns:
259      A Parallelizer wrapping the ReraiserThreadGroup running the map in
260      parallel.
261    """
262    r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
263    r.pFinish(None)
264    return r
265
266