• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example gRPC Python-using server-side application."""
15
16import threading
17
18import grpc
19
20# requests_pb2 is a semantic dependency of this module.
21from tests.testing import _application_common
22from tests.testing.proto import requests_pb2  # pylint: disable=unused-import
23from tests.testing.proto import services_pb2
24from tests.testing.proto import services_pb2_grpc
25
26
27class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
28    """Services RPCs."""
29
30    def __init__(self):
31        self._abort_lock = threading.RLock()
32        self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE
33
34    def UnUn(self, request, context):
35        if request == _application_common.UNARY_UNARY_REQUEST:
36            return _application_common.UNARY_UNARY_RESPONSE
37        elif request == _application_common.ABORT_REQUEST:
38            with self._abort_lock:
39                try:
40                    context.abort(
41                        grpc.StatusCode.PERMISSION_DENIED,
42                        "Denying permission to test abort.",
43                    )
44                except Exception as e:  # pylint: disable=broad-except
45                    self._abort_response = (
46                        _application_common.ABORT_SUCCESS_RESPONSE
47                    )
48                else:
49                    self._abort_status = (
50                        _application_common.ABORT_FAILURE_RESPONSE
51                    )
52            return None  # NOTE: For the linter.
53        elif request == _application_common.ABORT_SUCCESS_QUERY:
54            with self._abort_lock:
55                return self._abort_response
56        else:
57            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
58            context.set_details("Something is wrong with your request!")
59            return services_pb2.Down()
60
61    def UnStre(self, request, context):
62        if _application_common.UNARY_STREAM_REQUEST != request:
63            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
64            context.set_details("Something is wrong with your request!")
65        return
66        yield services_pb2.Strange()  # pylint: disable=unreachable
67
68    def StreUn(self, request_iterator, context):
69        context.send_initial_metadata(
70            (
71                (
72                    "server_application_metadata_key",
73                    "Hi there!",
74                ),
75            )
76        )
77        for request in request_iterator:
78            if request != _application_common.STREAM_UNARY_REQUEST:
79                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
80                context.set_details("Something is wrong with your request!")
81                return services_pb2.Strange()
82            elif not context.is_active():
83                return services_pb2.Strange()
84        else:
85            return _application_common.STREAM_UNARY_RESPONSE
86
87    def StreStre(self, request_iterator, context):
88        valid_requests = (
89            _application_common.STREAM_STREAM_REQUEST,
90            _application_common.STREAM_STREAM_MUTATING_REQUEST,
91        )
92        for request in request_iterator:
93            if request not in valid_requests:
94                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
95                context.set_details("Something is wrong with your request!")
96                return
97            elif not context.is_active():
98                return
99            elif request == _application_common.STREAM_STREAM_REQUEST:
100                yield _application_common.STREAM_STREAM_RESPONSE
101                yield _application_common.STREAM_STREAM_RESPONSE
102            elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST:
103                response = services_pb2.Bottom()
104                for i in range(
105                    _application_common.STREAM_STREAM_MUTATING_COUNT
106                ):
107                    response.first_bottom_field = i
108                    yield response
109