• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Objects for use in testing gRPC Python-using application code."""
15
16import abc
17
18from google.protobuf import descriptor
19import grpc
20
21
22class UnaryUnaryChannelRpc(abc.ABC):
23    """Fixture for a unary-unary RPC invoked by a system under test.
24
25    Enables users to "play server" for the RPC.
26    """
27
28    @abc.abstractmethod
29    def send_initial_metadata(self, initial_metadata):
30        """Sends the RPC's initial metadata to the system under test.
31
32        Args:
33          initial_metadata: The RPC's initial metadata to be "sent" to
34            the system under test.
35        """
36        raise NotImplementedError()
37
38    @abc.abstractmethod
39    def cancelled(self):
40        """Blocks until the system under test has cancelled the RPC."""
41        raise NotImplementedError()
42
43    @abc.abstractmethod
44    def terminate(self, response, trailing_metadata, code, details):
45        """Terminates the RPC.
46
47        Args:
48          response: The response for the RPC.
49          trailing_metadata: The RPC's trailing metadata.
50          code: The RPC's status code.
51          details: The RPC's status details.
52        """
53        raise NotImplementedError()
54
55
56class UnaryStreamChannelRpc(abc.ABC):
57    """Fixture for a unary-stream RPC invoked by a system under test.
58
59    Enables users to "play server" for the RPC.
60    """
61
62    @abc.abstractmethod
63    def send_initial_metadata(self, initial_metadata):
64        """Sends the RPC's initial metadata to the system under test.
65
66        Args:
67          initial_metadata: The RPC's initial metadata to be "sent" to
68            the system under test.
69        """
70        raise NotImplementedError()
71
72    @abc.abstractmethod
73    def send_response(self, response):
74        """Sends a response to the system under test.
75
76        Args:
77          response: A response message to be "sent" to the system under test.
78        """
79        raise NotImplementedError()
80
81    @abc.abstractmethod
82    def cancelled(self):
83        """Blocks until the system under test has cancelled the RPC."""
84        raise NotImplementedError()
85
86    @abc.abstractmethod
87    def terminate(self, trailing_metadata, code, details):
88        """Terminates the RPC.
89
90        Args:
91          trailing_metadata: The RPC's trailing metadata.
92          code: The RPC's status code.
93          details: The RPC's status details.
94        """
95        raise NotImplementedError()
96
97
98class StreamUnaryChannelRpc(abc.ABC):
99    """Fixture for a stream-unary RPC invoked by a system under test.
100
101    Enables users to "play server" for the RPC.
102    """
103
104    @abc.abstractmethod
105    def send_initial_metadata(self, initial_metadata):
106        """Sends the RPC's initial metadata to the system under test.
107
108        Args:
109          initial_metadata: The RPC's initial metadata to be "sent" to
110            the system under test.
111        """
112        raise NotImplementedError()
113
114    @abc.abstractmethod
115    def take_request(self):
116        """Draws one of the requests added to the RPC by the system under test.
117
118        This method blocks until the system under test has added to the RPC
119        the request to be returned.
120
121        Successive calls to this method return requests in the same order in
122        which the system under test added them to the RPC.
123
124        Returns:
125          A request message added to the RPC by the system under test.
126        """
127        raise NotImplementedError()
128
129    @abc.abstractmethod
130    def requests_closed(self):
131        """Blocks until the system under test has closed the request stream."""
132        raise NotImplementedError()
133
134    @abc.abstractmethod
135    def cancelled(self):
136        """Blocks until the system under test has cancelled the RPC."""
137        raise NotImplementedError()
138
139    @abc.abstractmethod
140    def terminate(self, response, trailing_metadata, code, details):
141        """Terminates the RPC.
142
143        Args:
144          response: The response for the RPC.
145          trailing_metadata: The RPC's trailing metadata.
146          code: The RPC's status code.
147          details: The RPC's status details.
148        """
149        raise NotImplementedError()
150
151
152class StreamStreamChannelRpc(abc.ABC):
153    """Fixture for a stream-stream RPC invoked by a system under test.
154
155    Enables users to "play server" for the RPC.
156    """
157
158    @abc.abstractmethod
159    def send_initial_metadata(self, initial_metadata):
160        """Sends the RPC's initial metadata to the system under test.
161
162        Args:
163          initial_metadata: The RPC's initial metadata to be "sent" to the
164            system under test.
165        """
166        raise NotImplementedError()
167
168    @abc.abstractmethod
169    def take_request(self):
170        """Draws one of the requests added to the RPC by the system under test.
171
172        This method blocks until the system under test has added to the RPC
173        the request to be returned.
174
175        Successive calls to this method return requests in the same order in
176        which the system under test added them to the RPC.
177
178        Returns:
179          A request message added to the RPC by the system under test.
180        """
181        raise NotImplementedError()
182
183    @abc.abstractmethod
184    def send_response(self, response):
185        """Sends a response to the system under test.
186
187        Args:
188          response: A response messages to be "sent" to the system under test.
189        """
190        raise NotImplementedError()
191
192    @abc.abstractmethod
193    def requests_closed(self):
194        """Blocks until the system under test has closed the request stream."""
195        raise NotImplementedError()
196
197    @abc.abstractmethod
198    def cancelled(self):
199        """Blocks until the system under test has cancelled the RPC."""
200        raise NotImplementedError()
201
202    @abc.abstractmethod
203    def terminate(self, trailing_metadata, code, details):
204        """Terminates the RPC.
205
206        Args:
207          trailing_metadata: The RPC's trailing metadata.
208          code: The RPC's status code.
209          details: The RPC's status details.
210        """
211        raise NotImplementedError()
212
213
214class Channel(grpc.Channel, metaclass=abc.ABCMeta):
215    """A grpc.Channel double with which to test a system that invokes RPCs."""
216
217    @abc.abstractmethod
218    def take_unary_unary(self, method_descriptor):
219        """Draws an RPC currently being made by the system under test.
220
221        If the given descriptor does not identify any RPC currently being made
222        by the system under test, this method blocks until the system under
223        test invokes such an RPC.
224
225        Args:
226          method_descriptor: A descriptor.MethodDescriptor describing a
227            unary-unary RPC method.
228
229        Returns:
230          A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
231            the RPC's invocation metadata, its request, and a
232            UnaryUnaryChannelRpc with which to "play server" for the RPC.
233        """
234        raise NotImplementedError()
235
236    @abc.abstractmethod
237    def take_unary_stream(self, method_descriptor):
238        """Draws an RPC currently being made by the system under test.
239
240        If the given descriptor does not identify any RPC currently being made
241        by the system under test, this method blocks until the system under
242        test invokes such an RPC.
243
244        Args:
245          method_descriptor: A descriptor.MethodDescriptor describing a
246            unary-stream RPC method.
247
248        Returns:
249          A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
250            the RPC's invocation metadata, its request, and a
251            UnaryStreamChannelRpc with which to "play server" for the RPC.
252        """
253        raise NotImplementedError()
254
255    @abc.abstractmethod
256    def take_stream_unary(self, method_descriptor):
257        """Draws an RPC currently being made by the system under test.
258
259        If the given descriptor does not identify any RPC currently being made
260        by the system under test, this method blocks until the system under
261        test invokes such an RPC.
262
263        Args:
264          method_descriptor: A descriptor.MethodDescriptor describing a
265            stream-unary RPC method.
266
267        Returns:
268          A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
269            invocation metadata and a StreamUnaryChannelRpc with which to "play
270            server" for the RPC.
271        """
272        raise NotImplementedError()
273
274    @abc.abstractmethod
275    def take_stream_stream(self, method_descriptor):
276        """Draws an RPC currently being made by the system under test.
277
278        If the given descriptor does not identify any RPC currently being made
279        by the system under test, this method blocks until the system under
280        test invokes such an RPC.
281
282        Args:
283          method_descriptor: A descriptor.MethodDescriptor describing a
284            stream-stream RPC method.
285
286        Returns:
287          A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
288            invocation metadata and a StreamStreamChannelRpc with which to
289            "play server" for the RPC.
290        """
291        raise NotImplementedError()
292
293
294class UnaryUnaryServerRpc(abc.ABC):
295    """Fixture for a unary-unary RPC serviced by a system under test.
296
297    Enables users to "play client" for the RPC.
298    """
299
300    @abc.abstractmethod
301    def initial_metadata(self):
302        """Accesses the initial metadata emitted by the system under test.
303
304        This method blocks until the system under test has added initial
305        metadata to the RPC (or has provided one or more response messages or
306        has terminated the RPC, either of which will cause gRPC Python to
307        synthesize initial metadata for the RPC).
308
309        Returns:
310          The initial metadata for the RPC.
311        """
312        raise NotImplementedError()
313
314    @abc.abstractmethod
315    def cancel(self):
316        """Cancels the RPC."""
317        raise NotImplementedError()
318
319    @abc.abstractmethod
320    def termination(self):
321        """Blocks until the system under test has terminated the RPC.
322
323        Returns:
324          A (response, trailing_metadata, code, details) sequence with the RPC's
325            response, trailing metadata, code, and details.
326        """
327        raise NotImplementedError()
328
329
330class UnaryStreamServerRpc(abc.ABC):
331    """Fixture for a unary-stream RPC serviced by a system under test.
332
333    Enables users to "play client" for the RPC.
334    """
335
336    @abc.abstractmethod
337    def initial_metadata(self):
338        """Accesses the initial metadata emitted by the system under test.
339
340        This method blocks until the system under test has added initial
341        metadata to the RPC (or has provided one or more response messages or
342        has terminated the RPC, either of which will cause gRPC Python to
343        synthesize initial metadata for the RPC).
344
345        Returns:
346          The initial metadata for the RPC.
347        """
348        raise NotImplementedError()
349
350    @abc.abstractmethod
351    def take_response(self):
352        """Draws one of the responses added to the RPC by the system under test.
353
354        Successive calls to this method return responses in the same order in
355        which the system under test added them to the RPC.
356
357        Returns:
358          A response message added to the RPC by the system under test.
359        """
360        raise NotImplementedError()
361
362    @abc.abstractmethod
363    def cancel(self):
364        """Cancels the RPC."""
365        raise NotImplementedError()
366
367    @abc.abstractmethod
368    def termination(self):
369        """Blocks until the system under test has terminated the RPC.
370
371        Returns:
372          A (trailing_metadata, code, details) sequence with the RPC's trailing
373            metadata, code, and details.
374        """
375        raise NotImplementedError()
376
377
378class StreamUnaryServerRpc(abc.ABC):
379    """Fixture for a stream-unary RPC serviced by a system under test.
380
381    Enables users to "play client" for the RPC.
382    """
383
384    @abc.abstractmethod
385    def initial_metadata(self):
386        """Accesses the initial metadata emitted by the system under test.
387
388        This method blocks until the system under test has added initial
389        metadata to the RPC (or has provided one or more response messages or
390        has terminated the RPC, either of which will cause gRPC Python to
391        synthesize initial metadata for the RPC).
392
393        Returns:
394          The initial metadata for the RPC.
395        """
396        raise NotImplementedError()
397
398    @abc.abstractmethod
399    def send_request(self, request):
400        """Sends a request to the system under test.
401
402        Args:
403          request: A request message for the RPC to be "sent" to the system
404            under test.
405        """
406        raise NotImplementedError()
407
408    @abc.abstractmethod
409    def requests_closed(self):
410        """Indicates the end of the RPC's request stream."""
411        raise NotImplementedError()
412
413    @abc.abstractmethod
414    def cancel(self):
415        """Cancels the RPC."""
416        raise NotImplementedError()
417
418    @abc.abstractmethod
419    def termination(self):
420        """Blocks until the system under test has terminated the RPC.
421
422        Returns:
423          A (response, trailing_metadata, code, details) sequence with the RPC's
424            response, trailing metadata, code, and details.
425        """
426        raise NotImplementedError()
427
428
429class StreamStreamServerRpc(abc.ABC):
430    """Fixture for a stream-stream RPC serviced by a system under test.
431
432    Enables users to "play client" for the RPC.
433    """
434
435    @abc.abstractmethod
436    def initial_metadata(self):
437        """Accesses the initial metadata emitted by the system under test.
438
439        This method blocks until the system under test has added initial
440        metadata to the RPC (or has provided one or more response messages or
441        has terminated the RPC, either of which will cause gRPC Python to
442        synthesize initial metadata for the RPC).
443
444        Returns:
445          The initial metadata for the RPC.
446        """
447        raise NotImplementedError()
448
449    @abc.abstractmethod
450    def send_request(self, request):
451        """Sends a request to the system under test.
452
453        Args:
454          request: A request message for the RPC to be "sent" to the system
455            under test.
456        """
457        raise NotImplementedError()
458
459    @abc.abstractmethod
460    def requests_closed(self):
461        """Indicates the end of the RPC's request stream."""
462        raise NotImplementedError()
463
464    @abc.abstractmethod
465    def take_response(self):
466        """Draws one of the responses added to the RPC by the system under test.
467
468        Successive calls to this method return responses in the same order in
469        which the system under test added them to the RPC.
470
471        Returns:
472          A response message added to the RPC by the system under test.
473        """
474        raise NotImplementedError()
475
476    @abc.abstractmethod
477    def cancel(self):
478        """Cancels the RPC."""
479        raise NotImplementedError()
480
481    @abc.abstractmethod
482    def termination(self):
483        """Blocks until the system under test has terminated the RPC.
484
485        Returns:
486          A (trailing_metadata, code, details) sequence with the RPC's trailing
487            metadata, code, and details.
488        """
489        raise NotImplementedError()
490
491
492class Server(abc.ABC):
493    """A server with which to test a system that services RPCs."""
494
495    @abc.abstractmethod
496    def invoke_unary_unary(
497        self, method_descriptor, invocation_metadata, request, timeout
498    ):
499        """Invokes an RPC to be serviced by the system under test.
500
501        Args:
502          method_descriptor: A descriptor.MethodDescriptor describing a unary-unary
503            RPC method.
504          invocation_metadata: The RPC's invocation metadata.
505          request: The RPC's request.
506          timeout: A duration of time in seconds for the RPC or None to
507            indicate that the RPC has no time limit.
508
509        Returns:
510          A UnaryUnaryServerRpc with which to "play client" for the RPC.
511        """
512        raise NotImplementedError()
513
514    @abc.abstractmethod
515    def invoke_unary_stream(
516        self, method_descriptor, invocation_metadata, request, timeout
517    ):
518        """Invokes an RPC to be serviced by the system under test.
519
520        Args:
521          method_descriptor: A descriptor.MethodDescriptor describing a unary-stream
522            RPC method.
523          invocation_metadata: The RPC's invocation metadata.
524          request: The RPC's request.
525          timeout: A duration of time in seconds for the RPC or None to
526            indicate that the RPC has no time limit.
527
528        Returns:
529          A UnaryStreamServerRpc with which to "play client" for the RPC.
530        """
531        raise NotImplementedError()
532
533    @abc.abstractmethod
534    def invoke_stream_unary(
535        self, method_descriptor, invocation_metadata, timeout
536    ):
537        """Invokes an RPC to be serviced by the system under test.
538
539        Args:
540          method_descriptor: A descriptor.MethodDescriptor describing a stream-unary
541            RPC method.
542          invocation_metadata: The RPC's invocation metadata.
543          timeout: A duration of time in seconds for the RPC or None to
544            indicate that the RPC has no time limit.
545
546        Returns:
547          A StreamUnaryServerRpc with which to "play client" for the RPC.
548        """
549        raise NotImplementedError()
550
551    @abc.abstractmethod
552    def invoke_stream_stream(
553        self, method_descriptor, invocation_metadata, timeout
554    ):
555        """Invokes an RPC to be serviced by the system under test.
556
557        Args:
558          method_descriptor: A descriptor.MethodDescriptor describing a stream-stream
559            RPC method.
560          invocation_metadata: The RPC's invocation metadata.
561          timeout: A duration of time in seconds for the RPC or None to
562            indicate that the RPC has no time limit.
563
564        Returns:
565          A StreamStreamServerRpc with which to "play client" for the RPC.
566        """
567        raise NotImplementedError()
568
569
570class Time(abc.ABC):
571    """A simulation of time.
572
573    Implementations needn't be connected with real time as provided by the
574    Python interpreter, but as long as systems under test use
575    RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness
576    implementations may be used to change passage of time in tests.
577    """
578
579    @abc.abstractmethod
580    def time(self):
581        """Accesses the current test time.
582
583        Returns:
584          The current test time (over which this object has authority).
585        """
586        raise NotImplementedError()
587
588    @abc.abstractmethod
589    def call_in(self, behavior, delay):
590        """Adds a behavior to be called after some time.
591
592        Args:
593          behavior: A behavior to be called with no arguments.
594          delay: A duration of time in seconds after which to call the behavior.
595
596        Returns:
597          A grpc.Future with which the call of the behavior may be cancelled
598            before it is executed.
599        """
600        raise NotImplementedError()
601
602    @abc.abstractmethod
603    def call_at(self, behavior, time):
604        """Adds a behavior to be called at a specific time.
605
606        Args:
607          behavior: A behavior to be called with no arguments.
608          time: The test time at which to call the behavior.
609
610        Returns:
611          A grpc.Future with which the call of the behavior may be cancelled
612            before it is executed.
613        """
614        raise NotImplementedError()
615
616    @abc.abstractmethod
617    def sleep_for(self, duration):
618        """Blocks for some length of test time.
619
620        Args:
621          duration: A duration of test time in seconds for which to block.
622        """
623        raise NotImplementedError()
624
625    @abc.abstractmethod
626    def sleep_until(self, time):
627        """Blocks until some test time.
628
629        Args:
630          time: The test time until which to block.
631        """
632        raise NotImplementedError()
633
634
635def strict_real_time():
636    """Creates a Time backed by the Python interpreter's time.
637
638    The returned instance will be "strict" with respect to callbacks
639    submitted to it: it will ensure that all callbacks registered to
640    be called at time t have been called before it describes the time
641    as having advanced beyond t.
642
643    Returns:
644      A Time backed by the "system" (Python interpreter's) time.
645    """
646    from grpc_testing import _time
647
648    return _time.StrictRealTime()
649
650
651def strict_fake_time(now):
652    """Creates a Time that can be manipulated by test code.
653
654    The returned instance maintains an internal representation of time
655    independent of real time. This internal representation only advances
656    when user code calls the instance's sleep_for and sleep_until methods.
657
658    The returned instance will be "strict" with respect to callbacks
659    submitted to it: it will ensure that all callbacks registered to
660    be called at time t have been called before it describes the time
661    as having advanced beyond t.
662
663    Returns:
664      A Time that simulates the passage of time.
665    """
666    from grpc_testing import _time
667
668    return _time.StrictFakeTime(now)
669
670
671def channel(service_descriptors, time):
672    """Creates a Channel for use in tests of a gRPC Python-using system.
673
674    Args:
675      service_descriptors: An iterable of descriptor.ServiceDescriptors
676        describing the RPCs that will be made on the returned Channel by the
677        system under test.
678      time: A Time to be used for tests.
679
680    Returns:
681      A Channel for use in tests.
682    """
683    from grpc_testing import _channel
684
685    return _channel.testing_channel(service_descriptors, time)
686
687
688def server_from_dictionary(descriptors_to_servicers, time):
689    """Creates a Server for use in tests of a gRPC Python-using system.
690
691    Args:
692      descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors
693        defining RPC services to servicer objects (usually instances of classes
694        that implement "Servicer" interfaces defined in generated "_pb2_grpc"
695        modules) implementing those services.
696      time: A Time to be used for tests.
697
698    Returns:
699      A Server for use in tests.
700    """
701    from grpc_testing import _server
702
703    return _server.server_from_dictionary(descriptors_to_servicers, time)
704