• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import functools
2import time
3import inspect
4import collections
5import types
6import itertools
7
8import pkg_resources.extern.more_itertools
9
10from typing import Callable, TypeVar
11
12
13CallableT = TypeVar("CallableT", bound=Callable[..., object])
14
15
16def compose(*funcs):
17    """
18    Compose any number of unary functions into a single unary function.
19
20    >>> import textwrap
21    >>> expected = str.strip(textwrap.dedent(compose.__doc__))
22    >>> strip_and_dedent = compose(str.strip, textwrap.dedent)
23    >>> strip_and_dedent(compose.__doc__) == expected
24    True
25
26    Compose also allows the innermost function to take arbitrary arguments.
27
28    >>> round_three = lambda x: round(x, ndigits=3)
29    >>> f = compose(round_three, int.__truediv__)
30    >>> [f(3*x, x+1) for x in range(1,10)]
31    [1.5, 2.0, 2.25, 2.4, 2.5, 2.571, 2.625, 2.667, 2.7]
32    """
33
34    def compose_two(f1, f2):
35        return lambda *args, **kwargs: f1(f2(*args, **kwargs))
36
37    return functools.reduce(compose_two, funcs)
38
39
40def method_caller(method_name, *args, **kwargs):
41    """
42    Return a function that will call a named method on the
43    target object with optional positional and keyword
44    arguments.
45
46    >>> lower = method_caller('lower')
47    >>> lower('MyString')
48    'mystring'
49    """
50
51    def call_method(target):
52        func = getattr(target, method_name)
53        return func(*args, **kwargs)
54
55    return call_method
56
57
58def once(func):
59    """
60    Decorate func so it's only ever called the first time.
61
62    This decorator can ensure that an expensive or non-idempotent function
63    will not be expensive on subsequent calls and is idempotent.
64
65    >>> add_three = once(lambda a: a+3)
66    >>> add_three(3)
67    6
68    >>> add_three(9)
69    6
70    >>> add_three('12')
71    6
72
73    To reset the stored value, simply clear the property ``saved_result``.
74
75    >>> del add_three.saved_result
76    >>> add_three(9)
77    12
78    >>> add_three(8)
79    12
80
81    Or invoke 'reset()' on it.
82
83    >>> add_three.reset()
84    >>> add_three(-3)
85    0
86    >>> add_three(0)
87    0
88    """
89
90    @functools.wraps(func)
91    def wrapper(*args, **kwargs):
92        if not hasattr(wrapper, 'saved_result'):
93            wrapper.saved_result = func(*args, **kwargs)
94        return wrapper.saved_result
95
96    wrapper.reset = lambda: vars(wrapper).__delitem__('saved_result')
97    return wrapper
98
99
100def method_cache(
101    method: CallableT,
102    cache_wrapper: Callable[
103        [CallableT], CallableT
104    ] = functools.lru_cache(),  # type: ignore[assignment]
105) -> CallableT:
106    """
107    Wrap lru_cache to support storing the cache data in the object instances.
108
109    Abstracts the common paradigm where the method explicitly saves an
110    underscore-prefixed protected property on first call and returns that
111    subsequently.
112
113    >>> class MyClass:
114    ...     calls = 0
115    ...
116    ...     @method_cache
117    ...     def method(self, value):
118    ...         self.calls += 1
119    ...         return value
120
121    >>> a = MyClass()
122    >>> a.method(3)
123    3
124    >>> for x in range(75):
125    ...     res = a.method(x)
126    >>> a.calls
127    75
128
129    Note that the apparent behavior will be exactly like that of lru_cache
130    except that the cache is stored on each instance, so values in one
131    instance will not flush values from another, and when an instance is
132    deleted, so are the cached values for that instance.
133
134    >>> b = MyClass()
135    >>> for x in range(35):
136    ...     res = b.method(x)
137    >>> b.calls
138    35
139    >>> a.method(0)
140    0
141    >>> a.calls
142    75
143
144    Note that if method had been decorated with ``functools.lru_cache()``,
145    a.calls would have been 76 (due to the cached value of 0 having been
146    flushed by the 'b' instance).
147
148    Clear the cache with ``.cache_clear()``
149
150    >>> a.method.cache_clear()
151
152    Same for a method that hasn't yet been called.
153
154    >>> c = MyClass()
155    >>> c.method.cache_clear()
156
157    Another cache wrapper may be supplied:
158
159    >>> cache = functools.lru_cache(maxsize=2)
160    >>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache)
161    >>> a = MyClass()
162    >>> a.method2()
163    3
164
165    Caution - do not subsequently wrap the method with another decorator, such
166    as ``@property``, which changes the semantics of the function.
167
168    See also
169    http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/
170    for another implementation and additional justification.
171    """
172
173    def wrapper(self: object, *args: object, **kwargs: object) -> object:
174        # it's the first call, replace the method with a cached, bound method
175        bound_method: CallableT = types.MethodType(  # type: ignore[assignment]
176            method, self
177        )
178        cached_method = cache_wrapper(bound_method)
179        setattr(self, method.__name__, cached_method)
180        return cached_method(*args, **kwargs)
181
182    # Support cache clear even before cache has been created.
183    wrapper.cache_clear = lambda: None  # type: ignore[attr-defined]
184
185    return (  # type: ignore[return-value]
186        _special_method_cache(method, cache_wrapper) or wrapper
187    )
188
189
190def _special_method_cache(method, cache_wrapper):
191    """
192    Because Python treats special methods differently, it's not
193    possible to use instance attributes to implement the cached
194    methods.
195
196    Instead, install the wrapper method under a different name
197    and return a simple proxy to that wrapper.
198
199    https://github.com/jaraco/jaraco.functools/issues/5
200    """
201    name = method.__name__
202    special_names = '__getattr__', '__getitem__'
203    if name not in special_names:
204        return
205
206    wrapper_name = '__cached' + name
207
208    def proxy(self, *args, **kwargs):
209        if wrapper_name not in vars(self):
210            bound = types.MethodType(method, self)
211            cache = cache_wrapper(bound)
212            setattr(self, wrapper_name, cache)
213        else:
214            cache = getattr(self, wrapper_name)
215        return cache(*args, **kwargs)
216
217    return proxy
218
219
220def apply(transform):
221    """
222    Decorate a function with a transform function that is
223    invoked on results returned from the decorated function.
224
225    >>> @apply(reversed)
226    ... def get_numbers(start):
227    ...     "doc for get_numbers"
228    ...     return range(start, start+3)
229    >>> list(get_numbers(4))
230    [6, 5, 4]
231    >>> get_numbers.__doc__
232    'doc for get_numbers'
233    """
234
235    def wrap(func):
236        return functools.wraps(func)(compose(transform, func))
237
238    return wrap
239
240
241def result_invoke(action):
242    r"""
243    Decorate a function with an action function that is
244    invoked on the results returned from the decorated
245    function (for its side-effect), then return the original
246    result.
247
248    >>> @result_invoke(print)
249    ... def add_two(a, b):
250    ...     return a + b
251    >>> x = add_two(2, 3)
252    5
253    >>> x
254    5
255    """
256
257    def wrap(func):
258        @functools.wraps(func)
259        def wrapper(*args, **kwargs):
260            result = func(*args, **kwargs)
261            action(result)
262            return result
263
264        return wrapper
265
266    return wrap
267
268
269def call_aside(f, *args, **kwargs):
270    """
271    Call a function for its side effect after initialization.
272
273    >>> @call_aside
274    ... def func(): print("called")
275    called
276    >>> func()
277    called
278
279    Use functools.partial to pass parameters to the initial call
280
281    >>> @functools.partial(call_aside, name='bingo')
282    ... def func(name): print("called with", name)
283    called with bingo
284    """
285    f(*args, **kwargs)
286    return f
287
288
289class Throttler:
290    """
291    Rate-limit a function (or other callable)
292    """
293
294    def __init__(self, func, max_rate=float('Inf')):
295        if isinstance(func, Throttler):
296            func = func.func
297        self.func = func
298        self.max_rate = max_rate
299        self.reset()
300
301    def reset(self):
302        self.last_called = 0
303
304    def __call__(self, *args, **kwargs):
305        self._wait()
306        return self.func(*args, **kwargs)
307
308    def _wait(self):
309        "ensure at least 1/max_rate seconds from last call"
310        elapsed = time.time() - self.last_called
311        must_wait = 1 / self.max_rate - elapsed
312        time.sleep(max(0, must_wait))
313        self.last_called = time.time()
314
315    def __get__(self, obj, type=None):
316        return first_invoke(self._wait, functools.partial(self.func, obj))
317
318
319def first_invoke(func1, func2):
320    """
321    Return a function that when invoked will invoke func1 without
322    any parameters (for its side-effect) and then invoke func2
323    with whatever parameters were passed, returning its result.
324    """
325
326    def wrapper(*args, **kwargs):
327        func1()
328        return func2(*args, **kwargs)
329
330    return wrapper
331
332
333def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
334    """
335    Given a callable func, trap the indicated exceptions
336    for up to 'retries' times, invoking cleanup on the
337    exception. On the final attempt, allow any exceptions
338    to propagate.
339    """
340    attempts = itertools.count() if retries == float('inf') else range(retries)
341    for attempt in attempts:
342        try:
343            return func()
344        except trap:
345            cleanup()
346
347    return func()
348
349
350def retry(*r_args, **r_kwargs):
351    """
352    Decorator wrapper for retry_call. Accepts arguments to retry_call
353    except func and then returns a decorator for the decorated function.
354
355    Ex:
356
357    >>> @retry(retries=3)
358    ... def my_func(a, b):
359    ...     "this is my funk"
360    ...     print(a, b)
361    >>> my_func.__doc__
362    'this is my funk'
363    """
364
365    def decorate(func):
366        @functools.wraps(func)
367        def wrapper(*f_args, **f_kwargs):
368            bound = functools.partial(func, *f_args, **f_kwargs)
369            return retry_call(bound, *r_args, **r_kwargs)
370
371        return wrapper
372
373    return decorate
374
375
376def print_yielded(func):
377    """
378    Convert a generator into a function that prints all yielded elements
379
380    >>> @print_yielded
381    ... def x():
382    ...     yield 3; yield None
383    >>> x()
384    3
385    None
386    """
387    print_all = functools.partial(map, print)
388    print_results = compose(more_itertools.consume, print_all, func)
389    return functools.wraps(func)(print_results)
390
391
392def pass_none(func):
393    """
394    Wrap func so it's not called if its first param is None
395
396    >>> print_text = pass_none(print)
397    >>> print_text('text')
398    text
399    >>> print_text(None)
400    """
401
402    @functools.wraps(func)
403    def wrapper(param, *args, **kwargs):
404        if param is not None:
405            return func(param, *args, **kwargs)
406
407    return wrapper
408
409
410def assign_params(func, namespace):
411    """
412    Assign parameters from namespace where func solicits.
413
414    >>> def func(x, y=3):
415    ...     print(x, y)
416    >>> assigned = assign_params(func, dict(x=2, z=4))
417    >>> assigned()
418    2 3
419
420    The usual errors are raised if a function doesn't receive
421    its required parameters:
422
423    >>> assigned = assign_params(func, dict(y=3, z=4))
424    >>> assigned()
425    Traceback (most recent call last):
426    TypeError: func() ...argument...
427
428    It even works on methods:
429
430    >>> class Handler:
431    ...     def meth(self, arg):
432    ...         print(arg)
433    >>> assign_params(Handler().meth, dict(arg='crystal', foo='clear'))()
434    crystal
435    """
436    sig = inspect.signature(func)
437    params = sig.parameters.keys()
438    call_ns = {k: namespace[k] for k in params if k in namespace}
439    return functools.partial(func, **call_ns)
440
441
442def save_method_args(method):
443    """
444    Wrap a method such that when it is called, the args and kwargs are
445    saved on the method.
446
447    >>> class MyClass:
448    ...     @save_method_args
449    ...     def method(self, a, b):
450    ...         print(a, b)
451    >>> my_ob = MyClass()
452    >>> my_ob.method(1, 2)
453    1 2
454    >>> my_ob._saved_method.args
455    (1, 2)
456    >>> my_ob._saved_method.kwargs
457    {}
458    >>> my_ob.method(a=3, b='foo')
459    3 foo
460    >>> my_ob._saved_method.args
461    ()
462    >>> my_ob._saved_method.kwargs == dict(a=3, b='foo')
463    True
464
465    The arguments are stored on the instance, allowing for
466    different instance to save different args.
467
468    >>> your_ob = MyClass()
469    >>> your_ob.method({str('x'): 3}, b=[4])
470    {'x': 3} [4]
471    >>> your_ob._saved_method.args
472    ({'x': 3},)
473    >>> my_ob._saved_method.args
474    ()
475    """
476    args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs')
477
478    @functools.wraps(method)
479    def wrapper(self, *args, **kwargs):
480        attr_name = '_saved_' + method.__name__
481        attr = args_and_kwargs(args, kwargs)
482        setattr(self, attr_name, attr)
483        return method(self, *args, **kwargs)
484
485    return wrapper
486
487
488def except_(*exceptions, replace=None, use=None):
489    """
490    Replace the indicated exceptions, if raised, with the indicated
491    literal replacement or evaluated expression (if present).
492
493    >>> safe_int = except_(ValueError)(int)
494    >>> safe_int('five')
495    >>> safe_int('5')
496    5
497
498    Specify a literal replacement with ``replace``.
499
500    >>> safe_int_r = except_(ValueError, replace=0)(int)
501    >>> safe_int_r('five')
502    0
503
504    Provide an expression to ``use`` to pass through particular parameters.
505
506    >>> safe_int_pt = except_(ValueError, use='args[0]')(int)
507    >>> safe_int_pt('five')
508    'five'
509
510    """
511
512    def decorate(func):
513        @functools.wraps(func)
514        def wrapper(*args, **kwargs):
515            try:
516                return func(*args, **kwargs)
517            except exceptions:
518                try:
519                    return eval(use)
520                except TypeError:
521                    return replace
522
523        return wrapper
524
525    return decorate
526