• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example gRPC Python-using client-side application."""
15
16import collections
17import enum
18import threading
19import time
20
21import grpc
22
23from tests.testing import _application_common
24from tests.testing.proto import requests_pb2
25from tests.testing.proto import services_pb2
26from tests.testing.proto import services_pb2_grpc
27from tests.unit.framework.common import test_constants
28
29
30@enum.unique
31class Scenario(enum.Enum):
32    UNARY_UNARY = "unary unary"
33    UNARY_STREAM = "unary stream"
34    STREAM_UNARY = "stream unary"
35    STREAM_STREAM = "stream stream"
36    CONCURRENT_STREAM_UNARY = "concurrent stream unary"
37    CONCURRENT_STREAM_STREAM = "concurrent stream stream"
38    CANCEL_UNARY_UNARY = "cancel unary unary"
39    CANCEL_UNARY_STREAM = "cancel unary stream"
40    INFINITE_REQUEST_STREAM = "infinite request stream"
41
42
43class Outcome(collections.namedtuple("Outcome", ("kind", "code", "details"))):
44    """Outcome of a client application scenario.
45
46    Attributes:
47      kind: A Kind value describing the overall kind of scenario execution.
48      code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
49      details: A status details string. Only valid if kind is Kind.RPC_ERROR.
50    """
51
52    @enum.unique
53    class Kind(enum.Enum):
54        SATISFACTORY = "satisfactory"
55        UNSATISFACTORY = "unsatisfactory"
56        RPC_ERROR = "rpc error"
57
58
59_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
60_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
61
62
63class _Pipe(object):
64    def __init__(self):
65        self._condition = threading.Condition()
66        self._values = []
67        self._open = True
68
69    def __iter__(self):
70        return self
71
72    def _next(self):
73        with self._condition:
74            while True:
75                if self._values:
76                    return self._values.pop(0)
77                elif not self._open:
78                    raise StopIteration()
79                else:
80                    self._condition.wait()
81
82    def __next__(self):  # (Python 3 Iterator Protocol)
83        return self._next()
84
85    def next(self):  # (Python 2 Iterator Protocol)
86        return self._next()
87
88    def add(self, value):
89        with self._condition:
90            self._values.append(value)
91            self._condition.notify_all()
92
93    def close(self):
94        with self._condition:
95            self._open = False
96            self._condition.notify_all()
97
98
99def _run_unary_unary(stub):
100    response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
101    if _application_common.UNARY_UNARY_RESPONSE == response:
102        return _SATISFACTORY_OUTCOME
103    else:
104        return _UNSATISFACTORY_OUTCOME
105
106
107def _run_unary_stream(stub):
108    response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
109    try:
110        next(response_iterator)
111    except StopIteration:
112        return _SATISFACTORY_OUTCOME
113    else:
114        return _UNSATISFACTORY_OUTCOME
115
116
117def _run_stream_unary(stub):
118    response, call = stub.StreUn.with_call(
119        iter((_application_common.STREAM_UNARY_REQUEST,) * 3)
120    )
121    if (
122        _application_common.STREAM_UNARY_RESPONSE == response
123        and call.code() is grpc.StatusCode.OK
124    ):
125        return _SATISFACTORY_OUTCOME
126    else:
127        return _UNSATISFACTORY_OUTCOME
128
129
130def _run_stream_stream(stub):
131    request_pipe = _Pipe()
132    response_iterator = stub.StreStre(iter(request_pipe))
133    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
134    first_responses = next(response_iterator), next(response_iterator)
135    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
136    second_responses = next(response_iterator), next(response_iterator)
137    request_pipe.close()
138    try:
139        next(response_iterator)
140    except StopIteration:
141        unexpected_extra_response = False
142    else:
143        unexpected_extra_response = True
144    if (
145        first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
146        and second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
147        and not unexpected_extra_response
148    ):
149        return _SATISFACTORY_OUTCOME
150    else:
151        return _UNSATISFACTORY_OUTCOME
152
153
154def _run_concurrent_stream_unary(stub):
155    future_calls = tuple(
156        stub.StreUn.future(
157            iter((_application_common.STREAM_UNARY_REQUEST,) * 3)
158        )
159        for _ in range(test_constants.THREAD_CONCURRENCY)
160    )
161    for future_call in future_calls:
162        if future_call.code() is grpc.StatusCode.OK:
163            response = future_call.result()
164            if _application_common.STREAM_UNARY_RESPONSE != response:
165                return _UNSATISFACTORY_OUTCOME
166        else:
167            return _UNSATISFACTORY_OUTCOME
168    else:
169        return _SATISFACTORY_OUTCOME
170
171
172def _run_concurrent_stream_stream(stub):
173    condition = threading.Condition()
174    outcomes = [None] * test_constants.RPC_CONCURRENCY
175
176    def run_stream_stream(index):
177        outcome = _run_stream_stream(stub)
178        with condition:
179            outcomes[index] = outcome
180            condition.notify()
181
182    for index in range(test_constants.RPC_CONCURRENCY):
183        thread = threading.Thread(target=run_stream_stream, args=(index,))
184        thread.start()
185    with condition:
186        while True:
187            if all(outcomes):
188                for outcome in outcomes:
189                    if outcome.kind is not Outcome.Kind.SATISFACTORY:
190                        return _UNSATISFACTORY_OUTCOME
191                else:
192                    return _SATISFACTORY_OUTCOME
193            else:
194                condition.wait()
195
196
197def _run_cancel_unary_unary(stub):
198    response_future_call = stub.UnUn.future(
199        _application_common.UNARY_UNARY_REQUEST
200    )
201    initial_metadata = response_future_call.initial_metadata()
202    cancelled = response_future_call.cancel()
203    if initial_metadata is not None and cancelled:
204        return _SATISFACTORY_OUTCOME
205    else:
206        return _UNSATISFACTORY_OUTCOME
207
208
209def _run_infinite_request_stream(stub):
210    def infinite_request_iterator():
211        while True:
212            yield _application_common.STREAM_UNARY_REQUEST
213
214    response_future_call = stub.StreUn.future(
215        infinite_request_iterator(),
216        timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT,
217    )
218    if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
219        return _SATISFACTORY_OUTCOME
220    else:
221        return _UNSATISFACTORY_OUTCOME
222
223
224_IMPLEMENTATIONS = {
225    Scenario.UNARY_UNARY: _run_unary_unary,
226    Scenario.UNARY_STREAM: _run_unary_stream,
227    Scenario.STREAM_UNARY: _run_stream_unary,
228    Scenario.STREAM_STREAM: _run_stream_stream,
229    Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
230    Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
231    Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
232    Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
233}
234
235
236def run(scenario, channel):
237    stub = services_pb2_grpc.FirstServiceStub(channel)
238    try:
239        return _IMPLEMENTATIONS[scenario](stub)
240    except grpc.RpcError as rpc_error:
241        return Outcome(
242            Outcome.Kind.RPC_ERROR, rpc_error.code(), rpc_error.details()
243        )
244