• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5""" Wrapper that allows method execution in parallel.
6
7This class wraps a list of objects of the same type, emulates their
8interface, and executes any functions called on the objects in parallel
9in ReraiserThreads.
10
11This means that, given a list of objects:
12
13  class Foo:
14    def __init__(self):
15      self.baz = Baz()
16
17    def bar(self, my_param):
18      // do something
19
20  list_of_foos = [Foo(1), Foo(2), Foo(3)]
21
22we can take a sequential operation on that list of objects:
23
24  for f in list_of_foos:
25    f.bar('Hello')
26
27and run it in parallel across all of the objects:
28
29  Parallelizer(list_of_foos).bar('Hello')
30
31It can also handle (non-method) attributes of objects, so that this:
32
33  for f in list_of_foos:
34    f.baz.myBazMethod()
35
36can be run in parallel with:
37
38  Parallelizer(list_of_foos).baz.myBazMethod()
39
40Because it emulates the interface of the wrapped objects, a Parallelizer
41can be passed to a method or function that takes objects of that type:
42
43  def DoesSomethingWithFoo(the_foo):
44    the_foo.bar('Hello')
45    the_foo.bar('world')
46    the_foo.baz.myBazMethod
47
48  DoesSomethingWithFoo(Parallelizer(list_of_foos))
49
50Note that this class spins up a thread for each object. Using this class
51to parallelize operations that are already fast will incur a net performance
52penalty.
53
54"""
55# pylint: disable=protected-access
56
57from devil.utils import reraiser_thread
58from devil.utils import watchdog_timer
59
60_DEFAULT_TIMEOUT = 30
61_DEFAULT_RETRIES = 3
62
63
64class Parallelizer(object):
65  """Allows parallel execution of method calls across a group of objects."""
66
67  def __init__(self, objs):
68    assert (objs is not None and len(objs) > 0), (
69        "Passed empty list to 'Parallelizer'")
70    self._orig_objs = objs
71    self._objs = objs
72
73  def __getattr__(self, name):
74    """Emulate getting the |name| attribute of |self|.
75
76    Args:
77      name: The name of the attribute to retrieve.
78    Returns:
79      A Parallelizer emulating the |name| attribute of |self|.
80    """
81    self.pGet(None)
82
83    r = type(self)(self._orig_objs)
84    r._objs = [getattr(o, name) for o in self._objs]
85    return r
86
87  def __getitem__(self, index):
88    """Emulate getting the value of |self| at |index|.
89
90    Returns:
91      A Parallelizer emulating the value of |self| at |index|.
92    """
93    self.pGet(None)
94
95    r = type(self)(self._orig_objs)
96    r._objs = [o[index] for o in self._objs]
97    return r
98
99  def __call__(self, *args, **kwargs):
100    """Emulate calling |self| with |args| and |kwargs|.
101
102    Note that this call is asynchronous. Call pFinish on the return value to
103    block until the call finishes.
104
105    Returns:
106      A Parallelizer wrapping the ReraiserThreadGroup running the call in
107      parallel.
108    Raises:
109      AttributeError if the wrapped objects aren't callable.
110    """
111    self.pGet(None)
112
113    if not self._objs:
114      raise AttributeError('Nothing to call.')
115    for o in self._objs:
116      if not callable(o):
117        raise AttributeError("'%s' is not callable" % o.__name__)
118
119    r = type(self)(self._orig_objs)
120    r._objs = reraiser_thread.ReraiserThreadGroup(
121        [reraiser_thread.ReraiserThread(
122            o, args=args, kwargs=kwargs,
123            name='%s.%s' % (str(d), o.__name__))
124         for d, o in zip(self._orig_objs, self._objs)])
125    r._objs.StartAll()  # pylint: disable=W0212
126    return r
127
128  def pFinish(self, timeout):
129    """Finish any outstanding asynchronous operations.
130
131    Args:
132      timeout: The maximum number of seconds to wait for an individual
133               result to return, or None to wait forever.
134    Returns:
135      self, now emulating the return values.
136    """
137    self._assertNoShadow('pFinish')
138    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
139      self._objs.JoinAll()
140      self._objs = self._objs.GetAllReturnValues(
141          watchdog_timer.WatchdogTimer(timeout))
142    return self
143
144  def pGet(self, timeout):
145    """Get the current wrapped objects.
146
147    Args:
148      timeout: Same as |pFinish|.
149    Returns:
150      A list of the results, in order of the provided devices.
151    Raises:
152      Any exception raised by any of the called functions.
153    """
154    self._assertNoShadow('pGet')
155    self.pFinish(timeout)
156    return self._objs
157
158  def pMap(self, f, *args, **kwargs):
159    """Map a function across the current wrapped objects in parallel.
160
161    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
162
163    Note that this call is asynchronous. Call pFinish on the return value to
164    block until the call finishes.
165
166    Args:
167      f: The function to call.
168      args: The positional args to pass to f.
169      kwargs: The keyword args to pass to f.
170    Returns:
171      A Parallelizer wrapping the ReraiserThreadGroup running the map in
172      parallel.
173    """
174    self._assertNoShadow('pMap')
175    r = type(self)(self._orig_objs)
176    r._objs = reraiser_thread.ReraiserThreadGroup(
177        [reraiser_thread.ReraiserThread(
178            f, args=tuple([o] + list(args)), kwargs=kwargs,
179            name='%s(%s)' % (f.__name__, d))
180         for d, o in zip(self._orig_objs, self._objs)])
181    r._objs.StartAll()  # pylint: disable=W0212
182    return r
183
184  def _assertNoShadow(self, attr_name):
185    """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
186
187    If the wrapped objects _do_ have an |attr_name| attribute, it will be
188    inaccessible to clients.
189
190    Args:
191      attr_name: The attribute to check.
192    Raises:
193      AssertionError if the wrapped objects have an attribute named 'attr_name'
194      or '_assertNoShadow'.
195    """
196    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
197      assert not hasattr(self._objs, '_assertNoShadow')
198      assert not hasattr(self._objs, attr_name)
199    else:
200      assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
201      assert not any(hasattr(o, attr_name) for o in self._objs)
202
203
204class SyncParallelizer(Parallelizer):
205  """A Parallelizer that blocks on function calls."""
206
207  # override
208  def __call__(self, *args, **kwargs):
209    """Emulate calling |self| with |args| and |kwargs|.
210
211    Note that this call is synchronous.
212
213    Returns:
214      A Parallelizer emulating the value returned from calling |self| with
215      |args| and |kwargs|.
216    Raises:
217      AttributeError if the wrapped objects aren't callable.
218    """
219    r = super(SyncParallelizer, self).__call__(*args, **kwargs)
220    r.pFinish(None)
221    return r
222
223  # override
224  def pMap(self, f, *args, **kwargs):
225    """Map a function across the current wrapped objects in parallel.
226
227    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
228
229    Note that this call is synchronous.
230
231    Args:
232      f: The function to call.
233      args: The positional args to pass to f.
234      kwargs: The keyword args to pass to f.
235    Returns:
236      A Parallelizer wrapping the ReraiserThreadGroup running the map in
237      parallel.
238    """
239    r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
240    r.pFinish(None)
241    return r
242
243