1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from concurrent.futures import ThreadPoolExecutor 18from grpc import RpcError 19import logging 20 21 22class EventCallbackStream(object): 23 """ 24 A an object that translate a gRPC stream of events to a Python stream of 25 callbacks. 26 27 All callbacks are non-sticky. This means that user will only receive callback 28 generated after EventCallbackStream is registered and will not receive any 29 callback after EventCallbackStream is unregistered 30 31 You would need a new EventCallbackStream and anything that depends on this 32 object once shutdown() is called 33 """ 34 35 def __init__(self, server_stream_call): 36 """ 37 Construct this object, call the |grpc_lambda| and trigger event_callback on 38 the thread used to create this object until |destroy| is called when this 39 object can no longer be used 40 :param server_stream_call: A server stream call object returned from 41 calling a gRPC server stream RPC API. The 42 object must support iterator interface (i.e. 43 next() method) and the grpc.Call interface 44 so that we can cancel it 45 :param event_callback: callback to be invoked with the only argument as 46 the generated event. The callback will be invoked 47 on a separate thread created within this object 48 """ 49 if server_stream_call is None: 50 raise ValueError("server_stream_call must not be None") 51 self.server_stream_call = server_stream_call 52 self.handlers = [] 53 self.executor = ThreadPoolExecutor() 54 self.future = self.executor.submit(EventCallbackStream._event_loop, 55 self) 56 57 def __enter__(self): 58 return self 59 60 def __exit__(self, type, value, traceback): 61 self.shutdown() 62 if traceback is None: 63 return True 64 else: 65 return False 66 67 def __del__(self): 68 self.shutdown() 69 70 def register_callback(self, callback, matcher_fn=None): 71 """ 72 Register a callback to handle events. Event will be handled by callback 73 if matcher_fn(event) returns True 74 75 callback and matcher are registered as a tuple. Hence the same callback 76 with different matcher are considered two different handler units. Same 77 matcher, but different callback are also considered different handling 78 unit 79 80 Callback will be invoked on a ThreadPoolExecutor owned by this 81 EventCallbackStream 82 83 :param callback: Will be called as callback(event) 84 :param matcher_fn: A boolean function that returns True or False when 85 calling matcher_fn(event), if None, all event will 86 be matched 87 """ 88 if callback is None: 89 raise ValueError("callback must not be None") 90 self.handlers.append((callback, matcher_fn)) 91 92 def unregister_callback(self, callback, matcher_fn=None): 93 """ 94 Unregister callback and matcher_fn from the event stream. Both objects 95 must match exactly the ones when calling register_callback() 96 97 :param callback: callback used in register_callback() 98 :param matcher_fn: matcher_fn used in register_callback() 99 :raises ValueError when (callback, matcher_fn) tuple is not found 100 """ 101 if callback is None: 102 raise ValueError("callback must not be None") 103 self.handlers.remove((callback, matcher_fn)) 104 105 def shutdown(self): 106 """ 107 Stop the gRPC lambda so that event_callback will not be invoked after th 108 method returns. 109 110 This object will be useless after this call as there is no way to restart 111 the gRPC callback. You would have to create a new EventCallbackStream 112 113 :return: None on success, exception object on failure 114 """ 115 while not self.server_stream_call.done(): 116 self.server_stream_call.cancel() 117 exception_for_return = None 118 try: 119 result = self.future.result() 120 if result: 121 logging.warning("Inner loop error %s" % result) 122 raise result 123 except Exception as exp: 124 logging.warning("Exception: %s" % (exp)) 125 exception_for_return = exp 126 self.executor.shutdown() 127 return exception_for_return 128 129 def _event_loop(self): 130 """ 131 Main loop for consuming the gRPC stream events. 132 Blocks until computation is cancelled 133 :return: None on success, exception object on failure 134 """ 135 try: 136 for event in self.server_stream_call: 137 for (callback, matcher_fn) in self.handlers: 138 if not matcher_fn or matcher_fn(event): 139 callback(event) 140 return None 141 except RpcError as exp: 142 if self.server_stream_call.cancelled(): 143 logging.debug("Cancelled") 144 return None 145 else: 146 logging.warning("Some RPC error not due to cancellation") 147 return exp 148