1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines behavior for WHEN clients send requests. 15 16Each client exposes a non-blocking send_request() method that the 17ClientRunner invokes either periodically or in response to some event. 18""" 19 20import abc 21import threading 22import time 23 24 25class ClientRunner: 26 """Abstract interface for sending requests from clients.""" 27 28 __metaclass__ = abc.ABCMeta 29 30 def __init__(self, client): 31 self._client = client 32 33 @abc.abstractmethod 34 def start(self): 35 raise NotImplementedError() 36 37 @abc.abstractmethod 38 def stop(self): 39 raise NotImplementedError() 40 41 42class OpenLoopClientRunner(ClientRunner): 43 44 def __init__(self, client, interval_generator): 45 super(OpenLoopClientRunner, self).__init__(client) 46 self._is_running = False 47 self._interval_generator = interval_generator 48 self._dispatch_thread = threading.Thread( 49 target=self._dispatch_requests, args=()) 50 51 def start(self): 52 self._is_running = True 53 self._client.start() 54 self._dispatch_thread.start() 55 56 def stop(self): 57 self._is_running = False 58 self._client.stop() 59 self._dispatch_thread.join() 60 self._client = None 61 62 def _dispatch_requests(self): 63 while self._is_running: 64 self._client.send_request() 65 time.sleep(next(self._interval_generator)) 66 67 68class ClosedLoopClientRunner(ClientRunner): 69 70 def __init__(self, client, request_count): 71 super(ClosedLoopClientRunner, self).__init__(client) 72 self._is_running = False 73 self._request_count = request_count 74 # Send a new request on each response for closed loop 75 self._client.add_response_callback(self._send_request) 76 77 def start(self): 78 self._is_running = True 79 self._client.start() 80 for _ in xrange(self._request_count): 81 self._client.send_request() 82 83 def stop(self): 84 self._is_running = False 85 self._client.stop() 86 self._client = None 87 88 def _send_request(self, client, response_time): 89 if self._is_running: 90 client.send_request() 91