1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import time 19import traceback 20from datetime import datetime, timedelta 21from threading import Timer 22 23from blueberry.tests.gd.cert.behavior import when, wait_until 24from blueberry.tests.gd.cert.behavior import IHasBehaviors 25from blueberry.tests.gd.cert.behavior import anything 26from blueberry.tests.gd.cert.behavior import SingleArgumentBehavior 27from blueberry.tests.gd.cert.behavior import ReplyStage 28from blueberry.tests.gd.cert.event_stream import EventStream, FilteringEventStream 29from blueberry.tests.gd.cert.metadata import metadata 30from blueberry.tests.gd.cert.truth import assertThat 31from bluetooth_packets_python3 import l2cap_packets 32import hci_packets as hci 33 34from mobly import asserts 35from mobly import signals 36from mobly import test_runner 37from mobly import base_test 38 39 40class BogusProto: 41 42 class BogusType: 43 44 def __init__(self): 45 self.name = "BogusProto" 46 self.is_extension = False 47 self.cpp_type = False 48 49 def type(self): 50 return 'BogusRpc' 51 52 def label(self): 53 return "label" 54 55 class BogusDescriptor: 56 57 def __init__(self, name): 58 self.full_name = name 59 60 def __init__(self, value): 61 self.value_ = value 62 self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) 63 64 def __str__(self): 65 return "BogusRpc value = " + str(self.value_) 66 67 def ListFields(self): 68 for field in [BogusProto.BogusType()]: 69 yield [field, self.value_] 70 71 72class FetchEvents: 73 74 def __init__(self, events, delay_ms): 75 self.events_ = events 76 self.sleep_time_ = (delay_ms * 1.0) / 1000 77 self.index_ = 0 78 self.done_ = False 79 self.then_ = datetime.now() 80 81 def __iter__(self): 82 for event in self.events_: 83 time.sleep(self.sleep_time_) 84 if self.done_: 85 return 86 logging.debug("yielding %d" % event) 87 yield BogusProto(event) 88 89 def done(self): 90 return self.done_ 91 92 def cancel(self): 93 logging.debug("cancel") 94 self.done_ = True 95 return None 96 97 98class TestBehaviors(object): 99 100 def __init__(self, parent): 101 self.test_request_behavior = SingleArgumentBehavior(lambda: TestBehaviors.TestRequestReplyStage(parent)) 102 103 def test_request(self, matcher): 104 return self.test_request_behavior.begin(matcher) 105 106 class TestRequestReplyStage(ReplyStage): 107 108 def __init__(self, parent): 109 self._parent = parent 110 111 def increment_count(self): 112 self._commit(lambda obj: self._increment_count(obj)) 113 return self 114 115 def _increment_count(self, obj): 116 self._parent.count += 1 117 self._parent.captured.append(obj) 118 119 120class ObjectWithBehaviors(IHasBehaviors): 121 122 def __init__(self): 123 self.behaviors = TestBehaviors(self) 124 self.count = 0 125 self.captured = [] 126 self.unhandled_count = 0 127 128 def get_behaviors(self): 129 return self.behaviors 130 131 def increment_unhandled(self): 132 self.unhandled_count += 1 133 134 135class CertSelfTest(base_test.BaseTestClass): 136 137 def setup_test(self): 138 return True 139 140 def teardown_test(self): 141 return True 142 143 def test_assert_occurs_at_least_passes(self): 144 with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: 145 event_stream.assert_event_occurs(lambda data: data.value_ == 1, 146 timeout=timedelta(milliseconds=300), 147 at_least_times=2) 148 149 def test_assert_occurs_passes(self): 150 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 151 event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) 152 153 def test_assert_occurs_fails(self): 154 try: 155 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 156 event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 157 except Exception as e: 158 logging.debug(e) 159 return True # Failed as expected 160 return False 161 162 def test_assert_occurs_at_most_passes(self): 163 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 164 event_stream.assert_event_occurs_at_most(lambda data: data.value_ < 4, 165 timeout=timedelta(seconds=1), 166 at_most_times=3) 167 168 def test_assert_occurs_at_most_fails(self): 169 try: 170 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 171 event_stream.assert_event_occurs_at_most(lambda data: data.value_ > 1, 172 timeout=timedelta(seconds=1), 173 at_most_times=2) 174 except Exception as e: 175 logging.debug(e) 176 return True # Failed as expected 177 return False 178 179 def test_skip_a_test(self): 180 asserts.skip("Skipping this test because it's blocked by b/xyz") 181 assert False 182 183 def test_nested_packets(self): 184 handle = 123 185 inside = hci.ReadScanEnable() 186 logging.debug(inside.serialize()) 187 logging.debug("building outside") 188 outside = hci.Acl(handle=handle, 189 packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 190 broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, 191 payload=inside.serialize()) 192 logging.debug(outside.serialize()) 193 logging.debug("Done!") 194 195 def test_l2cap_config_options(self): 196 mtu_opt = l2cap_packets.MtuConfigurationOption() 197 mtu_opt.mtu = 123 198 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 199 fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT 200 request = l2cap_packets.ConfigurationRequestBuilder( 201 0x1d, # Command ID 202 0xc1d, # Channel ID 203 l2cap_packets.Continuation.END, 204 [mtu_opt, fcs_opt]) 205 request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request) 206 handle = 123 207 wrapped = hci.Acl(handle=handle, 208 packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 209 broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT, 210 payload=bytes(request_b_frame.Serialize())) 211 # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3) 212 asserts.assert_true(len(wrapped.serialize()) == 23, "Packet serialized incorrectly") 213 214 def test_assertThat_boolean_success(self): 215 assertThat(True).isTrue() 216 assertThat(False).isFalse() 217 218 def test_assertThat_boolean_falseIsTrue(self): 219 try: 220 assertThat(False).isTrue() 221 except Exception as e: 222 return True 223 return False 224 225 def test_assertThat_boolean_trueIsFalse(self): 226 try: 227 assertThat(True).isFalse() 228 except Exception as e: 229 return True 230 return False 231 232 def test_assertThat_object_success(self): 233 assertThat("this").isEqualTo("this") 234 assertThat("this").isNotEqualTo("that") 235 assertThat(None).isNone() 236 assertThat("this").isNotNone() 237 238 def test_assertThat_object_isEqualToFails(self): 239 try: 240 assertThat("this").isEqualTo("that") 241 except Exception as e: 242 return True 243 return False 244 245 def test_assertThat_object_isNotEqualToFails(self): 246 try: 247 assertThat("this").isNotEqualTo("this") 248 except Exception as e: 249 return True 250 return False 251 252 def test_assertThat_object_isNoneFails(self): 253 try: 254 assertThat("this").isNone() 255 except Exception as e: 256 return True 257 return False 258 259 def test_assertThat_object_isNotNoneFails(self): 260 try: 261 assertThat(None).isNotNone() 262 except Exception as e: 263 return True 264 return False 265 266 def test_assertThat_eventStream_emits_passes(self): 267 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 268 assertThat(event_stream).emits(lambda data: data.value_ == 1) 269 270 def test_assertThat_eventStream_emits_then_passes(self): 271 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 272 assertThat(event_stream).emits(lambda data: data.value_ == 1).then(lambda data: data.value_ == 3) 273 274 def test_assertThat_eventStream_emits_fails(self): 275 try: 276 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 277 assertThat(event_stream).emits(lambda data: data.value_ == 4) 278 except Exception as e: 279 logging.debug(e) 280 return True # Failed as expected 281 return False 282 283 def test_assertThat_eventStream_emits_then_fails(self): 284 try: 285 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 286 assertThat(event_stream).emits(lambda data: data.value_ == 1).emits(lambda data: data.value_ == 4) 287 except Exception as e: 288 logging.debug(e) 289 return True # Failed as expected 290 return False 291 292 def test_assertThat_eventStream_emitsInOrder_passes(self): 293 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 294 assertThat(event_stream).emits(lambda data: data.value_ == 1, lambda data: data.value_ == 2).inOrder() 295 296 def test_assertThat_eventStream_emitsInAnyOrder_passes(self): 297 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 298 assertThat(event_stream).emits( 299 lambda data: data.value_ == 2, 300 lambda data: data.value_ == 1).inAnyOrder().then(lambda data: data.value_ == 3) 301 302 def test_assertThat_eventStream_emitsInOrder_fails(self): 303 try: 304 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 305 assertThat(event_stream).emits(lambda data: data.value_ == 2, lambda data: data.value_ == 1).inOrder() 306 except Exception as e: 307 logging.debug(e) 308 return True # Failed as expected 309 return False 310 311 def test_assertThat_eventStream_emitsInAnyOrder_fails(self): 312 try: 313 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 314 assertThat(event_stream).emits(lambda data: data.value_ == 4, 315 lambda data: data.value_ == 1).inAnyOrder() 316 except Exception as e: 317 logging.debug(e) 318 return True # Failed as expected 319 return False 320 321 def test_assertThat_emitsNone_passes(self): 322 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 323 assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( 324 lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15)) 325 326 def test_assertThat_emitsNone_passes_after_1_second(self): 327 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: 328 assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 329 330 def test_assertThat_emitsNone_fails(self): 331 try: 332 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 333 assertThat(event_stream).emitsNone(lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) 334 except Exception as e: 335 logging.debug(e) 336 return True # Failed as expected 337 return False 338 339 def test_assertThat_emitsNone_zero_passes(self): 340 with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream: 341 assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone(timeout=timedelta( 342 milliseconds=10)) 343 344 def test_assertThat_emitsNone_zero_passes_after_one_second(self): 345 with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream: 346 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0)) 347 348 def test_assertThat_emitsNone_zero_fails(self): 349 try: 350 with EventStream(FetchEvents(events=[17], delay_ms=50)) as event_stream: 351 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1)) 352 except Exception as e: 353 logging.debug(e) 354 return True # Failed as expected 355 return False 356 357 def test_filtering_event_stream_none_filter_function(self): 358 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 359 filtered_event_stream = FilteringEventStream(event_stream, None) 360 assertThat(filtered_event_stream) \ 361 .emits(lambda data: data.value_ == 1) \ 362 .then(lambda data: data.value_ == 3) 363 364 def test_metadata_empty(self): 365 366 @metadata() 367 def simple_pass_test(arg): 368 pass 369 370 try: 371 simple_pass_test(1) 372 except signals.TestFailure: 373 pass 374 except Exception as e: 375 asserts.fail("@metadata() should only raise signals.TestFailure, " 376 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 377 else: 378 asserts.fail("@metadata() should not work") 379 380 def test_metadata_empty_no_function_call(self): 381 382 @metadata 383 def simple_pass_test(arg): 384 pass 385 386 try: 387 simple_pass_test(1) 388 except signals.TestFailure: 389 pass 390 except Exception as e: 391 asserts.fail("@metadata should only raise signals.TestFailure, " 392 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 393 else: 394 asserts.fail("@metadata should not work") 395 396 def test_metadata_pts_missing_id(self): 397 398 @metadata(pts_test_name="Hello world") 399 def simple_pass_test(arg): 400 pass 401 402 try: 403 simple_pass_test(1) 404 except signals.TestFailure: 405 pass 406 except Exception as e: 407 asserts.fail("should only raise signals.TestFailure, " 408 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 409 else: 410 asserts.fail("missing pts_test_id should not work") 411 412 def test_metadata_pts_missing_name(self): 413 414 @metadata(pts_test_id="A/B/C") 415 def simple_pass_test(arg): 416 pass 417 418 try: 419 simple_pass_test(1) 420 except signals.TestFailure: 421 pass 422 except Exception as e: 423 asserts.fail("should only raise signals.TestFailure, " 424 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 425 else: 426 asserts.fail("missing pts_test_name should not work") 427 428 def test_metadata_pts_test_id_and_description(self): 429 430 @metadata(pts_test_id="A/B/C", pts_test_name="Hello world") 431 def simple_pass_test(arg): 432 pass 433 434 try: 435 simple_pass_test(1) 436 except signals.TestPass as e: 437 asserts.assert_true("pts_test_id" in e.extras, msg=("pts_test_id not in extra: %s" % str(e.extras))) 438 asserts.assert_equal(e.extras["pts_test_id"], "A/B/C") 439 asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) 440 asserts.assert_equal(e.extras["pts_test_name"], "Hello world") 441 else: 442 asserts.fail("Must throw an exception using @metadata decorator") 443 444 def test_metadata_test_with_exception_stacktrace(self): 445 446 @metadata(pts_test_id="A/B/C", pts_test_name="Hello world") 447 def simple_fail_test(failure_argument): 448 raise ValueError(failure_argument) 449 450 try: 451 simple_fail_test("BEEFBEEF") 452 except signals.TestError as e: 453 asserts.assert_true("pts_test_id" in e.extras, msg=("pts_test_id not in extra: %s" % str(e.extras))) 454 asserts.assert_equal(e.extras["pts_test_id"], "A/B/C") 455 asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) 456 asserts.assert_equal(e.extras["pts_test_name"], "Hello world") 457 trace_str = traceback.format_exc() 458 asserts.assert_true("raise ValueError(failure_argument)" in trace_str, 459 msg="Failed test method not in error stack trace: %s" % trace_str) 460 else: 461 asserts.fail("Must throw an exception using @metadata decorator") 462 463 def test_fluent_behavior_simple(self): 464 thing = ObjectWithBehaviors() 465 466 when(thing).test_request(anything()).then().increment_count() 467 468 thing.behaviors.test_request_behavior.run("A") 469 470 assertThat(thing.count).isEqualTo(1) 471 assertThat(thing.captured).isEqualTo(["A"]) 472 473 def test_fluent_behavior__then_single__captures_one(self): 474 thing = ObjectWithBehaviors() 475 476 thing.behaviors.test_request_behavior.set_default_to_ignore() 477 478 when(thing).test_request(anything()).then().increment_count() 479 480 thing.behaviors.test_request_behavior.run("A") 481 thing.behaviors.test_request_behavior.run("A") 482 thing.behaviors.test_request_behavior.run("A") 483 484 assertThat(thing.count).isEqualTo(1) 485 assertThat(thing.captured).isEqualTo(["A"]) 486 487 def test_fluent_behavior__then_times__captures_all(self): 488 thing = ObjectWithBehaviors() 489 490 when(thing).test_request(anything()).then(times=3).increment_count() 491 492 thing.behaviors.test_request_behavior.run("A") 493 thing.behaviors.test_request_behavior.run("B") 494 thing.behaviors.test_request_behavior.run("C") 495 496 assertThat(thing.count).isEqualTo(3) 497 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 498 499 def test_fluent_behavior__always__captures_all(self): 500 thing = ObjectWithBehaviors() 501 502 when(thing).test_request(anything()).always().increment_count() 503 504 thing.behaviors.test_request_behavior.run("A") 505 thing.behaviors.test_request_behavior.run("B") 506 thing.behaviors.test_request_behavior.run("C") 507 508 assertThat(thing.count).isEqualTo(3) 509 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 510 511 def test_fluent_behavior__matcher__captures_relevant(self): 512 thing = ObjectWithBehaviors() 513 thing.behaviors.test_request_behavior.set_default_to_ignore() 514 515 when(thing).test_request(lambda obj: obj == "B").always().increment_count() 516 517 thing.behaviors.test_request_behavior.run("A") 518 thing.behaviors.test_request_behavior.run("B") 519 thing.behaviors.test_request_behavior.run("C") 520 521 assertThat(thing.count).isEqualTo(1) 522 assertThat(thing.captured).isEqualTo(["B"]) 523 524 def test_fluent_behavior__then_repeated__captures_relevant(self): 525 thing = ObjectWithBehaviors() 526 thing.behaviors.test_request_behavior.set_default_to_ignore() 527 528 when(thing).test_request(anything()).then().increment_count().increment_count() 529 530 thing.behaviors.test_request_behavior.run("A") 531 thing.behaviors.test_request_behavior.run("B") 532 thing.behaviors.test_request_behavior.run("A") 533 534 assertThat(thing.count).isEqualTo(2) 535 assertThat(thing.captured).isEqualTo(["A", "B"]) 536 537 def test_fluent_behavior__fallback__captures_relevant(self): 538 thing = ObjectWithBehaviors() 539 thing.behaviors.test_request_behavior.set_default_to_ignore() 540 541 when(thing).test_request(lambda obj: obj == "B").then(times=1).increment_count() 542 when(thing).test_request(lambda obj: obj == "C").always().increment_count() 543 544 thing.behaviors.test_request_behavior.run("A") 545 thing.behaviors.test_request_behavior.run("B") 546 thing.behaviors.test_request_behavior.run("A") 547 thing.behaviors.test_request_behavior.run("C") 548 thing.behaviors.test_request_behavior.run("B") 549 thing.behaviors.test_request_behavior.run("C") 550 551 assertThat(thing.count).isEqualTo(3) 552 assertThat(thing.captured).isEqualTo(["B", "C", "C"]) 553 554 def test_fluent_behavior__default_unhandled_crash(self): 555 thing = ObjectWithBehaviors() 556 557 when(thing).test_request(anything()).then().increment_count() 558 559 thing.behaviors.test_request_behavior.run("A") 560 try: 561 thing.behaviors.test_request_behavior.run("A") 562 except Exception as e: 563 logging.debug(e) 564 return True # Failed as expected 565 return False 566 567 def test_fluent_behavior__set_default_works(self): 568 thing = ObjectWithBehaviors() 569 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 570 571 when(thing).test_request(anything()).then().increment_count() 572 573 thing.behaviors.test_request_behavior.run("A") 574 thing.behaviors.test_request_behavior.run("A") 575 assertThat(thing.unhandled_count).isEqualTo(1) 576 577 def test_fluent_behavior__wait_until_done(self): 578 thing = ObjectWithBehaviors() 579 is_a = lambda obj: obj == "A" 580 when(thing).test_request(is_a).then().increment_count() 581 582 closure = lambda: thing.behaviors.test_request_behavior.run("A") 583 t = Timer(0.5, closure) 584 t.start() 585 586 wait_until(thing).test_request(is_a).times(1) 587 assertThat(thing.count).isEqualTo(1) 588 assertThat(thing.captured).isEqualTo(["A"]) 589 590 def test_fluent_behavior__wait_until_done_different_lambda(self): 591 thing = ObjectWithBehaviors() 592 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 593 594 closure = lambda: thing.behaviors.test_request_behavior.run("A") 595 t = Timer(0.5, closure) 596 t.start() 597 598 wait_until(thing).test_request(lambda obj: obj == "A").times(1) 599 assertThat(thing.count).isEqualTo(1) 600 assertThat(thing.captured).isEqualTo(["A"]) 601 602 def test_fluent_behavior__wait_until_done_anything(self): 603 thing = ObjectWithBehaviors() 604 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 605 606 closure = lambda: thing.behaviors.test_request_behavior.run("A") 607 t = Timer(0.5, closure) 608 t.start() 609 610 wait_until(thing).test_request(anything()).times(1) 611 assertThat(thing.count).isEqualTo(1) 612 assertThat(thing.captured).isEqualTo(["A"]) 613 614 def test_fluent_behavior__wait_until_done_not_happened(self): 615 thing = ObjectWithBehaviors() 616 thing.behaviors.test_request_behavior.set_default_to_ignore() 617 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 618 619 closure = lambda: thing.behaviors.test_request_behavior.run("B") 620 t = Timer(0.5, closure) 621 t.start() 622 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(1)).isFalse() 623 624 def test_fluent_behavior__wait_until_done_with_default(self): 625 thing = ObjectWithBehaviors() 626 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 627 628 closure = lambda: thing.behaviors.test_request_behavior.run("A") 629 t = Timer(0.5, closure) 630 t.start() 631 632 wait_until(thing).test_request(anything()).times(1) 633 assertThat(thing.unhandled_count).isEqualTo(1) 634 635 def test_fluent_behavior__wait_until_done_two_events_AA(self): 636 thing = ObjectWithBehaviors() 637 when(thing).test_request(lambda obj: obj == "A").then().increment_count().increment_count() 638 639 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 640 t1 = Timer(0.5, closure1) 641 t1.start() 642 closure2 = lambda: thing.behaviors.test_request_behavior.run("A") 643 t2 = Timer(0.5, closure2) 644 t2.start() 645 646 wait_until(thing).test_request(lambda obj: obj == "A").times(2) 647 assertThat(thing.count).isEqualTo(2) 648 assertThat(thing.captured).isEqualTo(["A", "A"]) 649 650 def test_fluent_behavior__wait_until_done_two_events_AB(self): 651 thing = ObjectWithBehaviors() 652 when(thing).test_request(anything()).always().increment_count() 653 654 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 655 t1 = Timer(0.5, closure1) 656 t1.start() 657 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 658 t2 = Timer(1, closure2) 659 t2.start() 660 661 wait_until(thing).test_request(anything()).times(2) 662 assertThat(thing.count).isEqualTo(2) 663 assertThat(thing.captured).isEqualTo(["A", "B"]) 664 665 def test_fluent_behavior__wait_until_done_only_one_event_is_done(self): 666 thing = ObjectWithBehaviors() 667 when(thing).test_request(anything()).always().increment_count() 668 669 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 670 t1 = Timer(1, closure1) 671 t1.start() 672 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 673 t2 = Timer(3, closure2) 674 t2.start() 675 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(2)).isFalse() 676 677 678if __name__ == '__main__': 679 test_runner.main() 680