1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example gRPC Python-using client-side application.""" 15 16import collections 17import enum 18import threading 19import time 20 21import grpc 22 23from tests.testing import _application_common 24from tests.testing.proto import requests_pb2 25from tests.testing.proto import services_pb2 26from tests.testing.proto import services_pb2_grpc 27from tests.unit.framework.common import test_constants 28 29 30@enum.unique 31class Scenario(enum.Enum): 32 UNARY_UNARY = "unary unary" 33 UNARY_STREAM = "unary stream" 34 STREAM_UNARY = "stream unary" 35 STREAM_STREAM = "stream stream" 36 CONCURRENT_STREAM_UNARY = "concurrent stream unary" 37 CONCURRENT_STREAM_STREAM = "concurrent stream stream" 38 CANCEL_UNARY_UNARY = "cancel unary unary" 39 CANCEL_UNARY_STREAM = "cancel unary stream" 40 INFINITE_REQUEST_STREAM = "infinite request stream" 41 42 43class Outcome(collections.namedtuple("Outcome", ("kind", "code", "details"))): 44 """Outcome of a client application scenario. 45 46 Attributes: 47 kind: A Kind value describing the overall kind of scenario execution. 48 code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR. 49 details: A status details string. Only valid if kind is Kind.RPC_ERROR. 50 """ 51 52 @enum.unique 53 class Kind(enum.Enum): 54 SATISFACTORY = "satisfactory" 55 UNSATISFACTORY = "unsatisfactory" 56 RPC_ERROR = "rpc error" 57 58 59_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None) 60_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None) 61 62 63class _Pipe(object): 64 def __init__(self): 65 self._condition = threading.Condition() 66 self._values = [] 67 self._open = True 68 69 def __iter__(self): 70 return self 71 72 def _next(self): 73 with self._condition: 74 while True: 75 if self._values: 76 return self._values.pop(0) 77 elif not self._open: 78 raise StopIteration() 79 else: 80 self._condition.wait() 81 82 def __next__(self): # (Python 3 Iterator Protocol) 83 return self._next() 84 85 def next(self): # (Python 2 Iterator Protocol) 86 return self._next() 87 88 def add(self, value): 89 with self._condition: 90 self._values.append(value) 91 self._condition.notify_all() 92 93 def close(self): 94 with self._condition: 95 self._open = False 96 self._condition.notify_all() 97 98 99def _run_unary_unary(stub): 100 response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST) 101 if _application_common.UNARY_UNARY_RESPONSE == response: 102 return _SATISFACTORY_OUTCOME 103 else: 104 return _UNSATISFACTORY_OUTCOME 105 106 107def _run_unary_stream(stub): 108 response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST) 109 try: 110 next(response_iterator) 111 except StopIteration: 112 return _SATISFACTORY_OUTCOME 113 else: 114 return _UNSATISFACTORY_OUTCOME 115 116 117def _run_stream_unary(stub): 118 response, call = stub.StreUn.with_call( 119 iter((_application_common.STREAM_UNARY_REQUEST,) * 3) 120 ) 121 if ( 122 _application_common.STREAM_UNARY_RESPONSE == response 123 and call.code() is grpc.StatusCode.OK 124 ): 125 return _SATISFACTORY_OUTCOME 126 else: 127 return _UNSATISFACTORY_OUTCOME 128 129 130def _run_stream_stream(stub): 131 request_pipe = _Pipe() 132 response_iterator = stub.StreStre(iter(request_pipe)) 133 request_pipe.add(_application_common.STREAM_STREAM_REQUEST) 134 first_responses = next(response_iterator), next(response_iterator) 135 request_pipe.add(_application_common.STREAM_STREAM_REQUEST) 136 second_responses = next(response_iterator), next(response_iterator) 137 request_pipe.close() 138 try: 139 next(response_iterator) 140 except StopIteration: 141 unexpected_extra_response = False 142 else: 143 unexpected_extra_response = True 144 if ( 145 first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES 146 and second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES 147 and not unexpected_extra_response 148 ): 149 return _SATISFACTORY_OUTCOME 150 else: 151 return _UNSATISFACTORY_OUTCOME 152 153 154def _run_concurrent_stream_unary(stub): 155 future_calls = tuple( 156 stub.StreUn.future( 157 iter((_application_common.STREAM_UNARY_REQUEST,) * 3) 158 ) 159 for _ in range(test_constants.THREAD_CONCURRENCY) 160 ) 161 for future_call in future_calls: 162 if future_call.code() is grpc.StatusCode.OK: 163 response = future_call.result() 164 if _application_common.STREAM_UNARY_RESPONSE != response: 165 return _UNSATISFACTORY_OUTCOME 166 else: 167 return _UNSATISFACTORY_OUTCOME 168 else: 169 return _SATISFACTORY_OUTCOME 170 171 172def _run_concurrent_stream_stream(stub): 173 condition = threading.Condition() 174 outcomes = [None] * test_constants.RPC_CONCURRENCY 175 176 def run_stream_stream(index): 177 outcome = _run_stream_stream(stub) 178 with condition: 179 outcomes[index] = outcome 180 condition.notify() 181 182 for index in range(test_constants.RPC_CONCURRENCY): 183 thread = threading.Thread(target=run_stream_stream, args=(index,)) 184 thread.start() 185 with condition: 186 while True: 187 if all(outcomes): 188 for outcome in outcomes: 189 if outcome.kind is not Outcome.Kind.SATISFACTORY: 190 return _UNSATISFACTORY_OUTCOME 191 else: 192 return _SATISFACTORY_OUTCOME 193 else: 194 condition.wait() 195 196 197def _run_cancel_unary_unary(stub): 198 response_future_call = stub.UnUn.future( 199 _application_common.UNARY_UNARY_REQUEST 200 ) 201 initial_metadata = response_future_call.initial_metadata() 202 cancelled = response_future_call.cancel() 203 if initial_metadata is not None and cancelled: 204 return _SATISFACTORY_OUTCOME 205 else: 206 return _UNSATISFACTORY_OUTCOME 207 208 209def _run_infinite_request_stream(stub): 210 def infinite_request_iterator(): 211 while True: 212 yield _application_common.STREAM_UNARY_REQUEST 213 214 response_future_call = stub.StreUn.future( 215 infinite_request_iterator(), 216 timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT, 217 ) 218 if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED: 219 return _SATISFACTORY_OUTCOME 220 else: 221 return _UNSATISFACTORY_OUTCOME 222 223 224_IMPLEMENTATIONS = { 225 Scenario.UNARY_UNARY: _run_unary_unary, 226 Scenario.UNARY_STREAM: _run_unary_stream, 227 Scenario.STREAM_UNARY: _run_stream_unary, 228 Scenario.STREAM_STREAM: _run_stream_stream, 229 Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary, 230 Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream, 231 Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary, 232 Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream, 233} 234 235 236def run(scenario, channel): 237 stub = services_pb2_grpc.FirstServiceStub(channel) 238 try: 239 return _IMPLEMENTATIONS[scenario](stub) 240 except grpc.RpcError as rpc_error: 241 return Outcome( 242 Outcome.Kind.RPC_ERROR, rpc_error.code(), rpc_error.details() 243 ) 244