1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Objects for use in testing gRPC Python-using application code.""" 15 16import abc 17 18from google.protobuf import descriptor 19import grpc 20 21 22class UnaryUnaryChannelRpc(abc.ABC): 23 """Fixture for a unary-unary RPC invoked by a system under test. 24 25 Enables users to "play server" for the RPC. 26 """ 27 28 @abc.abstractmethod 29 def send_initial_metadata(self, initial_metadata): 30 """Sends the RPC's initial metadata to the system under test. 31 32 Args: 33 initial_metadata: The RPC's initial metadata to be "sent" to 34 the system under test. 35 """ 36 raise NotImplementedError() 37 38 @abc.abstractmethod 39 def cancelled(self): 40 """Blocks until the system under test has cancelled the RPC.""" 41 raise NotImplementedError() 42 43 @abc.abstractmethod 44 def terminate(self, response, trailing_metadata, code, details): 45 """Terminates the RPC. 46 47 Args: 48 response: The response for the RPC. 49 trailing_metadata: The RPC's trailing metadata. 50 code: The RPC's status code. 51 details: The RPC's status details. 52 """ 53 raise NotImplementedError() 54 55 56class UnaryStreamChannelRpc(abc.ABC): 57 """Fixture for a unary-stream RPC invoked by a system under test. 58 59 Enables users to "play server" for the RPC. 60 """ 61 62 @abc.abstractmethod 63 def send_initial_metadata(self, initial_metadata): 64 """Sends the RPC's initial metadata to the system under test. 65 66 Args: 67 initial_metadata: The RPC's initial metadata to be "sent" to 68 the system under test. 69 """ 70 raise NotImplementedError() 71 72 @abc.abstractmethod 73 def send_response(self, response): 74 """Sends a response to the system under test. 75 76 Args: 77 response: A response message to be "sent" to the system under test. 78 """ 79 raise NotImplementedError() 80 81 @abc.abstractmethod 82 def cancelled(self): 83 """Blocks until the system under test has cancelled the RPC.""" 84 raise NotImplementedError() 85 86 @abc.abstractmethod 87 def terminate(self, trailing_metadata, code, details): 88 """Terminates the RPC. 89 90 Args: 91 trailing_metadata: The RPC's trailing metadata. 92 code: The RPC's status code. 93 details: The RPC's status details. 94 """ 95 raise NotImplementedError() 96 97 98class StreamUnaryChannelRpc(abc.ABC): 99 """Fixture for a stream-unary RPC invoked by a system under test. 100 101 Enables users to "play server" for the RPC. 102 """ 103 104 @abc.abstractmethod 105 def send_initial_metadata(self, initial_metadata): 106 """Sends the RPC's initial metadata to the system under test. 107 108 Args: 109 initial_metadata: The RPC's initial metadata to be "sent" to 110 the system under test. 111 """ 112 raise NotImplementedError() 113 114 @abc.abstractmethod 115 def take_request(self): 116 """Draws one of the requests added to the RPC by the system under test. 117 118 This method blocks until the system under test has added to the RPC 119 the request to be returned. 120 121 Successive calls to this method return requests in the same order in 122 which the system under test added them to the RPC. 123 124 Returns: 125 A request message added to the RPC by the system under test. 126 """ 127 raise NotImplementedError() 128 129 @abc.abstractmethod 130 def requests_closed(self): 131 """Blocks until the system under test has closed the request stream.""" 132 raise NotImplementedError() 133 134 @abc.abstractmethod 135 def cancelled(self): 136 """Blocks until the system under test has cancelled the RPC.""" 137 raise NotImplementedError() 138 139 @abc.abstractmethod 140 def terminate(self, response, trailing_metadata, code, details): 141 """Terminates the RPC. 142 143 Args: 144 response: The response for the RPC. 145 trailing_metadata: The RPC's trailing metadata. 146 code: The RPC's status code. 147 details: The RPC's status details. 148 """ 149 raise NotImplementedError() 150 151 152class StreamStreamChannelRpc(abc.ABC): 153 """Fixture for a stream-stream RPC invoked by a system under test. 154 155 Enables users to "play server" for the RPC. 156 """ 157 158 @abc.abstractmethod 159 def send_initial_metadata(self, initial_metadata): 160 """Sends the RPC's initial metadata to the system under test. 161 162 Args: 163 initial_metadata: The RPC's initial metadata to be "sent" to the 164 system under test. 165 """ 166 raise NotImplementedError() 167 168 @abc.abstractmethod 169 def take_request(self): 170 """Draws one of the requests added to the RPC by the system under test. 171 172 This method blocks until the system under test has added to the RPC 173 the request to be returned. 174 175 Successive calls to this method return requests in the same order in 176 which the system under test added them to the RPC. 177 178 Returns: 179 A request message added to the RPC by the system under test. 180 """ 181 raise NotImplementedError() 182 183 @abc.abstractmethod 184 def send_response(self, response): 185 """Sends a response to the system under test. 186 187 Args: 188 response: A response messages to be "sent" to the system under test. 189 """ 190 raise NotImplementedError() 191 192 @abc.abstractmethod 193 def requests_closed(self): 194 """Blocks until the system under test has closed the request stream.""" 195 raise NotImplementedError() 196 197 @abc.abstractmethod 198 def cancelled(self): 199 """Blocks until the system under test has cancelled the RPC.""" 200 raise NotImplementedError() 201 202 @abc.abstractmethod 203 def terminate(self, trailing_metadata, code, details): 204 """Terminates the RPC. 205 206 Args: 207 trailing_metadata: The RPC's trailing metadata. 208 code: The RPC's status code. 209 details: The RPC's status details. 210 """ 211 raise NotImplementedError() 212 213 214class Channel(grpc.Channel, metaclass=abc.ABCMeta): 215 """A grpc.Channel double with which to test a system that invokes RPCs.""" 216 217 @abc.abstractmethod 218 def take_unary_unary(self, method_descriptor): 219 """Draws an RPC currently being made by the system under test. 220 221 If the given descriptor does not identify any RPC currently being made 222 by the system under test, this method blocks until the system under 223 test invokes such an RPC. 224 225 Args: 226 method_descriptor: A descriptor.MethodDescriptor describing a 227 unary-unary RPC method. 228 229 Returns: 230 A (invocation_metadata, request, unary_unary_channel_rpc) tuple of 231 the RPC's invocation metadata, its request, and a 232 UnaryUnaryChannelRpc with which to "play server" for the RPC. 233 """ 234 raise NotImplementedError() 235 236 @abc.abstractmethod 237 def take_unary_stream(self, method_descriptor): 238 """Draws an RPC currently being made by the system under test. 239 240 If the given descriptor does not identify any RPC currently being made 241 by the system under test, this method blocks until the system under 242 test invokes such an RPC. 243 244 Args: 245 method_descriptor: A descriptor.MethodDescriptor describing a 246 unary-stream RPC method. 247 248 Returns: 249 A (invocation_metadata, request, unary_stream_channel_rpc) tuple of 250 the RPC's invocation metadata, its request, and a 251 UnaryStreamChannelRpc with which to "play server" for the RPC. 252 """ 253 raise NotImplementedError() 254 255 @abc.abstractmethod 256 def take_stream_unary(self, method_descriptor): 257 """Draws an RPC currently being made by the system under test. 258 259 If the given descriptor does not identify any RPC currently being made 260 by the system under test, this method blocks until the system under 261 test invokes such an RPC. 262 263 Args: 264 method_descriptor: A descriptor.MethodDescriptor describing a 265 stream-unary RPC method. 266 267 Returns: 268 A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's 269 invocation metadata and a StreamUnaryChannelRpc with which to "play 270 server" for the RPC. 271 """ 272 raise NotImplementedError() 273 274 @abc.abstractmethod 275 def take_stream_stream(self, method_descriptor): 276 """Draws an RPC currently being made by the system under test. 277 278 If the given descriptor does not identify any RPC currently being made 279 by the system under test, this method blocks until the system under 280 test invokes such an RPC. 281 282 Args: 283 method_descriptor: A descriptor.MethodDescriptor describing a 284 stream-stream RPC method. 285 286 Returns: 287 A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's 288 invocation metadata and a StreamStreamChannelRpc with which to 289 "play server" for the RPC. 290 """ 291 raise NotImplementedError() 292 293 294class UnaryUnaryServerRpc(abc.ABC): 295 """Fixture for a unary-unary RPC serviced by a system under test. 296 297 Enables users to "play client" for the RPC. 298 """ 299 300 @abc.abstractmethod 301 def initial_metadata(self): 302 """Accesses the initial metadata emitted by the system under test. 303 304 This method blocks until the system under test has added initial 305 metadata to the RPC (or has provided one or more response messages or 306 has terminated the RPC, either of which will cause gRPC Python to 307 synthesize initial metadata for the RPC). 308 309 Returns: 310 The initial metadata for the RPC. 311 """ 312 raise NotImplementedError() 313 314 @abc.abstractmethod 315 def cancel(self): 316 """Cancels the RPC.""" 317 raise NotImplementedError() 318 319 @abc.abstractmethod 320 def termination(self): 321 """Blocks until the system under test has terminated the RPC. 322 323 Returns: 324 A (response, trailing_metadata, code, details) sequence with the RPC's 325 response, trailing metadata, code, and details. 326 """ 327 raise NotImplementedError() 328 329 330class UnaryStreamServerRpc(abc.ABC): 331 """Fixture for a unary-stream RPC serviced by a system under test. 332 333 Enables users to "play client" for the RPC. 334 """ 335 336 @abc.abstractmethod 337 def initial_metadata(self): 338 """Accesses the initial metadata emitted by the system under test. 339 340 This method blocks until the system under test has added initial 341 metadata to the RPC (or has provided one or more response messages or 342 has terminated the RPC, either of which will cause gRPC Python to 343 synthesize initial metadata for the RPC). 344 345 Returns: 346 The initial metadata for the RPC. 347 """ 348 raise NotImplementedError() 349 350 @abc.abstractmethod 351 def take_response(self): 352 """Draws one of the responses added to the RPC by the system under test. 353 354 Successive calls to this method return responses in the same order in 355 which the system under test added them to the RPC. 356 357 Returns: 358 A response message added to the RPC by the system under test. 359 """ 360 raise NotImplementedError() 361 362 @abc.abstractmethod 363 def cancel(self): 364 """Cancels the RPC.""" 365 raise NotImplementedError() 366 367 @abc.abstractmethod 368 def termination(self): 369 """Blocks until the system under test has terminated the RPC. 370 371 Returns: 372 A (trailing_metadata, code, details) sequence with the RPC's trailing 373 metadata, code, and details. 374 """ 375 raise NotImplementedError() 376 377 378class StreamUnaryServerRpc(abc.ABC): 379 """Fixture for a stream-unary RPC serviced by a system under test. 380 381 Enables users to "play client" for the RPC. 382 """ 383 384 @abc.abstractmethod 385 def initial_metadata(self): 386 """Accesses the initial metadata emitted by the system under test. 387 388 This method blocks until the system under test has added initial 389 metadata to the RPC (or has provided one or more response messages or 390 has terminated the RPC, either of which will cause gRPC Python to 391 synthesize initial metadata for the RPC). 392 393 Returns: 394 The initial metadata for the RPC. 395 """ 396 raise NotImplementedError() 397 398 @abc.abstractmethod 399 def send_request(self, request): 400 """Sends a request to the system under test. 401 402 Args: 403 request: A request message for the RPC to be "sent" to the system 404 under test. 405 """ 406 raise NotImplementedError() 407 408 @abc.abstractmethod 409 def requests_closed(self): 410 """Indicates the end of the RPC's request stream.""" 411 raise NotImplementedError() 412 413 @abc.abstractmethod 414 def cancel(self): 415 """Cancels the RPC.""" 416 raise NotImplementedError() 417 418 @abc.abstractmethod 419 def termination(self): 420 """Blocks until the system under test has terminated the RPC. 421 422 Returns: 423 A (response, trailing_metadata, code, details) sequence with the RPC's 424 response, trailing metadata, code, and details. 425 """ 426 raise NotImplementedError() 427 428 429class StreamStreamServerRpc(abc.ABC): 430 """Fixture for a stream-stream RPC serviced by a system under test. 431 432 Enables users to "play client" for the RPC. 433 """ 434 435 @abc.abstractmethod 436 def initial_metadata(self): 437 """Accesses the initial metadata emitted by the system under test. 438 439 This method blocks until the system under test has added initial 440 metadata to the RPC (or has provided one or more response messages or 441 has terminated the RPC, either of which will cause gRPC Python to 442 synthesize initial metadata for the RPC). 443 444 Returns: 445 The initial metadata for the RPC. 446 """ 447 raise NotImplementedError() 448 449 @abc.abstractmethod 450 def send_request(self, request): 451 """Sends a request to the system under test. 452 453 Args: 454 request: A request message for the RPC to be "sent" to the system 455 under test. 456 """ 457 raise NotImplementedError() 458 459 @abc.abstractmethod 460 def requests_closed(self): 461 """Indicates the end of the RPC's request stream.""" 462 raise NotImplementedError() 463 464 @abc.abstractmethod 465 def take_response(self): 466 """Draws one of the responses added to the RPC by the system under test. 467 468 Successive calls to this method return responses in the same order in 469 which the system under test added them to the RPC. 470 471 Returns: 472 A response message added to the RPC by the system under test. 473 """ 474 raise NotImplementedError() 475 476 @abc.abstractmethod 477 def cancel(self): 478 """Cancels the RPC.""" 479 raise NotImplementedError() 480 481 @abc.abstractmethod 482 def termination(self): 483 """Blocks until the system under test has terminated the RPC. 484 485 Returns: 486 A (trailing_metadata, code, details) sequence with the RPC's trailing 487 metadata, code, and details. 488 """ 489 raise NotImplementedError() 490 491 492class Server(abc.ABC): 493 """A server with which to test a system that services RPCs.""" 494 495 @abc.abstractmethod 496 def invoke_unary_unary( 497 self, method_descriptor, invocation_metadata, request, timeout 498 ): 499 """Invokes an RPC to be serviced by the system under test. 500 501 Args: 502 method_descriptor: A descriptor.MethodDescriptor describing a unary-unary 503 RPC method. 504 invocation_metadata: The RPC's invocation metadata. 505 request: The RPC's request. 506 timeout: A duration of time in seconds for the RPC or None to 507 indicate that the RPC has no time limit. 508 509 Returns: 510 A UnaryUnaryServerRpc with which to "play client" for the RPC. 511 """ 512 raise NotImplementedError() 513 514 @abc.abstractmethod 515 def invoke_unary_stream( 516 self, method_descriptor, invocation_metadata, request, timeout 517 ): 518 """Invokes an RPC to be serviced by the system under test. 519 520 Args: 521 method_descriptor: A descriptor.MethodDescriptor describing a unary-stream 522 RPC method. 523 invocation_metadata: The RPC's invocation metadata. 524 request: The RPC's request. 525 timeout: A duration of time in seconds for the RPC or None to 526 indicate that the RPC has no time limit. 527 528 Returns: 529 A UnaryStreamServerRpc with which to "play client" for the RPC. 530 """ 531 raise NotImplementedError() 532 533 @abc.abstractmethod 534 def invoke_stream_unary( 535 self, method_descriptor, invocation_metadata, timeout 536 ): 537 """Invokes an RPC to be serviced by the system under test. 538 539 Args: 540 method_descriptor: A descriptor.MethodDescriptor describing a stream-unary 541 RPC method. 542 invocation_metadata: The RPC's invocation metadata. 543 timeout: A duration of time in seconds for the RPC or None to 544 indicate that the RPC has no time limit. 545 546 Returns: 547 A StreamUnaryServerRpc with which to "play client" for the RPC. 548 """ 549 raise NotImplementedError() 550 551 @abc.abstractmethod 552 def invoke_stream_stream( 553 self, method_descriptor, invocation_metadata, timeout 554 ): 555 """Invokes an RPC to be serviced by the system under test. 556 557 Args: 558 method_descriptor: A descriptor.MethodDescriptor describing a stream-stream 559 RPC method. 560 invocation_metadata: The RPC's invocation metadata. 561 timeout: A duration of time in seconds for the RPC or None to 562 indicate that the RPC has no time limit. 563 564 Returns: 565 A StreamStreamServerRpc with which to "play client" for the RPC. 566 """ 567 raise NotImplementedError() 568 569 570class Time(abc.ABC): 571 """A simulation of time. 572 573 Implementations needn't be connected with real time as provided by the 574 Python interpreter, but as long as systems under test use 575 RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness 576 implementations may be used to change passage of time in tests. 577 """ 578 579 @abc.abstractmethod 580 def time(self): 581 """Accesses the current test time. 582 583 Returns: 584 The current test time (over which this object has authority). 585 """ 586 raise NotImplementedError() 587 588 @abc.abstractmethod 589 def call_in(self, behavior, delay): 590 """Adds a behavior to be called after some time. 591 592 Args: 593 behavior: A behavior to be called with no arguments. 594 delay: A duration of time in seconds after which to call the behavior. 595 596 Returns: 597 A grpc.Future with which the call of the behavior may be cancelled 598 before it is executed. 599 """ 600 raise NotImplementedError() 601 602 @abc.abstractmethod 603 def call_at(self, behavior, time): 604 """Adds a behavior to be called at a specific time. 605 606 Args: 607 behavior: A behavior to be called with no arguments. 608 time: The test time at which to call the behavior. 609 610 Returns: 611 A grpc.Future with which the call of the behavior may be cancelled 612 before it is executed. 613 """ 614 raise NotImplementedError() 615 616 @abc.abstractmethod 617 def sleep_for(self, duration): 618 """Blocks for some length of test time. 619 620 Args: 621 duration: A duration of test time in seconds for which to block. 622 """ 623 raise NotImplementedError() 624 625 @abc.abstractmethod 626 def sleep_until(self, time): 627 """Blocks until some test time. 628 629 Args: 630 time: The test time until which to block. 631 """ 632 raise NotImplementedError() 633 634 635def strict_real_time(): 636 """Creates a Time backed by the Python interpreter's time. 637 638 The returned instance will be "strict" with respect to callbacks 639 submitted to it: it will ensure that all callbacks registered to 640 be called at time t have been called before it describes the time 641 as having advanced beyond t. 642 643 Returns: 644 A Time backed by the "system" (Python interpreter's) time. 645 """ 646 from grpc_testing import _time 647 648 return _time.StrictRealTime() 649 650 651def strict_fake_time(now): 652 """Creates a Time that can be manipulated by test code. 653 654 The returned instance maintains an internal representation of time 655 independent of real time. This internal representation only advances 656 when user code calls the instance's sleep_for and sleep_until methods. 657 658 The returned instance will be "strict" with respect to callbacks 659 submitted to it: it will ensure that all callbacks registered to 660 be called at time t have been called before it describes the time 661 as having advanced beyond t. 662 663 Returns: 664 A Time that simulates the passage of time. 665 """ 666 from grpc_testing import _time 667 668 return _time.StrictFakeTime(now) 669 670 671def channel(service_descriptors, time): 672 """Creates a Channel for use in tests of a gRPC Python-using system. 673 674 Args: 675 service_descriptors: An iterable of descriptor.ServiceDescriptors 676 describing the RPCs that will be made on the returned Channel by the 677 system under test. 678 time: A Time to be used for tests. 679 680 Returns: 681 A Channel for use in tests. 682 """ 683 from grpc_testing import _channel 684 685 return _channel.testing_channel(service_descriptors, time) 686 687 688def server_from_dictionary(descriptors_to_servicers, time): 689 """Creates a Server for use in tests of a gRPC Python-using system. 690 691 Args: 692 descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors 693 defining RPC services to servicer objects (usually instances of classes 694 that implement "Servicer" interfaces defined in generated "_pb2_grpc" 695 modules) implementing those services. 696 time: A Time to be used for tests. 697 698 Returns: 699 A Server for use in tests. 700 """ 701 from grpc_testing import _server 702 703 return _server.server_from_dictionary(descriptors_to_servicers, time) 704