1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from blueberry.tests.gd.cert.capture import Capture 20from blueberry.tests.gd.cert.closable import Closable 21from blueberry.tests.gd.cert.closable import safeClose 22from blueberry.tests.gd.cert.event_stream import EventStream, IEventStream 23from blueberry.tests.gd.cert.event_stream import FilteringEventStream 24from blueberry.tests.gd.cert.matchers import IsoMatchers 25from blueberry.tests.gd.cert.truth import assertThat 26from datetime import timedelta 27from google.protobuf import empty_pb2 as empty_proto 28from blueberry.facade.iso import facade_pb2 as iso_facade_pb2 29 30 31class CisTestParameters(): 32 33 def __init__(self, cis_id, nse, max_sdu_m_to_s, max_sdu_s_to_m, max_pdu_m_to_s, max_pdu_s_to_m, phy_m_to_s, 34 phy_s_to_m, bn_m_to_s, bn_s_to_m): 35 self.cis_id = cis_id 36 self.nse = nse 37 self.max_sdu_m_to_s = max_sdu_m_to_s 38 self.max_sdu_s_to_m = max_sdu_s_to_m 39 self.max_pdu_m_to_s = max_pdu_m_to_s 40 self.max_pdu_s_to_m = max_pdu_s_to_m 41 self.phy_m_to_s = phy_m_to_s 42 self.phy_s_to_m = phy_s_to_m 43 self.bn_m_to_s = bn_m_to_s 44 self.bn_s_to_m = bn_s_to_m 45 46 47class PyLeIsoStream(IEventStream): 48 49 def __init__(self, device, cis_handle, iso_data_stream): 50 self._device = device 51 self._cis_handle = cis_handle 52 self._le_iso_data_stream = iso_data_stream 53 self._our_le_iso_cis_view = FilteringEventStream( 54 self._le_iso_data_stream, IsoMatchers.PacketPayloadWithMatchingCisHandle(self._cis_handle)) 55 56 def get_event_queue(self): 57 return self._our_le_iso_cis_view.get_event_queue() 58 59 def send(self, payload): 60 self._device.iso.SendIsoPacket(iso_facade_pb2.IsoPacket(handle=self._cis_handle, payload=payload)) 61 62 63class PyLeIso(Closable): 64 """ 65 Abstraction for iso tasks and GRPC calls 66 """ 67 68 _iso_event_stream = None 69 70 def __init__(self, device): 71 logging.info("DUT: Init") 72 self._device = device 73 self._device.wait_channel_ready() 74 self._iso_event_stream = EventStream(self._device.iso.FetchIsoEvents(empty_proto.Empty())) 75 self._iso_data_stream = EventStream(self._device.iso.FetchIsoData(empty_proto.Empty())) 76 77 def close(self): 78 if self._iso_event_stream is not None: 79 safeClose(self._iso_event_stream) 80 else: 81 logging.info("DUT: ISO Event Stream is None!") 82 if self._iso_data_stream is not None: 83 safeClose(self._iso_data_stream) 84 else: 85 logging.info("DUT: ISO Data Stream is None!") 86 87 logging.info("DUT: close") 88 89 def le_set_cig_parameters(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy, 90 packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id, 91 max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, rtn_m_to_s, rtn_s_to_m): 92 93 resp = self._device.iso.LeSetCigParameters( 94 iso_facade_pb2.LeSetCigParametersRequest( 95 cig_id=cig_id, 96 sdu_interval_m_to_s=sdu_interval_m_to_s, 97 sdu_interval_s_to_m=sdu_interval_s_to_m, 98 peripherals_clock_accuracy=peripherals_clock_accuracy, 99 packing=packing, 100 framing=framing, 101 max_transport_latency_m_to_s=max_transport_latency_m_to_s, 102 max_transport_latency_s_to_m=max_transport_latency_s_to_m, 103 cis_id=cis_id, 104 max_sdu_m_to_s=max_sdu_m_to_s, 105 max_sdu_s_to_m=max_sdu_s_to_m, 106 phy_m_to_s=phy_m_to_s, 107 phy_s_to_m=phy_s_to_m, 108 rtn_m_to_s=rtn_m_to_s, 109 rtn_s_to_m=rtn_s_to_m)) 110 111 def le_set_cig_parameters_test(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, ft_m_to_s, ft_s_to_m, 112 iso_interval, peripherals_clock_accuracy, packing, framing, 113 max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_configs): 114 configs = [] 115 for cc in cis_configs: 116 configs.append( 117 iso_facade_pb2.LeSetCigParametersTestRequest.LeCisParametersTestConfig( 118 cis_id=cc.cis_id, 119 nse=cc.nse, 120 max_sdu_m_to_s=cc.max_sdu_m_to_s, 121 max_sdu_s_to_m=cc.max_sdu_s_to_m, 122 max_pdu_m_to_s=cc.max_pdu_m_to_s, 123 max_pdu_s_to_m=cc.max_pdu_s_to_m, 124 phy_m_to_s=cc.phy_m_to_s, 125 phy_s_to_m=cc.phy_s_to_m, 126 bn_m_to_s=cc.bn_m_to_s, 127 bn_s_to_m=cc.bn_s_to_m, 128 )) 129 130 resp = self._device.iso.LeSetCigParametersTest( 131 iso_facade_pb2.LeSetCigParametersTestRequest( 132 cig_id=cig_id, 133 sdu_interval_m_to_s=sdu_interval_m_to_s, 134 sdu_interval_s_to_m=sdu_interval_s_to_m, 135 ft_m_to_s=ft_m_to_s, 136 ft_s_to_m=ft_s_to_m, 137 iso_interval=iso_interval, 138 peripherals_clock_accuracy=peripherals_clock_accuracy, 139 packing=packing, 140 framing=framing, 141 max_transport_latency_m_to_s=max_transport_latency_m_to_s, 142 max_transport_latency_s_to_m=max_transport_latency_s_to_m, 143 cis_configs=configs)) 144 145 def wait_le_set_cig_parameters_complete(self): 146 set_cig_params_complete_capture = PyLeIso.IsoCigComplete(iso_facade_pb2.IsoMsgType.ISO_PARAMETERS_SET_COMPLETE) 147 148 assertThat(self._iso_event_stream).emits(set_cig_params_complete_capture, timeout=timedelta(seconds=5)) 149 return set_cig_params_complete_capture.get() 150 151 @staticmethod 152 def IsoCigComplete(type=None): 153 return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles) 154 155 @staticmethod 156 def _extract_cis_handles(event): 157 if event is None: 158 return None 159 return event.cis_handle 160 161 def le_create_cis(self, cis_and_acl_handle_array): 162 handles_pairs = [] 163 for hp_tmp in cis_and_acl_handle_array: 164 handles_pairs.append( 165 iso_facade_pb2.LeCreateCisRequest.HandlePair(cis_handle=hp_tmp[0], acl_handle=hp_tmp[1])) 166 167 self._device.iso.LeCreateCis(iso_facade_pb2.LeCreateCisRequest(handle_pair=handles_pairs)) 168 169 def wait_le_cis_established(self): 170 cis_establshed_capture = PyLeIso.IsoCigEstablished(iso_facade_pb2.IsoMsgType.ISO_CIS_ESTABLISHED) 171 assertThat(self._iso_event_stream).emits(cis_establshed_capture, timeout=timedelta(seconds=5)) 172 cis_handle = cis_establshed_capture.get()[0] 173 return PyLeIsoStream(self._device, cis_handle, self._iso_data_stream) 174 175 @staticmethod 176 def IsoCigEstablished(type): 177 return Capture(lambda event: True if event.message_type == type else False, PyLeIso._extract_cis_handles) 178