• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import logging
17import queue
18import threading
19
20import mock
21import pytest
22
23try:
24    import grpc
25except ImportError:
26    pytest.skip("No GRPC", allow_module_level=True)
27
28from google.api_core import bidi
29from google.api_core import exceptions
30
31
32class Test_RequestQueueGenerator(object):
33    def test_bounded_consume(self):
34        call = mock.create_autospec(grpc.Call, instance=True)
35        call.is_active.return_value = True
36
37        def queue_generator(rpc):
38            yield mock.sentinel.A
39            yield queue.Empty()
40            yield mock.sentinel.B
41            rpc.is_active.return_value = False
42            yield mock.sentinel.C
43
44        q = mock.create_autospec(queue.Queue, instance=True)
45        q.get.side_effect = queue_generator(call)
46
47        generator = bidi._RequestQueueGenerator(q)
48        generator.call = call
49
50        items = list(generator)
51
52        assert items == [mock.sentinel.A, mock.sentinel.B]
53
54    def test_yield_initial_and_exit(self):
55        q = mock.create_autospec(queue.Queue, instance=True)
56        q.get.side_effect = queue.Empty()
57        call = mock.create_autospec(grpc.Call, instance=True)
58        call.is_active.return_value = False
59
60        generator = bidi._RequestQueueGenerator(q, initial_request=mock.sentinel.A)
61        generator.call = call
62
63        items = list(generator)
64
65        assert items == [mock.sentinel.A]
66
67    def test_yield_initial_callable_and_exit(self):
68        q = mock.create_autospec(queue.Queue, instance=True)
69        q.get.side_effect = queue.Empty()
70        call = mock.create_autospec(grpc.Call, instance=True)
71        call.is_active.return_value = False
72
73        generator = bidi._RequestQueueGenerator(
74            q, initial_request=lambda: mock.sentinel.A
75        )
76        generator.call = call
77
78        items = list(generator)
79
80        assert items == [mock.sentinel.A]
81
82    def test_exit_when_inactive_with_item(self):
83        q = mock.create_autospec(queue.Queue, instance=True)
84        q.get.side_effect = [mock.sentinel.A, queue.Empty()]
85        call = mock.create_autospec(grpc.Call, instance=True)
86        call.is_active.return_value = False
87
88        generator = bidi._RequestQueueGenerator(q)
89        generator.call = call
90
91        items = list(generator)
92
93        assert items == []
94        # Make sure it put the item back.
95        q.put.assert_called_once_with(mock.sentinel.A)
96
97    def test_exit_when_inactive_empty(self):
98        q = mock.create_autospec(queue.Queue, instance=True)
99        q.get.side_effect = queue.Empty()
100        call = mock.create_autospec(grpc.Call, instance=True)
101        call.is_active.return_value = False
102
103        generator = bidi._RequestQueueGenerator(q)
104        generator.call = call
105
106        items = list(generator)
107
108        assert items == []
109
110    def test_exit_with_stop(self):
111        q = mock.create_autospec(queue.Queue, instance=True)
112        q.get.side_effect = [None, queue.Empty()]
113        call = mock.create_autospec(grpc.Call, instance=True)
114        call.is_active.return_value = True
115
116        generator = bidi._RequestQueueGenerator(q)
117        generator.call = call
118
119        items = list(generator)
120
121        assert items == []
122
123
124class Test_Throttle(object):
125    def test_repr(self):
126        delta = datetime.timedelta(seconds=4.5)
127        instance = bidi._Throttle(access_limit=42, time_window=delta)
128        assert repr(instance) == "_Throttle(access_limit=42, time_window={})".format(
129            repr(delta)
130        )
131
132    def test_raises_error_on_invalid_init_arguments(self):
133        with pytest.raises(ValueError) as exc_info:
134            bidi._Throttle(access_limit=10, time_window=datetime.timedelta(seconds=0.0))
135        assert "time_window" in str(exc_info.value)
136        assert "must be a positive timedelta" in str(exc_info.value)
137
138        with pytest.raises(ValueError) as exc_info:
139            bidi._Throttle(access_limit=0, time_window=datetime.timedelta(seconds=10))
140        assert "access_limit" in str(exc_info.value)
141        assert "must be positive" in str(exc_info.value)
142
143    def test_does_not_delay_entry_attempts_under_threshold(self):
144        throttle = bidi._Throttle(
145            access_limit=3, time_window=datetime.timedelta(seconds=1)
146        )
147        entries = []
148
149        for _ in range(3):
150            with throttle as time_waited:
151                entry_info = {
152                    "entered_at": datetime.datetime.now(),
153                    "reported_wait": time_waited,
154                }
155                entries.append(entry_info)
156
157        # check the reported wait times ...
158        assert all(entry["reported_wait"] == 0.0 for entry in entries)
159
160        # .. and the actual wait times
161        delta = entries[1]["entered_at"] - entries[0]["entered_at"]
162        assert delta.total_seconds() < 0.1
163        delta = entries[2]["entered_at"] - entries[1]["entered_at"]
164        assert delta.total_seconds() < 0.1
165
166    def test_delays_entry_attempts_above_threshold(self):
167        throttle = bidi._Throttle(
168            access_limit=3, time_window=datetime.timedelta(seconds=1)
169        )
170        entries = []
171
172        for _ in range(6):
173            with throttle as time_waited:
174                entry_info = {
175                    "entered_at": datetime.datetime.now(),
176                    "reported_wait": time_waited,
177                }
178                entries.append(entry_info)
179
180        # For each group of 4 consecutive entries the time difference between
181        # the first and the last entry must have been greater than time_window,
182        # because a maximum of 3 are allowed in each time_window.
183        for i, entry in enumerate(entries[3:], start=3):
184            first_entry = entries[i - 3]
185            delta = entry["entered_at"] - first_entry["entered_at"]
186            assert delta.total_seconds() > 1.0
187
188        # check the reported wait times
189        # (NOTE: not using assert all(...), b/c the coverage check would complain)
190        for i, entry in enumerate(entries):
191            if i != 3:
192                assert entry["reported_wait"] == 0.0
193
194        # The delayed entry is expected to have been delayed for a significant
195        # chunk of the full second, and the actual and reported delay times
196        # should reflect that.
197        assert entries[3]["reported_wait"] > 0.7
198        delta = entries[3]["entered_at"] - entries[2]["entered_at"]
199        assert delta.total_seconds() > 0.7
200
201
202class _CallAndFuture(grpc.Call, grpc.Future):
203    pass
204
205
206def make_rpc():
207    """Makes a mock RPC used to test Bidi classes."""
208    call = mock.create_autospec(_CallAndFuture, instance=True)
209    rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
210
211    def rpc_side_effect(request, metadata=None):
212        call.is_active.return_value = True
213        call.request = request
214        call.metadata = metadata
215        return call
216
217    rpc.side_effect = rpc_side_effect
218
219    def cancel_side_effect():
220        call.is_active.return_value = False
221
222    call.cancel.side_effect = cancel_side_effect
223
224    return rpc, call
225
226
227class ClosedCall(object):
228    def __init__(self, exception):
229        self.exception = exception
230
231    def __next__(self):
232        raise self.exception
233
234    def is_active(self):
235        return False
236
237
238class TestBidiRpc(object):
239    def test_initial_state(self):
240        bidi_rpc = bidi.BidiRpc(None)
241
242        assert bidi_rpc.is_active is False
243
244    def test_done_callbacks(self):
245        bidi_rpc = bidi.BidiRpc(None)
246        callback = mock.Mock(spec=["__call__"])
247
248        bidi_rpc.add_done_callback(callback)
249        bidi_rpc._on_call_done(mock.sentinel.future)
250
251        callback.assert_called_once_with(mock.sentinel.future)
252
253    def test_metadata(self):
254        rpc, call = make_rpc()
255        bidi_rpc = bidi.BidiRpc(rpc, metadata=mock.sentinel.A)
256        assert bidi_rpc._rpc_metadata == mock.sentinel.A
257
258        bidi_rpc.open()
259        assert bidi_rpc.call == call
260        assert bidi_rpc.call.metadata == mock.sentinel.A
261
262    def test_open(self):
263        rpc, call = make_rpc()
264        bidi_rpc = bidi.BidiRpc(rpc)
265
266        bidi_rpc.open()
267
268        assert bidi_rpc.call == call
269        assert bidi_rpc.is_active
270        call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done)
271
272    def test_open_error_already_open(self):
273        rpc, _ = make_rpc()
274        bidi_rpc = bidi.BidiRpc(rpc)
275
276        bidi_rpc.open()
277
278        with pytest.raises(ValueError):
279            bidi_rpc.open()
280
281    def test_close(self):
282        rpc, call = make_rpc()
283        bidi_rpc = bidi.BidiRpc(rpc)
284        bidi_rpc.open()
285
286        bidi_rpc.close()
287
288        call.cancel.assert_called_once()
289        assert bidi_rpc.call == call
290        assert bidi_rpc.is_active is False
291        # ensure the request queue was signaled to stop.
292        assert bidi_rpc.pending_requests == 1
293        assert bidi_rpc._request_queue.get() is None
294
295    def test_close_no_rpc(self):
296        bidi_rpc = bidi.BidiRpc(None)
297        bidi_rpc.close()
298
299    def test_send(self):
300        rpc, call = make_rpc()
301        bidi_rpc = bidi.BidiRpc(rpc)
302        bidi_rpc.open()
303
304        bidi_rpc.send(mock.sentinel.request)
305
306        assert bidi_rpc.pending_requests == 1
307        assert bidi_rpc._request_queue.get() is mock.sentinel.request
308
309    def test_send_not_open(self):
310        rpc, call = make_rpc()
311        bidi_rpc = bidi.BidiRpc(rpc)
312
313        with pytest.raises(ValueError):
314            bidi_rpc.send(mock.sentinel.request)
315
316    def test_send_dead_rpc(self):
317        error = ValueError()
318        bidi_rpc = bidi.BidiRpc(None)
319        bidi_rpc.call = ClosedCall(error)
320
321        with pytest.raises(ValueError) as exc_info:
322            bidi_rpc.send(mock.sentinel.request)
323
324        assert exc_info.value == error
325
326    def test_recv(self):
327        bidi_rpc = bidi.BidiRpc(None)
328        bidi_rpc.call = iter([mock.sentinel.response])
329
330        response = bidi_rpc.recv()
331
332        assert response == mock.sentinel.response
333
334    def test_recv_not_open(self):
335        rpc, call = make_rpc()
336        bidi_rpc = bidi.BidiRpc(rpc)
337
338        with pytest.raises(ValueError):
339            bidi_rpc.recv()
340
341
342class CallStub(object):
343    def __init__(self, values, active=True):
344        self.values = iter(values)
345        self._is_active = active
346        self.cancelled = False
347
348    def __next__(self):
349        item = next(self.values)
350        if isinstance(item, Exception):
351            self._is_active = False
352            raise item
353        return item
354
355    def is_active(self):
356        return self._is_active
357
358    def add_done_callback(self, callback):
359        pass
360
361    def cancel(self):
362        self.cancelled = True
363
364
365class TestResumableBidiRpc(object):
366    def test_ctor_defaults(self):
367        start_rpc = mock.Mock()
368        should_recover = mock.Mock()
369        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
370
371        assert bidi_rpc.is_active is False
372        assert bidi_rpc._finalized is False
373        assert bidi_rpc._start_rpc is start_rpc
374        assert bidi_rpc._should_recover is should_recover
375        assert bidi_rpc._should_terminate is bidi._never_terminate
376        assert bidi_rpc._initial_request is None
377        assert bidi_rpc._rpc_metadata is None
378        assert bidi_rpc._reopen_throttle is None
379
380    def test_ctor_explicit(self):
381        start_rpc = mock.Mock()
382        should_recover = mock.Mock()
383        should_terminate = mock.Mock()
384        initial_request = mock.Mock()
385        metadata = {"x-foo": "bar"}
386        bidi_rpc = bidi.ResumableBidiRpc(
387            start_rpc,
388            should_recover,
389            should_terminate=should_terminate,
390            initial_request=initial_request,
391            metadata=metadata,
392            throttle_reopen=True,
393        )
394
395        assert bidi_rpc.is_active is False
396        assert bidi_rpc._finalized is False
397        assert bidi_rpc._should_recover is should_recover
398        assert bidi_rpc._should_terminate is should_terminate
399        assert bidi_rpc._initial_request is initial_request
400        assert bidi_rpc._rpc_metadata == metadata
401        assert isinstance(bidi_rpc._reopen_throttle, bidi._Throttle)
402
403    def test_done_callbacks_terminate(self):
404        cancellation = mock.Mock()
405        start_rpc = mock.Mock()
406        should_recover = mock.Mock(spec=["__call__"], return_value=True)
407        should_terminate = mock.Mock(spec=["__call__"], return_value=True)
408        bidi_rpc = bidi.ResumableBidiRpc(
409            start_rpc, should_recover, should_terminate=should_terminate
410        )
411        callback = mock.Mock(spec=["__call__"])
412
413        bidi_rpc.add_done_callback(callback)
414        bidi_rpc._on_call_done(cancellation)
415
416        should_terminate.assert_called_once_with(cancellation)
417        should_recover.assert_not_called()
418        callback.assert_called_once_with(cancellation)
419        assert not bidi_rpc.is_active
420
421    def test_done_callbacks_recoverable(self):
422        start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
423        should_recover = mock.Mock(spec=["__call__"], return_value=True)
424        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
425        callback = mock.Mock(spec=["__call__"])
426
427        bidi_rpc.add_done_callback(callback)
428        bidi_rpc._on_call_done(mock.sentinel.future)
429
430        callback.assert_not_called()
431        start_rpc.assert_called_once()
432        should_recover.assert_called_once_with(mock.sentinel.future)
433        assert bidi_rpc.is_active
434
435    def test_done_callbacks_non_recoverable(self):
436        start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True)
437        should_recover = mock.Mock(spec=["__call__"], return_value=False)
438        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
439        callback = mock.Mock(spec=["__call__"])
440
441        bidi_rpc.add_done_callback(callback)
442        bidi_rpc._on_call_done(mock.sentinel.future)
443
444        callback.assert_called_once_with(mock.sentinel.future)
445        should_recover.assert_called_once_with(mock.sentinel.future)
446        assert not bidi_rpc.is_active
447
448    def test_send_terminate(self):
449        cancellation = ValueError()
450        call_1 = CallStub([cancellation], active=False)
451        call_2 = CallStub([])
452        start_rpc = mock.create_autospec(
453            grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
454        )
455        should_recover = mock.Mock(spec=["__call__"], return_value=False)
456        should_terminate = mock.Mock(spec=["__call__"], return_value=True)
457        bidi_rpc = bidi.ResumableBidiRpc(
458            start_rpc, should_recover, should_terminate=should_terminate
459        )
460
461        bidi_rpc.open()
462
463        bidi_rpc.send(mock.sentinel.request)
464
465        assert bidi_rpc.pending_requests == 1
466        assert bidi_rpc._request_queue.get() is None
467
468        should_recover.assert_not_called()
469        should_terminate.assert_called_once_with(cancellation)
470        assert bidi_rpc.call == call_1
471        assert bidi_rpc.is_active is False
472        assert call_1.cancelled is True
473
474    def test_send_recover(self):
475        error = ValueError()
476        call_1 = CallStub([error], active=False)
477        call_2 = CallStub([])
478        start_rpc = mock.create_autospec(
479            grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
480        )
481        should_recover = mock.Mock(spec=["__call__"], return_value=True)
482        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
483
484        bidi_rpc.open()
485
486        bidi_rpc.send(mock.sentinel.request)
487
488        assert bidi_rpc.pending_requests == 1
489        assert bidi_rpc._request_queue.get() is mock.sentinel.request
490
491        should_recover.assert_called_once_with(error)
492        assert bidi_rpc.call == call_2
493        assert bidi_rpc.is_active is True
494
495    def test_send_failure(self):
496        error = ValueError()
497        call = CallStub([error], active=False)
498        start_rpc = mock.create_autospec(
499            grpc.StreamStreamMultiCallable, instance=True, return_value=call
500        )
501        should_recover = mock.Mock(spec=["__call__"], return_value=False)
502        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
503
504        bidi_rpc.open()
505
506        with pytest.raises(ValueError) as exc_info:
507            bidi_rpc.send(mock.sentinel.request)
508
509        assert exc_info.value == error
510        should_recover.assert_called_once_with(error)
511        assert bidi_rpc.call == call
512        assert bidi_rpc.is_active is False
513        assert call.cancelled is True
514        assert bidi_rpc.pending_requests == 1
515        assert bidi_rpc._request_queue.get() is None
516
517    def test_recv_terminate(self):
518        cancellation = ValueError()
519        call = CallStub([cancellation])
520        start_rpc = mock.create_autospec(
521            grpc.StreamStreamMultiCallable, instance=True, return_value=call
522        )
523        should_recover = mock.Mock(spec=["__call__"], return_value=False)
524        should_terminate = mock.Mock(spec=["__call__"], return_value=True)
525        bidi_rpc = bidi.ResumableBidiRpc(
526            start_rpc, should_recover, should_terminate=should_terminate
527        )
528
529        bidi_rpc.open()
530
531        bidi_rpc.recv()
532
533        should_recover.assert_not_called()
534        should_terminate.assert_called_once_with(cancellation)
535        assert bidi_rpc.call == call
536        assert bidi_rpc.is_active is False
537        assert call.cancelled is True
538
539    def test_recv_recover(self):
540        error = ValueError()
541        call_1 = CallStub([1, error])
542        call_2 = CallStub([2, 3])
543        start_rpc = mock.create_autospec(
544            grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
545        )
546        should_recover = mock.Mock(spec=["__call__"], return_value=True)
547        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
548
549        bidi_rpc.open()
550
551        values = []
552        for n in range(3):
553            values.append(bidi_rpc.recv())
554
555        assert values == [1, 2, 3]
556        should_recover.assert_called_once_with(error)
557        assert bidi_rpc.call == call_2
558        assert bidi_rpc.is_active is True
559
560    def test_recv_recover_already_recovered(self):
561        call_1 = CallStub([])
562        call_2 = CallStub([])
563        start_rpc = mock.create_autospec(
564            grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]
565        )
566        callback = mock.Mock()
567        callback.return_value = True
568        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, callback)
569
570        bidi_rpc.open()
571
572        bidi_rpc._reopen()
573
574        assert bidi_rpc.call is call_1
575        assert bidi_rpc.is_active is True
576
577    def test_recv_failure(self):
578        error = ValueError()
579        call = CallStub([error])
580        start_rpc = mock.create_autospec(
581            grpc.StreamStreamMultiCallable, instance=True, return_value=call
582        )
583        should_recover = mock.Mock(spec=["__call__"], return_value=False)
584        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
585
586        bidi_rpc.open()
587
588        with pytest.raises(ValueError) as exc_info:
589            bidi_rpc.recv()
590
591        assert exc_info.value == error
592        should_recover.assert_called_once_with(error)
593        assert bidi_rpc.call == call
594        assert bidi_rpc.is_active is False
595        assert call.cancelled is True
596
597    def test_close(self):
598        call = mock.create_autospec(_CallAndFuture, instance=True)
599
600        def cancel_side_effect():
601            call.is_active.return_value = False
602
603        call.cancel.side_effect = cancel_side_effect
604        start_rpc = mock.create_autospec(
605            grpc.StreamStreamMultiCallable, instance=True, return_value=call
606        )
607        should_recover = mock.Mock(spec=["__call__"], return_value=False)
608        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
609        bidi_rpc.open()
610
611        bidi_rpc.close()
612
613        should_recover.assert_not_called()
614        call.cancel.assert_called_once()
615        assert bidi_rpc.call == call
616        assert bidi_rpc.is_active is False
617        # ensure the request queue was signaled to stop.
618        assert bidi_rpc.pending_requests == 1
619        assert bidi_rpc._request_queue.get() is None
620        assert bidi_rpc._finalized
621
622    def test_reopen_failure_on_rpc_restart(self):
623        error1 = ValueError("1")
624        error2 = ValueError("2")
625        call = CallStub([error1])
626        # Invoking start RPC a second time will trigger an error.
627        start_rpc = mock.create_autospec(
628            grpc.StreamStreamMultiCallable, instance=True, side_effect=[call, error2]
629        )
630        should_recover = mock.Mock(spec=["__call__"], return_value=True)
631        callback = mock.Mock(spec=["__call__"])
632
633        bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover)
634        bidi_rpc.add_done_callback(callback)
635
636        bidi_rpc.open()
637
638        with pytest.raises(ValueError) as exc_info:
639            bidi_rpc.recv()
640
641        assert exc_info.value == error2
642        should_recover.assert_called_once_with(error1)
643        assert bidi_rpc.call is None
644        assert bidi_rpc.is_active is False
645        callback.assert_called_once_with(error2)
646
647    def test_using_throttle_on_reopen_requests(self):
648        call = CallStub([])
649        start_rpc = mock.create_autospec(
650            grpc.StreamStreamMultiCallable, instance=True, return_value=call
651        )
652        should_recover = mock.Mock(spec=["__call__"], return_value=True)
653        bidi_rpc = bidi.ResumableBidiRpc(
654            start_rpc, should_recover, throttle_reopen=True
655        )
656
657        patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__")
658        with patcher as mock_enter:
659            bidi_rpc._reopen()
660
661        mock_enter.assert_called_once()
662
663    def test_send_not_open(self):
664        bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
665
666        with pytest.raises(ValueError):
667            bidi_rpc.send(mock.sentinel.request)
668
669    def test_recv_not_open(self):
670        bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False)
671
672        with pytest.raises(ValueError):
673            bidi_rpc.recv()
674
675    def test_finalize_idempotent(self):
676        error1 = ValueError("1")
677        error2 = ValueError("2")
678        callback = mock.Mock(spec=["__call__"])
679        should_recover = mock.Mock(spec=["__call__"], return_value=False)
680
681        bidi_rpc = bidi.ResumableBidiRpc(mock.sentinel.start_rpc, should_recover)
682
683        bidi_rpc.add_done_callback(callback)
684
685        bidi_rpc._on_call_done(error1)
686        bidi_rpc._on_call_done(error2)
687
688        callback.assert_called_once_with(error1)
689
690
691class TestBackgroundConsumer(object):
692    def test_consume_once_then_exit(self):
693        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
694        bidi_rpc.is_active = True
695        bidi_rpc.recv.side_effect = [mock.sentinel.response_1]
696        recved = threading.Event()
697
698        def on_response(response):
699            assert response == mock.sentinel.response_1
700            bidi_rpc.is_active = False
701            recved.set()
702
703        consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
704
705        consumer.start()
706
707        recved.wait()
708
709        bidi_rpc.recv.assert_called_once()
710        assert bidi_rpc.is_active is False
711
712        consumer.stop()
713
714        bidi_rpc.close.assert_called_once()
715        assert consumer.is_active is False
716
717    def test_pause_resume_and_close(self):
718        # This test is relatively complex. It attempts to start the consumer,
719        # consume one item, pause the consumer, check the state of the world,
720        # then resume the consumer. Doing this in a deterministic fashion
721        # requires a bit more mocking and patching than usual.
722
723        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
724        bidi_rpc.is_active = True
725
726        def close_side_effect():
727            bidi_rpc.is_active = False
728
729        bidi_rpc.close.side_effect = close_side_effect
730
731        # These are used to coordinate the two threads to ensure deterministic
732        # execution.
733        should_continue = threading.Event()
734        responses_and_events = {
735            mock.sentinel.response_1: threading.Event(),
736            mock.sentinel.response_2: threading.Event(),
737        }
738        bidi_rpc.recv.side_effect = [mock.sentinel.response_1, mock.sentinel.response_2]
739
740        recved_responses = []
741        consumer = None
742
743        def on_response(response):
744            if response == mock.sentinel.response_1:
745                consumer.pause()
746
747            recved_responses.append(response)
748            responses_and_events[response].set()
749            should_continue.wait()
750
751        consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
752
753        consumer.start()
754
755        # Wait for the first response to be recved.
756        responses_and_events[mock.sentinel.response_1].wait()
757
758        # Ensure only one item has been recved and that the consumer is paused.
759        assert recved_responses == [mock.sentinel.response_1]
760        assert consumer.is_paused is True
761        assert consumer.is_active is True
762
763        # Unpause the consumer, wait for the second item, then close the
764        # consumer.
765        should_continue.set()
766        consumer.resume()
767
768        responses_and_events[mock.sentinel.response_2].wait()
769
770        assert recved_responses == [mock.sentinel.response_1, mock.sentinel.response_2]
771
772        consumer.stop()
773
774        assert consumer.is_active is False
775
776    def test_wake_on_error(self):
777        should_continue = threading.Event()
778
779        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
780        bidi_rpc.is_active = True
781        bidi_rpc.add_done_callback.side_effect = lambda _: should_continue.set()
782
783        consumer = bidi.BackgroundConsumer(bidi_rpc, mock.sentinel.on_response)
784
785        # Start the consumer paused, which should immediately put it into wait
786        # state.
787        consumer.pause()
788        consumer.start()
789
790        # Wait for add_done_callback to be called
791        should_continue.wait()
792        bidi_rpc.add_done_callback.assert_called_once_with(consumer._on_call_done)
793
794        # The consumer should now be blocked on waiting to be unpaused.
795        assert consumer.is_active
796        assert consumer.is_paused
797
798        # Trigger the done callback, it should unpause the consumer and cause
799        # it to exit.
800        bidi_rpc.is_active = False
801        consumer._on_call_done(bidi_rpc)
802
803        # It may take a few cycles for the thread to exit.
804        while consumer.is_active:
805            pass
806
807    def test_consumer_expected_error(self, caplog):
808        caplog.set_level(logging.DEBUG)
809
810        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
811        bidi_rpc.is_active = True
812        bidi_rpc.recv.side_effect = exceptions.ServiceUnavailable("Gone away")
813
814        on_response = mock.Mock(spec=["__call__"])
815
816        consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
817
818        consumer.start()
819
820        # Wait for the consumer's thread to exit.
821        while consumer.is_active:
822            pass
823
824        on_response.assert_not_called()
825        bidi_rpc.recv.assert_called_once()
826        assert "caught error" in caplog.text
827
828    def test_consumer_unexpected_error(self, caplog):
829        caplog.set_level(logging.DEBUG)
830
831        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
832        bidi_rpc.is_active = True
833        bidi_rpc.recv.side_effect = ValueError()
834
835        on_response = mock.Mock(spec=["__call__"])
836
837        consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
838
839        consumer.start()
840
841        # Wait for the consumer's thread to exit.
842        while consumer.is_active:
843            pass  # pragma: NO COVER (race condition)
844
845        on_response.assert_not_called()
846        bidi_rpc.recv.assert_called_once()
847        assert "caught unexpected exception" in caplog.text
848
849    def test_double_stop(self, caplog):
850        caplog.set_level(logging.DEBUG)
851        bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
852        bidi_rpc.is_active = True
853        on_response = mock.Mock(spec=["__call__"])
854
855        def close_side_effect():
856            bidi_rpc.is_active = False
857
858        bidi_rpc.close.side_effect = close_side_effect
859
860        consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
861
862        consumer.start()
863        assert consumer.is_active is True
864
865        consumer.stop()
866        assert consumer.is_active is False
867
868        # calling stop twice should not result in an error.
869        consumer.stop()
870