1#!/usr/bin/env python3 2# 3# Copyright (C) 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This test script exercises set PHY and read PHY procedures. 18""" 19 20from queue import Empty 21 22from acts.test_decorators import test_tracker_info 23from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 24from acts_contrib.test_utils.bt.GattConnectedBaseTest import GattConnectedBaseTest 25from acts_contrib.test_utils.bt.bt_constants import gatt_connection_priority 26from acts_contrib.test_utils.bt.bt_constants import gatt_event 27from acts_contrib.test_utils.bt.bt_constants import gatt_phy 28from acts import signals 29 30CONNECTION_PRIORITY_HIGH = gatt_connection_priority['high'] 31PHY_LE_1M = gatt_phy['1m'] 32PHY_LE_2M = gatt_phy['2m'] 33 34 35def lfmt(txPhy, rxPhy): 36 return '(' + list(gatt_phy.keys())[list(gatt_phy.values()).index( 37 txPhy)] + ', ' + list(gatt_phy.keys())[list(gatt_phy.values()).index( 38 rxPhy)] + ')' 39 40 41class PhyTest(GattConnectedBaseTest): 42 def setup_class(self): 43 super(PhyTest, self).setup_class() 44 if not self.cen_ad.droid.bluetoothIsLe2MPhySupported(): 45 raise signals.TestAbortClass( 46 "Central device does not support LE 2M PHY") 47 48 if not self.per_ad.droid.bluetoothIsLe2MPhySupported(): 49 raise signals.TestAbortClass( 50 "Peripheral device does not support LE 2M PHY") 51 52 # Some controllers auto-update PHY to 2M, and both client and server 53 # might receive PHY Update event right after connection, if the 54 # connection was established over 1M PHY. We will ignore this event, but 55 # must pop it from queue. 56 def pop_initial_phy_update(self): 57 try: 58 maybe_event = gatt_event['phy_update']['evt'].format( 59 self.gatt_callback) 60 self.cen_ad.ed.pop_event(maybe_event, 0) 61 except Empty: 62 pass 63 64 try: 65 maybe_event = gatt_event['serv_phy_update']['evt'].format( 66 self.gatt_server_callback) 67 self.per_ad.ed.pop_event(maybe_event, 0) 68 except Empty: 69 pass 70 71 # this helper method checks wether both client and server received PHY 72 # update event with proper txPhy and rxPhy 73 def ensure_both_updated_phy(self, clientTxPhy, clientRxPhy): 74 event = self._client_wait(gatt_event['phy_update']) 75 txPhy = event['data']['TxPhy'] 76 rxPhy = event['data']['RxPhy'] 77 self.log.info("\tClient PHY updated: " + lfmt(txPhy, rxPhy)) 78 self.assertEqual(0, event['data']['Status'], "Status should be 0") 79 self.assertEqual(clientTxPhy, event['data']['TxPhy']) 80 self.assertEqual(clientRxPhy, event['data']['RxPhy']) 81 82 bt_device_id = 0 83 event = self._server_wait(gatt_event['serv_phy_update']) 84 txPhy = event['data']['TxPhy'] 85 rxPhy = event['data']['RxPhy'] 86 self.log.info("\tServer PHY updated: " + lfmt(txPhy, rxPhy)) 87 self.assertEqual(0, event['data']['Status'], "Status should be 0") 88 self.assertEqual(clientRxPhy, event['data']['TxPhy']) 89 self.assertEqual(clientTxPhy, event['data']['RxPhy']) 90 91 # read the client phy, return (txPhy, rxPhy) 92 def read_client_phy(self): 93 self.cen_ad.droid.gattClientReadPhy(self.bluetooth_gatt) 94 event = self._client_wait(gatt_event['phy_read']) 95 self.assertEqual(0, event['data']['Status'], "Status should be 0") 96 return (event['data']['TxPhy'], event['data']['RxPhy']) 97 98 # read the server phy, return (txPhy, rxPhy) 99 def read_server_phy(self): 100 bt_device_id = 0 101 self.per_ad.droid.gattServerReadPhy(self.gatt_server, bt_device_id) 102 event = self._server_wait(gatt_event['serv_phy_read']) 103 self.assertEqual(0, event['data']['Status'], "Status should be 0") 104 return (event['data']['TxPhy'], event['data']['RxPhy']) 105 106 @BluetoothBaseTest.bt_test_wrap 107 @test_tracker_info(uuid='edb95ae1-97e5-4337-9a60-1e113aa43a4d') 108 def test_phy_read(self): 109 """Test LE read PHY. 110 111 Test LE read PHY. 112 113 Steps: 114 1. Central, Peripheral : read PHY, make sure values are same. 115 2. Central: update PHY. 116 3. Ensure both Central and Peripheral received PHY update event. 117 4. Central, Peripheral: read PHY, make sure values are same. 118 119 Expected Result: 120 Verify that read PHY works properly. 121 122 Returns: 123 Pass if True 124 Fail if False 125 126 TAGS: LE, PHY 127 Priority: 0 128 """ 129 self.cen_ad.droid.gattClientRequestConnectionPriority( 130 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 131 self.pop_initial_phy_update() 132 133 # read phy from client and server, make sure they're same 134 cTxPhy, cRxPhy = self.read_client_phy() 135 sTxPhy, sRxPhy = self.read_server_phy() 136 self.assertEqual(cTxPhy, sTxPhy) 137 self.assertEqual(cRxPhy, sRxPhy) 138 139 self.log.info("Initial connection PHY was: " + lfmt(cTxPhy, cRxPhy)) 140 141 nextTxPhy = (cTxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 142 nextRxPhy = (cRxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 143 144 # try to update PHY from Client 145 self.log.info("Will try to set PHY to: " + lfmt(nextTxPhy, nextRxPhy)) 146 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 147 nextTxPhy, nextRxPhy, 0) 148 self.ensure_both_updated_phy(nextTxPhy, nextRxPhy) 149 150 # read phy on client and server, make sure values are same and equal 151 # the newly set value 152 cTxPhy, cRxPhy = self.read_client_phy() 153 sTxPhy, sRxPhy = self.read_server_phy() 154 self.assertEqual(cTxPhy, sTxPhy) 155 self.assertEqual(cRxPhy, sRxPhy) 156 157 self.assertEqual(nextTxPhy, cTxPhy) 158 self.assertEqual(nextRxPhy, cRxPhy) 159 return True 160 161 @BluetoothBaseTest.bt_test_wrap 162 @test_tracker_info(uuid='6b66af0a-35eb-42af-acd5-9634684f275d') 163 def test_phy_change_20_times(self): 164 """Test PHY update. 165 166 Test LE PHY update. 167 168 Steps: 169 1. Central: read PHY. 170 2. Central: update PHY to 1M, 2M, 1M... 20 times, each time ensuring 171 both client and server received PHY update event. 172 173 Expected Result: 174 Verify that read update PHY worked properly each time. 175 176 Returns: 177 Pass if True 178 Fail if False 179 180 TAGS: LE, PHY 181 Priority: 0 182 """ 183 self.cen_ad.droid.gattClientRequestConnectionPriority( 184 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 185 self.pop_initial_phy_update() 186 187 txPhyB, rxPhyB = self.read_client_phy() 188 txPhyA = (txPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 189 rxPhyA = (rxPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 190 191 self.log.info("Initial connection PHY was: " + lfmt(txPhyB, rxPhyB)) 192 193 for i in range(20): 194 #swap values between iterations 195 txPhy = (i & 1) and txPhyB or txPhyA 196 rxPhy = (i & 1) and rxPhyB or rxPhyA 197 198 self.log.info("Will try to set PHY to: " + lfmt(txPhy, rxPhy)) 199 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 200 txPhy, rxPhy, 0) 201 self.ensure_both_updated_phy(txPhy, rxPhy) 202 return True 203 204 @BluetoothBaseTest.bt_test_wrap 205 @test_tracker_info(uuid='13f28de4-07f4-458c-a3e5-3ba95318616f') 206 def test_phy_change_asym(self): 207 """Test PHY update with asymetric rx and tx PHY. 208 209 Test PHY update with asymetric rx and tx PHY. 210 211 Steps: 212 1. Central: read PHY. 213 2. Central: update PHY to tx: 1M, rx: 2M, ensure both devices received 214 the asymetric update. 215 3. Central: update PHY to tx: 2M, rx: 1M, ensure both devices received 216 the asymetric update. 217 218 Expected Result: 219 Verify that read update PHY worked properly each time. 220 221 Returns: 222 Pass if True 223 Fail if False 224 225 TAGS: LE, PHY 226 Priority: 0 227 """ 228 self.cen_ad.droid.gattClientRequestConnectionPriority( 229 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 230 self.pop_initial_phy_update() 231 232 txPhy, rxPhy = self.read_client_phy() 233 234 self.log.info("Initial connection PHY was: " + lfmt(txPhy, rxPhy)) 235 self.log.info("will try to set PHY to: PHY_LE_1M, PHY_LE_2M") 236 237 #try to update PHY to tx 1M, rx 2M from Client 238 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 239 PHY_LE_1M, PHY_LE_2M, 0) 240 self.ensure_both_updated_phy(PHY_LE_1M, PHY_LE_2M) 241 242 #try to update PHY to TX 2M, RX 1M from Client 243 self.log.info("will try to set PHY to: PHY_LE_2M, PHY_LE_1M") 244 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 245 PHY_LE_2M, PHY_LE_1M, 0) 246 self.ensure_both_updated_phy(PHY_LE_2M, PHY_LE_1M) 247 248 return True 249