• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18from grpc import RpcError
19import logging
20
21
22class EventCallbackStream(object):
23    """
24    A an object that translate a gRPC stream of events to a Python stream of
25    callbacks.
26
27    All callbacks are non-sticky. This means that user will only receive callback
28    generated after EventCallbackStream is registered and will not receive any
29    callback after EventCallbackStream is unregistered
30
31    You would need a new EventCallbackStream and anything that depends on this
32    object once shutdown() is called
33    """
34
35    def __init__(self, server_stream_call):
36        """
37        Construct this object, call the |grpc_lambda| and trigger event_callback on
38        the thread used to create this object until |destroy| is called when this
39        object can no longer be used
40        :param server_stream_call: A server stream call object returned from
41                                   calling a gRPC server stream RPC API. The
42                                   object must support iterator interface (i.e.
43                                   next() method) and the grpc.Call interface
44                                   so that we can cancel it
45        :param event_callback: callback to be invoked with the only argument as
46                               the generated event. The callback will be invoked
47                               on a separate thread created within this object
48        """
49        if server_stream_call is None:
50            raise ValueError("server_stream_call must not be None")
51        self.server_stream_call = server_stream_call
52        self.handlers = []
53        self.executor = ThreadPoolExecutor()
54        self.future = self.executor.submit(EventCallbackStream._event_loop,
55                                           self)
56
57    def __enter__(self):
58        return self
59
60    def __exit__(self, type, value, traceback):
61        self.shutdown()
62        if traceback is None:
63            return True
64        else:
65            return False
66
67    def __del__(self):
68        self.shutdown()
69
70    def register_callback(self, callback, matcher_fn=None):
71        """
72        Register a callback to handle events. Event will be handled by callback
73        if matcher_fn(event) returns True
74
75        callback and matcher are registered as a tuple. Hence the same callback
76        with different matcher are considered two different handler units. Same
77        matcher, but different callback are also considered different handling
78        unit
79
80        Callback will be invoked on a ThreadPoolExecutor owned by this
81        EventCallbackStream
82
83        :param callback: Will be called as callback(event)
84        :param matcher_fn: A boolean function that returns True or False when
85                           calling matcher_fn(event), if None, all event will
86                           be matched
87        """
88        if callback is None:
89            raise ValueError("callback must not be None")
90        self.handlers.append((callback, matcher_fn))
91
92    def unregister_callback(self, callback, matcher_fn=None):
93        """
94        Unregister callback and matcher_fn from the event stream. Both objects
95        must match exactly the ones when calling register_callback()
96
97        :param callback: callback used in register_callback()
98        :param matcher_fn: matcher_fn used in register_callback()
99        :raises ValueError when (callback, matcher_fn) tuple is not found
100        """
101        if callback is None:
102            raise ValueError("callback must not be None")
103        self.handlers.remove((callback, matcher_fn))
104
105    def shutdown(self):
106        """
107        Stop the gRPC lambda so that event_callback will not be invoked after th
108        method returns.
109
110        This object will be useless after this call as there is no way to restart
111        the gRPC callback. You would have to create a new EventCallbackStream
112
113        :return: None on success, exception object on failure
114        """
115        while not self.server_stream_call.done():
116            self.server_stream_call.cancel()
117        exception_for_return = None
118        try:
119            result = self.future.result()
120            if result:
121                logging.warning("Inner loop error %s" % result)
122                raise result
123        except Exception as exp:
124            logging.warning("Exception: %s" % (exp))
125            exception_for_return = exp
126        self.executor.shutdown()
127        return exception_for_return
128
129    def _event_loop(self):
130        """
131        Main loop for consuming the gRPC stream events.
132        Blocks until computation is cancelled
133        :return: None on success, exception object on failure
134        """
135        try:
136            for event in self.server_stream_call:
137                for (callback, matcher_fn) in self.handlers:
138                    if not matcher_fn or matcher_fn(event):
139                        callback(event)
140            return None
141        except RpcError as exp:
142            if self.server_stream_call.cancelled():
143                logging.debug("Cancelled")
144                return None
145            else:
146                logging.warning("Some RPC error not due to cancellation")
147            return exp
148