1# Copyright 2018, Google LLC All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import logging 17import queue 18import threading 19 20import mock 21import pytest 22 23try: 24 import grpc 25except ImportError: 26 pytest.skip("No GRPC", allow_module_level=True) 27 28from google.api_core import bidi 29from google.api_core import exceptions 30 31 32class Test_RequestQueueGenerator(object): 33 def test_bounded_consume(self): 34 call = mock.create_autospec(grpc.Call, instance=True) 35 call.is_active.return_value = True 36 37 def queue_generator(rpc): 38 yield mock.sentinel.A 39 yield queue.Empty() 40 yield mock.sentinel.B 41 rpc.is_active.return_value = False 42 yield mock.sentinel.C 43 44 q = mock.create_autospec(queue.Queue, instance=True) 45 q.get.side_effect = queue_generator(call) 46 47 generator = bidi._RequestQueueGenerator(q) 48 generator.call = call 49 50 items = list(generator) 51 52 assert items == [mock.sentinel.A, mock.sentinel.B] 53 54 def test_yield_initial_and_exit(self): 55 q = mock.create_autospec(queue.Queue, instance=True) 56 q.get.side_effect = queue.Empty() 57 call = mock.create_autospec(grpc.Call, instance=True) 58 call.is_active.return_value = False 59 60 generator = bidi._RequestQueueGenerator(q, initial_request=mock.sentinel.A) 61 generator.call = call 62 63 items = list(generator) 64 65 assert items == [mock.sentinel.A] 66 67 def test_yield_initial_callable_and_exit(self): 68 q = mock.create_autospec(queue.Queue, instance=True) 69 q.get.side_effect = queue.Empty() 70 call = mock.create_autospec(grpc.Call, instance=True) 71 call.is_active.return_value = False 72 73 generator = bidi._RequestQueueGenerator( 74 q, initial_request=lambda: mock.sentinel.A 75 ) 76 generator.call = call 77 78 items = list(generator) 79 80 assert items == [mock.sentinel.A] 81 82 def test_exit_when_inactive_with_item(self): 83 q = mock.create_autospec(queue.Queue, instance=True) 84 q.get.side_effect = [mock.sentinel.A, queue.Empty()] 85 call = mock.create_autospec(grpc.Call, instance=True) 86 call.is_active.return_value = False 87 88 generator = bidi._RequestQueueGenerator(q) 89 generator.call = call 90 91 items = list(generator) 92 93 assert items == [] 94 # Make sure it put the item back. 95 q.put.assert_called_once_with(mock.sentinel.A) 96 97 def test_exit_when_inactive_empty(self): 98 q = mock.create_autospec(queue.Queue, instance=True) 99 q.get.side_effect = queue.Empty() 100 call = mock.create_autospec(grpc.Call, instance=True) 101 call.is_active.return_value = False 102 103 generator = bidi._RequestQueueGenerator(q) 104 generator.call = call 105 106 items = list(generator) 107 108 assert items == [] 109 110 def test_exit_with_stop(self): 111 q = mock.create_autospec(queue.Queue, instance=True) 112 q.get.side_effect = [None, queue.Empty()] 113 call = mock.create_autospec(grpc.Call, instance=True) 114 call.is_active.return_value = True 115 116 generator = bidi._RequestQueueGenerator(q) 117 generator.call = call 118 119 items = list(generator) 120 121 assert items == [] 122 123 124class Test_Throttle(object): 125 def test_repr(self): 126 delta = datetime.timedelta(seconds=4.5) 127 instance = bidi._Throttle(access_limit=42, time_window=delta) 128 assert repr(instance) == "_Throttle(access_limit=42, time_window={})".format( 129 repr(delta) 130 ) 131 132 def test_raises_error_on_invalid_init_arguments(self): 133 with pytest.raises(ValueError) as exc_info: 134 bidi._Throttle(access_limit=10, time_window=datetime.timedelta(seconds=0.0)) 135 assert "time_window" in str(exc_info.value) 136 assert "must be a positive timedelta" in str(exc_info.value) 137 138 with pytest.raises(ValueError) as exc_info: 139 bidi._Throttle(access_limit=0, time_window=datetime.timedelta(seconds=10)) 140 assert "access_limit" in str(exc_info.value) 141 assert "must be positive" in str(exc_info.value) 142 143 def test_does_not_delay_entry_attempts_under_threshold(self): 144 throttle = bidi._Throttle( 145 access_limit=3, time_window=datetime.timedelta(seconds=1) 146 ) 147 entries = [] 148 149 for _ in range(3): 150 with throttle as time_waited: 151 entry_info = { 152 "entered_at": datetime.datetime.now(), 153 "reported_wait": time_waited, 154 } 155 entries.append(entry_info) 156 157 # check the reported wait times ... 158 assert all(entry["reported_wait"] == 0.0 for entry in entries) 159 160 # .. and the actual wait times 161 delta = entries[1]["entered_at"] - entries[0]["entered_at"] 162 assert delta.total_seconds() < 0.1 163 delta = entries[2]["entered_at"] - entries[1]["entered_at"] 164 assert delta.total_seconds() < 0.1 165 166 def test_delays_entry_attempts_above_threshold(self): 167 throttle = bidi._Throttle( 168 access_limit=3, time_window=datetime.timedelta(seconds=1) 169 ) 170 entries = [] 171 172 for _ in range(6): 173 with throttle as time_waited: 174 entry_info = { 175 "entered_at": datetime.datetime.now(), 176 "reported_wait": time_waited, 177 } 178 entries.append(entry_info) 179 180 # For each group of 4 consecutive entries the time difference between 181 # the first and the last entry must have been greater than time_window, 182 # because a maximum of 3 are allowed in each time_window. 183 for i, entry in enumerate(entries[3:], start=3): 184 first_entry = entries[i - 3] 185 delta = entry["entered_at"] - first_entry["entered_at"] 186 assert delta.total_seconds() > 1.0 187 188 # check the reported wait times 189 # (NOTE: not using assert all(...), b/c the coverage check would complain) 190 for i, entry in enumerate(entries): 191 if i != 3: 192 assert entry["reported_wait"] == 0.0 193 194 # The delayed entry is expected to have been delayed for a significant 195 # chunk of the full second, and the actual and reported delay times 196 # should reflect that. 197 assert entries[3]["reported_wait"] > 0.7 198 delta = entries[3]["entered_at"] - entries[2]["entered_at"] 199 assert delta.total_seconds() > 0.7 200 201 202class _CallAndFuture(grpc.Call, grpc.Future): 203 pass 204 205 206def make_rpc(): 207 """Makes a mock RPC used to test Bidi classes.""" 208 call = mock.create_autospec(_CallAndFuture, instance=True) 209 rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True) 210 211 def rpc_side_effect(request, metadata=None): 212 call.is_active.return_value = True 213 call.request = request 214 call.metadata = metadata 215 return call 216 217 rpc.side_effect = rpc_side_effect 218 219 def cancel_side_effect(): 220 call.is_active.return_value = False 221 222 call.cancel.side_effect = cancel_side_effect 223 224 return rpc, call 225 226 227class ClosedCall(object): 228 def __init__(self, exception): 229 self.exception = exception 230 231 def __next__(self): 232 raise self.exception 233 234 def is_active(self): 235 return False 236 237 238class TestBidiRpc(object): 239 def test_initial_state(self): 240 bidi_rpc = bidi.BidiRpc(None) 241 242 assert bidi_rpc.is_active is False 243 244 def test_done_callbacks(self): 245 bidi_rpc = bidi.BidiRpc(None) 246 callback = mock.Mock(spec=["__call__"]) 247 248 bidi_rpc.add_done_callback(callback) 249 bidi_rpc._on_call_done(mock.sentinel.future) 250 251 callback.assert_called_once_with(mock.sentinel.future) 252 253 def test_metadata(self): 254 rpc, call = make_rpc() 255 bidi_rpc = bidi.BidiRpc(rpc, metadata=mock.sentinel.A) 256 assert bidi_rpc._rpc_metadata == mock.sentinel.A 257 258 bidi_rpc.open() 259 assert bidi_rpc.call == call 260 assert bidi_rpc.call.metadata == mock.sentinel.A 261 262 def test_open(self): 263 rpc, call = make_rpc() 264 bidi_rpc = bidi.BidiRpc(rpc) 265 266 bidi_rpc.open() 267 268 assert bidi_rpc.call == call 269 assert bidi_rpc.is_active 270 call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done) 271 272 def test_open_error_already_open(self): 273 rpc, _ = make_rpc() 274 bidi_rpc = bidi.BidiRpc(rpc) 275 276 bidi_rpc.open() 277 278 with pytest.raises(ValueError): 279 bidi_rpc.open() 280 281 def test_close(self): 282 rpc, call = make_rpc() 283 bidi_rpc = bidi.BidiRpc(rpc) 284 bidi_rpc.open() 285 286 bidi_rpc.close() 287 288 call.cancel.assert_called_once() 289 assert bidi_rpc.call == call 290 assert bidi_rpc.is_active is False 291 # ensure the request queue was signaled to stop. 292 assert bidi_rpc.pending_requests == 1 293 assert bidi_rpc._request_queue.get() is None 294 295 def test_close_no_rpc(self): 296 bidi_rpc = bidi.BidiRpc(None) 297 bidi_rpc.close() 298 299 def test_send(self): 300 rpc, call = make_rpc() 301 bidi_rpc = bidi.BidiRpc(rpc) 302 bidi_rpc.open() 303 304 bidi_rpc.send(mock.sentinel.request) 305 306 assert bidi_rpc.pending_requests == 1 307 assert bidi_rpc._request_queue.get() is mock.sentinel.request 308 309 def test_send_not_open(self): 310 rpc, call = make_rpc() 311 bidi_rpc = bidi.BidiRpc(rpc) 312 313 with pytest.raises(ValueError): 314 bidi_rpc.send(mock.sentinel.request) 315 316 def test_send_dead_rpc(self): 317 error = ValueError() 318 bidi_rpc = bidi.BidiRpc(None) 319 bidi_rpc.call = ClosedCall(error) 320 321 with pytest.raises(ValueError) as exc_info: 322 bidi_rpc.send(mock.sentinel.request) 323 324 assert exc_info.value == error 325 326 def test_recv(self): 327 bidi_rpc = bidi.BidiRpc(None) 328 bidi_rpc.call = iter([mock.sentinel.response]) 329 330 response = bidi_rpc.recv() 331 332 assert response == mock.sentinel.response 333 334 def test_recv_not_open(self): 335 rpc, call = make_rpc() 336 bidi_rpc = bidi.BidiRpc(rpc) 337 338 with pytest.raises(ValueError): 339 bidi_rpc.recv() 340 341 342class CallStub(object): 343 def __init__(self, values, active=True): 344 self.values = iter(values) 345 self._is_active = active 346 self.cancelled = False 347 348 def __next__(self): 349 item = next(self.values) 350 if isinstance(item, Exception): 351 self._is_active = False 352 raise item 353 return item 354 355 def is_active(self): 356 return self._is_active 357 358 def add_done_callback(self, callback): 359 pass 360 361 def cancel(self): 362 self.cancelled = True 363 364 365class TestResumableBidiRpc(object): 366 def test_ctor_defaults(self): 367 start_rpc = mock.Mock() 368 should_recover = mock.Mock() 369 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 370 371 assert bidi_rpc.is_active is False 372 assert bidi_rpc._finalized is False 373 assert bidi_rpc._start_rpc is start_rpc 374 assert bidi_rpc._should_recover is should_recover 375 assert bidi_rpc._should_terminate is bidi._never_terminate 376 assert bidi_rpc._initial_request is None 377 assert bidi_rpc._rpc_metadata is None 378 assert bidi_rpc._reopen_throttle is None 379 380 def test_ctor_explicit(self): 381 start_rpc = mock.Mock() 382 should_recover = mock.Mock() 383 should_terminate = mock.Mock() 384 initial_request = mock.Mock() 385 metadata = {"x-foo": "bar"} 386 bidi_rpc = bidi.ResumableBidiRpc( 387 start_rpc, 388 should_recover, 389 should_terminate=should_terminate, 390 initial_request=initial_request, 391 metadata=metadata, 392 throttle_reopen=True, 393 ) 394 395 assert bidi_rpc.is_active is False 396 assert bidi_rpc._finalized is False 397 assert bidi_rpc._should_recover is should_recover 398 assert bidi_rpc._should_terminate is should_terminate 399 assert bidi_rpc._initial_request is initial_request 400 assert bidi_rpc._rpc_metadata == metadata 401 assert isinstance(bidi_rpc._reopen_throttle, bidi._Throttle) 402 403 def test_done_callbacks_terminate(self): 404 cancellation = mock.Mock() 405 start_rpc = mock.Mock() 406 should_recover = mock.Mock(spec=["__call__"], return_value=True) 407 should_terminate = mock.Mock(spec=["__call__"], return_value=True) 408 bidi_rpc = bidi.ResumableBidiRpc( 409 start_rpc, should_recover, should_terminate=should_terminate 410 ) 411 callback = mock.Mock(spec=["__call__"]) 412 413 bidi_rpc.add_done_callback(callback) 414 bidi_rpc._on_call_done(cancellation) 415 416 should_terminate.assert_called_once_with(cancellation) 417 should_recover.assert_not_called() 418 callback.assert_called_once_with(cancellation) 419 assert not bidi_rpc.is_active 420 421 def test_done_callbacks_recoverable(self): 422 start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True) 423 should_recover = mock.Mock(spec=["__call__"], return_value=True) 424 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 425 callback = mock.Mock(spec=["__call__"]) 426 427 bidi_rpc.add_done_callback(callback) 428 bidi_rpc._on_call_done(mock.sentinel.future) 429 430 callback.assert_not_called() 431 start_rpc.assert_called_once() 432 should_recover.assert_called_once_with(mock.sentinel.future) 433 assert bidi_rpc.is_active 434 435 def test_done_callbacks_non_recoverable(self): 436 start_rpc = mock.create_autospec(grpc.StreamStreamMultiCallable, instance=True) 437 should_recover = mock.Mock(spec=["__call__"], return_value=False) 438 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 439 callback = mock.Mock(spec=["__call__"]) 440 441 bidi_rpc.add_done_callback(callback) 442 bidi_rpc._on_call_done(mock.sentinel.future) 443 444 callback.assert_called_once_with(mock.sentinel.future) 445 should_recover.assert_called_once_with(mock.sentinel.future) 446 assert not bidi_rpc.is_active 447 448 def test_send_terminate(self): 449 cancellation = ValueError() 450 call_1 = CallStub([cancellation], active=False) 451 call_2 = CallStub([]) 452 start_rpc = mock.create_autospec( 453 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2] 454 ) 455 should_recover = mock.Mock(spec=["__call__"], return_value=False) 456 should_terminate = mock.Mock(spec=["__call__"], return_value=True) 457 bidi_rpc = bidi.ResumableBidiRpc( 458 start_rpc, should_recover, should_terminate=should_terminate 459 ) 460 461 bidi_rpc.open() 462 463 bidi_rpc.send(mock.sentinel.request) 464 465 assert bidi_rpc.pending_requests == 1 466 assert bidi_rpc._request_queue.get() is None 467 468 should_recover.assert_not_called() 469 should_terminate.assert_called_once_with(cancellation) 470 assert bidi_rpc.call == call_1 471 assert bidi_rpc.is_active is False 472 assert call_1.cancelled is True 473 474 def test_send_recover(self): 475 error = ValueError() 476 call_1 = CallStub([error], active=False) 477 call_2 = CallStub([]) 478 start_rpc = mock.create_autospec( 479 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2] 480 ) 481 should_recover = mock.Mock(spec=["__call__"], return_value=True) 482 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 483 484 bidi_rpc.open() 485 486 bidi_rpc.send(mock.sentinel.request) 487 488 assert bidi_rpc.pending_requests == 1 489 assert bidi_rpc._request_queue.get() is mock.sentinel.request 490 491 should_recover.assert_called_once_with(error) 492 assert bidi_rpc.call == call_2 493 assert bidi_rpc.is_active is True 494 495 def test_send_failure(self): 496 error = ValueError() 497 call = CallStub([error], active=False) 498 start_rpc = mock.create_autospec( 499 grpc.StreamStreamMultiCallable, instance=True, return_value=call 500 ) 501 should_recover = mock.Mock(spec=["__call__"], return_value=False) 502 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 503 504 bidi_rpc.open() 505 506 with pytest.raises(ValueError) as exc_info: 507 bidi_rpc.send(mock.sentinel.request) 508 509 assert exc_info.value == error 510 should_recover.assert_called_once_with(error) 511 assert bidi_rpc.call == call 512 assert bidi_rpc.is_active is False 513 assert call.cancelled is True 514 assert bidi_rpc.pending_requests == 1 515 assert bidi_rpc._request_queue.get() is None 516 517 def test_recv_terminate(self): 518 cancellation = ValueError() 519 call = CallStub([cancellation]) 520 start_rpc = mock.create_autospec( 521 grpc.StreamStreamMultiCallable, instance=True, return_value=call 522 ) 523 should_recover = mock.Mock(spec=["__call__"], return_value=False) 524 should_terminate = mock.Mock(spec=["__call__"], return_value=True) 525 bidi_rpc = bidi.ResumableBidiRpc( 526 start_rpc, should_recover, should_terminate=should_terminate 527 ) 528 529 bidi_rpc.open() 530 531 bidi_rpc.recv() 532 533 should_recover.assert_not_called() 534 should_terminate.assert_called_once_with(cancellation) 535 assert bidi_rpc.call == call 536 assert bidi_rpc.is_active is False 537 assert call.cancelled is True 538 539 def test_recv_recover(self): 540 error = ValueError() 541 call_1 = CallStub([1, error]) 542 call_2 = CallStub([2, 3]) 543 start_rpc = mock.create_autospec( 544 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2] 545 ) 546 should_recover = mock.Mock(spec=["__call__"], return_value=True) 547 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 548 549 bidi_rpc.open() 550 551 values = [] 552 for n in range(3): 553 values.append(bidi_rpc.recv()) 554 555 assert values == [1, 2, 3] 556 should_recover.assert_called_once_with(error) 557 assert bidi_rpc.call == call_2 558 assert bidi_rpc.is_active is True 559 560 def test_recv_recover_already_recovered(self): 561 call_1 = CallStub([]) 562 call_2 = CallStub([]) 563 start_rpc = mock.create_autospec( 564 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2] 565 ) 566 callback = mock.Mock() 567 callback.return_value = True 568 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, callback) 569 570 bidi_rpc.open() 571 572 bidi_rpc._reopen() 573 574 assert bidi_rpc.call is call_1 575 assert bidi_rpc.is_active is True 576 577 def test_recv_failure(self): 578 error = ValueError() 579 call = CallStub([error]) 580 start_rpc = mock.create_autospec( 581 grpc.StreamStreamMultiCallable, instance=True, return_value=call 582 ) 583 should_recover = mock.Mock(spec=["__call__"], return_value=False) 584 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 585 586 bidi_rpc.open() 587 588 with pytest.raises(ValueError) as exc_info: 589 bidi_rpc.recv() 590 591 assert exc_info.value == error 592 should_recover.assert_called_once_with(error) 593 assert bidi_rpc.call == call 594 assert bidi_rpc.is_active is False 595 assert call.cancelled is True 596 597 def test_close(self): 598 call = mock.create_autospec(_CallAndFuture, instance=True) 599 600 def cancel_side_effect(): 601 call.is_active.return_value = False 602 603 call.cancel.side_effect = cancel_side_effect 604 start_rpc = mock.create_autospec( 605 grpc.StreamStreamMultiCallable, instance=True, return_value=call 606 ) 607 should_recover = mock.Mock(spec=["__call__"], return_value=False) 608 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 609 bidi_rpc.open() 610 611 bidi_rpc.close() 612 613 should_recover.assert_not_called() 614 call.cancel.assert_called_once() 615 assert bidi_rpc.call == call 616 assert bidi_rpc.is_active is False 617 # ensure the request queue was signaled to stop. 618 assert bidi_rpc.pending_requests == 1 619 assert bidi_rpc._request_queue.get() is None 620 assert bidi_rpc._finalized 621 622 def test_reopen_failure_on_rpc_restart(self): 623 error1 = ValueError("1") 624 error2 = ValueError("2") 625 call = CallStub([error1]) 626 # Invoking start RPC a second time will trigger an error. 627 start_rpc = mock.create_autospec( 628 grpc.StreamStreamMultiCallable, instance=True, side_effect=[call, error2] 629 ) 630 should_recover = mock.Mock(spec=["__call__"], return_value=True) 631 callback = mock.Mock(spec=["__call__"]) 632 633 bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) 634 bidi_rpc.add_done_callback(callback) 635 636 bidi_rpc.open() 637 638 with pytest.raises(ValueError) as exc_info: 639 bidi_rpc.recv() 640 641 assert exc_info.value == error2 642 should_recover.assert_called_once_with(error1) 643 assert bidi_rpc.call is None 644 assert bidi_rpc.is_active is False 645 callback.assert_called_once_with(error2) 646 647 def test_using_throttle_on_reopen_requests(self): 648 call = CallStub([]) 649 start_rpc = mock.create_autospec( 650 grpc.StreamStreamMultiCallable, instance=True, return_value=call 651 ) 652 should_recover = mock.Mock(spec=["__call__"], return_value=True) 653 bidi_rpc = bidi.ResumableBidiRpc( 654 start_rpc, should_recover, throttle_reopen=True 655 ) 656 657 patcher = mock.patch.object(bidi_rpc._reopen_throttle.__class__, "__enter__") 658 with patcher as mock_enter: 659 bidi_rpc._reopen() 660 661 mock_enter.assert_called_once() 662 663 def test_send_not_open(self): 664 bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False) 665 666 with pytest.raises(ValueError): 667 bidi_rpc.send(mock.sentinel.request) 668 669 def test_recv_not_open(self): 670 bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False) 671 672 with pytest.raises(ValueError): 673 bidi_rpc.recv() 674 675 def test_finalize_idempotent(self): 676 error1 = ValueError("1") 677 error2 = ValueError("2") 678 callback = mock.Mock(spec=["__call__"]) 679 should_recover = mock.Mock(spec=["__call__"], return_value=False) 680 681 bidi_rpc = bidi.ResumableBidiRpc(mock.sentinel.start_rpc, should_recover) 682 683 bidi_rpc.add_done_callback(callback) 684 685 bidi_rpc._on_call_done(error1) 686 bidi_rpc._on_call_done(error2) 687 688 callback.assert_called_once_with(error1) 689 690 691class TestBackgroundConsumer(object): 692 def test_consume_once_then_exit(self): 693 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 694 bidi_rpc.is_active = True 695 bidi_rpc.recv.side_effect = [mock.sentinel.response_1] 696 recved = threading.Event() 697 698 def on_response(response): 699 assert response == mock.sentinel.response_1 700 bidi_rpc.is_active = False 701 recved.set() 702 703 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response) 704 705 consumer.start() 706 707 recved.wait() 708 709 bidi_rpc.recv.assert_called_once() 710 assert bidi_rpc.is_active is False 711 712 consumer.stop() 713 714 bidi_rpc.close.assert_called_once() 715 assert consumer.is_active is False 716 717 def test_pause_resume_and_close(self): 718 # This test is relatively complex. It attempts to start the consumer, 719 # consume one item, pause the consumer, check the state of the world, 720 # then resume the consumer. Doing this in a deterministic fashion 721 # requires a bit more mocking and patching than usual. 722 723 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 724 bidi_rpc.is_active = True 725 726 def close_side_effect(): 727 bidi_rpc.is_active = False 728 729 bidi_rpc.close.side_effect = close_side_effect 730 731 # These are used to coordinate the two threads to ensure deterministic 732 # execution. 733 should_continue = threading.Event() 734 responses_and_events = { 735 mock.sentinel.response_1: threading.Event(), 736 mock.sentinel.response_2: threading.Event(), 737 } 738 bidi_rpc.recv.side_effect = [mock.sentinel.response_1, mock.sentinel.response_2] 739 740 recved_responses = [] 741 consumer = None 742 743 def on_response(response): 744 if response == mock.sentinel.response_1: 745 consumer.pause() 746 747 recved_responses.append(response) 748 responses_and_events[response].set() 749 should_continue.wait() 750 751 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response) 752 753 consumer.start() 754 755 # Wait for the first response to be recved. 756 responses_and_events[mock.sentinel.response_1].wait() 757 758 # Ensure only one item has been recved and that the consumer is paused. 759 assert recved_responses == [mock.sentinel.response_1] 760 assert consumer.is_paused is True 761 assert consumer.is_active is True 762 763 # Unpause the consumer, wait for the second item, then close the 764 # consumer. 765 should_continue.set() 766 consumer.resume() 767 768 responses_and_events[mock.sentinel.response_2].wait() 769 770 assert recved_responses == [mock.sentinel.response_1, mock.sentinel.response_2] 771 772 consumer.stop() 773 774 assert consumer.is_active is False 775 776 def test_wake_on_error(self): 777 should_continue = threading.Event() 778 779 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 780 bidi_rpc.is_active = True 781 bidi_rpc.add_done_callback.side_effect = lambda _: should_continue.set() 782 783 consumer = bidi.BackgroundConsumer(bidi_rpc, mock.sentinel.on_response) 784 785 # Start the consumer paused, which should immediately put it into wait 786 # state. 787 consumer.pause() 788 consumer.start() 789 790 # Wait for add_done_callback to be called 791 should_continue.wait() 792 bidi_rpc.add_done_callback.assert_called_once_with(consumer._on_call_done) 793 794 # The consumer should now be blocked on waiting to be unpaused. 795 assert consumer.is_active 796 assert consumer.is_paused 797 798 # Trigger the done callback, it should unpause the consumer and cause 799 # it to exit. 800 bidi_rpc.is_active = False 801 consumer._on_call_done(bidi_rpc) 802 803 # It may take a few cycles for the thread to exit. 804 while consumer.is_active: 805 pass 806 807 def test_consumer_expected_error(self, caplog): 808 caplog.set_level(logging.DEBUG) 809 810 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 811 bidi_rpc.is_active = True 812 bidi_rpc.recv.side_effect = exceptions.ServiceUnavailable("Gone away") 813 814 on_response = mock.Mock(spec=["__call__"]) 815 816 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response) 817 818 consumer.start() 819 820 # Wait for the consumer's thread to exit. 821 while consumer.is_active: 822 pass 823 824 on_response.assert_not_called() 825 bidi_rpc.recv.assert_called_once() 826 assert "caught error" in caplog.text 827 828 def test_consumer_unexpected_error(self, caplog): 829 caplog.set_level(logging.DEBUG) 830 831 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 832 bidi_rpc.is_active = True 833 bidi_rpc.recv.side_effect = ValueError() 834 835 on_response = mock.Mock(spec=["__call__"]) 836 837 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response) 838 839 consumer.start() 840 841 # Wait for the consumer's thread to exit. 842 while consumer.is_active: 843 pass # pragma: NO COVER (race condition) 844 845 on_response.assert_not_called() 846 bidi_rpc.recv.assert_called_once() 847 assert "caught unexpected exception" in caplog.text 848 849 def test_double_stop(self, caplog): 850 caplog.set_level(logging.DEBUG) 851 bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) 852 bidi_rpc.is_active = True 853 on_response = mock.Mock(spec=["__call__"]) 854 855 def close_side_effect(): 856 bidi_rpc.is_active = False 857 858 bidi_rpc.close.side_effect = close_side_effect 859 860 consumer = bidi.BackgroundConsumer(bidi_rpc, on_response) 861 862 consumer.start() 863 assert consumer.is_active is True 864 865 consumer.stop() 866 assert consumer.is_active is False 867 868 # calling stop twice should not result in an error. 869 consumer.stop() 870