1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines behavior for WHEN clients send requests. 15 16Each client exposes a non-blocking send_request() method that the 17ClientRunner invokes either periodically or in response to some event. 18""" 19 20import abc 21import threading 22import time 23 24 25class ClientRunner: 26 """Abstract interface for sending requests from clients.""" 27 28 __metaclass__ = abc.ABCMeta 29 30 def __init__(self, client): 31 self._client = client 32 33 @abc.abstractmethod 34 def start(self): 35 raise NotImplementedError() 36 37 @abc.abstractmethod 38 def stop(self): 39 raise NotImplementedError() 40 41 42class OpenLoopClientRunner(ClientRunner): 43 44 def __init__(self, client, interval_generator): 45 super(OpenLoopClientRunner, self).__init__(client) 46 self._is_running = False 47 self._interval_generator = interval_generator 48 self._dispatch_thread = threading.Thread(target=self._dispatch_requests, 49 args=()) 50 51 def start(self): 52 self._is_running = True 53 self._client.start() 54 self._dispatch_thread.start() 55 56 def stop(self): 57 self._is_running = False 58 self._client.stop() 59 self._dispatch_thread.join() 60 self._client = None 61 62 def _dispatch_requests(self): 63 while self._is_running: 64 self._client.send_request() 65 time.sleep(next(self._interval_generator)) 66 67 68class ClosedLoopClientRunner(ClientRunner): 69 70 def __init__(self, client, request_count, no_ping_pong): 71 super(ClosedLoopClientRunner, self).__init__(client) 72 self._is_running = False 73 self._request_count = request_count 74 # For server-streaming RPC, don't spawn new RPC after each responses. 75 # This yield at most ~17% for single RPC scenarios. 76 if not no_ping_pong: 77 # Send a new request on each response for closed loop 78 self._client.add_response_callback(self._send_request) 79 80 def start(self): 81 self._is_running = True 82 self._client.start() 83 for _ in range(self._request_count): 84 self._client.send_request() 85 86 def stop(self): 87 self._is_running = False 88 self._client.stop() 89 self._client = None 90 91 def _send_request(self, client, unused_response_time): 92 if self._is_running: 93 client.send_request() 94