• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""WiFi P2P library for multi-devices tests."""
2#  Copyright (C) 2025 The Android Open Source Project
3#
4#  Licensed under the Apache License, Version 2.0 (the "License");
5#  you may not use this file except in compliance with the License.
6#  You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#  Unless required by applicable law or agreed to in writing, software
11#  distributed under the License is distributed on an "AS IS" BASIS,
12#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#  See the License for the specific language governing permissions and
14#  limitations under the License.
15#
16#  Licensed under the Apache License, Version 2.0 (the "License");
17#  you may not use this file except in compliance with the License.
18#  You may obtain a copy of the License at
19#
20#       http://www.apache.org/licenses/LICENSE-2.0
21#
22#  Unless required by applicable law or agreed to in writing, software
23#  distributed under the License is distributed on an "AS IS" BASIS,
24#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25#  See the License for the specific language governing permissions and
26#  limitations under the License.
27
28# Lint as: python3
29
30from collections.abc import Sequence
31import datetime
32import time
33
34from direct import constants
35from direct import p2p_utils
36from mobly import asserts
37from mobly.controllers import android_device
38from mobly.controllers.android_device_lib import adb
39from mobly.controllers.android_device_lib import callback_handler_v2
40from mobly.snippet import errors
41
42_DEFAULT_TIMEOUT = datetime.timedelta(seconds=30)
43_DEFAULT_SLEEPTIME = 5
44_DEFAULT_FUNCTION_SWITCH_TIME = 10
45_DEFAULT_SERVICE_WAITING_TIME = 20
46_NORMAL_TIMEOUT = datetime.timedelta(seconds=20)
47
48P2P_CONNECT_NEGOTIATION = 0
49P2P_CONNECT_JOIN = 1
50P2P_CONNECT_INVITATION = 2
51
52######################################################
53# Wifi P2p local service type
54####################################################
55P2P_LOCAL_SERVICE_UPNP = 0
56P2P_LOCAL_SERVICE_IPP = 1
57P2P_LOCAL_SERVICE_AFP = 2
58
59######################################################
60# Wifi P2p local service event
61####################################################
62
63DNSSD_EVENT = 'WifiP2pOnDnsSdServiceAvailable'
64DNSSD_TXRECORD_EVENT = 'WifiP2pOnDnsSdTxtRecordAvailable'
65UPNP_EVENT = 'WifiP2pOnUpnpServiceAvailable'
66
67DNSSD_EVENT_INSTANCENAME_KEY = 'InstanceName'
68DNSSD_EVENT_REGISTRATIONTYPE_KEY = 'RegistrationType'
69DNSSD_TXRECORD_EVENT_FULLDOMAINNAME_KEY = 'FullDomainName'
70DNSSD_TXRECORD_EVENT_TXRECORDMAP_KEY = 'TxtRecordMap'
71UPNP_EVENT_SERVICELIST_KEY = 'ServiceList'
72
73
74######################################################
75# Wifi P2p UPnP MediaRenderer local service
76######################################################
77class UpnpTestData():
78    av_transport = 'urn:schemas-upnp-org:service:AVTransport:1'
79    connection_manager = 'urn:schemas-upnp-org:service:ConnectionManager:1'
80    service_type = 'urn:schemas-upnp-org:device:MediaRenderer:1'
81    uuid = '6859dede-8574-59ab-9332-123456789011'
82    rootdevice = 'upnp:rootdevice'
83
84
85######################################################
86# Wifi P2p Bonjour IPP & AFP local service
87######################################################
88class IppTestData():
89    ipp_instance_name = 'MyPrinter'
90    ipp_registration_type = '_ipp._tcp'
91    ipp_domain_name = 'myprinter._ipp._tcp.local.'
92    ipp_txt_record = {'txtvers': '1', 'pdl': 'application/postscript'}
93
94
95class AfpTestData():
96    afp_instance_name = 'Example'
97    afp_registration_type = '_afpovertcp._tcp'
98    afp_domain_name = 'example._afpovertcp._tcp.local.'
99    afp_txt_record = {}
100
101
102# Trigger p2p connect to device_go from device_gc.
103def p2p_connect(
104    device_gc: p2p_utils.DeviceState,
105    device_go: p2p_utils.DeviceState,
106    is_reconnect,
107    wps_setup,
108    p2p_connect_type=P2P_CONNECT_NEGOTIATION,
109    go_ad=None,
110):
111  """Trigger p2p connect to ad2 from ad1.
112
113  Args:
114      device_gc: The android device (Client)
115      device_go: The android device (GO)
116      is_reconnect: boolean, if persist group is exist, is_reconnect is true,
117        otherswise is false.
118      wps_setup: which wps connection would like to use
119      p2p_connect_type: enumeration, which type this p2p connection is
120      go_ad: The group owner android device which is used for the invitation
121        connection
122  """
123  device_gc.ad.log.info(
124      'Create p2p connection from %s to %s via wps: %s type %d',
125      device_gc.ad.serial,
126      device_go.ad.serial,
127      wps_setup,
128      p2p_connect_type,
129  )
130
131  if p2p_connect_type == P2P_CONNECT_INVITATION:
132    if go_ad is None:
133      go_ad = device_gc
134    p2p_utils.discover_p2p_peer(device_gc, device_go)
135    # GO might be another peer, so ad2 needs to find it first.
136    p2p_utils.discover_group_owner(
137        client=device_go, group_owner_address=go_ad.p2p_device.device_address
138    )
139  elif p2p_connect_type == P2P_CONNECT_JOIN:
140    peer_p2p_device = p2p_utils.discover_group_owner(
141        client=device_gc,
142        group_owner_address=device_go.p2p_device.device_address,
143    )
144    asserts.assert_true(
145        peer_p2p_device.is_group_owner,
146        f'P2p device {peer_p2p_device} should be group owner.',
147    )
148  else:
149    p2p_utils.discover_p2p_peer(device_gc, device_go)
150  time.sleep(_DEFAULT_SLEEPTIME)
151  device_gc.ad.log.info(
152      'from device1: %s -> device2: %s',
153      device_gc.p2p_device.device_address,
154      device_go.p2p_device.device_address,
155  )
156  p2p_config = constants.WifiP2pConfig(
157      device_address=device_go.p2p_device.device_address,
158      wps_setup=wps_setup,
159  )
160  if not is_reconnect:
161    p2p_utils.p2p_connect(device_gc, device_go, p2p_config)
162  else:
163    p2p_utils.p2p_reconnect(device_gc, device_go, p2p_config)
164
165
166def is_go(ad):
167  """Check an Android p2p role is Go or not.
168
169  Args:
170      ad: The android device
171
172  Returns:
173      True: An Android device is p2p go
174      False: An Android device is p2p gc
175  """
176  callback_handler = ad.wifi.wifiP2pRequestConnectionInfo()
177  event = callback_handler.waitAndGet(
178      event_name=constants.ON_CONNECTION_INFO_AVAILABLE,
179      timeout=_DEFAULT_TIMEOUT.total_seconds(),
180  )
181  if event.data['isGroupOwner']:
182    return True
183  return False
184
185
186def p2p_go_ip(ad):
187  """Get Group Owner IP address.
188
189  Args:
190      ad: The android device
191
192  Returns:
193      GO IP address
194  """
195  event_handler = ad.wifi.wifiP2pRequestConnectionInfo()
196  result = event_handler.waitAndGet(
197      event_name=constants.ON_CONNECTION_INFO_AVAILABLE,
198      timeout=_DEFAULT_TIMEOUT.total_seconds(),
199  )
200  go_flag = result.data['isGroupOwner']
201  ip = result.data['groupOwnerHostAddress'].replace('/', '')
202  ad.log.info('is_go:%s, p2p ip: %s', go_flag, ip)
203  return ip
204
205
206def p2p_disconnect(ad):
207  """Invoke an Android device removeGroup to trigger p2p disconnect.
208
209  Args:
210      ad: The android device
211  """
212  ad.log.debug('P2p Disconnect')
213  try:
214    ad.wifi.wifiP2pStopPeerDiscovery()
215    ad.wifi.wifiP2pCancelConnect()
216    ad.wifi.wifiP2pRemoveGroup()
217  finally:
218    # Make sure to call `p2pClose`, otherwise `_setup_wifi_p2p` won't be
219    # able to run again.
220    ad.wifi.p2pClose()
221
222
223def p2p_connection_ping_test(dut: android_device.AndroidDevice, peer_ip: str):
224  """Run a ping over the specified device/link.
225
226  Args:
227      dut: Device on which to execute ping6.
228      peer_ip: Scoped IPv4 address of the peer to ping.
229  """
230  cmd = 'ping -c 3 -W 1 %s' % peer_ip
231  try:
232    dut.log.info(cmd)
233    results = dut.adb.shell(cmd)
234  except adb.AdbError:
235    time.sleep(1)
236    dut.log.info('CMD RETRY: %s', cmd)
237    results = dut.adb.shell(cmd)
238
239  dut.log.info(results)
240
241
242def gen_test_data(service_category):
243  """Based on service category to generator Test Data.
244
245  Args:
246      service_category: P2p local service type, Upnp or Bonjour
247
248  Returns:
249      TestData
250  """
251  test_data = []
252  if service_category == P2P_LOCAL_SERVICE_UPNP:
253    test_data.append(UpnpTestData.uuid)
254    test_data.append(UpnpTestData.service_type)
255    test_data.append(
256        [UpnpTestData.av_transport, UpnpTestData.connection_manager]
257    )
258  elif service_category == P2P_LOCAL_SERVICE_IPP:
259    test_data.append(IppTestData.ipp_instance_name)
260    test_data.append(IppTestData.ipp_registration_type)
261    test_data.append(IppTestData.ipp_txt_record)
262  elif service_category == P2P_LOCAL_SERVICE_AFP:
263    test_data.append(AfpTestData.afp_instance_name)
264    test_data.append(AfpTestData.afp_registration_type)
265    test_data.append(AfpTestData.afp_txt_record)
266
267  return test_data
268
269
270def create_p2p_local_service(ad, service_category):
271  """Based on service_category to create p2p local service on an Android device ad.
272
273  Args:
274      ad: The android device
275      service_category: p2p local service type, UPNP / IPP / AFP,
276  """
277  test_data = gen_test_data(service_category)
278  ad.log.info(
279      'LocalService = %s, %s, %s', test_data[0], test_data[1], test_data[2]
280  )
281  if service_category == P2P_LOCAL_SERVICE_UPNP:
282    ad.wifi.wifiP2pAddUpnpLocalService(test_data[0], test_data[1], test_data[2])
283  elif (
284      service_category == P2P_LOCAL_SERVICE_IPP
285      or service_category == P2P_LOCAL_SERVICE_AFP
286  ):
287    ad.wifi.wifiP2pAddBonjourLocalService(
288        test_data[0], test_data[1], test_data[2]
289    )
290
291
292def gen_expect_test_data(service_type, query_string1, query_string2):
293  """Based on serviceCategory to generator expect serviceList.
294
295  Args:
296      service_type: P2p local service type, Upnp or Bonjour
297      query_string1: Query String, NonNull
298      query_string2: Query String, used for Bonjour, Nullable
299
300  Returns:
301      expect_service_list: expect data generated.
302  """
303  expect_service_list = {}
304  if (
305      service_type
306      == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR
307  ):
308    ipp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse()
309    afp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse()
310    if query_string1 == IppTestData.ipp_registration_type:
311      if query_string2 == IppTestData.ipp_instance_name:
312        ipp_service.instance_name = ''
313        ipp_service.registration_type = ''
314        ipp_service.full_domain_name = IppTestData.ipp_domain_name
315        ipp_service.txt_record_map = IppTestData.ipp_txt_record
316        expect_service_list[ipp_service.to_string()] = 1
317        return expect_service_list
318      ipp_service.instance_name = IppTestData.ipp_instance_name
319      ipp_service.registration_type = (
320          IppTestData.ipp_registration_type + '.local.'
321      )
322      ipp_service.full_domain_name = ''
323      ipp_service.txt_record_map = ''
324      expect_service_list[ipp_service.to_string()] = 1
325      return expect_service_list
326    elif query_string1 == AfpTestData.afp_registration_type:
327      if query_string2 == AfpTestData.afp_instance_name:
328        afp_service.instance_name = ''
329        afp_service.registration_type = ''
330        afp_service.full_domain_name = AfpTestData.afp_domain_name
331        afp_service.txt_record_map = AfpTestData.afp_txt_record
332        expect_service_list[afp_service.to_string()] = 1
333        return expect_service_list
334    ipp_service.instance_name = IppTestData.ipp_instance_name
335    ipp_service.registration_type = (
336        IppTestData.ipp_registration_type + '.local.'
337    )
338    ipp_service.full_domain_name = ''
339    ipp_service.txt_record_map = ''
340    expect_service_list[ipp_service.to_string()] = 1
341
342    ipp_service.instance_name = ''
343    ipp_service.registration_type = ''
344    ipp_service.full_domain_name = IppTestData.ipp_domain_name
345    ipp_service.txt_record_map = IppTestData.ipp_txt_record
346    expect_service_list[ipp_service.to_string()] = 1
347
348    afp_service.instance_name = AfpTestData.afp_instance_name
349    afp_service.registration_type = (
350        AfpTestData.afp_registration_type + '.local.'
351    )
352    afp_service.full_domain_name = ''
353    afp_service.txt_record_map = ''
354    expect_service_list[afp_service.to_string()] = 1
355
356    afp_service.instance_name = ''
357    afp_service.registration_type = ''
358    afp_service.full_domain_name = AfpTestData.afp_domain_name
359    afp_service.txt_record_map = AfpTestData.afp_txt_record
360    expect_service_list[afp_service.to_string()] = 1
361
362    return expect_service_list
363  elif (
364      service_type == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP
365  ):
366    upnp_service = (
367        'uuid:' + UpnpTestData.uuid + '::' + (UpnpTestData.rootdevice)
368    )
369    expect_service_list[upnp_service] = 1
370    if query_string1 != 'upnp:rootdevice':
371      upnp_service = (
372          'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.av_transport)
373      )
374      expect_service_list[upnp_service] = 1
375      upnp_service = (
376          'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.connection_manager)
377      )
378      expect_service_list[upnp_service] = 1
379      upnp_service = (
380          'uuid:' + UpnpTestData.uuid + ('::' + UpnpTestData.service_type)
381      )
382      expect_service_list[upnp_service] = 1
383      upnp_service = 'uuid:' + UpnpTestData.uuid
384      expect_service_list[upnp_service] = 1
385
386  return expect_service_list
387
388
389def check_service_query_result(service_list, expect_service_list):
390  """Check serviceList same as expectServiceList or not.
391
392  Args:
393      service_list: ServiceList which get from query result
394      expect_service_list: ServiceList which hardcode in genExpectTestData
395
396  Returns:
397      True: serviceList  same as expectServiceList
398      False:Exist discrepancy between serviceList and expectServiceList
399  """
400  temp_service_list = service_list.copy()
401  temp_expect_service_list = expect_service_list.copy()
402  for service in service_list.keys():
403    if service in expect_service_list:
404      del temp_service_list[service]
405      del temp_expect_service_list[service]
406  return not temp_expect_service_list and not temp_service_list
407
408
409def _check_all_expect_data(expect_data: dict[str, int]) -> bool:
410  for _, v in expect_data.items():
411    if v == 1:
412      return False
413  return True
414
415
416def request_service_and_check_result(
417    ad_service_provider: p2p_utils.DeviceState,
418    ad_service_receiver: p2p_utils.DeviceState,
419    service_type: int,
420    query_string1,
421    query_string2,
422):
423  """Based on service type and query info, check service request result.
424
425  Check same as expect or not on an Android device ad_service_receiver.
426  And remove p2p service request after result check.
427
428  Args:
429      ad_service_provider: The android device which provide p2p local service
430      ad_service_receiver: The android device which query p2p local service
431      service_type: P2p local service type, Upnp or Bonjour
432      query_string1: Query String, NonNull
433      query_string2: Query String, used for Bonjour, Nullable
434
435  Returns:
436      0: if service request result is as expected.
437  """
438  expect_data = gen_expect_test_data(service_type, query_string1, query_string2)
439  p2p_utils.discover_p2p_peer(ad_service_receiver, ad_service_provider)
440  ad_service_receiver.ad.wifi.wifiP2pStopPeerDiscovery()
441  ad_service_receiver.ad.wifi.wifiP2pClearServiceRequests()
442  time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME)
443
444  service_id = 0
445  if (
446      service_type
447      == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR
448  ):
449    ad_service_receiver.ad.log.info(
450        'Request bonjour service in %s with Query String %s and %s '
451        % (ad_service_receiver.ad.serial, query_string1, query_string2)
452    )
453    ad_service_receiver.ad.log.info('expectData 1st %s' % expect_data)
454    if query_string1:
455      service_id = ad_service_receiver.ad.wifi.wifiP2pAddBonjourServiceRequest(
456          query_string2,  # instanceName
457          query_string1,  # serviceType
458      )
459    else:
460      service_id = ad_service_receiver.ad.wifi.wifiP2pAddServiceRequest(
461          service_type
462      )
463    time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME)
464    ad_service_receiver.ad.log.info('service request id %s' % service_id)
465    p2p_utils.set_dns_sd_response_listeners(ad_service_receiver)
466    ad_service_receiver.ad.wifi.wifiP2pDiscoverServices()
467    ad_service_receiver.ad.log.info('Check Service Listener')
468    time.sleep(_DEFAULT_SERVICE_WAITING_TIME)
469    check_discovered_dns_sd_response(
470        ad_service_receiver,
471        expected_responses=expect_data,
472        expected_src_device_address=(
473            ad_service_provider.p2p_device.device_address
474        ),
475        channel_id=ad_service_receiver.channel_ids[0],
476        timeout=_NORMAL_TIMEOUT,
477    )
478    ad_service_receiver.ad.log.info('expectData 2nd %s' % expect_data)
479    check_discovered_dns_sd_txt_record(
480        ad_service_receiver,
481        expected_records=expect_data,
482        expected_src_device_address=(
483            ad_service_provider.p2p_device.device_address
484        ),
485        channel_id=ad_service_receiver.channel_ids[0],
486        timeout=_NORMAL_TIMEOUT,
487    )
488    got_all_expects = _check_all_expect_data(expect_data)
489    ad_service_receiver.ad.log.info(
490        'Got all the expect data : %s', got_all_expects
491    )
492    asserts.assert_true(
493        got_all_expects,
494        "Don't got all the expect data.",
495    )
496  elif (
497      service_type == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP
498  ):
499    ad_service_receiver.ad.log.info(
500        'Request upnp service in %s with Query String %s '
501        % (ad_service_receiver.ad.serial, query_string1)
502    )
503    ad_service_receiver.ad.log.info('expectData %s' % expect_data)
504    if query_string1:
505      service_id = ad_service_receiver.ad.wifi.wifiP2pAddUpnpServiceRequest(
506          query_string1
507      )
508    else:
509      service_id = ad_service_receiver.ad.wifi.wifiP2pAddServiceRequest(
510          service_type
511      )
512    p2p_utils.set_upnp_response_listener(ad_service_receiver)
513    ad_service_receiver.ad.wifi.wifiP2pDiscoverServices()
514    ad_service_receiver.ad.log.info('Check Service Listener')
515    time.sleep(_DEFAULT_FUNCTION_SWITCH_TIME)
516    p2p_utils.check_discovered_services(
517        ad_service_receiver,
518        ad_service_provider.p2p_device.device_address,
519        expected_dns_sd_sequence=None,
520        expected_dns_txt_sequence=None,
521        expected_upnp_sequence=expect_data,
522    )
523  ad_service_receiver.ad.wifi.wifiP2pRemoveServiceRequest(service_id)
524  return 0
525
526
527def request_service_and_check_result_with_retry(
528    ad_service_provider,
529    ad_service_receiver,
530    service_type,
531    query_string1,
532    query_string2,
533    retry_count=3,
534):
535  """allow failures for requestServiceAndCheckResult.
536
537  Service
538
539      discovery might fail unexpectedly because the request packet might not be
540      received by the service responder due to p2p state switch.
541
542  Args:
543      ad_service_provider: The android device which provide p2p local service
544      ad_service_receiver: The android device which query p2p local service
545      service_type: P2p local service type, Upnp or Bonjour
546      query_string1: Query String, NonNull
547      query_string2: Query String, used for Bonjour, Nullable
548      retry_count: maximum retry count, default is 3
549  """
550  ret = 0
551  while retry_count > 0:
552    ret = request_service_and_check_result(
553        ad_service_provider,
554        ad_service_receiver,
555        service_type,
556        query_string1,
557        query_string2,
558    )
559    if ret == 0:
560      break
561    retry_count -= 1
562
563  asserts.assert_equal(0, ret, 'cannot find any services with retries.')
564
565
566def _check_no_discovered_service(
567    ad: android_device.AndroidDevice,
568    callback_handler: callback_handler_v2.CallbackHandlerV2,
569    event_name: str,
570    expected_src_device_address: str,
571    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
572):
573    """Checks that no service is received from the specified source device."""
574    def _is_expected_event(event):
575        src_device = constants.WifiP2pDevice.from_dict(
576            event.data['sourceDevice']
577        )
578        return src_device.device_address == expected_src_device_address
579
580    # Set to a small timeout to allow pulling all received events
581    if timeout.total_seconds() <= 1:
582        timeout = datetime.timedelta(seconds=1)
583    try:
584        event = callback_handler.waitForEvent(
585            event_name=event_name,
586            predicate=_is_expected_event,
587            timeout=timeout.total_seconds(),
588        )
589    except errors.CallbackHandlerTimeoutError:
590        # Timeout error is expected as there should not be any qualified service
591        return
592    asserts.assert_is_none(
593        event,
594        f'{ad} should not discover p2p service. Discovered: {event}',
595    )
596
597
598def check_discovered_dns_sd_response(
599    device: p2p_utils.DeviceState,
600    expected_responses: Sequence[Sequence[str, str]],
601    expected_src_device_address: str,
602    channel_id: int | None = None,
603    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
604):
605    """Check discovered DNS SD responses.
606
607    If no responses are expected, check that no DNS SD response appear within
608    timeout. Otherwise, wait for all expected responses within timeout.
609
610    This assumes that Bonjour service listener is set by
611    `set_dns_sd_response_listeners`.
612
613    Args:
614        device: The device that is discovering DNS SD responses.
615        expected_responses: The expected DNS SD responses.
616        expected_src_device_address: This only checks services that are from the
617            expected source device.
618        channel_id: The channel to check for expected responses.
619        timeout: The wait timeout.
620    """
621    channel_id = channel_id or device.channel_ids[0]
622    callback_handler = device.dns_sd_response_listeners[channel_id]
623
624    def _all_service_received(event):
625        nonlocal expected_responses
626        src_device = constants.WifiP2pDevice.from_dict(
627            event.data['sourceDevice']
628        )
629        if src_device.device_address != expected_src_device_address:
630            return False
631        registration_type = event.data['registrationType']
632        instance_name = event.data['instanceName']
633        service_item = instance_name + registration_type
634        device.ad.log.info('Received DNS SD response: %s', service_item)
635        if service_item in expected_responses:
636            expected_responses[service_item] = 0
637        _check_all_expect_data(expected_responses)
638
639    device.ad.log.info('Waiting for DNS SD services: %s', expected_responses)
640    # Set to a small timeout to allow pulling all received events
641    if timeout.total_seconds() <= 1:
642        timeout = datetime.timedelta(seconds=1)
643    try:
644        callback_handler.waitForEvent(
645            event_name=constants.ON_DNS_SD_SERVICE_AVAILABLE,
646            predicate=_all_service_received,
647            timeout=timeout.total_seconds(),
648        )
649    except errors.CallbackHandlerTimeoutError:
650        device.ad.log.info(f'need to wait for services: {expected_responses}')
651
652
653def check_discovered_dns_sd_txt_record(
654    device: p2p_utils.DeviceState,
655    expected_records: Sequence[Sequence[str, dict[str, str]]],
656    expected_src_device_address: str,
657    channel_id: int | None = None,
658    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
659):
660    """Check discovered DNS SD TXT records.
661
662    If no records are expected, check that no DNS SD TXT record appear within
663    timeout. Otherwise, wait for all expected records within timeout.
664
665    This assumes that Bonjour service listener is set by
666    `set_dns_sd_response_listeners`.
667
668    Args:
669        device: The device that is discovering DNS SD TXT records.
670        expected_records: The expected DNS SD TXT records.
671        expected_src_device_address: This only checks services that are from the
672            expected source device.
673        channel_id: The channel to check for expected records.
674        timeout: The wait timeout.
675    """
676    channel_id = channel_id or device.channel_ids[0]
677    idx = device.channel_ids.index(channel_id)
678    callback_handler = device.dns_sd_response_listeners[idx]
679
680    device.ad.log.info('Expected DNS SD TXT records: %s', expected_records)
681    def _all_service_received(event):
682        nonlocal expected_records
683        src_device = constants.WifiP2pDevice.from_dict(
684            event.data['sourceDevice']
685        )
686        if src_device.device_address != expected_src_device_address:
687            return False
688        full_domain_name = event.data['fullDomainName']
689        txt_record_map = event.data['txtRecordMap']
690        record_to_remove = full_domain_name + str(txt_record_map)
691        device.ad.log.info('Received DNS SD TXT record: %s', record_to_remove)
692        if record_to_remove in expected_records:
693            expected_records[record_to_remove] = 0
694        _check_all_expect_data(expected_records)
695
696    device.ad.log.info('Waiting for DNS SD TXT records: %s', expected_records)
697    # Set to a small timeout to allow pulling all received events
698    if timeout.total_seconds() <= 1:
699        timeout = datetime.timedelta(seconds=1)
700    try:
701        callback_handler.waitForEvent(
702            event_name=constants.ON_DNS_SD_TXT_RECORD_AVAILABLE,
703            predicate=_all_service_received,
704            timeout=timeout.total_seconds(),
705        )
706    except errors.CallbackHandlerTimeoutError:
707        device.ad.log.info(f'need to wait for services: {expected_records}')
708
709
710class WifiP2PEnums:
711  """Enums for WifiP2p."""
712
713  class WifiP2pConfig:
714    DEVICEADDRESS_KEY = 'deviceAddress'
715    WPSINFO_KEY = 'wpsInfo'
716    GO_INTENT_KEY = 'groupOwnerIntent'
717    NETID_KEY = 'netId'
718    NETWORK_NAME = 'networkName'
719    PASSPHRASE = 'passphrase'
720    GROUP_BAND = 'groupOwnerBand'
721
722  class WpsInfo:
723    WPS_SETUP_KEY = 'setup'
724    BSSID_KEY = 'BSSID'
725    WPS_PIN_KEY = 'pin'
726    WIFI_WPS_INFO_PBC = 0
727    WIFI_WPS_INFO_DISPLAY = 1
728    WIFI_WPS_INFO_KEYPAD = 2
729    WIFI_WPS_INFO_LABEL = 3
730    WIFI_WPS_INFO_INVALID = 4
731
732  class WifiP2pServiceInfo:
733    # Macros for wifi p2p.
734    WIFI_P2P_SERVICE_TYPE_ALL = 0
735    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
736    WIFI_P2P_SERVICE_TYPE_UPNP = 2
737    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
738
739  class WifiP2pDnsSdServiceResponse:
740    instance_name = ''
741    registration_type = ''
742    full_domain_name = ''
743    txt_record_map = {}
744
745    def __init__(self):
746      pass
747
748    def to_string(self):
749      return (
750          self.instance_name
751          + self.registration_type
752          + (self.full_domain_name + str(self.txt_record_map))
753      )
754