1"""WiFi P2P library for multi-devices tests.""" 2# Copyright (C) 2025 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16# Licensed under the Apache License, Version 2.0 (the "License"); 17# you may not use this file except in compliance with the License. 18# You may obtain a copy of the License at 19# 20# http://www.apache.org/licenses/LICENSE-2.0 21# 22# Unless required by applicable law or agreed to in writing, software 23# distributed under the License is distributed on an "AS IS" BASIS, 24# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 25# See the License for the specific language governing permissions and 26# limitations under the License. 27 28# Lint as: python3 29 30from collections.abc import Sequence 31import datetime 32import time 33 34from direct import constants 35from direct import p2p_utils 36from mobly import asserts 37from mobly.controllers import android_device 38from mobly.controllers.android_device_lib import adb 39from mobly.controllers.android_device_lib import callback_handler_v2 40from mobly.snippet import errors 41 42_DEFAULT_TIMEOUT = datetime.timedelta(seconds=30) 43_DEFAULT_SLEEPTIME = 5 44_DEFAULT_FUNCTION_SWITCH_TIME = 10 45_DEFAULT_SERVICE_WAITING_TIME = 20 46_NORMAL_TIMEOUT = datetime.timedelta(seconds=20) 47 48P2P_CONNECT_NEGOTIATION = 0 49P2P_CONNECT_JOIN = 1 50P2P_CONNECT_INVITATION = 2 51 52###################################################### 53# Wifi P2p local service type 54#################################################### 55P2P_LOCAL_SERVICE_UPNP = 0 56P2P_LOCAL_SERVICE_IPP = 1 57P2P_LOCAL_SERVICE_AFP = 2 58 59###################################################### 60# Wifi P2p local service event 61#################################################### 62 63DNSSD_EVENT = 'WifiP2pOnDnsSdServiceAvailable' 64DNSSD_TXRECORD_EVENT = 'WifiP2pOnDnsSdTxtRecordAvailable' 65UPNP_EVENT = 'WifiP2pOnUpnpServiceAvailable' 66 67DNSSD_EVENT_INSTANCENAME_KEY = 'InstanceName' 68DNSSD_EVENT_REGISTRATIONTYPE_KEY = 'RegistrationType' 69DNSSD_TXRECORD_EVENT_FULLDOMAINNAME_KEY = 'FullDomainName' 70DNSSD_TXRECORD_EVENT_TXRECORDMAP_KEY = 'TxtRecordMap' 71UPNP_EVENT_SERVICELIST_KEY = 'ServiceList' 72 73 74###################################################### 75# Wifi P2p UPnP MediaRenderer local service 76###################################################### 77class UpnpTestData(): 78 av_transport = 'urn:schemas-upnp-org:service:AVTransport:1' 79 connection_manager = 'urn:schemas-upnp-org:service:ConnectionManager:1' 80 service_type = 'urn:schemas-upnp-org:device:MediaRenderer:1' 81 uuid = '6859dede-8574-59ab-9332-123456789011' 82 rootdevice = 'upnp:rootdevice' 83 84 85###################################################### 86# Wifi P2p Bonjour IPP & AFP local service 87###################################################### 88class IppTestData(): 89 ipp_instance_name = 'MyPrinter' 90 ipp_registration_type = '_ipp._tcp' 91 ipp_domain_name = 'myprinter._ipp._tcp.local.' 92 ipp_txt_record = {'txtvers': '1', 'pdl': 'application/postscript'} 93 94 95class AfpTestData(): 96 afp_instance_name = 'Example' 97 afp_registration_type = '_afpovertcp._tcp' 98 afp_domain_name = 'example._afpovertcp._tcp.local.' 99 afp_txt_record = {} 100 101 102# Trigger p2p connect to device_go from device_gc. 103def p2p_connect( 104 device_gc: p2p_utils.DeviceState, 105 device_go: p2p_utils.DeviceState, 106 is_reconnect, 107 wps_setup, 108 p2p_connect_type=P2P_CONNECT_NEGOTIATION, 109 go_ad=None, 110): 111 """Trigger p2p connect to ad2 from ad1. 112 113 Args: 114 device_gc: The android device (Client) 115 device_go: The android device (GO) 116 is_reconnect: boolean, if persist group is exist, is_reconnect is true, 117 otherswise is false. 118 wps_setup: which wps connection would like to use 119 p2p_connect_type: enumeration, which type this p2p connection is 120 go_ad: The group owner android device which is used for the invitation 121 connection 122 """ 123 device_gc.ad.log.info( 124 'Create p2p connection from %s to %s via wps: %s type %d', 125 device_gc.ad.serial, 126 device_go.ad.serial, 127 wps_setup, 128 p2p_connect_type, 129 ) 130 131 if p2p_connect_type == P2P_CONNECT_INVITATION: 132 if go_ad is None: 133 go_ad = device_gc 134 p2p_utils.discover_p2p_peer(device_gc, device_go) 135 # GO might be another peer, so ad2 needs to find it first. 136 p2p_utils.discover_group_owner( 137 client=device_go, group_owner_address=go_ad.p2p_device.device_address 138 ) 139 elif p2p_connect_type == P2P_CONNECT_JOIN: 140 peer_p2p_device = p2p_utils.discover_group_owner( 141 client=device_gc, 142 group_owner_address=device_go.p2p_device.device_address, 143 ) 144 asserts.assert_true( 145 peer_p2p_device.is_group_owner, 146 f'P2p device {peer_p2p_device} should be group owner.', 147 ) 148 else: 149 p2p_utils.discover_p2p_peer(device_gc, device_go) 150 time.sleep(_DEFAULT_SLEEPTIME) 151 device_gc.ad.log.info( 152 'from device1: %s -> device2: %s', 153 device_gc.p2p_device.device_address, 154 device_go.p2p_device.device_address, 155 ) 156 p2p_config = constants.WifiP2pConfig( 157 device_address=device_go.p2p_device.device_address, 158 wps_setup=wps_setup, 159 ) 160 if not is_reconnect: 161 p2p_utils.p2p_connect(device_gc, device_go, p2p_config) 162 else: 163 p2p_utils.p2p_reconnect(device_gc, device_go, p2p_config) 164 165 166def is_go(ad): 167 """Check an Android p2p role is Go or not. 168 169 Args: 170 ad: The android device 171 172 Returns: 173 True: An Android device is p2p go 174 False: An Android device is p2p gc 175 """ 176 callback_handler = ad.wifi.wifiP2pRequestConnectionInfo() 177 event = callback_handler.waitAndGet( 178 event_name=constants.ON_CONNECTION_INFO_AVAILABLE, 179 timeout=_DEFAULT_TIMEOUT.total_seconds(), 180 ) 181 if event.data['isGroupOwner']: 182 return True 183 return False 184 185 186def p2p_go_ip(ad): 187 """Get Group Owner IP address. 188 189 Args: 190 ad: The android device 191 192 Returns: 193 GO IP address 194 """ 195 event_handler = ad.wifi.wifiP2pRequestConnectionInfo() 196 result = event_handler.waitAndGet( 197 event_name=constants.ON_CONNECTION_INFO_AVAILABLE, 198 timeout=_DEFAULT_TIMEOUT.total_seconds(), 199 ) 200 go_flag = result.data['isGroupOwner'] 201 ip = result.data['groupOwnerHostAddress'].replace('/', '') 202 ad.log.info('is_go:%s, p2p ip: %s', go_flag, ip) 203 return ip 204 205 206def p2p_disconnect(ad): 207 """Invoke an Android device removeGroup to trigger p2p disconnect. 208 209 Args: 210 ad: The android device 211 """ 212 ad.log.debug('P2p Disconnect') 213 try: 214 ad.wifi.wifiP2pStopPeerDiscovery() 215 ad.wifi.wifiP2pCancelConnect() 216 ad.wifi.wifiP2pRemoveGroup() 217 finally: 218 # Make sure to call `p2pClose`, otherwise `_setup_wifi_p2p` won't be 219 # able to run again. 220 ad.wifi.p2pClose() 221 222 223def p2p_connection_ping_test(dut: android_device.AndroidDevice, peer_ip: str): 224 """Run a ping over the specified device/link. 225 226 Args: 227 dut: Device on which to execute ping6. 228 peer_ip: Scoped IPv4 address of the peer to ping. 229 """ 230 cmd = 'ping -c 3 -W 1 %s' % peer_ip 231 try: 232 dut.log.info(cmd) 233 results = dut.adb.shell(cmd) 234 except adb.AdbError: 235 time.sleep(1) 236 dut.log.info('CMD RETRY: %s', cmd) 237 results = dut.adb.shell(cmd) 238 239 dut.log.info(results) 240 241 242def gen_test_data(service_category): 243 """Based on service category to generator Test Data. 244 245 Args: 246 service_category: P2p local service type, Upnp or Bonjour 247 248 Returns: 249 TestData 250 """ 251 test_data = [] 252 if service_category == P2P_LOCAL_SERVICE_UPNP: 253 test_data.append(UpnpTestData.uuid) 254 test_data.append(UpnpTestData.service_type) 255 test_data.append( 256 [UpnpTestData.av_transport, UpnpTestData.connection_manager] 257 ) 258 elif service_category == P2P_LOCAL_SERVICE_IPP: 259 test_data.append(IppTestData.ipp_instance_name) 260 test_data.append(IppTestData.ipp_registration_type) 261 test_data.append(IppTestData.ipp_txt_record) 262 elif service_category == P2P_LOCAL_SERVICE_AFP: 263 test_data.append(AfpTestData.afp_instance_name) 264 test_data.append(AfpTestData.afp_registration_type) 265 test_data.append(AfpTestData.afp_txt_record) 266 267 return test_data 268 269 270def create_p2p_local_service(ad, service_category): 271 """Based on service_category to create p2p local service on an Android device ad. 272 273 Args: 274 ad: The android device 275 service_category: p2p local service type, UPNP / IPP / AFP, 276 """ 277 test_data = gen_test_data(service_category) 278 ad.log.info( 279 'LocalService = %s, %s, %s', test_data[0], test_data[1], test_data[2] 280 ) 281 if service_category == P2P_LOCAL_SERVICE_UPNP: 282 ad.wifi.wifiP2pAddUpnpLocalService(test_data[0], test_data[1], test_data[2]) 283 elif ( 284 service_category == P2P_LOCAL_SERVICE_IPP 285 or service_category == P2P_LOCAL_SERVICE_AFP 286 ): 287 ad.wifi.wifiP2pAddBonjourLocalService( 288 test_data[0], test_data[1], test_data[2] 289 ) 290 291 292def gen_expect_test_data(service_type, query_string1, query_string2): 293 """Based on serviceCategory to generator expect serviceList. 294 295 Args: 296 service_type: P2p local service type, Upnp or Bonjour 297 query_string1: Query String, NonNull 298 query_string2: Query String, used for Bonjour, Nullable 299 300 Returns: 301 expect_service_list: expect data generated. 302 """ 303 expect_service_list = {} 304 if ( 305 service_type 306 == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR 307 ): 308 ipp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse() 309 afp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse() 310 if query_string1 == IppTestData.ipp_registration_type: 311 if query_string2 == IppTestData.ipp_instance_name: 312 ipp_service.instance_name = '' 313 ipp_service.registration_type = '' 314 ipp_service.full_domain_name = IppTestData.ipp_domain_name 315 ipp_service.txt_record_map = IppTestData.ipp_txt_record 316 expect_service_list[ipp_service.to_string()] = 1 317 return expect_service_list 318 ipp_service.instance_name = IppTestData.ipp_instance_name 319 ipp_service.registration_type = ( 320 IppTestData.ipp_registration_type + '.local.' 321 ) 322 ipp_service.full_domain_name = '' 323 ipp_service.txt_record_map = '' 324 expect_service_list[ipp_service.to_string()] = 1 325 return expect_service_list 326 elif query_string1 == AfpTestData.afp_registration_type: 327 if query_string2 == AfpTestData.afp_instance_name: 328 afp_service.instance_name = '' 329 afp_service.registration_type = '' 330 afp_service.full_domain_name = AfpTestData.afp_domain_name 331 afp_service.txt_record_map = AfpTestData.afp_txt_record 332 expect_service_list[afp_service.to_string()] = 1 333 return expect_service_list 334 ipp_service.instance_name = IppTestData.ipp_instance_name 335 ipp_service.registration_type = ( 336 IppTestData.ipp_registration_type + '.local.' 337 ) 338 ipp_service.full_domain_name = '' 339 ipp_service.txt_record_map = '' 340 expect_service_list[ipp_service.to_string()] = 1 341 342 ipp_service.instance_name = '' 343 ipp_service.registration_type = '' 344 ipp_service.full_domain_name = IppTestData.ipp_domain_name 345 ipp_service.txt_record_map = IppTestData.ipp_txt_record 346 expect_service_list[ipp_service.to_string()] = 1 347 348 afp_service.instance_name = AfpTestData.afp_instance_name 349 afp_service.registration_type = ( 350 AfpTestData.afp_registration_type + '.local.' 351 ) 352 afp_service.full_domain_name = '' 353 afp_service.txt_record_map = '' 354 expect_service_list[afp_service.to_string()] = 1 355 356 afp_service.instance_name = '' 357 afp_service.registration_type = '' 358 afp_service.full_domain_name = AfpTestData.afp_domain_name 359 afp_service.txt_record_map = AfpTestData.afp_txt_record 360 expect_service_list[afp_service.to_string()] = 1 361 362 return expect_service_list 363 elif ( 364 service_type == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP 365 ): 366 upnp_service = ( 367 'uuid:' + UpnpTestData.uuid + '::' + (UpnpTestData.rootdevice) 368 ) 369 expect_service_list[upnp_service] = 1 370 if query_string1 != 'upnp:rootdevice': 371 upnp_service = ( 372 'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.av_transport) 373 ) 374 expect_service_list[upnp_service] = 1 375 upnp_service = ( 376 'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.connection_manager) 377 ) 378 expect_service_list[upnp_service] = 1 379 upnp_service = ( 380 'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.service_type) 381 ) 382 expect_service_list[upnp_service] = 1 383 upnp_service = 'uuid:' + UpnpTestData.uuid 384 expect_service_list[upnp_service] = 1 385 386 return expect_service_list 387 388 389def check_service_query_result(service_list, expect_service_list): 390 """Check serviceList same as expectServiceList or not. 391 392 Args: 393 service_list: ServiceList which get from query result 394 expect_service_list: ServiceList which hardcode in genExpectTestData 395 396 Returns: 397 True: serviceList same as expectServiceList 398 False:Exist discrepancy between serviceList and expectServiceList 399 """ 400 temp_service_list = service_list.copy() 401 temp_expect_service_list = expect_service_list.copy() 402 for service in service_list.keys(): 403 if service in expect_service_list: 404 del temp_service_list[service] 405 del temp_expect_service_list[service] 406 return not temp_expect_service_list and not temp_service_list 407 408 409def _check_all_expect_data(expect_data: dict[str, int]) -> bool: 410 for _, v in expect_data.items(): 411 if v == 1: 412 return False 413 return True 414 415 416def request_service_and_check_result( 417 ad_service_provider: p2p_utils.DeviceState, 418 ad_service_receiver: p2p_utils.DeviceState, 419 service_type: int, 420 query_string1, 421 query_string2, 422): 423 """Based on service type and query info, check service request result. 424 425 Check same as expect or not on an Android device ad_service_receiver. 426 And remove p2p service request after result check. 427 428 Args: 429 ad_service_provider: The android device which provide p2p local service 430 ad_service_receiver: The android device which query p2p local service 431 service_type: P2p local service type, Upnp or Bonjour 432 query_string1: Query String, NonNull 433 query_string2: Query String, used for Bonjour, Nullable 434 435 Returns: 436 0: if service request result is as expected. 437 """ 438 expect_data = gen_expect_test_data(service_type, query_string1, query_string2) 439 p2p_utils.discover_p2p_peer(ad_service_receiver, ad_service_provider) 440 ad_service_receiver.ad.wifi.wifiP2pStopPeerDiscovery() 441 ad_service_receiver.ad.wifi.wifiP2pClearServiceRequests() 442 time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME) 443 444 service_id = 0 445 if ( 446 service_type 447 == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR 448 ): 449 ad_service_receiver.ad.log.info( 450 'Request bonjour service in %s with Query String %s and %s ' 451 % (ad_service_receiver.ad.serial, query_string1, query_string2) 452 ) 453 ad_service_receiver.ad.log.info('expectData 1st %s' % expect_data) 454 if query_string1: 455 service_id = ad_service_receiver.ad.wifi.wifiP2pAddBonjourServiceRequest( 456 query_string2, # instanceName 457 query_string1, # serviceType 458 ) 459 else: 460 service_id = ad_service_receiver.ad.wifi.wifiP2pAddServiceRequest( 461 service_type 462 ) 463 time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME) 464 ad_service_receiver.ad.log.info('service request id %s' % service_id) 465 p2p_utils.set_dns_sd_response_listeners(ad_service_receiver) 466 ad_service_receiver.ad.wifi.wifiP2pDiscoverServices() 467 ad_service_receiver.ad.log.info('Check Service Listener') 468 time.sleep(_DEFAULT_SERVICE_WAITING_TIME) 469 check_discovered_dns_sd_response( 470 ad_service_receiver, 471 expected_responses=expect_data, 472 expected_src_device_address=( 473 ad_service_provider.p2p_device.device_address 474 ), 475 channel_id=ad_service_receiver.channel_ids[0], 476 timeout=_NORMAL_TIMEOUT, 477 ) 478 ad_service_receiver.ad.log.info('expectData 2nd %s' % expect_data) 479 check_discovered_dns_sd_txt_record( 480 ad_service_receiver, 481 expected_records=expect_data, 482 expected_src_device_address=( 483 ad_service_provider.p2p_device.device_address 484 ), 485 channel_id=ad_service_receiver.channel_ids[0], 486 timeout=_NORMAL_TIMEOUT, 487 ) 488 got_all_expects = _check_all_expect_data(expect_data) 489 ad_service_receiver.ad.log.info( 490 'Got all the expect data : %s', got_all_expects 491 ) 492 asserts.assert_true( 493 got_all_expects, 494 "Don't got all the expect data.", 495 ) 496 elif ( 497 service_type == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP 498 ): 499 ad_service_receiver.ad.log.info( 500 'Request upnp service in %s with Query String %s ' 501 % (ad_service_receiver.ad.serial, query_string1) 502 ) 503 ad_service_receiver.ad.log.info('expectData %s' % expect_data) 504 if query_string1: 505 service_id = ad_service_receiver.ad.wifi.wifiP2pAddUpnpServiceRequest( 506 query_string1 507 ) 508 else: 509 service_id = ad_service_receiver.ad.wifi.wifiP2pAddServiceRequest( 510 service_type 511 ) 512 p2p_utils.set_upnp_response_listener(ad_service_receiver) 513 ad_service_receiver.ad.wifi.wifiP2pDiscoverServices() 514 ad_service_receiver.ad.log.info('Check Service Listener') 515 time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME) 516 p2p_utils.check_discovered_services( 517 ad_service_receiver, 518 ad_service_provider.p2p_device.device_address, 519 expected_dns_sd_sequence=None, 520 expected_dns_txt_sequence=None, 521 expected_upnp_sequence=expect_data, 522 ) 523 ad_service_receiver.ad.wifi.wifiP2pRemoveServiceRequest(service_id) 524 return 0 525 526 527def request_service_and_check_result_with_retry( 528 ad_service_provider, 529 ad_service_receiver, 530 service_type, 531 query_string1, 532 query_string2, 533 retry_count=3, 534): 535 """allow failures for requestServiceAndCheckResult. 536 537 Service 538 539 discovery might fail unexpectedly because the request packet might not be 540 received by the service responder due to p2p state switch. 541 542 Args: 543 ad_service_provider: The android device which provide p2p local service 544 ad_service_receiver: The android device which query p2p local service 545 service_type: P2p local service type, Upnp or Bonjour 546 query_string1: Query String, NonNull 547 query_string2: Query String, used for Bonjour, Nullable 548 retry_count: maximum retry count, default is 3 549 """ 550 ret = 0 551 while retry_count > 0: 552 ret = request_service_and_check_result( 553 ad_service_provider, 554 ad_service_receiver, 555 service_type, 556 query_string1, 557 query_string2, 558 ) 559 if ret == 0: 560 break 561 retry_count -= 1 562 563 asserts.assert_equal(0, ret, 'cannot find any services with retries.') 564 565 566def _check_no_discovered_service( 567 ad: android_device.AndroidDevice, 568 callback_handler: callback_handler_v2.CallbackHandlerV2, 569 event_name: str, 570 expected_src_device_address: str, 571 timeout: datetime.timedelta = _DEFAULT_TIMEOUT, 572): 573 """Checks that no service is received from the specified source device.""" 574 def _is_expected_event(event): 575 src_device = constants.WifiP2pDevice.from_dict( 576 event.data['sourceDevice'] 577 ) 578 return src_device.device_address == expected_src_device_address 579 580 # Set to a small timeout to allow pulling all received events 581 if timeout.total_seconds() <= 1: 582 timeout = datetime.timedelta(seconds=1) 583 try: 584 event = callback_handler.waitForEvent( 585 event_name=event_name, 586 predicate=_is_expected_event, 587 timeout=timeout.total_seconds(), 588 ) 589 except errors.CallbackHandlerTimeoutError: 590 # Timeout error is expected as there should not be any qualified service 591 return 592 asserts.assert_is_none( 593 event, 594 f'{ad} should not discover p2p service. Discovered: {event}', 595 ) 596 597 598def check_discovered_dns_sd_response( 599 device: p2p_utils.DeviceState, 600 expected_responses: Sequence[Sequence[str, str]], 601 expected_src_device_address: str, 602 channel_id: int | None = None, 603 timeout: datetime.timedelta = _DEFAULT_TIMEOUT, 604): 605 """Check discovered DNS SD responses. 606 607 If no responses are expected, check that no DNS SD response appear within 608 timeout. Otherwise, wait for all expected responses within timeout. 609 610 This assumes that Bonjour service listener is set by 611 `set_dns_sd_response_listeners`. 612 613 Args: 614 device: The device that is discovering DNS SD responses. 615 expected_responses: The expected DNS SD responses. 616 expected_src_device_address: This only checks services that are from the 617 expected source device. 618 channel_id: The channel to check for expected responses. 619 timeout: The wait timeout. 620 """ 621 channel_id = channel_id or device.channel_ids[0] 622 callback_handler = device.dns_sd_response_listeners[channel_id] 623 624 def _all_service_received(event): 625 nonlocal expected_responses 626 src_device = constants.WifiP2pDevice.from_dict( 627 event.data['sourceDevice'] 628 ) 629 if src_device.device_address != expected_src_device_address: 630 return False 631 registration_type = event.data['registrationType'] 632 instance_name = event.data['instanceName'] 633 service_item = instance_name + registration_type 634 device.ad.log.info('Received DNS SD response: %s', service_item) 635 if service_item in expected_responses: 636 expected_responses[service_item] = 0 637 _check_all_expect_data(expected_responses) 638 639 device.ad.log.info('Waiting for DNS SD services: %s', expected_responses) 640 # Set to a small timeout to allow pulling all received events 641 if timeout.total_seconds() <= 1: 642 timeout = datetime.timedelta(seconds=1) 643 try: 644 callback_handler.waitForEvent( 645 event_name=constants.ON_DNS_SD_SERVICE_AVAILABLE, 646 predicate=_all_service_received, 647 timeout=timeout.total_seconds(), 648 ) 649 except errors.CallbackHandlerTimeoutError: 650 device.ad.log.info(f'need to wait for services: {expected_responses}') 651 652 653def check_discovered_dns_sd_txt_record( 654 device: p2p_utils.DeviceState, 655 expected_records: Sequence[Sequence[str, dict[str, str]]], 656 expected_src_device_address: str, 657 channel_id: int | None = None, 658 timeout: datetime.timedelta = _DEFAULT_TIMEOUT, 659): 660 """Check discovered DNS SD TXT records. 661 662 If no records are expected, check that no DNS SD TXT record appear within 663 timeout. Otherwise, wait for all expected records within timeout. 664 665 This assumes that Bonjour service listener is set by 666 `set_dns_sd_response_listeners`. 667 668 Args: 669 device: The device that is discovering DNS SD TXT records. 670 expected_records: The expected DNS SD TXT records. 671 expected_src_device_address: This only checks services that are from the 672 expected source device. 673 channel_id: The channel to check for expected records. 674 timeout: The wait timeout. 675 """ 676 channel_id = channel_id or device.channel_ids[0] 677 idx = device.channel_ids.index(channel_id) 678 callback_handler = device.dns_sd_response_listeners[idx] 679 680 device.ad.log.info('Expected DNS SD TXT records: %s', expected_records) 681 def _all_service_received(event): 682 nonlocal expected_records 683 src_device = constants.WifiP2pDevice.from_dict( 684 event.data['sourceDevice'] 685 ) 686 if src_device.device_address != expected_src_device_address: 687 return False 688 full_domain_name = event.data['fullDomainName'] 689 txt_record_map = event.data['txtRecordMap'] 690 record_to_remove = full_domain_name + str(txt_record_map) 691 device.ad.log.info('Received DNS SD TXT record: %s', record_to_remove) 692 if record_to_remove in expected_records: 693 expected_records[record_to_remove] = 0 694 _check_all_expect_data(expected_records) 695 696 device.ad.log.info('Waiting for DNS SD TXT records: %s', expected_records) 697 # Set to a small timeout to allow pulling all received events 698 if timeout.total_seconds() <= 1: 699 timeout = datetime.timedelta(seconds=1) 700 try: 701 callback_handler.waitForEvent( 702 event_name=constants.ON_DNS_SD_TXT_RECORD_AVAILABLE, 703 predicate=_all_service_received, 704 timeout=timeout.total_seconds(), 705 ) 706 except errors.CallbackHandlerTimeoutError: 707 device.ad.log.info(f'need to wait for services: {expected_records}') 708 709 710class WifiP2PEnums: 711 """Enums for WifiP2p.""" 712 713 class WifiP2pConfig: 714 DEVICEADDRESS_KEY = 'deviceAddress' 715 WPSINFO_KEY = 'wpsInfo' 716 GO_INTENT_KEY = 'groupOwnerIntent' 717 NETID_KEY = 'netId' 718 NETWORK_NAME = 'networkName' 719 PASSPHRASE = 'passphrase' 720 GROUP_BAND = 'groupOwnerBand' 721 722 class WpsInfo: 723 WPS_SETUP_KEY = 'setup' 724 BSSID_KEY = 'BSSID' 725 WPS_PIN_KEY = 'pin' 726 WIFI_WPS_INFO_PBC = 0 727 WIFI_WPS_INFO_DISPLAY = 1 728 WIFI_WPS_INFO_KEYPAD = 2 729 WIFI_WPS_INFO_LABEL = 3 730 WIFI_WPS_INFO_INVALID = 4 731 732 class WifiP2pServiceInfo: 733 # Macros for wifi p2p. 734 WIFI_P2P_SERVICE_TYPE_ALL = 0 735 WIFI_P2P_SERVICE_TYPE_BONJOUR = 1 736 WIFI_P2P_SERVICE_TYPE_UPNP = 2 737 WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255 738 739 class WifiP2pDnsSdServiceResponse: 740 instance_name = '' 741 registration_type = '' 742 full_domain_name = '' 743 txt_record_map = {} 744 745 def __init__(self): 746 pass 747 748 def to_string(self): 749 return ( 750 self.instance_name 751 + self.registration_type 752 + (self.full_domain_name + str(self.txt_record_map)) 753 ) 754