1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" Wrapper that allows method execution in parallel. 6 7This class wraps a list of objects of the same type, emulates their 8interface, and executes any functions called on the objects in parallel 9in ReraiserThreads. 10 11This means that, given a list of objects: 12 13 class Foo: 14 def __init__(self): 15 self.baz = Baz() 16 17 def bar(self, my_param): 18 // do something 19 20 list_of_foos = [Foo(1), Foo(2), Foo(3)] 21 22we can take a sequential operation on that list of objects: 23 24 for f in list_of_foos: 25 f.bar('Hello') 26 27and run it in parallel across all of the objects: 28 29 Parallelizer(list_of_foos).bar('Hello') 30 31It can also handle (non-method) attributes of objects, so that this: 32 33 for f in list_of_foos: 34 f.baz.myBazMethod() 35 36can be run in parallel with: 37 38 Parallelizer(list_of_foos).baz.myBazMethod() 39 40Because it emulates the interface of the wrapped objects, a Parallelizer 41can be passed to a method or function that takes objects of that type: 42 43 def DoesSomethingWithFoo(the_foo): 44 the_foo.bar('Hello') 45 the_foo.bar('world') 46 the_foo.baz.myBazMethod 47 48 DoesSomethingWithFoo(Parallelizer(list_of_foos)) 49 50Note that this class spins up a thread for each object. Using this class 51to parallelize operations that are already fast will incur a net performance 52penalty. 53 54""" 55# pylint: disable=protected-access 56 57from devil.utils import reraiser_thread 58from devil.utils import watchdog_timer 59 60_DEFAULT_TIMEOUT = 30 61_DEFAULT_RETRIES = 3 62 63 64class Parallelizer(object): 65 """Allows parallel execution of method calls across a group of objects.""" 66 67 def __init__(self, objs): 68 assert (objs is not None and len(objs) > 0), ( 69 "Passed empty list to 'Parallelizer'") 70 self._orig_objs = objs 71 self._objs = objs 72 73 def __getattr__(self, name): 74 """Emulate getting the |name| attribute of |self|. 75 76 Args: 77 name: The name of the attribute to retrieve. 78 Returns: 79 A Parallelizer emulating the |name| attribute of |self|. 80 """ 81 self.pGet(None) 82 83 r = type(self)(self._orig_objs) 84 r._objs = [getattr(o, name) for o in self._objs] 85 return r 86 87 def __getitem__(self, index): 88 """Emulate getting the value of |self| at |index|. 89 90 Returns: 91 A Parallelizer emulating the value of |self| at |index|. 92 """ 93 self.pGet(None) 94 95 r = type(self)(self._orig_objs) 96 r._objs = [o[index] for o in self._objs] 97 return r 98 99 def __call__(self, *args, **kwargs): 100 """Emulate calling |self| with |args| and |kwargs|. 101 102 Note that this call is asynchronous. Call pFinish on the return value to 103 block until the call finishes. 104 105 Returns: 106 A Parallelizer wrapping the ReraiserThreadGroup running the call in 107 parallel. 108 Raises: 109 AttributeError if the wrapped objects aren't callable. 110 """ 111 self.pGet(None) 112 113 if not self._objs: 114 raise AttributeError('Nothing to call.') 115 for o in self._objs: 116 if not callable(o): 117 raise AttributeError("'%s' is not callable" % o.__name__) 118 119 r = type(self)(self._orig_objs) 120 r._objs = reraiser_thread.ReraiserThreadGroup( 121 [reraiser_thread.ReraiserThread( 122 o, args=args, kwargs=kwargs, 123 name='%s.%s' % (str(d), o.__name__)) 124 for d, o in zip(self._orig_objs, self._objs)]) 125 r._objs.StartAll() # pylint: disable=W0212 126 return r 127 128 def pFinish(self, timeout): 129 """Finish any outstanding asynchronous operations. 130 131 Args: 132 timeout: The maximum number of seconds to wait for an individual 133 result to return, or None to wait forever. 134 Returns: 135 self, now emulating the return values. 136 """ 137 self._assertNoShadow('pFinish') 138 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 139 self._objs.JoinAll() 140 self._objs = self._objs.GetAllReturnValues( 141 watchdog_timer.WatchdogTimer(timeout)) 142 return self 143 144 def pGet(self, timeout): 145 """Get the current wrapped objects. 146 147 Args: 148 timeout: Same as |pFinish|. 149 Returns: 150 A list of the results, in order of the provided devices. 151 Raises: 152 Any exception raised by any of the called functions. 153 """ 154 self._assertNoShadow('pGet') 155 self.pFinish(timeout) 156 return self._objs 157 158 def pMap(self, f, *args, **kwargs): 159 """Map a function across the current wrapped objects in parallel. 160 161 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 162 163 Note that this call is asynchronous. Call pFinish on the return value to 164 block until the call finishes. 165 166 Args: 167 f: The function to call. 168 args: The positional args to pass to f. 169 kwargs: The keyword args to pass to f. 170 Returns: 171 A Parallelizer wrapping the ReraiserThreadGroup running the map in 172 parallel. 173 """ 174 self._assertNoShadow('pMap') 175 r = type(self)(self._orig_objs) 176 r._objs = reraiser_thread.ReraiserThreadGroup( 177 [reraiser_thread.ReraiserThread( 178 f, args=tuple([o] + list(args)), kwargs=kwargs, 179 name='%s(%s)' % (f.__name__, d)) 180 for d, o in zip(self._orig_objs, self._objs)]) 181 r._objs.StartAll() # pylint: disable=W0212 182 return r 183 184 def _assertNoShadow(self, attr_name): 185 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. 186 187 If the wrapped objects _do_ have an |attr_name| attribute, it will be 188 inaccessible to clients. 189 190 Args: 191 attr_name: The attribute to check. 192 Raises: 193 AssertionError if the wrapped objects have an attribute named 'attr_name' 194 or '_assertNoShadow'. 195 """ 196 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 197 assert not hasattr(self._objs, '_assertNoShadow') 198 assert not hasattr(self._objs, attr_name) 199 else: 200 assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) 201 assert not any(hasattr(o, attr_name) for o in self._objs) 202 203 204class SyncParallelizer(Parallelizer): 205 """A Parallelizer that blocks on function calls.""" 206 207 # override 208 def __call__(self, *args, **kwargs): 209 """Emulate calling |self| with |args| and |kwargs|. 210 211 Note that this call is synchronous. 212 213 Returns: 214 A Parallelizer emulating the value returned from calling |self| with 215 |args| and |kwargs|. 216 Raises: 217 AttributeError if the wrapped objects aren't callable. 218 """ 219 r = super(SyncParallelizer, self).__call__(*args, **kwargs) 220 r.pFinish(None) 221 return r 222 223 # override 224 def pMap(self, f, *args, **kwargs): 225 """Map a function across the current wrapped objects in parallel. 226 227 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 228 229 Note that this call is synchronous. 230 231 Args: 232 f: The function to call. 233 args: The positional args to pass to f. 234 kwargs: The keyword args to pass to f. 235 Returns: 236 A Parallelizer wrapping the ReraiserThreadGroup running the map in 237 parallel. 238 """ 239 r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) 240 r.pFinish(None) 241 return r 242 243