1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import datetime, timedelta 18import logging 19from threading import Timer 20import time 21import traceback 22 23from mobly import asserts 24 25from bluetooth_packets_python3 import hci_packets 26from bluetooth_packets_python3 import l2cap_packets 27from cert.event_stream import EventStream, FilteringEventStream 28from cert.truth import assertThat 29from cert.metadata import metadata 30from cert.behavior import when, wait_until 31from cert.behavior import IHasBehaviors 32from cert.behavior import anything 33from cert.behavior import SingleArgumentBehavior 34from cert.behavior import ReplyStage 35 36 37class BogusProto: 38 39 class BogusType: 40 41 def __init__(self): 42 self.name = "BogusProto" 43 self.is_extension = False 44 self.cpp_type = False 45 46 def type(self): 47 return 'BogusRpc' 48 49 def label(self): 50 return "label" 51 52 class BogusDescriptor: 53 54 def __init__(self, name): 55 self.full_name = name 56 57 def __init__(self, value): 58 self.value_ = value 59 self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) 60 61 def __str__(self): 62 return "BogusRpc value = " + str(self.value_) 63 64 def ListFields(self): 65 for field in [BogusProto.BogusType()]: 66 yield [field, self.value_] 67 68 69class FetchEvents: 70 71 def __init__(self, events, delay_ms): 72 self.events_ = events 73 self.sleep_time_ = (delay_ms * 1.0) / 1000 74 self.index_ = 0 75 self.done_ = False 76 self.then_ = datetime.now() 77 78 def __iter__(self): 79 for event in self.events_: 80 time.sleep(self.sleep_time_) 81 if self.done_: 82 return 83 logging.debug("yielding %d" % event) 84 yield BogusProto(event) 85 86 def done(self): 87 return self.done_ 88 89 def cancel(self): 90 logging.debug("cancel") 91 self.done_ = True 92 return None 93 94 95class TestBehaviors(object): 96 97 def __init__(self, parent): 98 self.test_request_behavior = SingleArgumentBehavior(lambda: TestBehaviors.TestRequestReplyStage(parent)) 99 100 def test_request(self, matcher): 101 return self.test_request_behavior.begin(matcher) 102 103 class TestRequestReplyStage(ReplyStage): 104 105 def __init__(self, parent): 106 self._parent = parent 107 108 def increment_count(self): 109 self._commit(lambda obj: self._increment_count(obj)) 110 return self 111 112 def _increment_count(self, obj): 113 self._parent.count += 1 114 self._parent.captured.append(obj) 115 116 117class ObjectWithBehaviors(IHasBehaviors): 118 119 def __init__(self): 120 self.behaviors = TestBehaviors(self) 121 self.count = 0 122 self.captured = [] 123 self.unhandled_count = 0 124 125 def get_behaviors(self): 126 return self.behaviors 127 128 def increment_unhandled(self): 129 self.unhandled_count += 1 130 131 132def test_assert_occurs_at_least_passes_core(): 133 with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: 134 event_stream.assert_event_occurs( 135 lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) 136 137 138def test_assert_occurs_passes_core(): 139 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 140 event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) 141 142 143def test_assert_occurs_fails_core(): 144 try: 145 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 146 event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 147 except Exception as e: 148 logging.debug(e) 149 return True # Failed as expected 150 return False 151 152 153def test_assert_occurs_at_most_passes_core(): 154 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 155 event_stream.assert_event_occurs_at_most( 156 lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) 157 158 159def test_assert_occurs_at_most_fails_core(): 160 try: 161 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 162 event_stream.assert_event_occurs_at_most( 163 lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) 164 except Exception as e: 165 logging.debug(e) 166 return True # Failed as expected 167 return False 168 169 170def test_skip_a_test_core(): 171 asserts.skip("Skipping this test because it's blocked by b/xyz") 172 assert False 173 174 175def test_nested_packets_core(): 176 handle = 123 177 inside = hci_packets.ReadScanEnableBuilder() 178 logging.debug(inside.Serialize()) 179 logging.debug("building outside") 180 outside = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 181 hci_packets.BroadcastFlag.POINT_TO_POINT, inside) 182 logging.debug(outside.Serialize()) 183 logging.debug("Done!") 184 185 186def test_l2cap_config_options_core(): 187 mtu_opt = l2cap_packets.MtuConfigurationOption() 188 mtu_opt.mtu = 123 189 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 190 fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT 191 request = l2cap_packets.ConfigurationRequestBuilder( 192 0x1d, # Command ID 193 0xc1d, # Channel ID 194 l2cap_packets.Continuation.END, 195 [mtu_opt, fcs_opt]) 196 request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request) 197 handle = 123 198 wrapped = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 199 hci_packets.BroadcastFlag.POINT_TO_POINT, request_b_frame) 200 # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3) 201 asserts.assert_true(len(wrapped.Serialize()) == 23, "Packet serialized incorrectly") 202 203 204def test_assertThat_boolean_success_core(): 205 assertThat(True).isTrue() 206 assertThat(False).isFalse() 207 208 209def test_assertThat_boolean_falseIsTrue_core(): 210 try: 211 assertThat(False).isTrue() 212 except Exception as e: 213 return True 214 return False 215 216 217def test_assertThat_boolean_trueIsFalse_core(): 218 try: 219 assertThat(True).isFalse() 220 except Exception as e: 221 return True 222 return False 223 224 225def test_assertThat_object_success_core(): 226 assertThat("this").isEqualTo("this") 227 assertThat("this").isNotEqualTo("that") 228 assertThat(None).isNone() 229 assertThat("this").isNotNone() 230 231 232def test_assertThat_object_isEqualToFails_core(): 233 try: 234 assertThat("this").isEqualTo("that") 235 except Exception as e: 236 return True 237 return False 238 239 240def test_assertThat_object_isNotEqualToFails_core(): 241 try: 242 assertThat("this").isNotEqualTo("this") 243 except Exception as e: 244 return True 245 return False 246 247 248def test_assertThat_object_isNoneFails_core(): 249 try: 250 assertThat("this").isNone() 251 except Exception as e: 252 return True 253 return False 254 255 256def test_assertThat_object_isNotNoneFails_core(): 257 try: 258 assertThat(None).isNotNone() 259 except Exception as e: 260 return True 261 return False 262 263 264def test_assertThat_eventStream_emits_passes_core(): 265 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 266 assertThat(event_stream).emits(lambda data: data.value_ == 1) 267 268 269def test_assertThat_eventStream_emits_then_passes_core(): 270 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 271 assertThat(event_stream).emits(lambda data: data.value_ == 1).then(lambda data: data.value_ == 3) 272 273 274def test_assertThat_eventStream_emits_fails_core(): 275 try: 276 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 277 assertThat(event_stream).emits(lambda data: data.value_ == 4) 278 except Exception as e: 279 logging.debug(e) 280 return True # Failed as expected 281 return False 282 283 284def test_assertThat_eventStream_emits_then_fails_core(): 285 try: 286 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 287 assertThat(event_stream).emits(lambda data: data.value_ == 1).emits(lambda data: data.value_ == 4) 288 except Exception as e: 289 logging.debug(e) 290 return True # Failed as expected 291 return False 292 293 294def test_assertThat_eventStream_emitsInOrder_passes_core(): 295 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 296 assertThat(event_stream).emits(lambda data: data.value_ == 1, lambda data: data.value_ == 2).inOrder() 297 298 299def test_assertThat_eventStream_emitsInAnyOrder_passes_core(): 300 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 301 assertThat(event_stream).emits(lambda data: data.value_ == 2, 302 lambda data: data.value_ == 1).inAnyOrder().then(lambda data: data.value_ == 3) 303 304 305def test_assertThat_eventStream_emitsInOrder_fails_core(): 306 try: 307 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 308 assertThat(event_stream).emits(lambda data: data.value_ == 2, lambda data: data.value_ == 1).inOrder() 309 except Exception as e: 310 logging.debug(e) 311 return True # Failed as expected 312 return False 313 314 315def test_assertThat_eventStream_emitsInAnyOrder_fails_core(): 316 try: 317 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 318 assertThat(event_stream).emits(lambda data: data.value_ == 4, lambda data: data.value_ == 1).inAnyOrder() 319 except Exception as e: 320 logging.debug(e) 321 return True # Failed as expected 322 return False 323 324 325def test_assertThat_emitsNone_passes_core(): 326 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 327 assertThat(event_stream).emitsNone( 328 lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( 329 lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15)) 330 331 332def test_assertThat_emitsNone_passes_after_1_second_core(): 333 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: 334 assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 335 336 337def test_assertThat_emitsNone_fails_core(): 338 try: 339 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 340 assertThat(event_stream).emitsNone(lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) 341 except Exception as e: 342 logging.debug(e) 343 return True # Failed as expected 344 return False 345 346 347def test_assertThat_emitsNone_zero_passes_core(): 348 with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream: 349 assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone( 350 timeout=timedelta(milliseconds=10)) 351 352 353def test_assertThat_emitsNone_zero_passes_after_one_second_core(): 354 with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream: 355 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0)) 356 357 358def test_assertThat_emitsNone_zero_fails_core(): 359 try: 360 with EventStream(FetchEvents(events=[17], delay_ms=50)) as event_stream: 361 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1)) 362 except Exception as e: 363 logging.debug(e) 364 return True # Failed as expected 365 return False 366 367 368def test_filtering_event_stream_none_filter_function_core(): 369 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 370 filtered_event_stream = FilteringEventStream(event_stream, None) 371 assertThat(filtered_event_stream) \ 372 .emits(lambda data: data.value_ == 1) \ 373 .then(lambda data: data.value_ == 3) 374 375 376def test_fluent_behavior_simple_core(): 377 thing = ObjectWithBehaviors() 378 379 when(thing).test_request(anything()).then().increment_count() 380 381 thing.behaviors.test_request_behavior.run("A") 382 383 assertThat(thing.count).isEqualTo(1) 384 assertThat(thing.captured).isEqualTo(["A"]) 385 386 387def test_fluent_behavior__then_single__captures_one_core(): 388 thing = ObjectWithBehaviors() 389 390 thing.behaviors.test_request_behavior.set_default_to_ignore() 391 392 when(thing).test_request(anything()).then().increment_count() 393 394 thing.behaviors.test_request_behavior.run("A") 395 thing.behaviors.test_request_behavior.run("A") 396 thing.behaviors.test_request_behavior.run("A") 397 398 assertThat(thing.count).isEqualTo(1) 399 assertThat(thing.captured).isEqualTo(["A"]) 400 401 402def test_fluent_behavior__then_times__captures_all_core(): 403 thing = ObjectWithBehaviors() 404 405 when(thing).test_request(anything()).then(times=3).increment_count() 406 407 thing.behaviors.test_request_behavior.run("A") 408 thing.behaviors.test_request_behavior.run("B") 409 thing.behaviors.test_request_behavior.run("C") 410 411 assertThat(thing.count).isEqualTo(3) 412 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 413 414 415def test_fluent_behavior__always__captures_all_core(): 416 thing = ObjectWithBehaviors() 417 418 when(thing).test_request(anything()).always().increment_count() 419 420 thing.behaviors.test_request_behavior.run("A") 421 thing.behaviors.test_request_behavior.run("B") 422 thing.behaviors.test_request_behavior.run("C") 423 424 assertThat(thing.count).isEqualTo(3) 425 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 426 427 428def test_fluent_behavior__matcher__captures_relevant_core(): 429 thing = ObjectWithBehaviors() 430 thing.behaviors.test_request_behavior.set_default_to_ignore() 431 432 when(thing).test_request(lambda obj: obj == "B").always().increment_count() 433 434 thing.behaviors.test_request_behavior.run("A") 435 thing.behaviors.test_request_behavior.run("B") 436 thing.behaviors.test_request_behavior.run("C") 437 438 assertThat(thing.count).isEqualTo(1) 439 assertThat(thing.captured).isEqualTo(["B"]) 440 441 442def test_fluent_behavior__then_repeated__captures_relevant_core(): 443 thing = ObjectWithBehaviors() 444 thing.behaviors.test_request_behavior.set_default_to_ignore() 445 446 when(thing).test_request(anything()).then().increment_count().increment_count() 447 448 thing.behaviors.test_request_behavior.run("A") 449 thing.behaviors.test_request_behavior.run("B") 450 thing.behaviors.test_request_behavior.run("A") 451 452 assertThat(thing.count).isEqualTo(2) 453 assertThat(thing.captured).isEqualTo(["A", "B"]) 454 455 456def test_fluent_behavior__fallback__captures_relevant_core(): 457 thing = ObjectWithBehaviors() 458 thing.behaviors.test_request_behavior.set_default_to_ignore() 459 460 when(thing).test_request(lambda obj: obj == "B").then(times=1).increment_count() 461 when(thing).test_request(lambda obj: obj == "C").always().increment_count() 462 463 thing.behaviors.test_request_behavior.run("A") 464 thing.behaviors.test_request_behavior.run("B") 465 thing.behaviors.test_request_behavior.run("A") 466 thing.behaviors.test_request_behavior.run("C") 467 thing.behaviors.test_request_behavior.run("B") 468 thing.behaviors.test_request_behavior.run("C") 469 470 assertThat(thing.count).isEqualTo(3) 471 assertThat(thing.captured).isEqualTo(["B", "C", "C"]) 472 473 474def test_fluent_behavior__default_unhandled_crash_core(): 475 thing = ObjectWithBehaviors() 476 477 when(thing).test_request(anything()).then().increment_count() 478 479 thing.behaviors.test_request_behavior.run("A") 480 try: 481 thing.behaviors.test_request_behavior.run("A") 482 except Exception as e: 483 logging.debug(e) 484 return True # Failed as expected 485 return False 486 487 488def test_fluent_behavior__set_default_works_core(): 489 thing = ObjectWithBehaviors() 490 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 491 492 when(thing).test_request(anything()).then().increment_count() 493 494 thing.behaviors.test_request_behavior.run("A") 495 thing.behaviors.test_request_behavior.run("A") 496 assertThat(thing.unhandled_count).isEqualTo(1) 497 498 499def test_fluent_behavior__wait_until_done_core(): 500 thing = ObjectWithBehaviors() 501 is_a = lambda obj: obj == "A" 502 when(thing).test_request(is_a).then().increment_count() 503 504 closure = lambda: thing.behaviors.test_request_behavior.run("A") 505 t = Timer(0.5, closure) 506 t.start() 507 508 wait_until(thing).test_request(is_a).times(1) 509 assertThat(thing.count).isEqualTo(1) 510 assertThat(thing.captured).isEqualTo(["A"]) 511 512 513def test_fluent_behavior__wait_until_done_different_lambda_core(): 514 thing = ObjectWithBehaviors() 515 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 516 517 closure = lambda: thing.behaviors.test_request_behavior.run("A") 518 t = Timer(0.5, closure) 519 t.start() 520 521 wait_until(thing).test_request(lambda obj: obj == "A").times(1) 522 assertThat(thing.count).isEqualTo(1) 523 assertThat(thing.captured).isEqualTo(["A"]) 524 525 526def test_fluent_behavior__wait_until_done_anything_core(): 527 thing = ObjectWithBehaviors() 528 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 529 530 closure = lambda: thing.behaviors.test_request_behavior.run("A") 531 t = Timer(0.5, closure) 532 t.start() 533 534 wait_until(thing).test_request(anything()).times(1) 535 assertThat(thing.count).isEqualTo(1) 536 assertThat(thing.captured).isEqualTo(["A"]) 537 538 539def test_fluent_behavior__wait_until_done_not_happened_core(): 540 thing = ObjectWithBehaviors() 541 thing.behaviors.test_request_behavior.set_default_to_ignore() 542 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 543 544 closure = lambda: thing.behaviors.test_request_behavior.run("B") 545 t = Timer(0.5, closure) 546 t.start() 547 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(1)).isFalse() 548 549 550def test_fluent_behavior__wait_until_done_with_default_core(): 551 thing = ObjectWithBehaviors() 552 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 553 554 closure = lambda: thing.behaviors.test_request_behavior.run("A") 555 t = Timer(0.5, closure) 556 t.start() 557 558 wait_until(thing).test_request(anything()).times(1) 559 assertThat(thing.unhandled_count).isEqualTo(1) 560 561 562def test_fluent_behavior__wait_until_done_two_events_AA_core(): 563 thing = ObjectWithBehaviors() 564 when(thing).test_request(lambda obj: obj == "A").then().increment_count().increment_count() 565 566 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 567 t1 = Timer(0.5, closure1) 568 t1.start() 569 closure2 = lambda: thing.behaviors.test_request_behavior.run("A") 570 t2 = Timer(0.5, closure2) 571 t2.start() 572 573 wait_until(thing).test_request(lambda obj: obj == "A").times(2) 574 assertThat(thing.count).isEqualTo(2) 575 assertThat(thing.captured).isEqualTo(["A", "A"]) 576 577 578def test_fluent_behavior__wait_until_done_two_events_AB_core(): 579 thing = ObjectWithBehaviors() 580 when(thing).test_request(anything()).always().increment_count() 581 582 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 583 t1 = Timer(0.5, closure1) 584 t1.start() 585 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 586 t2 = Timer(1, closure2) 587 t2.start() 588 589 wait_until(thing).test_request(anything()).times(2) 590 assertThat(thing.count).isEqualTo(2) 591 assertThat(thing.captured).isEqualTo(["A", "B"]) 592 593 594def test_fluent_behavior__wait_until_done_only_one_event_is_done_core(): 595 thing = ObjectWithBehaviors() 596 when(thing).test_request(anything()).always().increment_count() 597 598 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 599 t1 = Timer(1, closure1) 600 t1.start() 601 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 602 t2 = Timer(3, closure2) 603 t2.start() 604 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(2)).isFalse() 605