• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example gRPC Python-using server-side application."""
15
16import grpc
17
18import threading
19
20# requests_pb2 is a semantic dependency of this module.
21from tests.testing import _application_common
22from tests.testing.proto import requests_pb2  # pylint: disable=unused-import
23from tests.testing.proto import services_pb2
24from tests.testing.proto import services_pb2_grpc
25
26
27class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
28    """Services RPCs."""
29
30    def __init__(self):
31        self._abort_lock = threading.RLock()
32        self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE
33
34    def UnUn(self, request, context):
35        if request == _application_common.UNARY_UNARY_REQUEST:
36            return _application_common.UNARY_UNARY_RESPONSE
37        elif request == _application_common.ABORT_REQUEST:
38            with self._abort_lock:
39                try:
40                    context.abort(grpc.StatusCode.PERMISSION_DENIED,
41                                  "Denying permission to test abort.")
42                except Exception as e:  # pylint: disable=broad-except
43                    self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE
44                else:
45                    self._abort_status = _application_common.ABORT_FAILURE_RESPONSE
46            return None  # NOTE: For the linter.
47        elif request == _application_common.ABORT_SUCCESS_QUERY:
48            with self._abort_lock:
49                return self._abort_response
50        else:
51            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
52            context.set_details('Something is wrong with your request!')
53            return services_pb2.Down()
54
55    def UnStre(self, request, context):
56        if _application_common.UNARY_STREAM_REQUEST != request:
57            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
58            context.set_details('Something is wrong with your request!')
59        return
60        yield services_pb2.Strange()  # pylint: disable=unreachable
61
62    def StreUn(self, request_iterator, context):
63        context.send_initial_metadata(((
64            'server_application_metadata_key',
65            'Hi there!',
66        ),))
67        for request in request_iterator:
68            if request != _application_common.STREAM_UNARY_REQUEST:
69                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
70                context.set_details('Something is wrong with your request!')
71                return services_pb2.Strange()
72            elif not context.is_active():
73                return services_pb2.Strange()
74        else:
75            return _application_common.STREAM_UNARY_RESPONSE
76
77    def StreStre(self, request_iterator, context):
78        valid_requests = (_application_common.STREAM_STREAM_REQUEST,
79                          _application_common.STREAM_STREAM_MUTATING_REQUEST)
80        for request in request_iterator:
81            if request not in valid_requests:
82                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
83                context.set_details('Something is wrong with your request!')
84                return
85            elif not context.is_active():
86                return
87            elif request == _application_common.STREAM_STREAM_REQUEST:
88                yield _application_common.STREAM_STREAM_RESPONSE
89                yield _application_common.STREAM_STREAM_RESPONSE
90            elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST:
91                response = services_pb2.Bottom()
92                for i in range(
93                        _application_common.STREAM_STREAM_MUTATING_COUNT):
94                    response.first_bottom_field = i
95                    yield response
96