1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example gRPC Python-using client-side application.""" 15 16import collections 17import enum 18import threading 19import time 20 21import grpc 22from tests.unit.framework.common import test_constants 23 24from tests.testing.proto import requests_pb2 25from tests.testing.proto import services_pb2 26from tests.testing.proto import services_pb2_grpc 27 28from tests.testing import _application_common 29 30 31@enum.unique 32class Scenario(enum.Enum): 33 UNARY_UNARY = 'unary unary' 34 UNARY_STREAM = 'unary stream' 35 STREAM_UNARY = 'stream unary' 36 STREAM_STREAM = 'stream stream' 37 CONCURRENT_STREAM_UNARY = 'concurrent stream unary' 38 CONCURRENT_STREAM_STREAM = 'concurrent stream stream' 39 CANCEL_UNARY_UNARY = 'cancel unary unary' 40 CANCEL_UNARY_STREAM = 'cancel unary stream' 41 INFINITE_REQUEST_STREAM = 'infinite request stream' 42 43 44class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))): 45 """Outcome of a client application scenario. 46 47 Attributes: 48 kind: A Kind value describing the overall kind of scenario execution. 49 code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR. 50 details: A status details string. Only valid if kind is Kind.RPC_ERROR. 51 """ 52 53 @enum.unique 54 class Kind(enum.Enum): 55 SATISFACTORY = 'satisfactory' 56 UNSATISFACTORY = 'unsatisfactory' 57 RPC_ERROR = 'rpc error' 58 59 60_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None) 61_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None) 62 63 64class _Pipe(object): 65 66 def __init__(self): 67 self._condition = threading.Condition() 68 self._values = [] 69 self._open = True 70 71 def __iter__(self): 72 return self 73 74 def _next(self): 75 with self._condition: 76 while True: 77 if self._values: 78 return self._values.pop(0) 79 elif not self._open: 80 raise StopIteration() 81 else: 82 self._condition.wait() 83 84 def __next__(self): # (Python 3 Iterator Protocol) 85 return self._next() 86 87 def next(self): # (Python 2 Iterator Protocol) 88 return self._next() 89 90 def add(self, value): 91 with self._condition: 92 self._values.append(value) 93 self._condition.notify_all() 94 95 def close(self): 96 with self._condition: 97 self._open = False 98 self._condition.notify_all() 99 100 101def _run_unary_unary(stub): 102 response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST) 103 if _application_common.UNARY_UNARY_RESPONSE == response: 104 return _SATISFACTORY_OUTCOME 105 else: 106 return _UNSATISFACTORY_OUTCOME 107 108 109def _run_unary_stream(stub): 110 response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST) 111 try: 112 next(response_iterator) 113 except StopIteration: 114 return _SATISFACTORY_OUTCOME 115 else: 116 return _UNSATISFACTORY_OUTCOME 117 118 119def _run_stream_unary(stub): 120 response, call = stub.StreUn.with_call( 121 iter((_application_common.STREAM_UNARY_REQUEST,) * 3)) 122 if (_application_common.STREAM_UNARY_RESPONSE == response and 123 call.code() is grpc.StatusCode.OK): 124 return _SATISFACTORY_OUTCOME 125 else: 126 return _UNSATISFACTORY_OUTCOME 127 128 129def _run_stream_stream(stub): 130 request_pipe = _Pipe() 131 response_iterator = stub.StreStre(iter(request_pipe)) 132 request_pipe.add(_application_common.STREAM_STREAM_REQUEST) 133 first_responses = next(response_iterator), next(response_iterator) 134 request_pipe.add(_application_common.STREAM_STREAM_REQUEST) 135 second_responses = next(response_iterator), next(response_iterator) 136 request_pipe.close() 137 try: 138 next(response_iterator) 139 except StopIteration: 140 unexpected_extra_response = False 141 else: 142 unexpected_extra_response = True 143 if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and 144 second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES 145 and not unexpected_extra_response): 146 return _SATISFACTORY_OUTCOME 147 else: 148 return _UNSATISFACTORY_OUTCOME 149 150 151def _run_concurrent_stream_unary(stub): 152 future_calls = tuple( 153 stub.StreUn.future(iter((_application_common.STREAM_UNARY_REQUEST,) * 154 3)) 155 for _ in range(test_constants.THREAD_CONCURRENCY)) 156 for future_call in future_calls: 157 if future_call.code() is grpc.StatusCode.OK: 158 response = future_call.result() 159 if _application_common.STREAM_UNARY_RESPONSE != response: 160 return _UNSATISFACTORY_OUTCOME 161 else: 162 return _UNSATISFACTORY_OUTCOME 163 else: 164 return _SATISFACTORY_OUTCOME 165 166 167def _run_concurrent_stream_stream(stub): 168 condition = threading.Condition() 169 outcomes = [None] * test_constants.RPC_CONCURRENCY 170 171 def run_stream_stream(index): 172 outcome = _run_stream_stream(stub) 173 with condition: 174 outcomes[index] = outcome 175 condition.notify() 176 177 for index in range(test_constants.RPC_CONCURRENCY): 178 thread = threading.Thread(target=run_stream_stream, args=(index,)) 179 thread.start() 180 with condition: 181 while True: 182 if all(outcomes): 183 for outcome in outcomes: 184 if outcome.kind is not Outcome.Kind.SATISFACTORY: 185 return _UNSATISFACTORY_OUTCOME 186 else: 187 return _SATISFACTORY_OUTCOME 188 else: 189 condition.wait() 190 191 192def _run_cancel_unary_unary(stub): 193 response_future_call = stub.UnUn.future( 194 _application_common.UNARY_UNARY_REQUEST) 195 initial_metadata = response_future_call.initial_metadata() 196 cancelled = response_future_call.cancel() 197 if initial_metadata is not None and cancelled: 198 return _SATISFACTORY_OUTCOME 199 else: 200 return _UNSATISFACTORY_OUTCOME 201 202 203def _run_infinite_request_stream(stub): 204 205 def infinite_request_iterator(): 206 while True: 207 yield _application_common.STREAM_UNARY_REQUEST 208 209 response_future_call = stub.StreUn.future( 210 infinite_request_iterator(), 211 timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT) 212 if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED: 213 return _SATISFACTORY_OUTCOME 214 else: 215 return _UNSATISFACTORY_OUTCOME 216 217 218_IMPLEMENTATIONS = { 219 Scenario.UNARY_UNARY: _run_unary_unary, 220 Scenario.UNARY_STREAM: _run_unary_stream, 221 Scenario.STREAM_UNARY: _run_stream_unary, 222 Scenario.STREAM_STREAM: _run_stream_stream, 223 Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary, 224 Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream, 225 Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary, 226 Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream, 227} 228 229 230def run(scenario, channel): 231 stub = services_pb2_grpc.FirstServiceStub(channel) 232 try: 233 return _IMPLEMENTATIONS[scenario](stub) 234 except grpc.RpcError as rpc_error: 235 return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), 236 rpc_error.details()) 237