1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The Python implementation of the TestServicer.""" 15 16import time 17 18import grpc 19 20from src.proto.grpc.testing import empty_pb2 21from src.proto.grpc.testing import messages_pb2 22from src.proto.grpc.testing import test_pb2_grpc 23 24_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" 25_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" 26_US_IN_A_SECOND = 1000 * 1000 27 28 29def _maybe_echo_metadata(servicer_context): 30 """Copies metadata from request to response if it is present.""" 31 invocation_metadata = dict(servicer_context.invocation_metadata()) 32 if _INITIAL_METADATA_KEY in invocation_metadata: 33 initial_metadatum = (_INITIAL_METADATA_KEY, 34 invocation_metadata[_INITIAL_METADATA_KEY]) 35 servicer_context.send_initial_metadata((initial_metadatum,)) 36 if _TRAILING_METADATA_KEY in invocation_metadata: 37 trailing_metadatum = (_TRAILING_METADATA_KEY, 38 invocation_metadata[_TRAILING_METADATA_KEY]) 39 servicer_context.set_trailing_metadata((trailing_metadatum,)) 40 41 42def _maybe_echo_status_and_message(request, servicer_context): 43 """Sets the response context code and details if the request asks for them""" 44 if request.HasField('response_status'): 45 servicer_context.set_code(request.response_status.code) 46 servicer_context.set_details(request.response_status.message) 47 48 49class TestService(test_pb2_grpc.TestServiceServicer): 50 51 def EmptyCall(self, request, context): 52 _maybe_echo_metadata(context) 53 return empty_pb2.Empty() 54 55 def UnaryCall(self, request, context): 56 _maybe_echo_metadata(context) 57 _maybe_echo_status_and_message(request, context) 58 return messages_pb2.SimpleResponse( 59 payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, 60 body=b'\x00' * request.response_size)) 61 62 def StreamingOutputCall(self, request, context): 63 _maybe_echo_status_and_message(request, context) 64 for response_parameters in request.response_parameters: 65 if response_parameters.interval_us != 0: 66 time.sleep(response_parameters.interval_us / _US_IN_A_SECOND) 67 yield messages_pb2.StreamingOutputCallResponse( 68 payload=messages_pb2.Payload(type=request.response_type, 69 body=b'\x00' * 70 response_parameters.size)) 71 72 def StreamingInputCall(self, request_iterator, context): 73 aggregate_size = 0 74 for request in request_iterator: 75 if request.payload is not None and request.payload.body: 76 aggregate_size += len(request.payload.body) 77 return messages_pb2.StreamingInputCallResponse( 78 aggregated_payload_size=aggregate_size) 79 80 def FullDuplexCall(self, request_iterator, context): 81 _maybe_echo_metadata(context) 82 for request in request_iterator: 83 _maybe_echo_status_and_message(request, context) 84 for response_parameters in request.response_parameters: 85 if response_parameters.interval_us != 0: 86 time.sleep(response_parameters.interval_us / 87 _US_IN_A_SECOND) 88 yield messages_pb2.StreamingOutputCallResponse( 89 payload=messages_pb2.Payload(type=request.payload.type, 90 body=b'\x00' * 91 response_parameters.size)) 92 93 # NOTE(nathaniel): Apparently this is the same as the full-duplex call? 94 # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... 95 def HalfDuplexCall(self, request_iterator, context): 96 return self.FullDuplexCall(request_iterator, context) 97