• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from blueberry.tests.gd.cert.capture import Capture
20from blueberry.tests.gd.cert.closable import Closable
21from blueberry.tests.gd.cert.closable import safeClose
22from blueberry.tests.gd.cert.event_stream import EventStream, IEventStream
23from blueberry.tests.gd.cert.event_stream import FilteringEventStream
24from blueberry.tests.gd.cert.matchers import IsoMatchers
25from blueberry.tests.gd.cert.truth import assertThat
26from datetime import timedelta
27from google.protobuf import empty_pb2 as empty_proto
28from blueberry.facade.iso import facade_pb2 as iso_facade_pb2
29
30
31class CisTestParameters():
32
33    def __init__(self, cis_id, nse, max_sdu_m_to_s, max_sdu_s_to_m, max_pdu_m_to_s, max_pdu_s_to_m, phy_m_to_s,
34                 phy_s_to_m, bn_m_to_s, bn_s_to_m):
35        self.cis_id = cis_id
36        self.nse = nse
37        self.max_sdu_m_to_s = max_sdu_m_to_s
38        self.max_sdu_s_to_m = max_sdu_s_to_m
39        self.max_pdu_m_to_s = max_pdu_m_to_s
40        self.max_pdu_s_to_m = max_pdu_s_to_m
41        self.phy_m_to_s = phy_m_to_s
42        self.phy_s_to_m = phy_s_to_m
43        self.bn_m_to_s = bn_m_to_s
44        self.bn_s_to_m = bn_s_to_m
45
46
47class PyLeIsoStream(IEventStream):
48
49    def __init__(self, device, cis_handle, iso_data_stream):
50        self._device = device
51        self._cis_handle = cis_handle
52        self._le_iso_data_stream = iso_data_stream
53        self._our_le_iso_cis_view = FilteringEventStream(
54            self._le_iso_data_stream, IsoMatchers.PacketPayloadWithMatchingCisHandle(self._cis_handle))
55
56    def get_event_queue(self):
57        return self._our_le_iso_cis_view.get_event_queue()
58
59    def send(self, payload):
60        self._device.iso.SendIsoPacket(iso_facade_pb2.IsoPacket(handle=self._cis_handle, payload=payload))
61
62
63class PyLeIso(Closable):
64    """
65        Abstraction for iso tasks and GRPC calls
66    """
67
68    _iso_event_stream = None
69
70    def __init__(self, device):
71        logging.info("DUT: Init")
72        self._device = device
73        self._device.wait_channel_ready()
74        self._iso_event_stream = EventStream(self._device.iso.FetchIsoEvents(empty_proto.Empty()))
75        self._iso_data_stream = EventStream(self._device.iso.FetchIsoData(empty_proto.Empty()))
76
77    def close(self):
78        if self._iso_event_stream is not None:
79            safeClose(self._iso_event_stream)
80        else:
81            logging.info("DUT: ISO Event Stream is None!")
82        if self._iso_data_stream is not None:
83            safeClose(self._iso_data_stream)
84        else:
85            logging.info("DUT: ISO Data Stream is None!")
86
87        logging.info("DUT: close")
88
89    def le_set_cig_parameters(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy,
90                              packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id,
91                              max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, rtn_m_to_s, rtn_s_to_m):
92
93        resp = self._device.iso.LeSetCigParameters(
94            iso_facade_pb2.LeSetCigParametersRequest(
95                cig_id=cig_id,
96                sdu_interval_m_to_s=sdu_interval_m_to_s,
97                sdu_interval_s_to_m=sdu_interval_s_to_m,
98                peripherals_clock_accuracy=peripherals_clock_accuracy,
99                packing=packing,
100                framing=framing,
101                max_transport_latency_m_to_s=max_transport_latency_m_to_s,
102                max_transport_latency_s_to_m=max_transport_latency_s_to_m,
103                cis_id=cis_id,
104                max_sdu_m_to_s=max_sdu_m_to_s,
105                max_sdu_s_to_m=max_sdu_s_to_m,
106                phy_m_to_s=phy_m_to_s,
107                phy_s_to_m=phy_s_to_m,
108                rtn_m_to_s=rtn_m_to_s,
109                rtn_s_to_m=rtn_s_to_m))
110
111    def le_set_cig_parameters_test(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m,
112                                   iso_interval, peripherals_clock_accuracy, packing, framing,
113                                   max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_configs):
114        configs = []
115        for cc in cis_configs:
116            configs.append(
117                iso_facade_pb2.LeSetCigParametersTestRequest.LeCisParametersTestConfig(
118                    cis_id=cc.cis_id,
119                    nse=cc.nse,
120                    max_sdu_m_to_s=cc.max_sdu_m_to_s,
121                    max_sdu_s_to_m=cc.max_sdu_s_to_m,
122                    max_pdu_m_to_s=cc.max_pdu_m_to_s,
123                    max_pdu_s_to_m=cc.max_pdu_s_to_m,
124                    phy_m_to_s=cc.phy_m_to_s,
125                    phy_s_to_m=cc.phy_s_to_m,
126                    bn_m_to_s=cc.bn_m_to_s,
127                    bn_s_to_m=cc.bn_s_to_m,
128                ))
129
130        resp = self._device.iso.LeSetCigParametersTest(
131            iso_facade_pb2.LeSetCigParametersTestRequest(
132                cig_id=cig_id,
133                sdu_interval_m_to_s=sdu_interval_m_to_s,
134                sdu_interval_s_to_m=sdu_interval_s_to_m,
135                ft_m_to_s=ft_m_to_s,
136                ft_s_to_m=ft_s_to_m,
137                iso_interval=iso_interval,
138                peripherals_clock_accuracy=peripherals_clock_accuracy,
139                packing=packing,
140                framing=framing,
141                max_transport_latency_m_to_s=max_transport_latency_m_to_s,
142                max_transport_latency_s_to_m=max_transport_latency_s_to_m,
143                cis_configs=configs))
144
145    def wait_le_set_cig_parameters_complete(self):
146        set_cig_params_complete_capture = PyLeIso.IsoCigComplete(iso_facade_pb2.IsoMsgType.ISO_PARAMETERS_SET_COMPLETE)
147
148        assertThat(self._iso_event_stream).emits(set_cig_params_complete_capture, timeout=timedelta(seconds=5))
149        return set_cig_params_complete_capture.get()
150
151    @staticmethod
152    def IsoCigComplete(type=None):
153        return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles)
154
155    @staticmethod
156    def _extract_cis_handles(event):
157        if event is None:
158            return None
159        return event.cis_handle
160
161    def le_create_cis(self, cis_and_acl_handle_array):
162        handles_pairs = []
163        for hp_tmp in cis_and_acl_handle_array:
164            handles_pairs.append(
165                iso_facade_pb2.LeCreateCisRequest.HandlePair(cis_handle=hp_tmp[0], acl_handle=hp_tmp[1]))
166
167        self._device.iso.LeCreateCis(iso_facade_pb2.LeCreateCisRequest(handle_pair=handles_pairs))
168
169    def wait_le_cis_established(self):
170        cis_establshed_capture = PyLeIso.IsoCigEstablished(iso_facade_pb2.IsoMsgType.ISO_CIS_ESTABLISHED)
171        assertThat(self._iso_event_stream).emits(cis_establshed_capture, timeout=timedelta(seconds=5))
172        cis_handle = cis_establshed_capture.get()[0]
173        return PyLeIsoStream(self._device, cis_handle, self._iso_data_stream)
174
175    @staticmethod
176    def IsoCigEstablished(type):
177        return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles)
178