• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14# Lint as: python3
15
16from collections.abc import Sequence
17import dataclasses
18import datetime
19import logging
20import time
21
22from mobly import asserts
23from mobly.snippet import errors
24from mobly.controllers.android_device_lib import callback_handler_v2
25from mobly.controllers import android_device
26
27from direct import constants
28
29_DEFAULT_TIMEOUT = datetime.timedelta(seconds=45)
30_GROUP_OWNER_DISCOVERY_TIMEOUT = datetime.timedelta(seconds=60)
31_DEFAULT_UI_RESPONSE_TIME = datetime.timedelta(seconds=2)
32
33
34@dataclasses.dataclass
35class DeviceState:
36    """All objects related to operating p2p snippet RPCs.
37
38    Attributes:
39        ad: The Android device controller object.
40        p2p_device: The object that represents a Wi-Fi p2p device.
41        broadcast_receiver: The object for getting events that represent
42            Wi-Fi p2p broadcast intents on device.
43        channel_ids: The IDs of all p2p channels initialized by Mobly snippets.
44        upnp_response_listeners: Maps channel IDs to UPnP listeners. These
45            listeners correspond to UpnpServiceResponseListener on device.
46        dns_sd_response_listeners: Maps channel IDs to DNS SD listeners. These
47            listeners listen for callback invocation of
48            DnsSdServiceResponseListener and DnsSdTxtRecordListener.
49    """
50
51    ad: android_device.AndroidDevice
52    p2p_device: constants.WifiP2pDevice
53    broadcast_receiver: callback_handler_v2.CallbackHandlerV2
54    channel_ids: Sequence[int] = dataclasses.field(default_factory=list)
55    upnp_response_listeners: dict[int, callback_handler_v2.CallbackHandlerV2] = dataclasses.field(default_factory=dict)
56    dns_sd_response_listeners: dict[int, callback_handler_v2.CallbackHandlerV2] = dataclasses.field(default_factory=dict)
57
58
59def setup_wifi_p2p(ad: android_device.AndroidDevice) -> DeviceState:
60    """Sets up Wi-Fi p2p for automation tests on an Android device."""
61    broadcast_receiver = _init_wifi_p2p(ad)
62    _delete_all_persistent_groups(ad)
63    p2p_device = _get_p2p_device(ad)
64    asserts.assert_not_equal(
65        p2p_device.device_address,
66        constants.ANONYMIZED_MAC_ADDRESS,
67        f'{ad} failed to get p2p device MAC address, please check permissions '
68        'required by API WifiP2pManager#requestConnectionInfo',
69    )
70    return DeviceState(
71        ad=ad,
72        p2p_device=p2p_device,
73        broadcast_receiver=broadcast_receiver,
74        channel_ids=[broadcast_receiver.ret_value],
75    )
76
77
78def _init_wifi_p2p(
79    ad: android_device.AndroidDevice,
80) -> callback_handler_v2.CallbackHandlerV2:
81    """Registers the snippet app with the Wi-Fi p2p framework.
82
83    This must be the first to be called before any p2p operations are performed.
84
85    Args:
86        ad: The Android device controller object.
87
88    Returns:
89        The broadcast receiver from which you can get snippet events
90        corresponding to Wi-Fi p2p intents received on device.
91    """
92    broadcast_receiver = ad.wifi.wifiP2pInitialize()
93
94    def _is_p2p_enabled(event):
95        return (
96            event.data[constants.EXTRA_WIFI_STATE]
97            == constants.ExtraWifiState.WIFI_P2P_STATE_ENABLED
98        )
99
100    # Wait until receiving the "p2p enabled" event. We might receive a
101    # "p2p disabled" event before that.
102    broadcast_receiver.waitForEvent(
103        event_name=constants.WIFI_P2P_STATE_CHANGED_ACTION,
104        predicate=_is_p2p_enabled,
105        timeout=_DEFAULT_TIMEOUT.total_seconds(),
106    )
107    return broadcast_receiver
108
109
110def _capture_p2p_intents(
111    ad: android_device.AndroidDevice,
112) -> callback_handler_v2.CallbackHandlerV2:
113    """Starts capturing Wi-Fi p2p intents and returns the intent receiver."""
114    broadcast_receiver = ad.wifi.wifiP2pCaptureP2pIntents()
115    return broadcast_receiver
116
117
118def _delete_all_persistent_groups(
119    ad: android_device.AndroidDevice,
120) -> None:
121    """Deletes all persistent Wi-Fi p2p groups."""
122    groups = _request_persistent_group_info(ad)
123    ad.log.debug('Wi-Fi p2p persistent groups before delete: %s', groups)
124    for group in groups:
125        result_data = ad.wifi.wifiP2pDeletePersistentGroup(group.network_id)
126        result = result_data[constants.EVENT_KEY_CALLBACK_NAME]
127        if result != constants.ACTION_LISTENER_ON_SUCCESS:
128            reason = constants.ActionListenerOnFailure(
129                result_data[constants.EVENT_KEY_REASON]
130            )
131            raise RuntimeError(
132                'Failed to delete persistent group with network id '
133                f'{group.network_id}. Reason: {reason.name}'
134            )
135    groups = _request_persistent_group_info(ad)
136    ad.log.debug('Wi-Fi p2p persistent groups after delete: %s', groups)
137
138
139def _request_persistent_group_info(
140    ad: android_device.AndroidDevice,
141) -> Sequence[constants.WifiP2pGroup]:
142    """Requests persistent group information."""
143    callback_handler = ad.wifi.wifiP2pRequestPersistentGroupInfo()
144    event = callback_handler.waitAndGet(
145        event_name=constants.ON_PERSISTENT_GROUP_INFO_AVAILABLE,
146        timeout=_DEFAULT_TIMEOUT.total_seconds(),
147    )
148    groups = constants.WifiP2pGroup.from_dict_list(event.data['groupList'])
149    return groups
150
151
152def _get_p2p_device(
153    ad: android_device.AndroidDevice,
154) -> constants.WifiP2pDevice:
155    """Gets the Wi-Fi p2p device information."""
156    callback_handler = ad.wifi.wifiP2pRequestDeviceInfo()
157    event = callback_handler.waitAndGet(
158        event_name=constants.ON_DEVICE_INFO_AVAILABLE,
159        timeout=_DEFAULT_TIMEOUT.total_seconds(),
160    )
161    return constants.WifiP2pDevice.from_dict(
162        event.data[constants.EVENT_KEY_P2P_DEVICE]
163    )
164
165
166def init_extra_channel(device: DeviceState) -> int:
167  """Initializes an extra p2p channel and returns the channel ID."""
168  channel_id = device.ad.wifi.wifiP2pInitExtraChannel()
169  device.channel_ids.append(channel_id)
170  return channel_id
171
172
173def discover_p2p_peer(
174    requester: DeviceState,
175    responder: DeviceState,
176) -> constants.WifiP2pDevice:
177    """Initiates p2p peer discovery for the requester to find the responder.
178
179    This initiates p2p discovery on both devices and checks that the
180    requester can discover responder.
181
182    Returns:
183        The peer p2p device discovered on the client.
184    """
185    requester.ad.log.debug('Discovering Wi-Fi p2p peer %s.', responder.ad)
186    # Clear events in broadcast receiver before initiating peer discovery.
187    _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
188
189    # Initiate peer discovery.
190    responder.ad.wifi.wifiP2pDiscoverPeers()
191    requester.ad.wifi.wifiP2pDiscoverPeers()
192
193    # Wait until found the p2p peer device with expected MAC address.
194    expected_address = responder.p2p_device.device_address
195
196    def _filter_target_p2p_device(event) -> Sequence[constants.WifiP2pDevice]:
197        peers = constants.WifiP2pDevice.from_dict_list(
198            event.data[constants.EVENT_KEY_PEER_LIST]
199        )
200        filtered_peers = [
201            peer for peer in peers if peer.device_address == expected_address
202        ]
203        return filtered_peers
204
205    try:
206        event = requester.broadcast_receiver.waitForEvent(
207            event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
208            predicate=lambda event: len(_filter_target_p2p_device(event)) > 0,
209            timeout=_DEFAULT_TIMEOUT.total_seconds(),
210        )
211    except errors.CallbackHandlerTimeoutError as e:
212        asserts.fail(
213            f'{requester.ad} did not find the responder device. Expected '
214            f'responder MAC: {expected_address}.'
215        )
216
217    # There should be only one expected p2p peer.
218    peers = _filter_target_p2p_device(event)
219    if len(peers) == 0:
220        asserts.fail(
221            f'{requester.ad} did not find the responder device. Expected '
222            f'responder MAC: {expected_address}, found event: {event}.'
223        )
224    if len(peers) > 1:
225        asserts.fail(
226            f'{requester.ad} found more than one responder device. Expected '
227            f'responder MAC: {expected_address}, found event: {event}.'
228        )
229    return peers[0]
230
231
232def discover_group_owner(
233    client: DeviceState,
234    group_owner_address: str,
235) -> constants.WifiP2pDevice:
236    """Initiates p2p peer discovery for the client to find expected group owner.
237
238    This requires that p2p group has already been established on the group
239    owner.
240
241    Args:
242        client: The device acts as p2p client.
243        group_owner_address: The expected MAC address of the group owner.
244
245    Returns:
246        The peer p2p device discovered on the client.
247    """
248    client.ad.log.debug(
249        'Discovering Wi-Fi p2p group owner %s.', group_owner_address
250    )
251    client.ad.wifi.wifiP2pDiscoverPeers()
252
253    # Wait until found the p2p peer device with expected MAC address. It must
254    # be a group owner.
255    def _filter_target_group_owner(event) -> Sequence[constants.WifiP2pDevice]:
256        peers = constants.WifiP2pDevice.from_dict_list(
257            event.data[constants.EVENT_KEY_PEER_LIST]
258        )
259        filtered_peers = [
260            peer
261            for peer in peers
262            if peer.device_address == group_owner_address
263            and peer.is_group_owner
264        ]
265        return filtered_peers
266
267    try:
268        event = client.broadcast_receiver.waitForEvent(
269            event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
270            predicate=lambda event: len(_filter_target_group_owner(event)) > 0,
271            timeout=_GROUP_OWNER_DISCOVERY_TIMEOUT.total_seconds(),
272        )
273    except errors.CallbackHandlerTimeoutError as e:
274        asserts.fail(
275            f'{client.ad} did not find the group owner device. Expected group '
276            f'owner MAC: {group_owner_address}.'
277        )
278
279    # There should be only one expected p2p peer.
280    peers = _filter_target_group_owner(event)
281    if len(peers) == 0:
282        asserts.fail(
283            f'{client.ad} did not find the group owner device. Expected group '
284            f'owner MAC: {group_owner_address}, got event: {event}.'
285        )
286    if len(peers) > 1:
287        asserts.fail(
288            f'{client.ad} found more than one group owner devices. Expected '
289            f'group owner MAC: {group_owner_address}, got event: {event}.'
290        )
291    return peers[0]
292
293
294def create_group(
295    device: DeviceState, config: constants.WifiP2pConfig | None = None
296):
297    """Creates a Wi-Fi p2p group on the given device."""
298    _clear_events(device, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
299
300    config = config.to_dict() if config else None
301    device.ad.wifi.wifiP2pCreateGroup(config)
302
303    # Wait until groupFormed=True
304    _wait_connection_notice(device.broadcast_receiver)
305
306
307def p2p_connect(
308    requester: DeviceState,
309    responder: DeviceState,
310    config: constants.WifiP2pConfig,
311) -> None:
312    """Establishes Wi-Fi p2p connection with WPS configuration.
313
314    This method instructs the requester to initiate a connection request and the
315    responder to accept the connection. It then verifies the connection status
316    on both devices.
317
318    Args:
319        requester: The requester device.
320        responder: The responder device.
321        config: The Wi-Fi p2p configuration.
322    """
323    logging.info(
324        'Establishing a p2p connection through p2p configuration %s.', config
325    )
326
327    # Clear events in broadcast receiver.
328    _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
329    _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
330    _clear_events(responder, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
331    _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
332
333    requester.ad.wifi.wifiP2pConnect(config.to_dict())
334    requester.ad.log.info('Sent P2P connect invitation to responder.')
335    # Connect with WPS config requires user inetraction through UI.
336    if config.wps_setup == constants.WpsInfo.PBC:
337        time.sleep(_DEFAULT_UI_RESPONSE_TIME.total_seconds())
338        responder.ad.wifi.wifiP2pAcceptInvitation(
339            requester.p2p_device.device_name
340        )
341        responder.ad.log.info('Accepted connect invitation.')
342    elif config.wps_setup == constants.WpsInfo.DISPLAY:
343        time.sleep(_DEFAULT_UI_RESPONSE_TIME.total_seconds())
344        pin = requester.ad.wifi.wifiP2pGetPinCode(
345            responder.p2p_device.device_name
346        )
347        requester.ad.log.info('p2p connection PIN code: %s', pin)
348        time.sleep(_DEFAULT_UI_RESPONSE_TIME.total_seconds())
349        responder.ad.wifi.wifiP2pEnterPin(pin, requester.p2p_device.device_name)
350        responder.ad.log.info('Enetered PIN code.')
351    elif config.wps_setup == constants.WpsInfo.KEYPAD:
352        time.sleep(_DEFAULT_UI_RESPONSE_TIME.total_seconds())
353        pin = responder.ad.wifi.wifiP2pGetKeypadPinCode(
354            requester.p2p_device.device_name
355        )
356        responder.ad.log.info('p2p connection Keypad PIN code: %s', pin)
357        time.sleep(_DEFAULT_UI_RESPONSE_TIME.total_seconds())
358        requester.ad.wifi.wifiP2pEnterPin(pin, responder.p2p_device.device_name)
359        requester.ad.log.info('Enetered Keypad PIN code.')
360    elif config.wps_setup is not None:
361        asserts.fail(f'Unsupported WPS configuration: {config.wps_setup}')
362
363    # Check p2p status on requester.
364    _wait_connection_notice(requester.broadcast_receiver)
365    _wait_peer_connected(
366        requester.broadcast_receiver,
367        responder.p2p_device.device_address,
368    )
369    requester.ad.log.debug(
370        'Connected with device %s through wifi p2p.',
371        responder.p2p_device.device_address,
372    )
373
374    # Check p2p status on responder.
375    _wait_connection_notice(responder.broadcast_receiver)
376    _wait_peer_connected(
377        responder.broadcast_receiver,
378        requester.p2p_device.device_address,
379    )
380    responder.ad.log.debug(
381        'Connected with device %s through wifi p2p.',
382        requester.p2p_device.device_address,
383    )
384    logging.info('Established wifi p2p connection.')
385
386
387def p2p_reconnect(
388    requester: DeviceState,
389    responder: DeviceState,
390    config: constants.WifiP2pConfig,
391) -> None:
392    """Establishes Wi-Fi p2p connection with WPS configuration.
393
394    This method instructs the requester to initiate a connection request and the
395    responder to accept the connection. It then verifies the connection status
396    on both devices.
397
398    Args:
399        requester: The requester device.
400        responder: The responder device.
401        config: The Wi-Fi p2p configuration.
402    """
403    logging.info(
404        'Establishing a p2p connection through p2p configuration %s.', config
405    )
406
407    # Clear events in broadcast receiver.
408    _clear_events(requester, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
409    _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
410    _clear_events(responder, constants.WIFI_P2P_PEERS_CHANGED_ACTION)
411    _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
412
413    requester.ad.wifi.wifiP2pConnect(config.to_dict())
414    requester.ad.log.info('Sent P2P connect invitation to responder.')
415
416    # Check p2p status on requester.
417    _wait_connection_notice(requester.broadcast_receiver)
418    _wait_peer_connected(
419        requester.broadcast_receiver,
420        responder.p2p_device.device_address,
421    )
422    requester.ad.log.info(
423        'Connected with device %s through wifi p2p.',
424        responder.p2p_device.device_address,
425    )
426
427    # Check p2p status on responder.
428    _wait_connection_notice(responder.broadcast_receiver)
429    _wait_peer_connected(
430        responder.broadcast_receiver,
431        requester.p2p_device.device_address,
432    )
433    responder.ad.log.info(
434        'Connected with device %s through wifi p2p.',
435        requester.p2p_device.device_address,
436    )
437    logging.info('Established wifi p2p connection.')
438
439
440def _wait_peer_connected(
441    broadcast_receiver: callback_handler_v2.CallbackHandlerV2, peer_address: str
442):
443    """Waits for event that indicates expected Wi-Fi p2p peer is connected."""
444
445    def _is_peer_connected(event):
446        devices = constants.WifiP2pDevice.from_dict_list(event.data['peerList'])
447        for device in devices:
448            if (
449                device.device_address == peer_address
450                and device.status == constants.WifiP2pDeviceStatus.CONNECTED
451            ):
452                return True
453        return False
454
455    broadcast_receiver.waitForEvent(
456        event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
457        predicate=_is_peer_connected,
458        timeout=_DEFAULT_TIMEOUT.total_seconds(),
459    )
460
461
462def _wait_connection_notice(
463    broadcast_receiver: callback_handler_v2.CallbackHandlerV2,
464):
465    """Waits for event that indicates a p2p connection is established."""
466
467    def _is_group_formed(event):
468        try:
469            p2p_info = constants.WifiP2pInfo.from_dict(
470                event.data[constants.EVENT_KEY_P2P_INFO]
471            )
472            return p2p_info.group_formed
473        except KeyError:
474            return False
475
476    event = broadcast_receiver.waitForEvent(
477        event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION,
478        predicate=_is_group_formed,
479        timeout=_DEFAULT_TIMEOUT.total_seconds(),
480    )
481
482
483def remove_group_and_verify_disconnected(
484    requester: DeviceState,
485    responder: DeviceState,
486    is_group_negotiation: bool,
487):
488    """Stops p2p connection and verifies disconnection status on devices."""
489    logging.info('Stopping wifi p2p connection.')
490
491    # Clear events in broadcast receiver.
492    _clear_events(requester, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
493    _clear_events(requester, constants.ON_DEVICE_INFO_AVAILABLE)
494    _clear_events(responder, constants.WIFI_P2P_CONNECTION_CHANGED_ACTION)
495    _clear_events(responder, constants.ON_DEVICE_INFO_AVAILABLE)
496
497    # Requester initiates p2p group removal.
498    requester.ad.wifi.wifiP2pRemoveGroup()
499
500    # Check p2p status on requester.
501    _wait_disconnection_notice(requester.broadcast_receiver)
502    _wait_peer_disconnected(
503        requester.broadcast_receiver, responder.p2p_device.device_address
504    )
505    requester.ad.log.debug(
506        'Disconnected with device %s through wifi p2p.',
507        responder.p2p_device.device_address,
508    )
509
510    # Check p2p status on responder.
511    if is_group_negotiation:
512        _wait_disconnection_notice(responder.broadcast_receiver)
513    _wait_peer_disconnected(
514        responder.broadcast_receiver, requester.p2p_device.device_address
515    )
516    responder.ad.log.debug(
517        'Disconnected with device %s through wifi p2p.',
518        requester.p2p_device.device_address,
519    )
520
521    logging.info('Stopped wifi p2p connection.')
522
523
524def _wait_disconnection_notice(broadcast_receiver):
525    """Waits for event that indicates the p2p connection is disconnected."""
526
527    def _is_disconnect_event(event):
528        info = constants.WifiP2pInfo.from_dict(
529            event.data[constants.EVENT_KEY_P2P_INFO]
530        )
531        return not info.group_formed
532
533    broadcast_receiver.waitForEvent(
534        event_name=constants.WIFI_P2P_CONNECTION_CHANGED_ACTION,
535        predicate=_is_disconnect_event,
536        timeout=_DEFAULT_TIMEOUT.total_seconds(),
537    )
538
539
540def _wait_peer_disconnected(broadcast_receiver, target_address):
541    """Waits for event that indicates current Wi-Fi p2p peer is disconnected."""
542
543    def _is_peer_disconnect_event(event):
544        devices = constants.WifiP2pDevice.from_dict_list(
545            event.data[constants.EVENT_KEY_PEER_LIST]
546        )
547        for device in devices:
548            if device.device_address == target_address:
549                return device.status != constants.WifiP2pDeviceStatus.CONNECTED
550        # Target device not found also means it is disconnected.
551        return True
552
553    broadcast_receiver.waitForEvent(
554        event_name=constants.WIFI_P2P_PEERS_CHANGED_ACTION,
555        predicate=_is_peer_disconnect_event,
556        timeout=_DEFAULT_TIMEOUT.total_seconds(),
557    )
558
559
560def _clear_events(device: DeviceState, event_name):
561    """Clears the events with the given name in the broadcast receiver."""
562    all_events = device.broadcast_receiver.getAll(event_name)
563    device.ad.log.debug(
564        'Cleared %d events of event name %s', len(all_events), event_name
565    )
566
567
568def teardown_wifi_p2p(ad: android_device.AndroidDevice):
569    """Destroys all resources initialized in `_setup_wifi_p2p`."""
570    try:
571        ad.wifi.wifiP2pStopPeerDiscovery()
572        ad.wifi.wifiP2pCancelConnect()
573        ad.wifi.wifiP2pRemoveGroup()
574    finally:
575        # Make sure to call `p2pClose`, otherwise `_setup_wifi_p2p` won't be
576        # able to run again.
577        ad.wifi.p2pClose()
578
579
580def add_upnp_local_service(device: DeviceState, config: dict):
581    """Adds p2p local Upnp service."""
582    device.ad.wifi.wifiP2pAddUpnpLocalService(
583        config['uuid'], config['device'], config['services']
584    )
585
586
587def add_bonjour_local_service(device: DeviceState, config: dict):
588    """Adds p2p local Bonjour service."""
589    device.ad.wifi.wifiP2pAddBonjourLocalService(
590        config['instance_name'], config['service_type'], config['txt_map']
591    )
592
593
594def set_upnp_response_listener(
595    device: DeviceState, channel_id: int | None = None
596):
597    """Set response listener for Upnp service."""
598    channel_id = channel_id or device.channel_ids[0]
599    upnp_response_listener = device.ad.wifi.wifiP2pSetUpnpResponseListener(
600        channel_id
601    )
602    device.upnp_response_listeners[channel_id] = upnp_response_listener
603
604
605def set_dns_sd_response_listeners(
606    device: DeviceState, channel_id: int | None = None
607):
608    """Set response listener for Bonjour service."""
609    channel_id = channel_id or device.channel_ids[0]
610    listener = device.ad.wifi.wifiP2pSetDnsSdResponseListeners(channel_id)
611    device.dns_sd_response_listeners[channel_id] = listener
612
613
614def reset_p2p_service_state(
615    ad: android_device.AndroidDevice, channel_id
616):
617    """Clears all p2p service related states on device."""
618    ad.wifi.wifiP2pClearServiceRequests(channel_id)
619    ad.wifi.wifiP2pUnsetDnsSdResponseListeners(channel_id)
620    ad.wifi.wifiP2pUnsetUpnpResponseListener(channel_id)
621    ad.wifi.wifiP2pClearLocalServices(channel_id)
622
623
624def check_discovered_services(
625    requester: DeviceState,
626    expected_src_device_address: str,
627    expected_dns_sd_sequence: Sequence[Sequence[str, dict[str, str]]],
628    expected_dns_txt_sequence: Sequence[Sequence[str, str]],
629    expected_upnp_sequence: Sequence[str],
630    channel_id: int | None = None,
631):
632    """Checks the discovered service responses are as expected.
633
634    This checks all services discovered within the timeout `_DEFAULT_TIMEOUT`.
635    If any expected service sequence is empty, this checks that no such service
636    are received within timeout.
637
638    Args:
639        device: The device that is discovering services.
640        expected_src_device_address: Only check services advertised from this
641            device address.
642        expected_dns_sd_sequence: Expected DNS SD responses.
643        expected_dns_txt_sequence: Expected DNS SD TXT records.
644        expected_upnp_sequence: Expected UPnP services.
645        channel_id: The channel to check for expected services.
646    """
647    channel_id = channel_id or requester.channel_ids[0]
648    start_time = datetime.datetime.now()
649    timeout = _DEFAULT_TIMEOUT
650    check_discovered_dns_sd_response(
651        requester,
652        expected_responses=expected_dns_sd_sequence,
653        expected_src_device_address=expected_src_device_address,
654        channel_id=channel_id,
655        timeout=timeout,
656    )
657    remaining_timeout = timeout - (datetime.datetime.now() - start_time)
658    check_discovered_dns_sd_txt_record(
659        requester,
660        expected_records=expected_dns_txt_sequence,
661        expected_src_device_address=expected_src_device_address,
662        channel_id=channel_id,
663        timeout=remaining_timeout,
664    )
665    remaining_timeout = timeout - (datetime.datetime.now() - start_time)
666    check_discovered_upnp_services(
667        requester,
668        expected_services=expected_upnp_sequence,
669        expected_src_device_address=expected_src_device_address,
670        channel_id=channel_id,
671        timeout=remaining_timeout,
672    )
673
674
675def check_discovered_upnp_services(
676    device: DeviceState,
677    expected_services: Sequence[str],
678    expected_src_device_address: str,
679    channel_id: int | None = None,
680    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
681):
682    """Check discovered Upnp services.
683
684    If no services are expected, check that no UPnP service appear within
685    timeout. Otherwise, wait for all expected services within timeout.
686
687    This assumes that Upnp service listener is set by
688    `set_upnp_response_listener`.
689
690    Args:
691        device: The device that is discovering Upnp services.
692        expected_services: The expected Upnp services.
693        expected_src_device_address: This only checks services that are from the
694            expected source device.
695        channel_id: The channel to check for expected services.
696        timeout: The wait timeout.
697    """
698    channel_id = channel_id or device.channel_ids[0]
699    callback_handler = device.upnp_response_listeners[channel_id]
700    if len(expected_services) == 0:
701        _check_no_discovered_service(
702            ad=device.ad,
703            callback_handler=callback_handler,
704            event_name=constants.ON_UPNP_SERVICE_AVAILABLE,
705            expected_src_device_address=expected_src_device_address,
706            timeout=timeout,
707        )
708        return
709
710    expected_services = set(expected_services)
711    def _all_service_received(event):
712        nonlocal expected_services
713        src_device = constants.WifiP2pDevice.from_dict(
714            event.data['sourceDevice']
715        )
716        if src_device.device_address != expected_src_device_address:
717            return False
718        for service in event.data['serviceList']:
719            if service in expected_services:
720                device.ad.log.debug('Received upnp services: %s', service)
721                expected_services.remove(service)
722        return len(expected_services) == 0
723
724    device.ad.log.debug('Waiting for UpnP services: %s', expected_services)
725    # Set to a small timeout to allow pulling all received events
726    if timeout.total_seconds() <= 1:
727        timeout = datetime.timedelta(seconds=1)
728    try:
729        callback_handler.waitForEvent(
730            event_name=constants.ON_UPNP_SERVICE_AVAILABLE,
731            predicate=_all_service_received,
732            timeout=timeout.total_seconds(),
733        )
734    except errors.CallbackHandlerTimeoutError as e:
735        asserts.fail(
736            f'{device.ad} Timed out waiting for services: {expected_services}'
737        )
738
739
740def check_discovered_dns_sd_response(
741    device: DeviceState,
742    expected_responses: Sequence[Sequence[str, str]],
743    expected_src_device_address: str,
744    channel_id: int | None = None,
745    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
746):
747    """Check discovered DNS SD responses.
748
749    If no responses are expected, check that no DNS SD response appear within
750    timeout. Otherwise, wait for all expected responses within timeout.
751
752    This assumes that Bonjour service listener is set by
753    `set_dns_sd_response_listeners`.
754
755    Args:
756        device: The device that is discovering DNS SD responses.
757        expected_responses: The expected DNS SD responses.
758        expected_src_device_address: This only checks services that are from the
759            expected source device.
760        channel_id: The channel to check for expected responses.
761        timeout: The wait timeout.
762    """
763    channel_id = channel_id or device.channel_ids[0]
764    callback_handler = device.dns_sd_response_listeners[channel_id]
765    if not expected_responses:
766        _check_no_discovered_service(
767            device.ad,
768            callback_handler=callback_handler,
769            event_name=constants.ON_DNS_SD_SERVICE_AVAILABLE,
770            expected_src_device_address=expected_src_device_address,
771            timeout=timeout,
772        )
773        return
774
775    expected_responses = list(expected_responses)
776
777    def _all_service_received(event):
778        nonlocal expected_responses
779        src_device = constants.WifiP2pDevice.from_dict(
780            event.data['sourceDevice']
781        )
782        if src_device.device_address != expected_src_device_address:
783            return False
784        registration_type = event.data['registrationType']
785        instance_name = event.data['instanceName']
786        service_tuple = (instance_name, registration_type)
787        device.ad.log.debug('Received DNS SD response: %s', service_tuple)
788        if service_tuple in expected_responses:
789            expected_responses.remove(service_tuple)
790        return len(expected_responses) == 0
791
792    device.ad.log.debug('Waiting for DNS SD services: %s', expected_responses)
793    # Set to a small timeout to allow pulling all received events
794    if timeout.total_seconds() <= 1:
795        timeout = datetime.timedelta(seconds=1)
796    try:
797        callback_handler.waitForEvent(
798            event_name=constants.ON_DNS_SD_SERVICE_AVAILABLE,
799            predicate=_all_service_received,
800            timeout=timeout.total_seconds(),
801        )
802    except errors.CallbackHandlerTimeoutError as e:
803        asserts.fail(
804            f'{device.ad} Timed out waiting for services: {expected_responses}'
805        )
806
807
808def check_discovered_dns_sd_txt_record(
809    device: DeviceState,
810    expected_records: Sequence[Sequence[str, dict[str, str]]],
811    expected_src_device_address: str,
812    channel_id: int | None = None,
813    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
814):
815    """Check discovered DNS SD TXT records.
816
817    If no records are expected, check that no DNS SD TXT record appear within
818    timeout. Otherwise, wait for all expected records within timeout.
819
820    This assumes that Bonjour service listener is set by
821    `set_dns_sd_response_listeners`.
822
823    Args:
824        device: The device that is discovering DNS SD TXT records.
825        expected_records: The expected DNS SD TXT records.
826        expected_src_device_address: This only checks services that are from the
827            expected source device.
828        channel_id: The channel to check for expected records.
829        timeout: The wait timeout.
830    """
831    channel_id = channel_id or device.channel_ids[0]
832    idx = device.channel_ids.index(channel_id)
833    callback_handler = device.dns_sd_response_listeners[idx]
834    if not expected_records:
835        _check_no_discovered_service(
836            device.ad,
837            callback_handler=callback_handler,
838            event_name=constants.ON_DNS_SD_TXT_RECORD_AVAILABLE,
839            expected_src_device_address=expected_src_device_address,
840            timeout=timeout,
841        )
842        return
843
844    expected_records = list(expected_records)
845    device.ad.log.debug('Expected DNS SD TXT records: %s', expected_records)
846    def _all_service_received(event):
847        nonlocal expected_records
848        src_device = constants.WifiP2pDevice.from_dict(
849            event.data['sourceDevice']
850        )
851        if src_device.device_address != expected_src_device_address:
852            return False
853        full_domain_name = event.data['fullDomainName']
854        txt_record_map = event.data['txtRecordMap']
855        record_to_remove = (full_domain_name, txt_record_map)
856        device.ad.log.debug('Received DNS SD TXT record: %s', record_to_remove)
857        if record_to_remove in expected_records:
858            expected_records.remove(record_to_remove)
859        return len(expected_records) == 0
860
861    device.ad.log.debug('Waiting for DNS SD TXT records: %s', expected_records)
862    # Set to a small timeout to allow pulling all received events
863    if timeout.total_seconds() <= 1:
864        timeout = datetime.timedelta(seconds=1)
865    try:
866        callback_handler.waitForEvent(
867            event_name=constants.ON_DNS_SD_TXT_RECORD_AVAILABLE,
868            predicate=_all_service_received,
869            timeout=timeout.total_seconds(),
870        )
871    except errors.CallbackHandlerTimeoutError as e:
872        asserts.fail(
873            f'{device.ad} Timed out waiting for services: {expected_records}'
874        )
875
876
877def _check_no_discovered_service(
878    ad: android_device.AndroidDevice,
879    callback_handler: callback_handler_v2.CallbackHandlerV2,
880    event_name: str,
881    expected_src_device_address: str,
882    timeout: datetime.timedelta = _DEFAULT_TIMEOUT,
883):
884    """Checks that no service is received from the specified source device."""
885    def _is_expected_event(event):
886        src_device = constants.WifiP2pDevice.from_dict(
887            event.data['sourceDevice']
888        )
889        return src_device.device_address == expected_src_device_address
890
891    # Set to a small timeout to allow pulling all received events
892    if timeout.total_seconds() <= 1:
893        timeout = datetime.timedelta(seconds=1)
894    try:
895        event = callback_handler.waitForEvent(
896            event_name=event_name,
897            predicate=_is_expected_event,
898            timeout=timeout.total_seconds(),
899        )
900    except errors.CallbackHandlerTimeoutError as e:
901        # Timeout error is expected as there should not be any qualified service
902        return
903    asserts.assert_is_none(
904        event,
905        f'{ad} should not discover p2p service. Discovered: {event}',
906    )
907