1# Copyright 2017, Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import concurrent.futures 16import threading 17import time 18 19import mock 20import pytest 21 22from google.api_core import exceptions, retry 23from google.api_core.future import polling 24 25 26class PollingFutureImpl(polling.PollingFuture): 27 def done(self): 28 return False 29 30 def cancel(self): 31 return True 32 33 def cancelled(self): 34 return False 35 36 def running(self): 37 return True 38 39 40def test_polling_future_constructor(): 41 future = PollingFutureImpl() 42 assert not future.done() 43 assert not future.cancelled() 44 assert future.running() 45 assert future.cancel() 46 with mock.patch.object(future, "done", return_value=True): 47 future.result() 48 49 50def test_set_result(): 51 future = PollingFutureImpl() 52 callback = mock.Mock() 53 54 future.set_result(1) 55 56 assert future.result() == 1 57 future.add_done_callback(callback) 58 callback.assert_called_once_with(future) 59 60 61def test_set_exception(): 62 future = PollingFutureImpl() 63 exception = ValueError("meep") 64 65 future.set_exception(exception) 66 67 assert future.exception() == exception 68 with pytest.raises(ValueError): 69 future.result() 70 71 callback = mock.Mock() 72 future.add_done_callback(callback) 73 callback.assert_called_once_with(future) 74 75 76def test_invoke_callback_exception(): 77 future = PollingFutureImplWithPoll() 78 future.set_result(42) 79 80 # This should not raise, despite the callback causing an exception. 81 callback = mock.Mock(side_effect=ValueError) 82 future.add_done_callback(callback) 83 callback.assert_called_once_with(future) 84 85 86class PollingFutureImplWithPoll(PollingFutureImpl): 87 def __init__(self): 88 super(PollingFutureImplWithPoll, self).__init__() 89 self.poll_count = 0 90 self.event = threading.Event() 91 92 def done(self, retry=polling.DEFAULT_RETRY): 93 self.poll_count += 1 94 self.event.wait() 95 self.set_result(42) 96 return True 97 98 99def test_result_with_polling(): 100 future = PollingFutureImplWithPoll() 101 102 future.event.set() 103 result = future.result() 104 105 assert result == 42 106 assert future.poll_count == 1 107 # Repeated calls should not cause additional polling 108 assert future.result() == result 109 assert future.poll_count == 1 110 111 112class PollingFutureImplTimeout(PollingFutureImplWithPoll): 113 def done(self, retry=polling.DEFAULT_RETRY): 114 time.sleep(1) 115 return False 116 117 118def test_result_timeout(): 119 future = PollingFutureImplTimeout() 120 with pytest.raises(concurrent.futures.TimeoutError): 121 future.result(timeout=1) 122 123 124def test_exception_timeout(): 125 future = PollingFutureImplTimeout() 126 with pytest.raises(concurrent.futures.TimeoutError): 127 future.exception(timeout=1) 128 129 130class PollingFutureImplTransient(PollingFutureImplWithPoll): 131 def __init__(self, errors): 132 super(PollingFutureImplTransient, self).__init__() 133 self._errors = errors 134 135 def done(self, retry=polling.DEFAULT_RETRY): 136 if self._errors: 137 error, self._errors = self._errors[0], self._errors[1:] 138 raise error("testing") 139 self.poll_count += 1 140 self.set_result(42) 141 return True 142 143 144def test_result_transient_error(): 145 future = PollingFutureImplTransient( 146 ( 147 exceptions.TooManyRequests, 148 exceptions.InternalServerError, 149 exceptions.BadGateway, 150 ) 151 ) 152 result = future.result() 153 assert result == 42 154 assert future.poll_count == 1 155 # Repeated calls should not cause additional polling 156 assert future.result() == result 157 assert future.poll_count == 1 158 159 160def test_callback_background_thread(): 161 future = PollingFutureImplWithPoll() 162 callback = mock.Mock() 163 164 future.add_done_callback(callback) 165 166 assert future._polling_thread is not None 167 168 # Give the thread a second to poll 169 time.sleep(1) 170 assert future.poll_count == 1 171 172 future.event.set() 173 future._polling_thread.join() 174 175 callback.assert_called_once_with(future) 176 177 178def test_double_callback_background_thread(): 179 future = PollingFutureImplWithPoll() 180 callback = mock.Mock() 181 callback2 = mock.Mock() 182 183 future.add_done_callback(callback) 184 current_thread = future._polling_thread 185 assert current_thread is not None 186 187 # only one polling thread should be created. 188 future.add_done_callback(callback2) 189 assert future._polling_thread is current_thread 190 191 future.event.set() 192 future._polling_thread.join() 193 194 assert future.poll_count == 1 195 callback.assert_called_once_with(future) 196 callback2.assert_called_once_with(future) 197 198 199class PollingFutureImplWithoutRetry(PollingFutureImpl): 200 def done(self): 201 return True 202 203 def result(self): 204 return super(PollingFutureImplWithoutRetry, self).result() 205 206 def _blocking_poll(self, timeout): 207 return super(PollingFutureImplWithoutRetry, self)._blocking_poll( 208 timeout=timeout 209 ) 210 211 212class PollingFutureImplWith_done_or_raise(PollingFutureImpl): 213 def done(self): 214 return True 215 216 def _done_or_raise(self): 217 return super(PollingFutureImplWith_done_or_raise, self)._done_or_raise() 218 219 220def test_polling_future_without_retry(): 221 custom_retry = retry.Retry( 222 predicate=retry.if_exception_type(exceptions.TooManyRequests) 223 ) 224 future = PollingFutureImplWithoutRetry() 225 assert future.done() 226 assert future.running() 227 assert future.result() is None 228 229 with mock.patch.object(future, "done") as done_mock: 230 future._done_or_raise() 231 done_mock.assert_called_once_with() 232 233 with mock.patch.object(future, "done") as done_mock: 234 future._done_or_raise(retry=custom_retry) 235 done_mock.assert_called_once_with(retry=custom_retry) 236 237 238def test_polling_future_with__done_or_raise(): 239 future = PollingFutureImplWith_done_or_raise() 240 assert future.done() 241 assert future.running() 242 assert future.result() is None 243