• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""The Python implementation of the TestServicer."""
15
16import time
17
18import grpc
19
20from src.proto.grpc.testing import empty_pb2
21from src.proto.grpc.testing import messages_pb2
22from src.proto.grpc.testing import test_pb2_grpc
23
24_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
25_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
26_US_IN_A_SECOND = 1000 * 1000
27
28
29def _maybe_echo_metadata(servicer_context):
30    """Copies metadata from request to response if it is present."""
31    invocation_metadata = dict(servicer_context.invocation_metadata())
32    if _INITIAL_METADATA_KEY in invocation_metadata:
33        initial_metadatum = (_INITIAL_METADATA_KEY,
34                             invocation_metadata[_INITIAL_METADATA_KEY])
35        servicer_context.send_initial_metadata((initial_metadatum,))
36    if _TRAILING_METADATA_KEY in invocation_metadata:
37        trailing_metadatum = (_TRAILING_METADATA_KEY,
38                              invocation_metadata[_TRAILING_METADATA_KEY])
39        servicer_context.set_trailing_metadata((trailing_metadatum,))
40
41
42def _maybe_echo_status_and_message(request, servicer_context):
43    """Sets the response context code and details if the request asks for them"""
44    if request.HasField('response_status'):
45        servicer_context.set_code(request.response_status.code)
46        servicer_context.set_details(request.response_status.message)
47
48
49class TestService(test_pb2_grpc.TestServiceServicer):
50
51    def EmptyCall(self, request, context):
52        _maybe_echo_metadata(context)
53        return empty_pb2.Empty()
54
55    def UnaryCall(self, request, context):
56        _maybe_echo_metadata(context)
57        _maybe_echo_status_and_message(request, context)
58        return messages_pb2.SimpleResponse(
59            payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE,
60                                         body=b'\x00' * request.response_size))
61
62    def StreamingOutputCall(self, request, context):
63        _maybe_echo_status_and_message(request, context)
64        for response_parameters in request.response_parameters:
65            if response_parameters.interval_us != 0:
66                time.sleep(response_parameters.interval_us / _US_IN_A_SECOND)
67            yield messages_pb2.StreamingOutputCallResponse(
68                payload=messages_pb2.Payload(type=request.response_type,
69                                             body=b'\x00' *
70                                             response_parameters.size))
71
72    def StreamingInputCall(self, request_iterator, context):
73        aggregate_size = 0
74        for request in request_iterator:
75            if request.payload is not None and request.payload.body:
76                aggregate_size += len(request.payload.body)
77        return messages_pb2.StreamingInputCallResponse(
78            aggregated_payload_size=aggregate_size)
79
80    def FullDuplexCall(self, request_iterator, context):
81        _maybe_echo_metadata(context)
82        for request in request_iterator:
83            _maybe_echo_status_and_message(request, context)
84            for response_parameters in request.response_parameters:
85                if response_parameters.interval_us != 0:
86                    time.sleep(response_parameters.interval_us /
87                               _US_IN_A_SECOND)
88                yield messages_pb2.StreamingOutputCallResponse(
89                    payload=messages_pb2.Payload(type=request.payload.type,
90                                                 body=b'\x00' *
91                                                 response_parameters.size))
92
93    # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
94    # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
95    def HalfDuplexCall(self, request_iterator, context):
96        return self.FullDuplexCall(request_iterator, context)
97