1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4""" Wrapper that allows method execution in parallel. 5 6This class wraps a list of objects of the same type, emulates their 7interface, and executes any functions called on the objects in parallel 8in ReraiserThreads. 9 10This means that, given a list of objects: 11 12 class Foo: 13 def __init__(self): 14 self.baz = Baz() 15 16 def bar(self, my_param): 17 // do something 18 19 list_of_foos = [Foo(1), Foo(2), Foo(3)] 20 21we can take a sequential operation on that list of objects: 22 23 for f in list_of_foos: 24 f.bar('Hello') 25 26and run it in parallel across all of the objects: 27 28 Parallelizer(list_of_foos).bar('Hello') 29 30It can also handle (non-method) attributes of objects, so that this: 31 32 for f in list_of_foos: 33 f.baz.myBazMethod() 34 35can be run in parallel with: 36 37 Parallelizer(list_of_foos).baz.myBazMethod() 38 39Because it emulates the interface of the wrapped objects, a Parallelizer 40can be passed to a method or function that takes objects of that type: 41 42 def DoesSomethingWithFoo(the_foo): 43 the_foo.bar('Hello') 44 the_foo.bar('world') 45 the_foo.baz.myBazMethod 46 47 DoesSomethingWithFoo(Parallelizer(list_of_foos)) 48 49Note that this class spins up a thread for each object. Using this class 50to parallelize operations that are already fast will incur a net performance 51penalty. 52 53""" 54# pylint: disable=protected-access 55 56from devil.utils import reraiser_thread 57from devil.utils import watchdog_timer 58 59_DEFAULT_TIMEOUT = 30 60_DEFAULT_RETRIES = 3 61 62 63class Parallelizer(object): 64 """Allows parallel execution of method calls across a group of objects.""" 65 66 def __init__(self, objs): 67 self._orig_objs = objs 68 self._objs = objs 69 70 def __getattr__(self, name): 71 """Emulate getting the |name| attribute of |self|. 72 73 Args: 74 name: The name of the attribute to retrieve. 75 Returns: 76 A Parallelizer emulating the |name| attribute of |self|. 77 """ 78 self.pGet(None) 79 80 r = type(self)(self._orig_objs) 81 r._objs = [getattr(o, name) for o in self._objs] 82 return r 83 84 def __getitem__(self, index): 85 """Emulate getting the value of |self| at |index|. 86 87 Returns: 88 A Parallelizer emulating the value of |self| at |index|. 89 """ 90 self.pGet(None) 91 92 r = type(self)(self._orig_objs) 93 r._objs = [o[index] for o in self._objs] 94 return r 95 96 def __call__(self, *args, **kwargs): 97 """Emulate calling |self| with |args| and |kwargs|. 98 99 Note that this call is asynchronous. Call pFinish on the return value to 100 block until the call finishes. 101 102 Returns: 103 A Parallelizer wrapping the ReraiserThreadGroup running the call in 104 parallel. 105 Raises: 106 AttributeError if the wrapped objects aren't callable. 107 """ 108 self.pGet(None) 109 110 for o in self._objs: 111 if not callable(o): 112 raise AttributeError("'%s' is not callable" % o.__name__) 113 114 r = type(self)(self._orig_objs) 115 r._objs = reraiser_thread.ReraiserThreadGroup([ 116 reraiser_thread.ReraiserThread( 117 o, args=args, kwargs=kwargs, name='%s.%s' % (str(d), o.__name__)) 118 for d, o in zip(self._orig_objs, self._objs) 119 ]) 120 r._objs.StartAll() 121 return r 122 123 def pFinish(self, timeout): 124 """Finish any outstanding asynchronous operations. 125 126 Args: 127 timeout: The maximum number of seconds to wait for an individual 128 result to return, or None to wait forever. 129 Returns: 130 self, now emulating the return values. 131 """ 132 self._assertNoShadow('pFinish') 133 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 134 self._objs.JoinAll() 135 self._objs = self._objs.GetAllReturnValues( 136 watchdog_timer.WatchdogTimer(timeout)) 137 return self 138 139 def pGet(self, timeout): 140 """Get the current wrapped objects. 141 142 Args: 143 timeout: Same as |pFinish|. 144 Returns: 145 A list of the results, in order of the provided devices. 146 Raises: 147 Any exception raised by any of the called functions. 148 """ 149 self._assertNoShadow('pGet') 150 self.pFinish(timeout) 151 return self._objs 152 153 def pMap(self, f, *args, **kwargs): 154 """Map a function across the current wrapped objects in parallel. 155 156 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 157 158 Note that this call is asynchronous. Call pFinish on the return value to 159 block until the call finishes. 160 161 Args: 162 f: The function to call. 163 args: The positional args to pass to f. 164 kwargs: The keyword args to pass to f. 165 Returns: 166 A Parallelizer wrapping the ReraiserThreadGroup running the map in 167 parallel. 168 """ 169 self._assertNoShadow('pMap') 170 r = type(self)(self._orig_objs) 171 r._objs = reraiser_thread.ReraiserThreadGroup([ 172 reraiser_thread.ReraiserThread( 173 f, 174 args=tuple([o] + list(args)), 175 kwargs=kwargs, 176 name='%s(%s)' % (f.__name__, d)) 177 for d, o in zip(self._orig_objs, self._objs) 178 ]) 179 r._objs.StartAll() 180 return r 181 182 def _assertNoShadow(self, attr_name): 183 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. 184 185 If the wrapped objects _do_ have an |attr_name| attribute, it will be 186 inaccessible to clients. 187 188 Args: 189 attr_name: The attribute to check. 190 Raises: 191 AssertionError if the wrapped objects have an attribute named 'attr_name' 192 or '_assertNoShadow'. 193 """ 194 if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): 195 assert not hasattr(self._objs, '_assertNoShadow') 196 assert not hasattr(self._objs, attr_name) 197 else: 198 assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) 199 assert not any(hasattr(o, attr_name) for o in self._objs) 200 201 202class SyncParallelizer(Parallelizer): 203 """A Parallelizer that blocks on function calls.""" 204 205 def __enter__(self): 206 """Emulate entering the context of |self|. 207 208 Note that this call is synchronous. 209 210 Returns: 211 A Parallelizer emulating the value returned from entering into the 212 context of |self|. 213 """ 214 r = type(self)(self._orig_objs) 215 r._objs = [o.__enter__ for o in r._objs] 216 return r.__call__() 217 218 def __exit__(self, exc_type, exc_val, exc_tb): 219 """Emulate exiting the context of |self|. 220 221 Note that this call is synchronous. 222 223 Args: 224 exc_type: the exception type. 225 exc_val: the exception value. 226 exc_tb: the exception traceback. 227 """ 228 r = type(self)(self._orig_objs) 229 r._objs = [o.__exit__ for o in r._objs] 230 r.__call__(exc_type, exc_val, exc_tb) 231 232 # override 233 def __call__(self, *args, **kwargs): 234 """Emulate calling |self| with |args| and |kwargs|. 235 236 Note that this call is synchronous. 237 238 Returns: 239 A Parallelizer emulating the value returned from calling |self| with 240 |args| and |kwargs|. 241 Raises: 242 AttributeError if the wrapped objects aren't callable. 243 """ 244 r = super(SyncParallelizer, self).__call__(*args, **kwargs) 245 r.pFinish(None) 246 return r 247 248 # override 249 def pMap(self, f, *args, **kwargs): 250 """Map a function across the current wrapped objects in parallel. 251 252 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. 253 254 Note that this call is synchronous. 255 256 Args: 257 f: The function to call. 258 args: The positional args to pass to f. 259 kwargs: The keyword args to pass to f. 260 Returns: 261 A Parallelizer wrapping the ReraiserThreadGroup running the map in 262 parallel. 263 """ 264 r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) 265 r.pFinish(None) 266 return r 267