1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The base interface of RPC Framework. 15 16Implementations of this interface support the conduct of "operations": 17exchanges between two distinct ends of an arbitrary number of data payloads 18and metadata such as a name for the operation, initial and terminal metadata 19in each direction, and flow control. These operations may be used for transfers 20of data, remote procedure calls, status indication, or anything else 21applications choose. 22""" 23 24# threading is referenced from specification in this module. 25import abc 26import enum 27import threading # pylint: disable=unused-import 28 29# pylint: disable=too-many-arguments 30 31 32class NoSuchMethodError(Exception): 33 """Indicates that an unrecognized operation has been called. 34 35 Attributes: 36 code: A code value to communicate to the other side of the operation 37 along with indication of operation termination. May be None. 38 details: A details value to communicate to the other side of the 39 operation along with indication of operation termination. May be None. 40 """ 41 42 def __init__(self, code, details): 43 """Constructor. 44 45 Args: 46 code: A code value to communicate to the other side of the operation 47 along with indication of operation termination. May be None. 48 details: A details value to communicate to the other side of the 49 operation along with indication of operation termination. May be None. 50 """ 51 super(NoSuchMethodError, self).__init__() 52 self.code = code 53 self.details = details 54 55 56class Outcome(object): 57 """The outcome of an operation. 58 59 Attributes: 60 kind: A Kind value coarsely identifying how the operation terminated. 61 code: An application-specific code value or None if no such value was 62 provided. 63 details: An application-specific details value or None if no such value was 64 provided. 65 """ 66 67 @enum.unique 68 class Kind(enum.Enum): 69 """Ways in which an operation can terminate.""" 70 71 COMPLETED = "completed" 72 CANCELLED = "cancelled" 73 EXPIRED = "expired" 74 LOCAL_SHUTDOWN = "local shutdown" 75 REMOTE_SHUTDOWN = "remote shutdown" 76 RECEPTION_FAILURE = "reception failure" 77 TRANSMISSION_FAILURE = "transmission failure" 78 LOCAL_FAILURE = "local failure" 79 REMOTE_FAILURE = "remote failure" 80 81 82class Completion(abc.ABC): 83 """An aggregate of the values exchanged upon operation completion. 84 85 Attributes: 86 terminal_metadata: A terminal metadata value for the operaton. 87 code: A code value for the operation. 88 message: A message value for the operation. 89 """ 90 91 92class OperationContext(abc.ABC): 93 """Provides operation-related information and action.""" 94 95 @abc.abstractmethod 96 def outcome(self): 97 """Indicates the operation's outcome (or that the operation is ongoing). 98 99 Returns: 100 None if the operation is still active or the Outcome value for the 101 operation if it has terminated. 102 """ 103 raise NotImplementedError() 104 105 @abc.abstractmethod 106 def add_termination_callback(self, callback): 107 """Adds a function to be called upon operation termination. 108 109 Args: 110 callback: A callable to be passed an Outcome value on operation 111 termination. 112 113 Returns: 114 None if the operation has not yet terminated and the passed callback will 115 later be called when it does terminate, or if the operation has already 116 terminated an Outcome value describing the operation termination and the 117 passed callback will not be called as a result of this method call. 118 """ 119 raise NotImplementedError() 120 121 @abc.abstractmethod 122 def time_remaining(self): 123 """Describes the length of allowed time remaining for the operation. 124 125 Returns: 126 A nonnegative float indicating the length of allowed time in seconds 127 remaining for the operation to complete before it is considered to have 128 timed out. Zero is returned if the operation has terminated. 129 """ 130 raise NotImplementedError() 131 132 @abc.abstractmethod 133 def cancel(self): 134 """Cancels the operation if the operation has not yet terminated.""" 135 raise NotImplementedError() 136 137 @abc.abstractmethod 138 def fail(self, exception): 139 """Indicates that the operation has failed. 140 141 Args: 142 exception: An exception germane to the operation failure. May be None. 143 """ 144 raise NotImplementedError() 145 146 147class Operator(abc.ABC): 148 """An interface through which to participate in an operation.""" 149 150 @abc.abstractmethod 151 def advance( 152 self, 153 initial_metadata=None, 154 payload=None, 155 completion=None, 156 allowance=None, 157 ): 158 """Progresses the operation. 159 160 Args: 161 initial_metadata: An initial metadata value. Only one may ever be 162 communicated in each direction for an operation, and they must be 163 communicated no later than either the first payload or the completion. 164 payload: A payload value. 165 completion: A Completion value. May only ever be non-None once in either 166 direction, and no payloads may be passed after it has been communicated. 167 allowance: A positive integer communicating the number of additional 168 payloads allowed to be passed by the remote side of the operation. 169 """ 170 raise NotImplementedError() 171 172 173class ProtocolReceiver(abc.ABC): 174 """A means of receiving protocol values during an operation.""" 175 176 @abc.abstractmethod 177 def context(self, protocol_context): 178 """Accepts the protocol context object for the operation. 179 180 Args: 181 protocol_context: The protocol context object for the operation. 182 """ 183 raise NotImplementedError() 184 185 186class Subscription(abc.ABC): 187 """Describes customer code's interest in values from the other side. 188 189 Attributes: 190 kind: A Kind value describing the overall kind of this value. 191 termination_callback: A callable to be passed the Outcome associated with 192 the operation after it has terminated. Must be non-None if kind is 193 Kind.TERMINATION_ONLY. Must be None otherwise. 194 allowance: A callable behavior that accepts positive integers representing 195 the number of additional payloads allowed to be passed to the other side 196 of the operation. Must be None if kind is Kind.FULL. Must not be None 197 otherwise. 198 operator: An Operator to be passed values from the other side of the 199 operation. Must be non-None if kind is Kind.FULL. Must be None otherwise. 200 protocol_receiver: A ProtocolReceiver to be passed protocol objects as they 201 become available during the operation. Must be non-None if kind is 202 Kind.FULL. 203 """ 204 205 @enum.unique 206 class Kind(enum.Enum): 207 NONE = "none" 208 TERMINATION_ONLY = "termination only" 209 FULL = "full" 210 211 212class Servicer(abc.ABC): 213 """Interface for service implementations.""" 214 215 @abc.abstractmethod 216 def service(self, group, method, context, output_operator): 217 """Services an operation. 218 219 Args: 220 group: The group identifier of the operation to be serviced. 221 method: The method identifier of the operation to be serviced. 222 context: An OperationContext object affording contextual information and 223 actions. 224 output_operator: An Operator that will accept output values of the 225 operation. 226 227 Returns: 228 A Subscription via which this object may or may not accept more values of 229 the operation. 230 231 Raises: 232 NoSuchMethodError: If this Servicer does not handle operations with the 233 given group and method. 234 abandonment.Abandoned: If the operation has been aborted and there no 235 longer is any reason to service the operation. 236 """ 237 raise NotImplementedError() 238 239 240class End(abc.ABC): 241 """Common type for entry-point objects on both sides of an operation.""" 242 243 @abc.abstractmethod 244 def start(self): 245 """Starts this object's service of operations.""" 246 raise NotImplementedError() 247 248 @abc.abstractmethod 249 def stop(self, grace): 250 """Stops this object's service of operations. 251 252 This object will refuse service of new operations as soon as this method is 253 called but operations under way at the time of the call may be given a 254 grace period during which they are allowed to finish. 255 256 Args: 257 grace: A duration of time in seconds to allow ongoing operations to 258 terminate before being forcefully terminated by the stopping of this 259 End. May be zero to terminate all ongoing operations and immediately 260 stop. 261 262 Returns: 263 A threading.Event that will be set to indicate all operations having 264 terminated and this End having completely stopped. The returned event 265 may not be set until after the full grace period (if some ongoing 266 operation continues for the full length of the period) or it may be set 267 much sooner (if for example this End had no operations in progress at 268 the time its stop method was called). 269 """ 270 raise NotImplementedError() 271 272 @abc.abstractmethod 273 def operate( 274 self, 275 group, 276 method, 277 subscription, 278 timeout, 279 initial_metadata=None, 280 payload=None, 281 completion=None, 282 protocol_options=None, 283 ): 284 """Commences an operation. 285 286 Args: 287 group: The group identifier of the invoked operation. 288 method: The method identifier of the invoked operation. 289 subscription: A Subscription to which the results of the operation will be 290 passed. 291 timeout: A length of time in seconds to allow for the operation. 292 initial_metadata: An initial metadata value to be sent to the other side 293 of the operation. May be None if the initial metadata will be later 294 passed via the returned operator or if there will be no initial metadata 295 passed at all. 296 payload: An initial payload for the operation. 297 completion: A Completion value indicating the end of transmission to the 298 other side of the operation. 299 protocol_options: A value specified by the provider of a Base interface 300 implementation affording custom state and behavior. 301 302 Returns: 303 A pair of objects affording information about the operation and action 304 continuing the operation. The first element of the returned pair is an 305 OperationContext for the operation and the second element of the 306 returned pair is an Operator to which operation values not passed in 307 this call should later be passed. 308 """ 309 raise NotImplementedError() 310 311 @abc.abstractmethod 312 def operation_stats(self): 313 """Reports the number of terminated operations broken down by outcome. 314 315 Returns: 316 A dictionary from Outcome.Kind value to an integer identifying the number 317 of operations that terminated with that outcome kind. 318 """ 319 raise NotImplementedError() 320 321 @abc.abstractmethod 322 def add_idle_action(self, action): 323 """Adds an action to be called when this End has no ongoing operations. 324 325 Args: 326 action: A callable that accepts no arguments. 327 """ 328 raise NotImplementedError() 329