1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import time 19 20from mobly import asserts 21from datetime import datetime, timedelta 22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 23from cert.event_callback_stream import EventCallbackStream 24from cert.event_asserts import EventAsserts 25 26# Test packet nesting 27from bluetooth_packets_python3 import hci_packets 28from bluetooth_packets_python3 import l2cap_packets 29 30 31class BogusProto: 32 33 class BogusType: 34 35 def __init__(self): 36 self.name = "BogusProto" 37 self.is_extension = False 38 self.cpp_type = False 39 40 def type(self): 41 return 'BogusRpc' 42 43 def label(self): 44 return "label" 45 46 class BogusDescriptor: 47 48 def __init__(self, name): 49 self.full_name = name 50 51 def __init__(self, value): 52 self.value_ = value 53 self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) 54 55 def __str__(self): 56 return "BogusRpc value = " + str(self.value_) 57 58 def ListFields(self): 59 for field in [BogusProto.BogusType()]: 60 yield [field, self.value_] 61 62 63class FetchEvents: 64 65 def __init__(self, events, delay_ms): 66 self.events_ = events 67 self.sleep_time_ = (delay_ms * 1.0) / 1000 68 self.index_ = 0 69 self.done_ = False 70 self.then_ = datetime.now() 71 72 def __iter__(self): 73 for event in self.events_: 74 time.sleep(self.sleep_time_) 75 if self.done_: 76 return 77 logging.debug("yielding %d" % event) 78 yield BogusProto(event) 79 80 def done(self): 81 return self.done_ 82 83 def cancel(self): 84 logging.debug("cancel") 85 self.done_ = True 86 return None 87 88 89class CertSelfTest(GdFacadeOnlyBaseTestClass): 90 91 def setup_test(self): 92 return True 93 94 def teardown_test(self): 95 return True 96 97 def test_assert_none_passes(self): 98 with EventCallbackStream(FetchEvents(events=[], 99 delay_ms=50)) as event_stream: 100 event_asserts = EventAsserts(event_stream) 101 event_asserts.assert_none(timeout=timedelta(milliseconds=10)) 102 103 def test_assert_none_passes_after_one_second(self): 104 with EventCallbackStream(FetchEvents([1], 105 delay_ms=1500)) as event_stream: 106 event_asserts = EventAsserts(event_stream) 107 event_asserts.assert_none(timeout=timedelta(seconds=1.0)) 108 109 def test_assert_none_fails(self): 110 try: 111 with EventCallbackStream(FetchEvents(events=[17], 112 delay_ms=50)) as event_stream: 113 event_asserts = EventAsserts(event_stream) 114 event_asserts.assert_none(timeout=timedelta(seconds=1)) 115 except Exception as e: 116 logging.debug(e) 117 return True # Failed as expected 118 return False 119 120 def test_assert_none_matching_passes(self): 121 with EventCallbackStream(FetchEvents(events=[1, 2, 3], 122 delay_ms=50)) as event_stream: 123 event_asserts = EventAsserts(event_stream) 124 event_asserts.assert_none_matching( 125 lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)) 126 127 def test_assert_none_matching_passes_after_1_second(self): 128 with EventCallbackStream( 129 FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: 130 event_asserts = EventAsserts(event_stream) 131 event_asserts.assert_none_matching( 132 lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 133 134 def test_assert_none_matching_fails(self): 135 try: 136 with EventCallbackStream( 137 FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 138 event_asserts = EventAsserts(event_stream) 139 event_asserts.assert_none_matching( 140 lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) 141 except Exception as e: 142 logging.debug(e) 143 return True # Failed as expected 144 return False 145 146 def test_assert_occurs_at_least_passes(self): 147 with EventCallbackStream( 148 FetchEvents(events=[1, 2, 3, 1, 2, 3], 149 delay_ms=40)) as event_stream: 150 event_asserts = EventAsserts(event_stream) 151 event_asserts.assert_event_occurs( 152 lambda data: data.value_ == 1, 153 timeout=timedelta(milliseconds=300), 154 at_least_times=2) 155 156 def test_assert_occurs_passes(self): 157 with EventCallbackStream(FetchEvents(events=[1, 2, 3], 158 delay_ms=50)) as event_stream: 159 event_asserts = EventAsserts(event_stream) 160 event_asserts.assert_event_occurs( 161 lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) 162 163 def test_assert_occurs_fails(self): 164 try: 165 with EventCallbackStream( 166 FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 167 event_asserts = EventAsserts(event_stream) 168 event_asserts.assert_event_occurs( 169 lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 170 except Exception as e: 171 logging.debug(e) 172 return True # Failed as expected 173 return False 174 175 def test_assert_occurs_at_most_passes(self): 176 with EventCallbackStream(FetchEvents(events=[1, 2, 3, 4], 177 delay_ms=50)) as event_stream: 178 event_asserts = EventAsserts(event_stream) 179 event_asserts.assert_event_occurs_at_most( 180 lambda data: data.value_ < 4, 181 timeout=timedelta(seconds=1), 182 at_most_times=3) 183 184 def test_assert_occurs_at_most_fails(self): 185 try: 186 with EventCallbackStream( 187 FetchEvents(events=[1, 2, 3, 4], 188 delay_ms=50)) as event_stream: 189 event_asserts = EventAsserts(event_stream) 190 event_asserts.assert_event_occurs_at_most( 191 lambda data: data.value_ > 1, 192 timeout=timedelta(seconds=1), 193 at_most_times=2) 194 except Exception as e: 195 logging.debug(e) 196 return True # Failed as expected 197 return False 198 199 def test_skip_a_test(self): 200 asserts.skip("Skipping this test because it's blocked by b/xyz") 201 assert False 202 203 def test_nested_packets(self): 204 handle = 123 205 inside = hci_packets.ReadScanEnableBuilder() 206 logging.debug(inside.Serialize()) 207 logging.debug("building outside") 208 outside = hci_packets.AclPacketBuilder( 209 handle, 210 hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 211 hci_packets.BroadcastFlag.POINT_TO_POINT, inside) 212 logging.debug(outside.Serialize()) 213 logging.debug("Done!") 214 215 def test_l2cap_config_options(self): 216 mtu_opt = l2cap_packets.MtuConfigurationOption() 217 mtu_opt.mtu = 123 218 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 219 fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT 220 request = l2cap_packets.ConfigurationRequestBuilder( 221 0x1d, # Command ID 222 0xc1d, # Channel ID 223 l2cap_packets.Continuation.END, 224 [mtu_opt, fcs_opt]) 225 request.Serialize() 226 handle = 123 227 wrapped = hci_packets.AclPacketBuilder( 228 handle, 229 hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 230 hci_packets.BroadcastFlag.POINT_TO_POINT, request) 231 asserts.assert_true( 232 len(wrapped.Serialize()) == 16, "Packet serialized incorrectly") 233