• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example gRPC Python-using client-side application."""
15
16import collections
17import enum
18import threading
19import time
20
21import grpc
22from tests.unit.framework.common import test_constants
23
24from tests.testing.proto import requests_pb2
25from tests.testing.proto import services_pb2
26from tests.testing.proto import services_pb2_grpc
27
28from tests.testing import _application_common
29
30
31@enum.unique
32class Scenario(enum.Enum):
33    UNARY_UNARY = 'unary unary'
34    UNARY_STREAM = 'unary stream'
35    STREAM_UNARY = 'stream unary'
36    STREAM_STREAM = 'stream stream'
37    CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
38    CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
39    CANCEL_UNARY_UNARY = 'cancel unary unary'
40    CANCEL_UNARY_STREAM = 'cancel unary stream'
41    INFINITE_REQUEST_STREAM = 'infinite request stream'
42
43
44class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
45    """Outcome of a client application scenario.
46
47    Attributes:
48      kind: A Kind value describing the overall kind of scenario execution.
49      code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
50      details: A status details string. Only valid if kind is Kind.RPC_ERROR.
51    """
52
53    @enum.unique
54    class Kind(enum.Enum):
55        SATISFACTORY = 'satisfactory'
56        UNSATISFACTORY = 'unsatisfactory'
57        RPC_ERROR = 'rpc error'
58
59
60_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
61_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
62
63
64class _Pipe(object):
65
66    def __init__(self):
67        self._condition = threading.Condition()
68        self._values = []
69        self._open = True
70
71    def __iter__(self):
72        return self
73
74    def _next(self):
75        with self._condition:
76            while True:
77                if self._values:
78                    return self._values.pop(0)
79                elif not self._open:
80                    raise StopIteration()
81                else:
82                    self._condition.wait()
83
84    def __next__(self):  # (Python 3 Iterator Protocol)
85        return self._next()
86
87    def next(self):  # (Python 2 Iterator Protocol)
88        return self._next()
89
90    def add(self, value):
91        with self._condition:
92            self._values.append(value)
93            self._condition.notify_all()
94
95    def close(self):
96        with self._condition:
97            self._open = False
98            self._condition.notify_all()
99
100
101def _run_unary_unary(stub):
102    response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
103    if _application_common.UNARY_UNARY_RESPONSE == response:
104        return _SATISFACTORY_OUTCOME
105    else:
106        return _UNSATISFACTORY_OUTCOME
107
108
109def _run_unary_stream(stub):
110    response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
111    try:
112        next(response_iterator)
113    except StopIteration:
114        return _SATISFACTORY_OUTCOME
115    else:
116        return _UNSATISFACTORY_OUTCOME
117
118
119def _run_stream_unary(stub):
120    response, call = stub.StreUn.with_call(
121        iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
122    if (_application_common.STREAM_UNARY_RESPONSE == response and
123            call.code() is grpc.StatusCode.OK):
124        return _SATISFACTORY_OUTCOME
125    else:
126        return _UNSATISFACTORY_OUTCOME
127
128
129def _run_stream_stream(stub):
130    request_pipe = _Pipe()
131    response_iterator = stub.StreStre(iter(request_pipe))
132    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
133    first_responses = next(response_iterator), next(response_iterator),
134    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
135    second_responses = next(response_iterator), next(response_iterator),
136    request_pipe.close()
137    try:
138        next(response_iterator)
139    except StopIteration:
140        unexpected_extra_response = False
141    else:
142        unexpected_extra_response = True
143    if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
144            second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
145            and not unexpected_extra_response):
146        return _SATISFACTORY_OUTCOME
147    else:
148        return _UNSATISFACTORY_OUTCOME
149
150
151def _run_concurrent_stream_unary(stub):
152    future_calls = tuple(
153        stub.StreUn.future(
154            iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
155        for _ in range(test_constants.THREAD_CONCURRENCY))
156    for future_call in future_calls:
157        if future_call.code() is grpc.StatusCode.OK:
158            response = future_call.result()
159            if _application_common.STREAM_UNARY_RESPONSE != response:
160                return _UNSATISFACTORY_OUTCOME
161        else:
162            return _UNSATISFACTORY_OUTCOME
163    else:
164        return _SATISFACTORY_OUTCOME
165
166
167def _run_concurrent_stream_stream(stub):
168    condition = threading.Condition()
169    outcomes = [None] * test_constants.RPC_CONCURRENCY
170
171    def run_stream_stream(index):
172        outcome = _run_stream_stream(stub)
173        with condition:
174            outcomes[index] = outcome
175            condition.notify()
176
177    for index in range(test_constants.RPC_CONCURRENCY):
178        thread = threading.Thread(target=run_stream_stream, args=(index,))
179        thread.start()
180    with condition:
181        while True:
182            if all(outcomes):
183                for outcome in outcomes:
184                    if outcome.kind is not Outcome.Kind.SATISFACTORY:
185                        return _UNSATISFACTORY_OUTCOME
186                else:
187                    return _SATISFACTORY_OUTCOME
188            else:
189                condition.wait()
190
191
192def _run_cancel_unary_unary(stub):
193    response_future_call = stub.UnUn.future(
194        _application_common.UNARY_UNARY_REQUEST)
195    initial_metadata = response_future_call.initial_metadata()
196    cancelled = response_future_call.cancel()
197    if initial_metadata is not None and cancelled:
198        return _SATISFACTORY_OUTCOME
199    else:
200        return _UNSATISFACTORY_OUTCOME
201
202
203def _run_infinite_request_stream(stub):
204
205    def infinite_request_iterator():
206        while True:
207            yield _application_common.STREAM_UNARY_REQUEST
208
209    response_future_call = stub.StreUn.future(
210        infinite_request_iterator(),
211        timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
212    if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
213        return _SATISFACTORY_OUTCOME
214    else:
215        return _UNSATISFACTORY_OUTCOME
216
217
218_IMPLEMENTATIONS = {
219    Scenario.UNARY_UNARY: _run_unary_unary,
220    Scenario.UNARY_STREAM: _run_unary_stream,
221    Scenario.STREAM_UNARY: _run_stream_unary,
222    Scenario.STREAM_STREAM: _run_stream_stream,
223    Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
224    Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
225    Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
226    Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
227}
228
229
230def run(scenario, channel):
231    stub = services_pb2_grpc.FirstServiceStub(channel)
232    try:
233        return _IMPLEMENTATIONS[scenario](stub)
234    except grpc.RpcError as rpc_error:
235        return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
236                       rpc_error.details())
237