1import functools 2import time 3import inspect 4import collections 5import types 6import itertools 7 8import pkg_resources.extern.more_itertools 9 10from typing import Callable, TypeVar 11 12 13CallableT = TypeVar("CallableT", bound=Callable[..., object]) 14 15 16def compose(*funcs): 17 """ 18 Compose any number of unary functions into a single unary function. 19 20 >>> import textwrap 21 >>> expected = str.strip(textwrap.dedent(compose.__doc__)) 22 >>> strip_and_dedent = compose(str.strip, textwrap.dedent) 23 >>> strip_and_dedent(compose.__doc__) == expected 24 True 25 26 Compose also allows the innermost function to take arbitrary arguments. 27 28 >>> round_three = lambda x: round(x, ndigits=3) 29 >>> f = compose(round_three, int.__truediv__) 30 >>> [f(3*x, x+1) for x in range(1,10)] 31 [1.5, 2.0, 2.25, 2.4, 2.5, 2.571, 2.625, 2.667, 2.7] 32 """ 33 34 def compose_two(f1, f2): 35 return lambda *args, **kwargs: f1(f2(*args, **kwargs)) 36 37 return functools.reduce(compose_two, funcs) 38 39 40def method_caller(method_name, *args, **kwargs): 41 """ 42 Return a function that will call a named method on the 43 target object with optional positional and keyword 44 arguments. 45 46 >>> lower = method_caller('lower') 47 >>> lower('MyString') 48 'mystring' 49 """ 50 51 def call_method(target): 52 func = getattr(target, method_name) 53 return func(*args, **kwargs) 54 55 return call_method 56 57 58def once(func): 59 """ 60 Decorate func so it's only ever called the first time. 61 62 This decorator can ensure that an expensive or non-idempotent function 63 will not be expensive on subsequent calls and is idempotent. 64 65 >>> add_three = once(lambda a: a+3) 66 >>> add_three(3) 67 6 68 >>> add_three(9) 69 6 70 >>> add_three('12') 71 6 72 73 To reset the stored value, simply clear the property ``saved_result``. 74 75 >>> del add_three.saved_result 76 >>> add_three(9) 77 12 78 >>> add_three(8) 79 12 80 81 Or invoke 'reset()' on it. 82 83 >>> add_three.reset() 84 >>> add_three(-3) 85 0 86 >>> add_three(0) 87 0 88 """ 89 90 @functools.wraps(func) 91 def wrapper(*args, **kwargs): 92 if not hasattr(wrapper, 'saved_result'): 93 wrapper.saved_result = func(*args, **kwargs) 94 return wrapper.saved_result 95 96 wrapper.reset = lambda: vars(wrapper).__delitem__('saved_result') 97 return wrapper 98 99 100def method_cache( 101 method: CallableT, 102 cache_wrapper: Callable[ 103 [CallableT], CallableT 104 ] = functools.lru_cache(), # type: ignore[assignment] 105) -> CallableT: 106 """ 107 Wrap lru_cache to support storing the cache data in the object instances. 108 109 Abstracts the common paradigm where the method explicitly saves an 110 underscore-prefixed protected property on first call and returns that 111 subsequently. 112 113 >>> class MyClass: 114 ... calls = 0 115 ... 116 ... @method_cache 117 ... def method(self, value): 118 ... self.calls += 1 119 ... return value 120 121 >>> a = MyClass() 122 >>> a.method(3) 123 3 124 >>> for x in range(75): 125 ... res = a.method(x) 126 >>> a.calls 127 75 128 129 Note that the apparent behavior will be exactly like that of lru_cache 130 except that the cache is stored on each instance, so values in one 131 instance will not flush values from another, and when an instance is 132 deleted, so are the cached values for that instance. 133 134 >>> b = MyClass() 135 >>> for x in range(35): 136 ... res = b.method(x) 137 >>> b.calls 138 35 139 >>> a.method(0) 140 0 141 >>> a.calls 142 75 143 144 Note that if method had been decorated with ``functools.lru_cache()``, 145 a.calls would have been 76 (due to the cached value of 0 having been 146 flushed by the 'b' instance). 147 148 Clear the cache with ``.cache_clear()`` 149 150 >>> a.method.cache_clear() 151 152 Same for a method that hasn't yet been called. 153 154 >>> c = MyClass() 155 >>> c.method.cache_clear() 156 157 Another cache wrapper may be supplied: 158 159 >>> cache = functools.lru_cache(maxsize=2) 160 >>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache) 161 >>> a = MyClass() 162 >>> a.method2() 163 3 164 165 Caution - do not subsequently wrap the method with another decorator, such 166 as ``@property``, which changes the semantics of the function. 167 168 See also 169 http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/ 170 for another implementation and additional justification. 171 """ 172 173 def wrapper(self: object, *args: object, **kwargs: object) -> object: 174 # it's the first call, replace the method with a cached, bound method 175 bound_method: CallableT = types.MethodType( # type: ignore[assignment] 176 method, self 177 ) 178 cached_method = cache_wrapper(bound_method) 179 setattr(self, method.__name__, cached_method) 180 return cached_method(*args, **kwargs) 181 182 # Support cache clear even before cache has been created. 183 wrapper.cache_clear = lambda: None # type: ignore[attr-defined] 184 185 return ( # type: ignore[return-value] 186 _special_method_cache(method, cache_wrapper) or wrapper 187 ) 188 189 190def _special_method_cache(method, cache_wrapper): 191 """ 192 Because Python treats special methods differently, it's not 193 possible to use instance attributes to implement the cached 194 methods. 195 196 Instead, install the wrapper method under a different name 197 and return a simple proxy to that wrapper. 198 199 https://github.com/jaraco/jaraco.functools/issues/5 200 """ 201 name = method.__name__ 202 special_names = '__getattr__', '__getitem__' 203 if name not in special_names: 204 return 205 206 wrapper_name = '__cached' + name 207 208 def proxy(self, *args, **kwargs): 209 if wrapper_name not in vars(self): 210 bound = types.MethodType(method, self) 211 cache = cache_wrapper(bound) 212 setattr(self, wrapper_name, cache) 213 else: 214 cache = getattr(self, wrapper_name) 215 return cache(*args, **kwargs) 216 217 return proxy 218 219 220def apply(transform): 221 """ 222 Decorate a function with a transform function that is 223 invoked on results returned from the decorated function. 224 225 >>> @apply(reversed) 226 ... def get_numbers(start): 227 ... "doc for get_numbers" 228 ... return range(start, start+3) 229 >>> list(get_numbers(4)) 230 [6, 5, 4] 231 >>> get_numbers.__doc__ 232 'doc for get_numbers' 233 """ 234 235 def wrap(func): 236 return functools.wraps(func)(compose(transform, func)) 237 238 return wrap 239 240 241def result_invoke(action): 242 r""" 243 Decorate a function with an action function that is 244 invoked on the results returned from the decorated 245 function (for its side-effect), then return the original 246 result. 247 248 >>> @result_invoke(print) 249 ... def add_two(a, b): 250 ... return a + b 251 >>> x = add_two(2, 3) 252 5 253 >>> x 254 5 255 """ 256 257 def wrap(func): 258 @functools.wraps(func) 259 def wrapper(*args, **kwargs): 260 result = func(*args, **kwargs) 261 action(result) 262 return result 263 264 return wrapper 265 266 return wrap 267 268 269def call_aside(f, *args, **kwargs): 270 """ 271 Call a function for its side effect after initialization. 272 273 >>> @call_aside 274 ... def func(): print("called") 275 called 276 >>> func() 277 called 278 279 Use functools.partial to pass parameters to the initial call 280 281 >>> @functools.partial(call_aside, name='bingo') 282 ... def func(name): print("called with", name) 283 called with bingo 284 """ 285 f(*args, **kwargs) 286 return f 287 288 289class Throttler: 290 """ 291 Rate-limit a function (or other callable) 292 """ 293 294 def __init__(self, func, max_rate=float('Inf')): 295 if isinstance(func, Throttler): 296 func = func.func 297 self.func = func 298 self.max_rate = max_rate 299 self.reset() 300 301 def reset(self): 302 self.last_called = 0 303 304 def __call__(self, *args, **kwargs): 305 self._wait() 306 return self.func(*args, **kwargs) 307 308 def _wait(self): 309 "ensure at least 1/max_rate seconds from last call" 310 elapsed = time.time() - self.last_called 311 must_wait = 1 / self.max_rate - elapsed 312 time.sleep(max(0, must_wait)) 313 self.last_called = time.time() 314 315 def __get__(self, obj, type=None): 316 return first_invoke(self._wait, functools.partial(self.func, obj)) 317 318 319def first_invoke(func1, func2): 320 """ 321 Return a function that when invoked will invoke func1 without 322 any parameters (for its side-effect) and then invoke func2 323 with whatever parameters were passed, returning its result. 324 """ 325 326 def wrapper(*args, **kwargs): 327 func1() 328 return func2(*args, **kwargs) 329 330 return wrapper 331 332 333def retry_call(func, cleanup=lambda: None, retries=0, trap=()): 334 """ 335 Given a callable func, trap the indicated exceptions 336 for up to 'retries' times, invoking cleanup on the 337 exception. On the final attempt, allow any exceptions 338 to propagate. 339 """ 340 attempts = itertools.count() if retries == float('inf') else range(retries) 341 for attempt in attempts: 342 try: 343 return func() 344 except trap: 345 cleanup() 346 347 return func() 348 349 350def retry(*r_args, **r_kwargs): 351 """ 352 Decorator wrapper for retry_call. Accepts arguments to retry_call 353 except func and then returns a decorator for the decorated function. 354 355 Ex: 356 357 >>> @retry(retries=3) 358 ... def my_func(a, b): 359 ... "this is my funk" 360 ... print(a, b) 361 >>> my_func.__doc__ 362 'this is my funk' 363 """ 364 365 def decorate(func): 366 @functools.wraps(func) 367 def wrapper(*f_args, **f_kwargs): 368 bound = functools.partial(func, *f_args, **f_kwargs) 369 return retry_call(bound, *r_args, **r_kwargs) 370 371 return wrapper 372 373 return decorate 374 375 376def print_yielded(func): 377 """ 378 Convert a generator into a function that prints all yielded elements 379 380 >>> @print_yielded 381 ... def x(): 382 ... yield 3; yield None 383 >>> x() 384 3 385 None 386 """ 387 print_all = functools.partial(map, print) 388 print_results = compose(more_itertools.consume, print_all, func) 389 return functools.wraps(func)(print_results) 390 391 392def pass_none(func): 393 """ 394 Wrap func so it's not called if its first param is None 395 396 >>> print_text = pass_none(print) 397 >>> print_text('text') 398 text 399 >>> print_text(None) 400 """ 401 402 @functools.wraps(func) 403 def wrapper(param, *args, **kwargs): 404 if param is not None: 405 return func(param, *args, **kwargs) 406 407 return wrapper 408 409 410def assign_params(func, namespace): 411 """ 412 Assign parameters from namespace where func solicits. 413 414 >>> def func(x, y=3): 415 ... print(x, y) 416 >>> assigned = assign_params(func, dict(x=2, z=4)) 417 >>> assigned() 418 2 3 419 420 The usual errors are raised if a function doesn't receive 421 its required parameters: 422 423 >>> assigned = assign_params(func, dict(y=3, z=4)) 424 >>> assigned() 425 Traceback (most recent call last): 426 TypeError: func() ...argument... 427 428 It even works on methods: 429 430 >>> class Handler: 431 ... def meth(self, arg): 432 ... print(arg) 433 >>> assign_params(Handler().meth, dict(arg='crystal', foo='clear'))() 434 crystal 435 """ 436 sig = inspect.signature(func) 437 params = sig.parameters.keys() 438 call_ns = {k: namespace[k] for k in params if k in namespace} 439 return functools.partial(func, **call_ns) 440 441 442def save_method_args(method): 443 """ 444 Wrap a method such that when it is called, the args and kwargs are 445 saved on the method. 446 447 >>> class MyClass: 448 ... @save_method_args 449 ... def method(self, a, b): 450 ... print(a, b) 451 >>> my_ob = MyClass() 452 >>> my_ob.method(1, 2) 453 1 2 454 >>> my_ob._saved_method.args 455 (1, 2) 456 >>> my_ob._saved_method.kwargs 457 {} 458 >>> my_ob.method(a=3, b='foo') 459 3 foo 460 >>> my_ob._saved_method.args 461 () 462 >>> my_ob._saved_method.kwargs == dict(a=3, b='foo') 463 True 464 465 The arguments are stored on the instance, allowing for 466 different instance to save different args. 467 468 >>> your_ob = MyClass() 469 >>> your_ob.method({str('x'): 3}, b=[4]) 470 {'x': 3} [4] 471 >>> your_ob._saved_method.args 472 ({'x': 3},) 473 >>> my_ob._saved_method.args 474 () 475 """ 476 args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs') 477 478 @functools.wraps(method) 479 def wrapper(self, *args, **kwargs): 480 attr_name = '_saved_' + method.__name__ 481 attr = args_and_kwargs(args, kwargs) 482 setattr(self, attr_name, attr) 483 return method(self, *args, **kwargs) 484 485 return wrapper 486 487 488def except_(*exceptions, replace=None, use=None): 489 """ 490 Replace the indicated exceptions, if raised, with the indicated 491 literal replacement or evaluated expression (if present). 492 493 >>> safe_int = except_(ValueError)(int) 494 >>> safe_int('five') 495 >>> safe_int('5') 496 5 497 498 Specify a literal replacement with ``replace``. 499 500 >>> safe_int_r = except_(ValueError, replace=0)(int) 501 >>> safe_int_r('five') 502 0 503 504 Provide an expression to ``use`` to pass through particular parameters. 505 506 >>> safe_int_pt = except_(ValueError, use='args[0]')(int) 507 >>> safe_int_pt('five') 508 'five' 509 510 """ 511 512 def decorate(func): 513 @functools.wraps(func) 514 def wrapper(*args, **kwargs): 515 try: 516 return func(*args, **kwargs) 517 except exceptions: 518 try: 519 return eval(use) 520 except TypeError: 521 return replace 522 523 return wrapper 524 525 return decorate 526