• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import time
19
20from mobly import asserts
21from datetime import datetime, timedelta
22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
23from cert.event_callback_stream import EventCallbackStream
24from cert.event_asserts import EventAsserts
25
26# Test packet nesting
27from bluetooth_packets_python3 import hci_packets
28from bluetooth_packets_python3 import l2cap_packets
29
30
31class BogusProto:
32
33    class BogusType:
34
35        def __init__(self):
36            self.name = "BogusProto"
37            self.is_extension = False
38            self.cpp_type = False
39
40        def type(self):
41            return 'BogusRpc'
42
43        def label(self):
44            return "label"
45
46    class BogusDescriptor:
47
48        def __init__(self, name):
49            self.full_name = name
50
51    def __init__(self, value):
52        self.value_ = value
53        self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value))
54
55    def __str__(self):
56        return "BogusRpc value = " + str(self.value_)
57
58    def ListFields(self):
59        for field in [BogusProto.BogusType()]:
60            yield [field, self.value_]
61
62
63class FetchEvents:
64
65    def __init__(self, events, delay_ms):
66        self.events_ = events
67        self.sleep_time_ = (delay_ms * 1.0) / 1000
68        self.index_ = 0
69        self.done_ = False
70        self.then_ = datetime.now()
71
72    def __iter__(self):
73        for event in self.events_:
74            time.sleep(self.sleep_time_)
75            if self.done_:
76                return
77            logging.debug("yielding %d" % event)
78            yield BogusProto(event)
79
80    def done(self):
81        return self.done_
82
83    def cancel(self):
84        logging.debug("cancel")
85        self.done_ = True
86        return None
87
88
89class CertSelfTest(GdFacadeOnlyBaseTestClass):
90
91    def setup_test(self):
92        return True
93
94    def teardown_test(self):
95        return True
96
97    def test_assert_none_passes(self):
98        with EventCallbackStream(FetchEvents(events=[],
99                                             delay_ms=50)) as event_stream:
100            event_asserts = EventAsserts(event_stream)
101            event_asserts.assert_none(timeout=timedelta(milliseconds=10))
102
103    def test_assert_none_passes_after_one_second(self):
104        with EventCallbackStream(FetchEvents([1],
105                                             delay_ms=1500)) as event_stream:
106            event_asserts = EventAsserts(event_stream)
107            event_asserts.assert_none(timeout=timedelta(seconds=1.0))
108
109    def test_assert_none_fails(self):
110        try:
111            with EventCallbackStream(FetchEvents(events=[17],
112                                                 delay_ms=50)) as event_stream:
113                event_asserts = EventAsserts(event_stream)
114                event_asserts.assert_none(timeout=timedelta(seconds=1))
115        except Exception as e:
116            logging.debug(e)
117            return True  # Failed as expected
118        return False
119
120    def test_assert_none_matching_passes(self):
121        with EventCallbackStream(FetchEvents(events=[1, 2, 3],
122                                             delay_ms=50)) as event_stream:
123            event_asserts = EventAsserts(event_stream)
124            event_asserts.assert_none_matching(
125                lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15))
126
127    def test_assert_none_matching_passes_after_1_second(self):
128        with EventCallbackStream(
129                FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream:
130            event_asserts = EventAsserts(event_stream)
131            event_asserts.assert_none_matching(
132                lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
133
134    def test_assert_none_matching_fails(self):
135        try:
136            with EventCallbackStream(
137                    FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
138                event_asserts = EventAsserts(event_stream)
139                event_asserts.assert_none_matching(
140                    lambda data: data.value_ == 2, timeout=timedelta(seconds=1))
141        except Exception as e:
142            logging.debug(e)
143            return True  # Failed as expected
144        return False
145
146    def test_assert_occurs_at_least_passes(self):
147        with EventCallbackStream(
148                FetchEvents(events=[1, 2, 3, 1, 2, 3],
149                            delay_ms=40)) as event_stream:
150            event_asserts = EventAsserts(event_stream)
151            event_asserts.assert_event_occurs(
152                lambda data: data.value_ == 1,
153                timeout=timedelta(milliseconds=300),
154                at_least_times=2)
155
156    def test_assert_occurs_passes(self):
157        with EventCallbackStream(FetchEvents(events=[1, 2, 3],
158                                             delay_ms=50)) as event_stream:
159            event_asserts = EventAsserts(event_stream)
160            event_asserts.assert_event_occurs(
161                lambda data: data.value_ == 1, timeout=timedelta(seconds=1))
162
163    def test_assert_occurs_fails(self):
164        try:
165            with EventCallbackStream(
166                    FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
167                event_asserts = EventAsserts(event_stream)
168                event_asserts.assert_event_occurs(
169                    lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
170        except Exception as e:
171            logging.debug(e)
172            return True  # Failed as expected
173        return False
174
175    def test_assert_occurs_at_most_passes(self):
176        with EventCallbackStream(FetchEvents(events=[1, 2, 3, 4],
177                                             delay_ms=50)) as event_stream:
178            event_asserts = EventAsserts(event_stream)
179            event_asserts.assert_event_occurs_at_most(
180                lambda data: data.value_ < 4,
181                timeout=timedelta(seconds=1),
182                at_most_times=3)
183
184    def test_assert_occurs_at_most_fails(self):
185        try:
186            with EventCallbackStream(
187                    FetchEvents(events=[1, 2, 3, 4],
188                                delay_ms=50)) as event_stream:
189                event_asserts = EventAsserts(event_stream)
190                event_asserts.assert_event_occurs_at_most(
191                    lambda data: data.value_ > 1,
192                    timeout=timedelta(seconds=1),
193                    at_most_times=2)
194        except Exception as e:
195            logging.debug(e)
196            return True  # Failed as expected
197        return False
198
199    def test_skip_a_test(self):
200        asserts.skip("Skipping this test because it's blocked by b/xyz")
201        assert False
202
203    def test_nested_packets(self):
204        handle = 123
205        inside = hci_packets.ReadScanEnableBuilder()
206        logging.debug(inside.Serialize())
207        logging.debug("building outside")
208        outside = hci_packets.AclPacketBuilder(
209            handle,
210            hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
211            hci_packets.BroadcastFlag.POINT_TO_POINT, inside)
212        logging.debug(outside.Serialize())
213        logging.debug("Done!")
214
215    def test_l2cap_config_options(self):
216        mtu_opt = l2cap_packets.MtuConfigurationOption()
217        mtu_opt.mtu = 123
218        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
219        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
220        request = l2cap_packets.ConfigurationRequestBuilder(
221            0x1d,  # Command ID
222            0xc1d,  # Channel ID
223            l2cap_packets.Continuation.END,
224            [mtu_opt, fcs_opt])
225        request.Serialize()
226        handle = 123
227        wrapped = hci_packets.AclPacketBuilder(
228            handle,
229            hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
230            hci_packets.BroadcastFlag.POINT_TO_POINT, request)
231        asserts.assert_true(
232            len(wrapped.Serialize()) == 16, "Packet serialized incorrectly")
233