1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" Wrapper that allows method execution in parallel. 6 7This class wraps a list of objects of the same type, emulates their 8interface, and executes any functions called on the objects in parallel 9in ReraiserThreads. 10 11This means that, given a list of objects: 12 13 class Foo: 14 def __init__(self): 15 self.baz = Baz() 16 17 def bar(self, my_param): 18 // do something 19 20 list_of_foos = [Foo(1), Foo(2), Foo(3)] 21 22we can take a sequential operation on that list of objects: 23 24 for f in list_of_foos: 25 f.bar('Hello') 26 27and run it in parallel across all of the objects: 28 29 Parallelizer(list_of_foos).bar('Hello') 30 31It can also handle (non-method) attributes of objects, so that this: 32 33 for f in list_of_foos: 34 f.baz.myBazMethod() 35 36can be run in parallel with: 37 38 Parallelizer(list_of_foos).baz.myBazMethod() 39 40Because it emulates the interface of the wrapped objects, a Parallelizer 41can be passed to a method or function that takes objects of that type: 42 43 def DoesSomethingWithFoo(the_foo): 44 the_foo.bar('Hello') 45 the_foo.bar('world') 46 the_foo.baz.myBazMethod 47 48 DoesSomethingWithFoo(Parallelizer(list_of_foos)) 49 50Note that this class spins up a thread for each object. Using this class 51to parallelize operations that are already fast will incur a net performance 52penalty. 53 54""" 55# pylint: disable=protected-access 56 57from devil.utils import reraiser_thread 58from devil.utils import watchdog_timer 59 60_DEFAULT_TIMEOUT = 30 61_DEFAULT_RETRIES = 3 62 63 64class Parallelizer(object): 65 """Allows parallel execution of method calls across a group of objects.""" 66 67 def __init__(self, objs): 68 self._orig_objs = objs 69 self._objs = objs 70 71 def __getattr__(self, name): 72 """Emulate getting the |name| attribute of |self|. 73 74 Args: 75 name: The name of the attribute to retrieve. 76 Returns: 77 A Parallelizer emulating the |name| attribute of |self|. 78 """ 79 self.pGet(None) 80 81 r = type(self)(self._orig_objs) 82 r._objs = [getattr(o, name) for o in self._objs] 83 return r 84 85 def __getitem__(self, index): 86 """Emulate getting the value of |self| at |index|. 87 88 Returns: 89 A Parallelizer emulating the value of |self| at |index|. 90 """ 91 self.pGet(None) 92 93 r = type(self)(self._orig_objs) 94 r._objs = [o[index] for o in self._objs] 95 return r 96 97 def __call__(self, *args, **kwargs): 98 """Emulate calling |self| with |args| and |kwargs|. 99 100 Note that this call is asynchronous. Call pFinish on the return value to 101 block until the call finishes. 102 103 Returns: 104 A Parallelizer wrapping the ReraiserThreadGroup running the call in 105 parallel. 106 Raises: 107 AttributeError if the wrapped objects aren't callable. 108 """ 109 self.pGet(None) 110 111 for o in self._objs: 112 if not callable(o): 113 raise AttributeError("'%s' is not callable" % o.__name__) 114 115 r = type(self)(self._orig_objs) 116 r._objs = reraiser_thread.ReraiserThreadGroup( 117 [reraiser_thread.ReraiserThread( 118 o, args=args, kwargs=kwargs, 119 name='%s.%s' % (str(d), o.__name__)) 120 for d, o in zip(self._orig_objs, self._objs)]) 121 r._objs.StartAll() 122 return r 123 124 def pFinish(self, timeout): 125 """Finish any outstanding asynchronous operations. 126 127 Args: 128 timeout: The maximum number of seconds to wait for an individual 129 result to return, or None to wait forever. 130 Returns: 131 self, now emulating the return values. 132 """ 133 self._assertNoShadow('pFinish') 134 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 135 self._objs.JoinAll() 136 self._objs = self._objs.GetAllReturnValues( 137 watchdog_timer.WatchdogTimer(timeout)) 138 return self 139 140 def pGet(self, timeout): 141 """Get the current wrapped objects. 142 143 Args: 144 timeout: Same as |pFinish|. 145 Returns: 146 A list of the results, in order of the provided devices. 147 Raises: 148 Any exception raised by any of the called functions. 149 """ 150 self._assertNoShadow('pGet') 151 self.pFinish(timeout) 152 return self._objs 153 154 def pMap(self, f, *args, **kwargs): 155 """Map a function across the current wrapped objects in parallel. 156 157 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 158 159 Note that this call is asynchronous. Call pFinish on the return value to 160 block until the call finishes. 161 162 Args: 163 f: The function to call. 164 args: The positional args to pass to f. 165 kwargs: The keyword args to pass to f. 166 Returns: 167 A Parallelizer wrapping the ReraiserThreadGroup running the map in 168 parallel. 169 """ 170 self._assertNoShadow('pMap') 171 r = type(self)(self._orig_objs) 172 r._objs = reraiser_thread.ReraiserThreadGroup( 173 [reraiser_thread.ReraiserThread( 174 f, args=tuple([o] + list(args)), kwargs=kwargs, 175 name='%s(%s)' % (f.__name__, d)) 176 for d, o in zip(self._orig_objs, self._objs)]) 177 r._objs.StartAll() 178 return r 179 180 def _assertNoShadow(self, attr_name): 181 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. 182 183 If the wrapped objects _do_ have an |attr_name| attribute, it will be 184 inaccessible to clients. 185 186 Args: 187 attr_name: The attribute to check. 188 Raises: 189 AssertionError if the wrapped objects have an attribute named 'attr_name' 190 or '_assertNoShadow'. 191 """ 192 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 193 assert not hasattr(self._objs, '_assertNoShadow') 194 assert not hasattr(self._objs, attr_name) 195 else: 196 assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) 197 assert not any(hasattr(o, attr_name) for o in self._objs) 198 199 200class SyncParallelizer(Parallelizer): 201 """A Parallelizer that blocks on function calls.""" 202 203 def __enter__(self): 204 """Emulate entering the context of |self|. 205 206 Note that this call is synchronous. 207 208 Returns: 209 A Parallelizer emulating the value returned from entering into the 210 context of |self|. 211 """ 212 r = type(self)(self._orig_objs) 213 r._objs = [o.__enter__ for o in r._objs] 214 return r.__call__() 215 216 def __exit__(self, exc_type, exc_val, exc_tb): 217 """Emulate exiting the context of |self|. 218 219 Note that this call is synchronous. 220 221 Args: 222 exc_type: the exception type. 223 exc_val: the exception value. 224 exc_tb: the exception traceback. 225 """ 226 r = type(self)(self._orig_objs) 227 r._objs = [o.__exit__ for o in r._objs] 228 r.__call__(exc_type, exc_val, exc_tb) 229 230 # override 231 def __call__(self, *args, **kwargs): 232 """Emulate calling |self| with |args| and |kwargs|. 233 234 Note that this call is synchronous. 235 236 Returns: 237 A Parallelizer emulating the value returned from calling |self| with 238 |args| and |kwargs|. 239 Raises: 240 AttributeError if the wrapped objects aren't callable. 241 """ 242 r = super(SyncParallelizer, self).__call__(*args, **kwargs) 243 r.pFinish(None) 244 return r 245 246 # override 247 def pMap(self, f, *args, **kwargs): 248 """Map a function across the current wrapped objects in parallel. 249 250 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 251 252 Note that this call is synchronous. 253 254 Args: 255 f: The function to call. 256 args: The positional args to pass to f. 257 kwargs: The keyword args to pass to f. 258 Returns: 259 A Parallelizer wrapping the ReraiserThreadGroup running the map in 260 parallel. 261 """ 262 r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) 263 r.pFinish(None) 264 return r 265 266