# Copyright 2017, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures import threading import time import mock import pytest from google.api_core import exceptions, retry from google.api_core.future import polling class PollingFutureImpl(polling.PollingFuture): def done(self): return False def cancel(self): return True def cancelled(self): return False def running(self): return True def test_polling_future_constructor(): future = PollingFutureImpl() assert not future.done() assert not future.cancelled() assert future.running() assert future.cancel() with mock.patch.object(future, "done", return_value=True): future.result() def test_set_result(): future = PollingFutureImpl() callback = mock.Mock() future.set_result(1) assert future.result() == 1 future.add_done_callback(callback) callback.assert_called_once_with(future) def test_set_exception(): future = PollingFutureImpl() exception = ValueError("meep") future.set_exception(exception) assert future.exception() == exception with pytest.raises(ValueError): future.result() callback = mock.Mock() future.add_done_callback(callback) callback.assert_called_once_with(future) def test_invoke_callback_exception(): future = PollingFutureImplWithPoll() future.set_result(42) # This should not raise, despite the callback causing an exception. callback = mock.Mock(side_effect=ValueError) future.add_done_callback(callback) callback.assert_called_once_with(future) class PollingFutureImplWithPoll(PollingFutureImpl): def __init__(self): super(PollingFutureImplWithPoll, self).__init__() self.poll_count = 0 self.event = threading.Event() def done(self, retry=polling.DEFAULT_RETRY): self.poll_count += 1 self.event.wait() self.set_result(42) return True def test_result_with_polling(): future = PollingFutureImplWithPoll() future.event.set() result = future.result() assert result == 42 assert future.poll_count == 1 # Repeated calls should not cause additional polling assert future.result() == result assert future.poll_count == 1 class PollingFutureImplTimeout(PollingFutureImplWithPoll): def done(self, retry=polling.DEFAULT_RETRY): time.sleep(1) return False def test_result_timeout(): future = PollingFutureImplTimeout() with pytest.raises(concurrent.futures.TimeoutError): future.result(timeout=1) def test_exception_timeout(): future = PollingFutureImplTimeout() with pytest.raises(concurrent.futures.TimeoutError): future.exception(timeout=1) class PollingFutureImplTransient(PollingFutureImplWithPoll): def __init__(self, errors): super(PollingFutureImplTransient, self).__init__() self._errors = errors def done(self, retry=polling.DEFAULT_RETRY): if self._errors: error, self._errors = self._errors[0], self._errors[1:] raise error("testing") self.poll_count += 1 self.set_result(42) return True def test_result_transient_error(): future = PollingFutureImplTransient( ( exceptions.TooManyRequests, exceptions.InternalServerError, exceptions.BadGateway, ) ) result = future.result() assert result == 42 assert future.poll_count == 1 # Repeated calls should not cause additional polling assert future.result() == result assert future.poll_count == 1 def test_callback_background_thread(): future = PollingFutureImplWithPoll() callback = mock.Mock() future.add_done_callback(callback) assert future._polling_thread is not None # Give the thread a second to poll time.sleep(1) assert future.poll_count == 1 future.event.set() future._polling_thread.join() callback.assert_called_once_with(future) def test_double_callback_background_thread(): future = PollingFutureImplWithPoll() callback = mock.Mock() callback2 = mock.Mock() future.add_done_callback(callback) current_thread = future._polling_thread assert current_thread is not None # only one polling thread should be created. future.add_done_callback(callback2) assert future._polling_thread is current_thread future.event.set() future._polling_thread.join() assert future.poll_count == 1 callback.assert_called_once_with(future) callback2.assert_called_once_with(future) class PollingFutureImplWithoutRetry(PollingFutureImpl): def done(self): return True def result(self): return super(PollingFutureImplWithoutRetry, self).result() def _blocking_poll(self, timeout): return super(PollingFutureImplWithoutRetry, self)._blocking_poll( timeout=timeout ) class PollingFutureImplWith_done_or_raise(PollingFutureImpl): def done(self): return True def _done_or_raise(self): return super(PollingFutureImplWith_done_or_raise, self)._done_or_raise() def test_polling_future_without_retry(): custom_retry = retry.Retry( predicate=retry.if_exception_type(exceptions.TooManyRequests) ) future = PollingFutureImplWithoutRetry() assert future.done() assert future.running() assert future.result() is None with mock.patch.object(future, "done") as done_mock: future._done_or_raise() done_mock.assert_called_once_with() with mock.patch.object(future, "done") as done_mock: future._done_or_raise(retry=custom_retry) done_mock.assert_called_once_with(retry=custom_retry) def test_polling_future_with__done_or_raise(): future = PollingFutureImplWith_done_or_raise() assert future.done() assert future.running() assert future.result() is None