• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import concurrent.futures
16import threading
17import time
18
19import mock
20import pytest
21
22from google.api_core import exceptions, retry
23from google.api_core.future import polling
24
25
26class PollingFutureImpl(polling.PollingFuture):
27    def done(self):
28        return False
29
30    def cancel(self):
31        return True
32
33    def cancelled(self):
34        return False
35
36    def running(self):
37        return True
38
39
40def test_polling_future_constructor():
41    future = PollingFutureImpl()
42    assert not future.done()
43    assert not future.cancelled()
44    assert future.running()
45    assert future.cancel()
46    with mock.patch.object(future, "done", return_value=True):
47        future.result()
48
49
50def test_set_result():
51    future = PollingFutureImpl()
52    callback = mock.Mock()
53
54    future.set_result(1)
55
56    assert future.result() == 1
57    future.add_done_callback(callback)
58    callback.assert_called_once_with(future)
59
60
61def test_set_exception():
62    future = PollingFutureImpl()
63    exception = ValueError("meep")
64
65    future.set_exception(exception)
66
67    assert future.exception() == exception
68    with pytest.raises(ValueError):
69        future.result()
70
71    callback = mock.Mock()
72    future.add_done_callback(callback)
73    callback.assert_called_once_with(future)
74
75
76def test_invoke_callback_exception():
77    future = PollingFutureImplWithPoll()
78    future.set_result(42)
79
80    # This should not raise, despite the callback causing an exception.
81    callback = mock.Mock(side_effect=ValueError)
82    future.add_done_callback(callback)
83    callback.assert_called_once_with(future)
84
85
86class PollingFutureImplWithPoll(PollingFutureImpl):
87    def __init__(self):
88        super(PollingFutureImplWithPoll, self).__init__()
89        self.poll_count = 0
90        self.event = threading.Event()
91
92    def done(self, retry=polling.DEFAULT_RETRY):
93        self.poll_count += 1
94        self.event.wait()
95        self.set_result(42)
96        return True
97
98
99def test_result_with_polling():
100    future = PollingFutureImplWithPoll()
101
102    future.event.set()
103    result = future.result()
104
105    assert result == 42
106    assert future.poll_count == 1
107    # Repeated calls should not cause additional polling
108    assert future.result() == result
109    assert future.poll_count == 1
110
111
112class PollingFutureImplTimeout(PollingFutureImplWithPoll):
113    def done(self, retry=polling.DEFAULT_RETRY):
114        time.sleep(1)
115        return False
116
117
118def test_result_timeout():
119    future = PollingFutureImplTimeout()
120    with pytest.raises(concurrent.futures.TimeoutError):
121        future.result(timeout=1)
122
123
124def test_exception_timeout():
125    future = PollingFutureImplTimeout()
126    with pytest.raises(concurrent.futures.TimeoutError):
127        future.exception(timeout=1)
128
129
130class PollingFutureImplTransient(PollingFutureImplWithPoll):
131    def __init__(self, errors):
132        super(PollingFutureImplTransient, self).__init__()
133        self._errors = errors
134
135    def done(self, retry=polling.DEFAULT_RETRY):
136        if self._errors:
137            error, self._errors = self._errors[0], self._errors[1:]
138            raise error("testing")
139        self.poll_count += 1
140        self.set_result(42)
141        return True
142
143
144def test_result_transient_error():
145    future = PollingFutureImplTransient(
146        (
147            exceptions.TooManyRequests,
148            exceptions.InternalServerError,
149            exceptions.BadGateway,
150        )
151    )
152    result = future.result()
153    assert result == 42
154    assert future.poll_count == 1
155    # Repeated calls should not cause additional polling
156    assert future.result() == result
157    assert future.poll_count == 1
158
159
160def test_callback_background_thread():
161    future = PollingFutureImplWithPoll()
162    callback = mock.Mock()
163
164    future.add_done_callback(callback)
165
166    assert future._polling_thread is not None
167
168    # Give the thread a second to poll
169    time.sleep(1)
170    assert future.poll_count == 1
171
172    future.event.set()
173    future._polling_thread.join()
174
175    callback.assert_called_once_with(future)
176
177
178def test_double_callback_background_thread():
179    future = PollingFutureImplWithPoll()
180    callback = mock.Mock()
181    callback2 = mock.Mock()
182
183    future.add_done_callback(callback)
184    current_thread = future._polling_thread
185    assert current_thread is not None
186
187    # only one polling thread should be created.
188    future.add_done_callback(callback2)
189    assert future._polling_thread is current_thread
190
191    future.event.set()
192    future._polling_thread.join()
193
194    assert future.poll_count == 1
195    callback.assert_called_once_with(future)
196    callback2.assert_called_once_with(future)
197
198
199class PollingFutureImplWithoutRetry(PollingFutureImpl):
200    def done(self):
201        return True
202
203    def result(self):
204        return super(PollingFutureImplWithoutRetry, self).result()
205
206    def _blocking_poll(self, timeout):
207        return super(PollingFutureImplWithoutRetry, self)._blocking_poll(
208            timeout=timeout
209        )
210
211
212class PollingFutureImplWith_done_or_raise(PollingFutureImpl):
213    def done(self):
214        return True
215
216    def _done_or_raise(self):
217        return super(PollingFutureImplWith_done_or_raise, self)._done_or_raise()
218
219
220def test_polling_future_without_retry():
221    custom_retry = retry.Retry(
222        predicate=retry.if_exception_type(exceptions.TooManyRequests)
223    )
224    future = PollingFutureImplWithoutRetry()
225    assert future.done()
226    assert future.running()
227    assert future.result() is None
228
229    with mock.patch.object(future, "done") as done_mock:
230        future._done_or_raise()
231        done_mock.assert_called_once_with()
232
233    with mock.patch.object(future, "done") as done_mock:
234        future._done_or_raise(retry=custom_retry)
235        done_mock.assert_called_once_with(retry=custom_retry)
236
237
238def test_polling_future_with__done_or_raise():
239    future = PollingFutureImplWith_done_or_raise()
240    assert future.done()
241    assert future.running()
242    assert future.result() is None
243