• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import time
17import re
18import os
19import pathlib
20import math
21import shutil
22import fnmatch
23import posixpath
24import subprocess
25import tempfile
26from retry import retry
27from collections import namedtuple
28from datetime import datetime
29from xml.etree import ElementTree
30from contextlib import contextmanager
31from statistics import median
32
33from acts import utils
34from acts import asserts
35from acts import signals
36from acts.libs.proc import job
37from acts.controllers.adb_lib.error import AdbCommandError
38from acts.controllers.android_device import list_adb_devices
39from acts.controllers.android_device import list_fastboot_devices
40from acts.controllers.android_device import DEFAULT_QXDM_LOG_PATH
41from acts.controllers.android_device import SL4A_APK_NAME
42from acts_contrib.test_utils.gnss.gnss_measurement import GnssMeasurement
43from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
44from acts_contrib.test_utils.tel import tel_logging_utils as tlutils
45from acts_contrib.test_utils.tel import tel_test_utils as tutils
46from acts_contrib.test_utils.gnss import gnssstatus_utils
47from acts_contrib.test_utils.gnss import gnss_constant
48from acts_contrib.test_utils.gnss import supl
49from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationCommandBuilder
50from acts_contrib.test_utils.instrumentation.device.command.instrumentation_command_builder import InstrumentationTestCommandBuilder
51from acts.utils import get_current_epoch_time
52from acts.utils import epoch_to_human_time
53from acts_contrib.test_utils.gnss.gnss_defines import BCM_GPS_XML_PATH
54from acts_contrib.test_utils.gnss.gnss_defines import BCM_NVME_STO_PATH
55
56WifiEnums = wutils.WifiEnums
57FIRST_FIXED_MAX_WAITING_TIME = 60
58UPLOAD_TO_SPONGE_PREFIX = "TestResult "
59PULL_TIMEOUT = 300
60GNSSSTATUS_LOG_PATH = (
61    "/storage/emulated/0/Android/data/com.android.gpstool/files/")
62QXDM_MASKS = ["GPS.cfg", "GPS-general.cfg", "default.cfg"]
63TTFF_REPORT = namedtuple(
64    "TTFF_REPORT", "utc_time ttff_loop ttff_sec ttff_pe ttff_ant_cn "
65                   "ttff_base_cn ttff_haccu")
66TRACK_REPORT = namedtuple(
67    "TRACK_REPORT", "l5flag pe ant_top4cn ant_cn base_top4cn base_cn device_time report_time")
68LOCAL_PROP_FILE_CONTENTS = """\
69log.tag.LocationManagerService=VERBOSE
70log.tag.GnssLocationProvider=VERBOSE
71log.tag.GnssMeasurementsProvider=VERBOSE
72log.tag.GpsNetInitiatedHandler=VERBOSE
73log.tag.GnssNetInitiatedHandler=VERBOSE
74log.tag.GnssNetworkConnectivityHandler=VERBOSE
75log.tag.ConnectivityService=VERBOSE
76log.tag.ConnectivityManager=VERBOSE
77log.tag.GnssVisibilityControl=VERBOSE
78log.tag.NtpTimeHelper=VERBOSE
79log.tag.NtpTrustedTime=VERBOSE
80log.tag.GnssPsdsDownloader=VERBOSE
81log.tag.Gnss=VERBOSE
82log.tag.GnssConfiguration=VERBOSE"""
83LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE = """\
84log.tag.ImsPhone=VERBOSE
85log.tag.GsmCdmaPhone=VERBOSE
86log.tag.Phone=VERBOSE
87log.tag.GCoreFlp=VERBOSE"""
88TEST_PACKAGE_NAME = "com.google.android.apps.maps"
89LOCATION_PERMISSIONS = [
90    "android.permission.ACCESS_FINE_LOCATION",
91    "android.permission.ACCESS_COARSE_LOCATION"
92]
93GNSSTOOL_PACKAGE_NAME = "com.android.gpstool"
94GNSSTOOL_PERMISSIONS = [
95    "android.permission.ACCESS_FINE_LOCATION",
96    "android.permission.READ_EXTERNAL_STORAGE",
97    "android.permission.ACCESS_COARSE_LOCATION",
98    "android.permission.CALL_PHONE",
99    "android.permission.WRITE_CONTACTS",
100    "android.permission.CAMERA",
101    "android.permission.WRITE_EXTERNAL_STORAGE",
102    "android.permission.READ_CONTACTS",
103    "android.permission.ACCESS_BACKGROUND_LOCATION"
104]
105DISABLE_LTO_FILE_CONTENTS = """\
106LONGTERM_PSDS_SERVER_1="http://"
107LONGTERM_PSDS_SERVER_2="http://"
108LONGTERM_PSDS_SERVER_3="http://"
109NORMAL_PSDS_SERVER="http://"
110REALTIME_PSDS_SERVER="http://"
111"""
112DISABLE_LTO_FILE_CONTENTS_R = """\
113XTRA_SERVER_1="http://"
114XTRA_SERVER_2="http://"
115XTRA_SERVER_3="http://"
116"""
117_BRCM_DUTY_CYCLE_PATTERN = re.compile(r".*PGLOR,\d+,STA.*")
118
119
120class GnssTestUtilsError(Exception):
121    pass
122
123
124def remount_device(ad):
125    """Remount device file system to read and write.
126
127    Args:
128        ad: An AndroidDevice object.
129    """
130    for retries in range(5):
131        ad.root_adb()
132        if ad.adb.getprop("ro.boot.veritymode") == "enforcing":
133            ad.adb.disable_verity()
134            reboot(ad)
135        remount_result = ad.adb.remount()
136        ad.log.info("Attempt %d - %s" % (retries + 1, remount_result))
137        if "remount succeeded" in remount_result:
138            break
139
140
141def reboot(ad):
142    """Reboot device and check if mobile data is available.
143
144    Args:
145        ad: An AndroidDevice object.
146    """
147    ad.log.info("Reboot device to make changes take effect.")
148    # TODO(diegowchung): remove the timeout setting after p23 back to normal
149    ad.reboot(timeout=600)
150    ad.unlock_screen(password=None)
151    if not is_mobile_data_on(ad):
152        set_mobile_data(ad, True)
153    utils.sync_device_time(ad)
154
155
156def enable_gnss_verbose_logging(ad):
157    """Enable GNSS VERBOSE Logging and persistent logcat.
158
159    Args:
160        ad: An AndroidDevice object.
161    """
162    remount_device(ad)
163    ad.log.info("Enable GNSS VERBOSE Logging and persistent logcat.")
164    if check_chipset_vendor_by_qualcomm(ad):
165        ad.adb.shell("echo -e '\nDEBUG_LEVEL = 5' >> /vendor/etc/gps.conf")
166    else:
167        ad.adb.shell("echo LogEnabled=true >> /data/vendor/gps/libgps.conf")
168        ad.adb.shell("chown gps.system /data/vendor/gps/libgps.conf")
169    if is_device_wearable(ad):
170       PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS + LOCAL_PROP_FILE_CONTENTS_FOR_WEARABLE
171    else:
172        PROP_CONTENTS = LOCAL_PROP_FILE_CONTENTS
173    ad.adb.shell("echo %r >> /data/local.prop" % PROP_CONTENTS)
174    ad.adb.shell("chmod 644 /data/local.prop")
175    ad.adb.shell("setprop persist.logd.logpersistd.size 20000")
176    ad.adb.shell("setprop persist.logd.size 16777216")
177    ad.adb.shell("setprop persist.vendor.radio.adb_log_on 1")
178    ad.adb.shell("setprop persist.logd.logpersistd logcatd")
179    ad.adb.shell("setprop log.tag.copresGcore VERBOSE")
180    ad.adb.shell("sync")
181
182
183def get_am_flags(value):
184    """Returns the (value, type) flags for a given python value."""
185    if type(value) is bool:
186        return str(value).lower(), 'boolean'
187    elif type(value) is str:
188        return value, 'string'
189    raise ValueError("%s should be either 'boolean' or 'string'" % value)
190
191
192def enable_compact_and_particle_fusion_log(ad):
193    """Enable CompactLog, FLP particle fusion log and disable gms
194    location-based quake monitoring.
195
196    Args:
197        ad: An AndroidDevice object.
198    """
199    ad.root_adb()
200    ad.log.info("Enable FLP flags and Disable GMS location-based quake "
201                "monitoring.")
202    overrides = {
203        'compact_log_enabled': True,
204        'flp_use_particle_fusion': True,
205        'flp_particle_fusion_extended_bug_report': True,
206        'flp_event_log_size': '86400',
207        'proks_config': '28',
208        'flp_particle_fusion_bug_report_window_sec': '86400',
209        'flp_particle_fusion_bug_report_max_buffer_size': '86400',
210        'seismic_data_collection': False,
211        'Ealert__enable': False,
212    }
213    for flag, python_value in overrides.items():
214        value, type = get_am_flags(python_value)
215        cmd = ("am broadcast -a com.google.android.gms.phenotype.FLAG_OVERRIDE "
216               "--es package com.google.android.location --es user \* "
217               "--esa flags %s --esa values %s --esa types %s "
218               "com.google.android.gms" % (flag, value, type))
219        ad.adb.shell(cmd, ignore_status=True)
220    ad.adb.shell("am force-stop com.google.android.gms")
221    ad.adb.shell("am broadcast -a com.google.android.gms.INITIALIZE")
222
223
224def disable_xtra_throttle(ad):
225    """Disable XTRA throttle will have no limit to download XTRA data.
226
227    Args:
228        ad: An AndroidDevice object.
229    """
230    remount_device(ad)
231    ad.log.info("Disable XTRA Throttle.")
232    ad.adb.shell("echo -e '\nXTRA_TEST_ENABLED=1' >> /vendor/etc/gps.conf")
233    ad.adb.shell("echo -e '\nXTRA_THROTTLE_ENABLED=0' >> /vendor/etc/gps.conf")
234
235
236def enable_supl_mode(ad):
237    """Enable SUPL back on for next test item.
238
239    Args:
240        ad: An AndroidDevice object.
241    """
242    remount_device(ad)
243    ad.log.info("Enable SUPL mode.")
244    ad.adb.shell("echo -e '\nSUPL_MODE=1' >> /etc/gps_debug.conf")
245
246
247def disable_supl_mode(ad):
248    """Kill SUPL to test XTRA/LTO only test item.
249
250    Args:
251        ad: An AndroidDevice object.
252    """
253    remount_device(ad)
254    ad.log.info("Disable SUPL mode.")
255    ad.adb.shell("echo -e '\nSUPL_MODE=0' >> /etc/gps_debug.conf")
256    if not check_chipset_vendor_by_qualcomm(ad):
257        supl.set_supl_over_wifi_state(ad, False)
258
259
260def enable_vendor_orbit_assistance_data(ad):
261    """Enable vendor assistance features.
262        For Qualcomm: Enable XTRA
263        For Broadcom: Enable LTO
264
265    Args:
266        ad: An AndroidDevice object.
267    """
268    ad.root_adb()
269    if is_device_wearable(ad):
270        lto_mode_wearable(ad, True)
271    elif check_chipset_vendor_by_qualcomm(ad):
272        disable_xtra_throttle(ad)
273        reboot(ad)
274    else:
275        lto_mode(ad, True)
276
277
278def disable_vendor_orbit_assistance_data(ad):
279    """Disable vendor assistance features.
280
281    For Qualcomm: disable XTRA
282    For Broadcom: disable LTO
283
284    Args:
285        ad: An AndroidDevice object.
286    """
287    ad.root_adb()
288    if is_device_wearable(ad):
289        lto_mode_wearable(ad, False)
290    elif check_chipset_vendor_by_qualcomm(ad):
291        disable_qualcomm_orbit_assistance_data(ad)
292    else:
293        lto_mode(ad, False)
294
295def gla_mode(ad, state: bool):
296    """Enable or disable Google Location Accuracy feature.
297
298    Args:
299        ad: An AndroidDevice object.
300        state: True to enable GLA, False to disable GLA.
301    """
302    ad.root_adb()
303    if state:
304        ad.adb.shell('settings put global assisted_gps_enabled 1')
305        ad.log.info("Modify current GLA Mode to MS_BASED mode")
306    else:
307        ad.adb.shell('settings put global assisted_gps_enabled 0')
308        ad.log.info("Modify current GLA Mode to standalone mode")
309
310    out = int(ad.adb.shell("settings get global assisted_gps_enabled"))
311    if out == 1:
312        ad.log.info("GLA is enabled, MS_BASED mode")
313    else:
314        ad.log.info("GLA is disabled, standalone mode")
315
316
317def disable_qualcomm_orbit_assistance_data(ad):
318    """Disable assiatance features for Qualcomm project.
319
320    Args:
321        ad: An AndroidDevice object.
322    """
323    ad.log.info("Disable XTRA-daemon until next reboot.")
324    ad.adb.shell("killall xtra-daemon", ignore_status=True)
325
326
327def disable_private_dns_mode(ad):
328    """Due to b/118365122, it's better to disable private DNS mode while
329       testing. 8.8.8.8 private dns sever is unstable now, sometimes server
330       will not response dns query suddenly.
331
332    Args:
333        ad: An AndroidDevice object.
334    """
335    tutils.get_operator_name(ad.log, ad, subId=None)
336    if ad.adb.shell("settings get global private_dns_mode") != "off":
337        ad.log.info("Disable Private DNS mode.")
338        ad.adb.shell("settings put global private_dns_mode off")
339
340
341def _init_device(ad):
342    """Init GNSS test devices.
343
344    Args:
345        ad: An AndroidDevice object.
346    """
347    check_location_service(ad)
348    enable_gnss_verbose_logging(ad)
349    prepare_gps_overlay(ad)
350    set_screen_always_on(ad)
351    ad.log.info("Setting Bluetooth state to False")
352    ad.droid.bluetoothToggleState(False)
353    set_wifi_and_bt_scanning(ad, True)
354    disable_private_dns_mode(ad)
355    init_gtw_gpstool(ad)
356    if is_device_wearable(ad):
357        disable_battery_defend(ad)
358
359
360def prepare_gps_overlay(ad):
361    """Set pixellogger gps log mask to
362    resolve gps logs unreplayable from brcm vendor
363    """
364    if not check_chipset_vendor_by_qualcomm(ad):
365        overlay_file = "/data/vendor/gps/overlay/gps_overlay.xml"
366        xml_file = generate_gps_overlay_xml(ad)
367        try:
368            ad.log.info("Push gps_overlay to device")
369            ad.adb.push(xml_file, overlay_file)
370            ad.adb.shell(f"chmod 777 {overlay_file}")
371        finally:
372            xml_folder = os.path.abspath(os.path.join(xml_file, os.pardir))
373            shutil.rmtree(xml_folder)
374
375
376def generate_gps_overlay_xml(ad):
377    """For r11 devices, the overlay setting is 'Replayable default'
378    For other brcm devices, the setting is 'Replayable debug'
379
380    Returns:
381        path to the xml file
382    """
383    root_attrib = {
384        "xmlns": "http://www.glpals.com/",
385        "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
386        "xsi:schemaLocation": "http://www.glpals.com/ glconfig.xsd",
387    }
388    sub_attrib = {"EnableOnChipStopNotification": "true"}
389    if not is_device_wearable(ad):
390        sub_attrib["LogPriMask"] = "LOG_DEBUG"
391        sub_attrib["LogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
392        sub_attrib["OnChipLogPriMask"] = "LOG_DEBUG"
393        sub_attrib["OnChipLogFacMask"] = "LOG_GLLIO | LOG_GLLAPI | LOG_NMEA | LOG_RAWDATA"
394
395    temp_path = tempfile.mkdtemp()
396    xml_file = os.path.join(temp_path, "gps_overlay.xml")
397
398    root = ElementTree.Element('glgps')
399    for key, value in root_attrib.items():
400        root.attrib[key] = value
401
402    ad.log.debug("Sub attrib is %s", sub_attrib)
403
404    sub = ElementTree.SubElement(root, 'gll')
405    for key, value in sub_attrib.items():
406        sub.attrib[key] = value
407
408    xml = ElementTree.ElementTree(root)
409    xml.write(xml_file, xml_declaration=True, encoding="utf-8", method="xml")
410    return xml_file
411
412
413def connect_to_wifi_network(ad, network):
414    """Connection logic for open and psk wifi networks.
415
416    Args:
417        ad: An AndroidDevice object.
418        network: Dictionary with network info.
419    """
420    SSID = network[WifiEnums.SSID_KEY]
421    ad.ed.clear_all_events()
422    wutils.reset_wifi(ad)
423    wutils.start_wifi_connection_scan_and_ensure_network_found(ad, SSID)
424    for i in range(5):
425        wutils.wifi_connect(ad, network, check_connectivity=False)
426        # Validates wifi connection with ping_gateway=False to avoid issue like
427        # b/254913994.
428        if wutils.validate_connection(ad, ping_gateway=False):
429            ad.log.info("WiFi connection is validated")
430            return
431    raise signals.TestError("Failed to connect WiFi")
432
433def set_wifi_and_bt_scanning(ad, state=True):
434    """Set Wi-Fi and Bluetooth scanning on/off in Settings -> Location
435
436    Args:
437        ad: An AndroidDevice object.
438        state: True to turn on "Wi-Fi and Bluetooth scanning".
439            False to turn off "Wi-Fi and Bluetooth scanning".
440    """
441    ad.root_adb()
442    if state:
443        ad.adb.shell("settings put global wifi_scan_always_enabled 1")
444        ad.adb.shell("settings put global ble_scan_always_enabled 1")
445        ad.log.info("Wi-Fi and Bluetooth scanning are enabled")
446    else:
447        ad.adb.shell("settings put global wifi_scan_always_enabled 0")
448        ad.adb.shell("settings put global ble_scan_always_enabled 0")
449        ad.log.info("Wi-Fi and Bluetooth scanning are disabled")
450
451
452def check_location_service(ad):
453    """Set location service on.
454       Verify if location service is available.
455
456    Args:
457        ad: An AndroidDevice object.
458    """
459    remount_device(ad)
460    utils.set_location_service(ad, True)
461    ad.adb.shell("cmd location set-location-enabled true")
462    location_mode = int(ad.adb.shell("settings get secure location_mode"))
463    ad.log.info("Current Location Mode >> %d" % location_mode)
464    if location_mode != 3:
465        raise signals.TestError("Failed to turn Location on")
466
467
468def delete_device_folder(ad, folder):
469    ad.log.info("Folder to be deleted: %s" % folder)
470    folder_contents = ad.adb.shell(f"ls {folder}", ignore_status=True)
471    ad.log.debug("Contents to be deleted: %s" % folder_contents)
472    ad.adb.shell("rm -rf %s" % folder, ignore_status=True)
473
474
475def remove_pixel_logger_folder(ad):
476    if check_chipset_vendor_by_qualcomm(ad):
477        folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/diag_logs"
478    else:
479        folder = "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/"
480
481    delete_device_folder(ad, folder)
482
483
484def clear_logd_gnss_qxdm_log(ad):
485    """Clear /data/misc/logd,
486    /storage/emulated/0/Android/data/com.android.gpstool/files and
487    /data/vendor/radio/diag_logs/logs from previous test item then reboot.
488
489    Args:
490        ad: An AndroidDevice object.
491    """
492    remount_device(ad)
493    ad.log.info("Clear Logd, GNSS and PixelLogger Log from previous test item.")
494    folders_should_be_removed = ["/data/misc/logd"]
495    ad.adb.shell(
496        'find %s -name "*.txt" -type f -delete' % GNSSSTATUS_LOG_PATH,
497        ignore_status=True)
498    if check_chipset_vendor_by_qualcomm(ad):
499        output_path = posixpath.join(DEFAULT_QXDM_LOG_PATH, "logs")
500        folders_should_be_removed += [output_path]
501    else:
502        always_on_logger_log_path = ("/data/vendor/gps/logs")
503        folders_should_be_removed += [always_on_logger_log_path]
504    for folder in folders_should_be_removed:
505        delete_device_folder(ad, folder)
506    remove_pixel_logger_folder(ad)
507    if not is_device_wearable(ad):
508        reboot(ad)
509
510
511def get_gnss_qxdm_log(ad, qdb_path=None):
512    """Get /storage/emulated/0/Android/data/com.android.gpstool/files and
513    /data/vendor/radio/diag_logs/logs for test item.
514
515    Args:
516        ad: An AndroidDevice object.
517        qdb_path: The path of qdsp6m.qdb on different projects.
518    """
519    log_path = ad.device_log_path
520    os.makedirs(log_path, exist_ok=True)
521    gnss_log_name = "gnssstatus_log_%s_%s" % (ad.model, ad.serial)
522    gnss_log_path = posixpath.join(log_path, gnss_log_name)
523    os.makedirs(gnss_log_path, exist_ok=True)
524    ad.log.info("Pull GnssStatus Log to %s" % gnss_log_path)
525    ad.adb.pull("%s %s" % (GNSSSTATUS_LOG_PATH + ".", gnss_log_path),
526                timeout=PULL_TIMEOUT, ignore_status=True)
527    shutil.make_archive(gnss_log_path, "zip", gnss_log_path)
528    shutil.rmtree(gnss_log_path, ignore_errors=True)
529    if check_chipset_vendor_by_qualcomm(ad):
530        output_path = (
531            "/sdcard/Android/data/com.android.pixellogger/files/logs/"
532            "diag_logs/.")
533    else:
534        output_path = (
535            "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
536    qxdm_log_name = "PixelLogger_%s_%s" % (ad.model, ad.serial)
537    qxdm_log_path = posixpath.join(log_path, qxdm_log_name)
538    os.makedirs(qxdm_log_path, exist_ok=True)
539    ad.log.info("Pull PixelLogger Log %s to %s" % (output_path,
540                                                   qxdm_log_path))
541    ad.adb.pull("%s %s" % (output_path, qxdm_log_path),
542                timeout=PULL_TIMEOUT, ignore_status=True)
543    if check_chipset_vendor_by_qualcomm(ad):
544        for path in qdb_path:
545            output = ad.adb.pull("%s %s" % (path, qxdm_log_path),
546                                 timeout=PULL_TIMEOUT, ignore_status=True)
547            if "No such file or directory" in output:
548                continue
549            break
550    shutil.make_archive(qxdm_log_path, "zip", qxdm_log_path)
551    shutil.rmtree(qxdm_log_path, ignore_errors=True)
552
553
554def set_mobile_data(ad, state):
555    """Set mobile data on or off and check mobile data state.
556
557    Args:
558        ad: An AndroidDevice object.
559        state: True to enable mobile data. False to disable mobile data.
560    """
561    ad.root_adb()
562    if state:
563        if is_device_wearable(ad):
564            ad.log.info("Enable wearable mobile data.")
565            ad.adb.shell("settings put global cell_on 1")
566        else:
567            ad.log.info("Enable mobile data via RPC call.")
568            ad.droid.telephonyToggleDataConnection(True)
569    else:
570        if is_device_wearable(ad):
571            ad.log.info("Disable wearable mobile data.")
572            ad.adb.shell("settings put global cell_on 0")
573        else:
574            ad.log.info("Disable mobile data via RPC call.")
575            ad.droid.telephonyToggleDataConnection(False)
576    time.sleep(5)
577    ret_val = is_mobile_data_on(ad)
578    if state and ret_val:
579        ad.log.info("Mobile data is enabled and set to %s" % ret_val)
580    elif not state and not ret_val:
581        ad.log.info("Mobile data is disabled and set to %s" % ret_val)
582    else:
583        ad.log.error("Mobile data is at unknown state and set to %s" % ret_val)
584
585
586def gnss_trigger_modem_ssr_by_adb(ad, dwelltime=60):
587    """Trigger modem SSR crash by adb and verify if modem crash and recover
588    successfully.
589
590    Args:
591        ad: An AndroidDevice object.
592        dwelltime: Waiting time for modem reset. Default is 60 seconds.
593
594    Returns:
595        True if success.
596        False if failed.
597    """
598    begin_time = get_current_epoch_time()
599    ad.root_adb()
600    cmds = ("echo restart > /sys/kernel/debug/msm_subsys/modem",
601            r"echo 'at+cfun=1,1\r' > /dev/at_mdm0")
602    for cmd in cmds:
603        ad.log.info("Triggering modem SSR crash by %s" % cmd)
604        output = ad.adb.shell(cmd, ignore_status=True)
605        if "No such file or directory" in output:
606            continue
607        break
608    time.sleep(dwelltime)
609    ad.send_keycode("HOME")
610    logcat_results = ad.search_logcat("SSRObserver", begin_time)
611    if logcat_results:
612        for ssr in logcat_results:
613            if "mSubsystem='modem', mCrashReason" in ssr["log_message"]:
614                ad.log.debug(ssr["log_message"])
615                ad.log.info("Triggering modem SSR crash successfully.")
616                return True
617        raise signals.TestError("Failed to trigger modem SSR crash")
618    raise signals.TestError("No SSRObserver found in logcat")
619
620
621def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
622    """Trigger modem SSR crash by mds tool and verify if modem crash and recover
623    successfully.
624
625    Args:
626        ad: An AndroidDevice object.
627        dwelltime: Waiting time for modem reset. Default is 60 seconds.
628    """
629    mds_check = ad.adb.shell("pm path com.google.mdstest")
630    if not mds_check:
631        raise signals.TestError("MDS Tool is not properly installed.")
632    ad.root_adb()
633    cmd = ('am instrument -w -e request "4b 25 03 00" '
634           '"com.google.mdstest/com.google.mdstest.instrument'
635           '.ModemCommandInstrumentation"')
636    ad.log.info("Triggering modem SSR crash by MDS")
637    output = ad.adb.shell(cmd, ignore_status=True)
638    ad.log.debug(output)
639    time.sleep(dwelltime)
640    ad.send_keycode("HOME")
641    if "SUCCESS" in output:
642        ad.log.info("Triggering modem SSR crash by MDS successfully.")
643    else:
644        raise signals.TestError(
645            "Failed to trigger modem SSR crash by MDS. \n%s" % output)
646
647
648def check_xtra_download(ad, begin_time):
649    """Verify XTRA download success log message in logcat.
650
651    Args:
652        ad: An AndroidDevice object.
653        begin_time: test begin time
654
655    Returns:
656        True: xtra_download if XTRA downloaded and injected successfully
657        otherwise return False.
658    """
659    ad.send_keycode("HOME")
660    if check_chipset_vendor_by_qualcomm(ad):
661        xtra_results = ad.search_logcat("XTRA download success. "
662                                        "inject data into modem", begin_time)
663        if xtra_results:
664            ad.log.debug("%s" % xtra_results[-1]["log_message"])
665            ad.log.info("XTRA downloaded and injected successfully.")
666            return True
667        ad.log.error("XTRA downloaded FAIL.")
668    else:
669        if is_device_wearable(ad):
670            lto_results = ad.search_logcat("GnssLocationProvider: "
671                                           "calling native_inject_psds_data", begin_time)
672        else:
673            lto_results = ad.search_logcat("GnssPsdsAidl: injectPsdsData: "
674                                           "psdsType: 1", begin_time)
675        if lto_results:
676            ad.log.debug("%s" % lto_results[-1]["log_message"])
677            ad.log.info("LTO downloaded and injected successfully.")
678            return True
679        ad.log.error("LTO downloaded and inject FAIL.")
680    return False
681
682
683def pull_package_apk(ad, package_name):
684    """Pull apk of given package_name from device.
685
686    Args:
687        ad: An AndroidDevice object.
688        package_name: Package name of apk to pull.
689
690    Returns:
691        The temp path of pulled apk.
692    """
693    out = ad.adb.shell("pm path %s" % package_name)
694    result = re.search(r"package:(.*)", out)
695    if not result:
696        raise signals.TestError("Couldn't find apk of %s" % package_name)
697    else:
698        apk_source = result.group(1)
699        ad.log.info("Get apk of %s from %s" % (package_name, apk_source))
700        apk_path = tempfile.mkdtemp()
701        ad.pull_files([apk_source], apk_path)
702    return apk_path
703
704
705def pull_gnss_cfg_file(ad, file):
706    """Pull given gnss cfg file from device.
707
708    Args:
709        ad: An AndroidDevice object.
710        file: CFG file in device to pull.
711
712    Returns:
713        The temp path of pulled gnss cfg file in host.
714    """
715    ad.root_adb()
716    host_dest = tempfile.mkdtemp()
717    ad.pull_files(file, host_dest)
718    for path_key in os.listdir(host_dest):
719        if fnmatch.fnmatch(path_key, "*.cfg"):
720            gnss_cfg_file = os.path.join(host_dest, path_key)
721            break
722    else:
723        raise signals.TestError("No cfg file is found in %s" % host_dest)
724    return gnss_cfg_file
725
726
727def reinstall_package_apk(ad, package_name, apk_path):
728    """Reinstall apk of given package_name.
729
730    Args:
731        ad: An AndroidDevice object.
732        package_name: Package name of apk.
733        apk_path: The temp path of pulled apk.
734    """
735    for path_key in os.listdir(apk_path):
736        if fnmatch.fnmatch(path_key, "*.apk"):
737            apk_path = os.path.join(apk_path, path_key)
738            break
739    else:
740        raise signals.TestError("No apk is found in %s" % apk_path)
741    ad.log.info("Re-install %s with path: %s" % (package_name, apk_path))
742    ad.adb.shell("settings put global verifier_verify_adb_installs 0")
743    ad.adb.install("-r -d -g --user 0 %s" % apk_path)
744    package_check = ad.adb.shell("pm path %s" % package_name)
745    if not package_check:
746        tutils.abort_all_tests(
747            ad.log, "%s is not properly re-installed." % package_name)
748    ad.log.info("%s is re-installed successfully." % package_name)
749
750
751def init_gtw_gpstool(ad):
752    """Init GTW_GPSTool apk.
753
754    Args:
755        ad: An AndroidDevice object.
756    """
757    remount_device(ad)
758    gpstool_path = pull_package_apk(ad, "com.android.gpstool")
759    reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
760    shutil.rmtree(gpstool_path, ignore_errors=True)
761
762
763def fastboot_factory_reset(ad, state=True):
764    """Factory reset the device in fastboot mode.
765       Pull sl4a apk from device. Terminate all sl4a sessions,
766       Reboot the device to bootloader,
767       factory reset the device by fastboot.
768       Reboot the device. wait for device to complete booting
769       Re-install and start an sl4a session.
770
771    Args:
772        ad: An AndroidDevice object.
773        State: True for exit_setup_wizard, False for not exit_setup_wizard.
774
775    Returns:
776        True if factory reset process complete.
777    """
778    status = True
779    mds_path = ""
780    gnss_cfg_file = ""
781    gnss_cfg_path = "/vendor/etc/mdlog"
782    default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
783    sl4a_path = pull_package_apk(ad, SL4A_APK_NAME)
784    gpstool_path = pull_package_apk(ad, "com.android.gpstool")
785    if check_chipset_vendor_by_qualcomm(ad):
786        mds_path = pull_package_apk(ad, "com.google.mdstest")
787        gnss_cfg_file = pull_gnss_cfg_file(ad, default_gnss_cfg)
788    stop_pixel_logger(ad)
789    ad.stop_services()
790    for i in range(1, 4):
791        try:
792            if ad.serial in list_adb_devices():
793                ad.log.info("Reboot to bootloader")
794                ad.adb.reboot("bootloader", ignore_status=True)
795                time.sleep(10)
796            if ad.serial in list_fastboot_devices():
797                ad.log.info("Factory reset in fastboot")
798                ad.fastboot._w(timeout=300, ignore_status=True)
799                time.sleep(30)
800                ad.log.info("Reboot in fastboot")
801                ad.fastboot.reboot()
802            ad.wait_for_boot_completion()
803            ad.root_adb()
804            if ad.skip_sl4a:
805                break
806            if ad.is_sl4a_installed():
807                break
808            if is_device_wearable(ad):
809                ad.log.info("Wait 5 mins for wearable projects system busy time.")
810                time.sleep(300)
811            reinstall_package_apk(ad, SL4A_APK_NAME, sl4a_path)
812            reinstall_package_apk(ad, "com.android.gpstool", gpstool_path)
813            if check_chipset_vendor_by_qualcomm(ad):
814                reinstall_package_apk(ad, "com.google.mdstest", mds_path)
815                ad.push_system_file(gnss_cfg_file, gnss_cfg_path)
816            time.sleep(10)
817            break
818        except Exception as e:
819            ad.log.error(e)
820            if i == attempts:
821                tutils.abort_all_tests(ad.log, str(e))
822            time.sleep(5)
823    try:
824        ad.start_adb_logcat()
825    except Exception as e:
826        ad.log.error(e)
827    if state:
828        ad.exit_setup_wizard()
829    if ad.skip_sl4a:
830        return status
831    tutils.bring_up_sl4a(ad)
832    for path in [sl4a_path, gpstool_path, mds_path, gnss_cfg_file]:
833        shutil.rmtree(path, ignore_errors=True)
834    return status
835
836
837def clear_aiding_data_by_gtw_gpstool(ad):
838    """Launch GTW GPSTool and Clear all GNSS aiding data.
839       Wait 5 seconds for GTW GPStool to clear all GNSS aiding
840       data properly.
841
842    Args:
843        ad: An AndroidDevice object.
844    """
845    if not check_chipset_vendor_by_qualcomm(ad):
846        delete_lto_file(ad)
847    ad.log.info("Launch GTW GPSTool and Clear all GNSS aiding data")
848    ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode clear")
849    time.sleep(10)
850
851
852def start_gnss_by_gtw_gpstool(ad,
853                              state,
854                              api_type="gnss",
855                              bgdisplay=False,
856                              freq=0,
857                              lowpower=False,
858                              meas=False):
859    """Start or stop GNSS on GTW_GPSTool.
860
861    Args:
862        ad: An AndroidDevice object.
863        state: True to start GNSS. False to Stop GNSS.
864        api_type: Different API for location fix. Use gnss/flp/nmea
865        bgdisplay: true to run GTW when Display off. false to not run GTW when
866          Display off.
867        freq: An integer to set location update frequency.
868        meas: A Boolean to set GNSS measurement registration.
869        lowpower: A boolean to set GNSS LowPowerMode.
870    """
871    cmd = "am start -S -n com.android.gpstool/.GPSTool --es mode gps"
872    if not state:
873        ad.log.info("Stop %s on GTW_GPSTool." % api_type)
874        cmd = "am broadcast -a com.android.gpstool.stop_gps_action"
875    else:
876        options = ("--es type {} --ei freq {} --ez BG {} --ez meas {} --ez "
877                   "lowpower {}").format(api_type, freq, bgdisplay, meas, lowpower)
878        cmd = cmd + " " + options
879    ad.adb.shell(cmd, ignore_status=True, timeout = 300)
880    time.sleep(3)
881
882
883def process_gnss_by_gtw_gpstool(ad,
884                                criteria,
885                                api_type="gnss",
886                                clear_data=True,
887                                meas_flag=False,
888                                freq=0,
889                                bg_display=False):
890    """Launch GTW GPSTool and Clear all GNSS aiding data
891       Start GNSS tracking on GTW_GPSTool.
892
893    Args:
894        ad: An AndroidDevice object.
895        criteria: Criteria for current test item.
896        api_type: Different API for location fix. Use gnss/flp/nmea
897        clear_data: True to clear GNSS aiding data. False is not to. Default
898        set to True.
899        meas_flag: True to enable GnssMeasurement. False is not to. Default
900        set to False.
901        freq: An integer to set location update frequency. Default set to 0.
902        bg_display: To enable GPS tool bg display or not
903
904    Returns:
905        First fix datetime obj
906
907    Raises:
908        signals.TestFailure: when first fixed is over criteria or not even get first fixed
909    """
910    retries = 3
911    for i in range(retries):
912        if not ad.is_adb_logcat_on:
913            ad.start_adb_logcat()
914        check_adblog_functionality(ad)
915        check_location_runtime_permissions(
916            ad, GNSSTOOL_PACKAGE_NAME, GNSSTOOL_PERMISSIONS)
917        begin_time = get_current_epoch_time()
918        if clear_data:
919            clear_aiding_data_by_gtw_gpstool(ad)
920        ad.log.info("Start %s on GTW_GPSTool - attempt %d" % (api_type.upper(),
921                                                              i+1))
922        start_gnss_by_gtw_gpstool(ad, state=True, api_type=api_type, meas=meas_flag, freq=freq,
923                                  bgdisplay=bg_display)
924        for _ in range(10 + criteria):
925            logcat_results = ad.search_logcat("First fixed", begin_time)
926            if logcat_results:
927                ad.log.debug(logcat_results[-1]["log_message"])
928                first_fixed = int(logcat_results[-1]["log_message"].split()[-1])
929                ad.log.info("%s First fixed = %.3f seconds" %
930                            (api_type.upper(), first_fixed/1000))
931                if (first_fixed/1000) <= criteria:
932                    return logcat_results[-1]["datetime_obj"]
933                start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
934                raise signals.TestFailure("Fail to get %s location fixed "
935                                          "within %d seconds criteria."
936                                          % (api_type.upper(), criteria))
937            time.sleep(1)
938        check_current_focus_app(ad)
939        start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
940    raise signals.TestFailure("Fail to get %s location fixed within %d "
941                              "attempts." % (api_type.upper(), retries))
942
943
944def start_ttff_by_gtw_gpstool(ad,
945                              ttff_mode,
946                              iteration,
947                              aid_data=False,
948                              raninterval=False,
949                              mininterval=10,
950                              maxinterval=40,
951                              hot_warm_sleep=300,
952                              timeout=60):
953    """Identify which TTFF mode for different test items.
954
955    Args:
956        ad: An AndroidDevice object.
957        ttff_mode: TTFF Test mode for current test item.
958        iteration: Iteration of TTFF cycles.
959        aid_data: Boolean for identify aid_data existed or not
960        raninterval: Boolean for identify random interval of TTFF in enable or not.
961        mininterval: Minimum value of random interval pool. The unit is second.
962        maxinterval: Maximum value of random interval pool. The unit is second.
963        hot_warm_sleep: Wait time for acquiring Almanac.
964        timeout: TTFF time out. The unit is second.
965    Returns:
966        latest_start_time: (Datetime) the start time of latest successful TTFF
967    """
968    begin_time = get_current_epoch_time()
969    ad.log.debug("[start_ttff] Search logcat start time: %s" % begin_time)
970    if (ttff_mode == "hs" or ttff_mode == "ws") and not aid_data:
971        ad.log.info("Wait {} seconds to start TTFF {}...".format(
972            hot_warm_sleep, ttff_mode.upper()))
973        time.sleep(hot_warm_sleep)
974    if ttff_mode == "cs":
975        ad.log.info("Start TTFF Cold Start...")
976        time.sleep(3)
977    elif ttff_mode == "csa":
978        ad.log.info("Start TTFF CSWith Assist...")
979        time.sleep(3)
980    for i in range(1, 4):
981        try:
982            ad.log.info(f"Before sending TTFF gms version is {get_gms_version(ad)}")
983            ad.adb.shell("am broadcast -a com.android.gpstool.ttff_action "
984                         "--es ttff {} --es cycle {}  --ez raninterval {} "
985                         "--ei mininterval {} --ei maxinterval {}".format(
986                         ttff_mode, iteration, raninterval, mininterval,
987                         maxinterval))
988        except job.TimeoutError:
989            # If this is the last retry and we still get timeout error, raises the timeoutError.
990            if i == 3:
991                raise
992            # Currently we encounter lots of timeout issue in Qualcomm devices. But so far we don't
993            # know the root cause yet. In order to continue the test, we ignore the timeout for
994            # retry.
995            ad.log.warn("Send TTFF command timeout.")
996            ad.log.info(f"Current gms version is {get_gms_version(ad)}")
997            # Wait 2 second to retry
998            time.sleep(2)
999            continue
1000        time.sleep(1)
1001        result = ad.search_logcat("act=com.android.gpstool.start_test_action", begin_time)
1002        if result:
1003            ad.log.debug("TTFF start log %s" % result)
1004            latest_start_time = max(list(map(lambda x: x['datetime_obj'], result)))
1005            ad.log.info("Send TTFF start_test_action successfully.")
1006            return latest_start_time
1007    else:
1008        check_current_focus_app(ad)
1009        raise signals.TestError("Fail to send TTFF start_test_action.")
1010
1011
1012def gnss_tracking_via_gtw_gpstool(ad,
1013                                  criteria,
1014                                  api_type="gnss",
1015                                  testtime=60,
1016                                  meas_flag=False,
1017                                  freq=0,
1018                                  is_screen_off=False):
1019    """Start GNSS/FLP tracking tests for input testtime on GTW_GPSTool.
1020
1021    Args:
1022        ad: An AndroidDevice object.
1023        criteria: Criteria for current TTFF.
1024        api_type: Different API for location fix. Use gnss/flp/nmea
1025        testtime: Tracking test time for minutes. Default set to 60 minutes.
1026        meas_flag: True to enable GnssMeasurement. False is not to. Default
1027        set to False.
1028        freq: An integer to set location update frequency. Default set to 0.
1029        is_screen_off: whether to turn off during tracking
1030    """
1031    process_gnss_by_gtw_gpstool(
1032        ad, criteria=criteria, api_type=api_type, meas_flag=meas_flag, freq=freq,
1033        bg_display=is_screen_off)
1034    ad.log.info("Start %s tracking test for %d minutes" % (api_type.upper(),
1035                                                           testtime))
1036    begin_time = get_current_epoch_time()
1037    with set_screen_status(ad, off=is_screen_off):
1038        wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type)
1039        ad.log.info("Successfully tested for %d minutes" % testtime)
1040    start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
1041
1042
1043def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss",
1044                                  ignore_hal_crash=False):
1045    """Waits for GNSS tracking to finish and detect GNSS crash during the waiting time.
1046
1047    Args:
1048        ad: An AndroidDevice object.
1049        begin_time: The start time of tracking.
1050        api_type: Different API for location fix. Use gnss/flp/nmea
1051        testtime: Tracking test time for minutes.
1052        ignore_hal_crash: To ignore HAL crash error no not.
1053    """
1054    while get_current_epoch_time() - begin_time < testtime * 60 * 1000:
1055        detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash)
1056        # add sleep here to avoid too many request and cause device not responding
1057        time.sleep(1)
1058
1059def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location):
1060    """Run GNSS TTFF test with selected mode and parse the results.
1061
1062    Args:
1063        mode: "cs", "ws" or "hs"
1064        criteria: Criteria for the TTFF.
1065
1066    Returns:
1067        ttff_data: A dict of all TTFF data.
1068    """
1069    # Before running TTFF, we will run tracking and try to get first fixed.
1070    # But the TTFF before TTFF doesn't apply to any criteria, so we set a maximum value.
1071    process_gnss_by_gtw_gpstool(ad, criteria=FIRST_FIXED_MAX_WAITING_TIME)
1072    ttff_start_time = start_ttff_by_gtw_gpstool(ad, mode, test_cycle)
1073    ttff_data = process_ttff_by_gtw_gpstool(ad, ttff_start_time, true_location)
1074    result = check_ttff_data(ad, ttff_data, gnss_constant.TTFF_MODE.get(mode), criteria)
1075    asserts.assert_true(
1076        result, "TTFF %s fails to reach designated criteria: %d "
1077                "seconds." % (gnss_constant.TTFF_MODE.get(mode), criteria))
1078    return ttff_data
1079
1080def parse_gtw_gpstool_log(ad, true_position, api_type="gnss", validate_gnssstatus=False):
1081    """Process GNSS/FLP API logs from GTW GPSTool and output track_data to
1082    test_run_info for ACTS plugin to parse and display on MobileHarness as
1083    Property.
1084
1085    Args:
1086        ad: An AndroidDevice object.
1087        true_position: Coordinate as [latitude, longitude] to calculate
1088        position error.
1089        api_type: Different API for location fix. Use gnss/flp/nmea
1090        validate_gnssstatus: Validate gnssstatus or not
1091
1092    Returns:
1093        A dict of location reported from GPSTool
1094            {<utc_time>: TRACK_REPORT, ...}
1095    """
1096    gnssstatus_count = 0
1097    test_logfile = {}
1098    track_data = {}
1099    ant_top4_cn = 0
1100    ant_cn = 0
1101    base_top4_cn = 0
1102    base_cn = 0
1103    track_lat = 0
1104    track_long = 0
1105    l5flag = "false"
1106    gps_datetime_pattern = re.compile("(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}\.\d{0,5})")
1107    gps_datetime_format = "%Y/%m/%d %H:%M:%S.%f"
1108    file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
1109                                  % GNSSSTATUS_LOG_PATH))
1110    if file_count != 1:
1111        ad.log.warn("%d API logs exist." % file_count)
1112    dir_file = ad.adb.shell("ls -tr %s" % GNSSSTATUS_LOG_PATH).split()
1113    for path_key in dir_file:
1114        if fnmatch.fnmatch(path_key, "*.txt"):
1115            logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
1116            out = ad.adb.shell("wc -c %s" % logpath)
1117            file_size = int(out.split(" ")[0])
1118            if file_size < 2000:
1119                ad.log.info("Skip log %s due to log size %d bytes" %
1120                            (path_key, file_size))
1121                continue
1122            test_logfile = logpath
1123    if not test_logfile:
1124        raise signals.TestError("Failed to get test log file in device.")
1125    lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
1126    gnss_svid_container = gnssstatus_utils.GnssSvidContainer()
1127    for line in lines:
1128        if line.startswith('Fix'):
1129            try:
1130                gnss_status = gnssstatus_utils.GnssStatus(line)
1131                gnssstatus_count += 1
1132            except gnssstatus_utils.RegexParseException as e:
1133                ad.log.warn(e)
1134                continue
1135
1136            gnss_svid_container.add_satellite(gnss_status)
1137            if validate_gnssstatus:
1138                gnss_status.validate_gnssstatus()
1139
1140        if "Antenna_History Avg Top4" in line:
1141            ant_top4_cn = float(line.split(":")[-1].strip())
1142        elif "Antenna_History Avg" in line:
1143            ant_cn = float(line.split(":")[-1].strip())
1144        elif "Baseband_History Avg Top4" in line:
1145            base_top4_cn = float(line.split(":")[-1].strip())
1146        elif "Baseband_History Avg" in line:
1147            base_cn = float(line.split(":")[-1].strip())
1148        elif "L5 used in fix" in line:
1149            l5flag = line.split(":")[-1].strip()
1150        elif "Latitude" in line:
1151            track_lat = float(line.split(":")[-1].strip())
1152        elif "Longitude" in line:
1153            track_long = float(line.split(":")[-1].strip())
1154        elif "Read:" in line:
1155            target = re.search(gps_datetime_pattern, line)
1156            device_time = datetime.strptime(target.group(1), gps_datetime_format)
1157        elif "Time" in line:
1158            target = re.search(gps_datetime_pattern, line)
1159            track_utc = target.group(1)
1160            report_time = datetime.strptime(track_utc, gps_datetime_format)
1161            if track_utc in track_data.keys():
1162                continue
1163            pe = calculate_position_error(track_lat, track_long, true_position)
1164            track_data[track_utc] = TRACK_REPORT(l5flag=l5flag,
1165                                                 pe=pe,
1166                                                 ant_top4cn=ant_top4_cn,
1167                                                 ant_cn=ant_cn,
1168                                                 base_top4cn=base_top4_cn,
1169                                                 base_cn=base_cn,
1170                                                 device_time=device_time,
1171                                                 report_time=report_time,
1172                                                 )
1173    ad.log.info("Total %d gnssstatus samples verified" %gnssstatus_count)
1174    ad.log.debug(track_data)
1175    prop_basename = UPLOAD_TO_SPONGE_PREFIX + f"{api_type.upper()}_tracking_"
1176    time_list = sorted(track_data.keys())
1177    l5flag_list = [track_data[key].l5flag for key in time_list]
1178    pe_list = [float(track_data[key].pe) for key in time_list]
1179    ant_top4cn_list = [float(track_data[key].ant_top4cn) for key in time_list]
1180    ant_cn_list = [float(track_data[key].ant_cn) for key in time_list]
1181    base_top4cn_list = [float(track_data[key].base_top4cn) for key in time_list]
1182    base_cn_list = [float(track_data[key].base_cn) for key in time_list]
1183    ad.log.info(prop_basename+"StartTime %s" % time_list[0].replace(" ", "-"))
1184    ad.log.info(prop_basename+"EndTime %s" % time_list[-1].replace(" ", "-"))
1185    ad.log.info(prop_basename+"TotalFixPoints %d" % len(time_list))
1186    ad.log.info(prop_basename+"L5FixRate "+'{percent:.2%}'.format(
1187        percent=l5flag_list.count("true")/len(l5flag_list)))
1188    ad.log.info(prop_basename+"AvgDis %.1f" % (sum(pe_list)/len(pe_list)))
1189    ad.log.info(prop_basename+"MaxDis %.1f" % max(pe_list))
1190    ad.log.info(prop_basename+"Ant_AvgTop4Signal %.1f" % ant_top4cn_list[-1])
1191    ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_cn_list[-1])
1192    ad.log.info(prop_basename+"Base_AvgTop4Signal %.1f" % base_top4cn_list[-1])
1193    ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_cn_list[-1])
1194    _log_svid_info(gnss_svid_container, prop_basename, ad)
1195    return track_data
1196
1197
1198def verify_gps_time_should_be_close_to_device_time(ad, tracking_result):
1199    """Check the time gap between GPS time and device time.
1200
1201    In normal cases, the GPS time should be close to device time. But if GPS week rollover happens,
1202    the GPS time may goes back to 20 years ago. In order to capture this issue, we assert the time
1203    diff between the GPS time and device time.
1204
1205    Args:
1206        ad: The device under test.
1207        tracking_result: The result we get from GNSS tracking.
1208    """
1209    ad.log.info("Validating GPS/Device time difference")
1210    max_time_diff_in_seconds = 2.0
1211    exceed_report = []
1212    for report in tracking_result.values():
1213        time_diff_in_seconds = abs((report.report_time - report.device_time).total_seconds())
1214        if time_diff_in_seconds > max_time_diff_in_seconds:
1215            message = (f"GPS time: {report.report_time}  Device time: {report.device_time} "
1216                       f"diff: {time_diff_in_seconds}")
1217            exceed_report.append(message)
1218    fail_message = (f"The following items exceed {max_time_diff_in_seconds}s\n" +
1219                     "\n".join(exceed_report))
1220    asserts.assert_false(exceed_report, msg=fail_message)
1221
1222
1223def validate_location_fix_rate(ad, location_reported, run_time, fix_rate_criteria):
1224    """Check location reported count
1225
1226    The formula is "total_fix_points / (run_time * 60)"
1227    When the result is lower than fix_rate_criteria, fail the test case
1228
1229    Args:
1230        ad: AndroidDevice object
1231        location_reported: (Enumerate) Contains the reported location
1232        run_time: (int) How many minutes do we need to verify
1233        fix_rate_criteria: The threshold of the pass criteria
1234            if we expect fix rate to be 99%, then fix_rate_criteria should be 0.99
1235    """
1236    ad.log.info("Validating fix rate")
1237    pass_criteria = run_time * 60 * fix_rate_criteria
1238    actual_location_count = len(location_reported)
1239
1240    # The fix rate may exceed 100% occasionally, to standardlize the result
1241    # set maximum fix rate to 100%
1242    actual_fix_rate = min(1, (actual_location_count / (run_time * 60)))
1243    actual_fix_rate_percentage = f"{actual_fix_rate:.0%}"
1244
1245    log_prefix = UPLOAD_TO_SPONGE_PREFIX + f"FIX_RATE_"
1246    ad.log.info("%sresult %s" % (log_prefix, actual_fix_rate_percentage))
1247    ad.log.debug("Actual location count %s" % actual_location_count)
1248
1249    fail_message = (f"Fail to meet criteria. Expect to have at least {pass_criteria} location count"
1250                    f" Actual: {actual_location_count}")
1251    asserts.assert_true(pass_criteria <= actual_location_count, msg=fail_message)
1252
1253
1254def _log_svid_info(container, log_prefix, ad):
1255    """Write GnssSvidContainer svid information into logger
1256    Args:
1257        container: A GnssSvidContainer object
1258        log_prefix:
1259            A prefix used to specify the log will be upload to dashboard
1260        ad: An AndroidDevice object
1261    """
1262    for sv_type, svids in container.used_in_fix.items():
1263        message = f"{log_prefix}{sv_type} {len(svids)}"
1264        ad.log.info(message)
1265        ad.log.debug("Satellite used in fix %s ids are: %s", sv_type, svids)
1266
1267    for sv_type, svids in container.not_used_in_fix.items():
1268        ad.log.debug("Satellite not used in fix %s ids are: %s", sv_type, svids)
1269
1270
1271def process_ttff_by_gtw_gpstool(ad, begin_time, true_position, api_type="gnss"):
1272    """Process TTFF and record results in ttff_data.
1273
1274    Args:
1275        ad: An AndroidDevice object.
1276        begin_time: test begin time.
1277        true_position: Coordinate as [latitude, longitude] to calculate
1278        position error.
1279        api_type: Different API for location fix. Use gnss/flp/nmea
1280
1281    Returns:
1282        ttff_data: A dict of all TTFF data.
1283    """
1284    ttff_lat = 0
1285    ttff_lon = 0
1286    utc_time = epoch_to_human_time(get_current_epoch_time())
1287    ttff_data = {}
1288    ttff_loop_time = get_current_epoch_time()
1289    while True:
1290        if get_current_epoch_time() - ttff_loop_time >= 120000:
1291            raise signals.TestError("Fail to search specific GPSService "
1292                                    "message in logcat. Abort test.")
1293        if not ad.is_adb_logcat_on:
1294            ad.start_adb_logcat()
1295        logcat_results = ad.search_logcat("write TTFF log", ttff_loop_time)
1296        if logcat_results:
1297            ttff_loop_time = get_current_epoch_time()
1298            ttff_log = logcat_results[-1]["log_message"].split()
1299            ttff_loop = int(ttff_log[8].split(":")[-1])
1300            ttff_sec = float(ttff_log[11])
1301            if ttff_sec != 0.0:
1302                ttff_ant_cn = float(ttff_log[18].strip("]"))
1303                ttff_base_cn = float(ttff_log[25].strip("]"))
1304                if api_type == "gnss":
1305                    gnss_results = ad.search_logcat("GPSService: Check item",
1306                                                    begin_time)
1307                    if gnss_results:
1308                        ad.log.debug(gnss_results[-1]["log_message"])
1309                        gnss_location_log = \
1310                            gnss_results[-1]["log_message"].split()
1311                        ttff_lat = float(
1312                            gnss_location_log[8].split("=")[-1].strip(","))
1313                        ttff_lon = float(
1314                            gnss_location_log[9].split("=")[-1].strip(","))
1315                        loc_time = int(
1316                            gnss_location_log[10].split("=")[-1].strip(","))
1317                        utc_time = epoch_to_human_time(loc_time)
1318                        ttff_haccu = float(
1319                            gnss_location_log[11].split("=")[-1].strip(","))
1320                elif api_type == "flp":
1321                    flp_results = ad.search_logcat("GPSService: FLP Location",
1322                                                   begin_time)
1323                    if flp_results:
1324                        ad.log.debug(flp_results[-1]["log_message"])
1325                        flp_location_log = flp_results[-1][
1326                            "log_message"].split()
1327                        ttff_lat = float(flp_location_log[8].split(",")[0])
1328                        ttff_lon = float(flp_location_log[8].split(",")[1])
1329                        ttff_haccu = float(flp_location_log[9].split("=")[1])
1330                        utc_time = epoch_to_human_time(get_current_epoch_time())
1331            else:
1332                ttff_ant_cn = float(ttff_log[19].strip("]"))
1333                ttff_base_cn = float(ttff_log[26].strip("]"))
1334                ttff_lat = 0
1335                ttff_lon = 0
1336                ttff_haccu = 0
1337                utc_time = epoch_to_human_time(get_current_epoch_time())
1338            ad.log.debug("TTFF Loop %d - (Lat, Lon) = (%s, %s)" % (ttff_loop,
1339                                                                   ttff_lat,
1340                                                                   ttff_lon))
1341            ttff_pe = calculate_position_error(
1342                ttff_lat, ttff_lon, true_position)
1343            ttff_data[ttff_loop] = TTFF_REPORT(utc_time=utc_time,
1344                                               ttff_loop=ttff_loop,
1345                                               ttff_sec=ttff_sec,
1346                                               ttff_pe=ttff_pe,
1347                                               ttff_ant_cn=ttff_ant_cn,
1348                                               ttff_base_cn=ttff_base_cn,
1349                                               ttff_haccu=ttff_haccu)
1350            ad.log.info("UTC Time = %s, Loop %d = %.1f seconds, "
1351                        "Position Error = %.1f meters, "
1352                        "Antenna Average Signal = %.1f dbHz, "
1353                        "Baseband Average Signal = %.1f dbHz, "
1354                        "Horizontal Accuracy = %.1f meters" % (utc_time,
1355                                                                 ttff_loop,
1356                                                                 ttff_sec,
1357                                                                 ttff_pe,
1358                                                                 ttff_ant_cn,
1359                                                                 ttff_base_cn,
1360                                                                 ttff_haccu))
1361        stop_gps_results = ad.search_logcat("stop gps test", begin_time)
1362        if stop_gps_results:
1363            ad.send_keycode("HOME")
1364            break
1365        crash_result = ad.search_logcat("Force finishing activity "
1366                                        "com.android.gpstool/.GPSTool",
1367                                        begin_time)
1368        if crash_result:
1369            raise signals.TestError("GPSTool crashed. Abort test.")
1370        # wait 5 seconds to avoid logs not writing into logcat yet
1371        time.sleep(5)
1372    return ttff_data
1373
1374
1375def check_ttff_data(ad, ttff_data, ttff_mode, criteria):
1376    """Verify all TTFF results from ttff_data.
1377
1378    Args:
1379        ad: An AndroidDevice object.
1380        ttff_data: TTFF data of secs, position error and signal strength.
1381        ttff_mode: TTFF Test mode for current test item.
1382        criteria: Criteria for current test item.
1383
1384    Returns:
1385        True: All TTFF results are within criteria.
1386        False: One or more TTFF results exceed criteria or Timeout.
1387    """
1388    ad.log.info("%d iterations of TTFF %s tests finished."
1389                % (len(ttff_data.keys()), ttff_mode))
1390    ad.log.info("%s PASS criteria is %d seconds" % (ttff_mode, criteria))
1391    ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
1392    if len(ttff_data.keys()) == 0:
1393        ad.log.error("GTW_GPSTool didn't process TTFF properly.")
1394        raise ValueError("No ttff loop is done")
1395
1396    ttff_property_key_and_value(ad, ttff_data, ttff_mode)
1397
1398    if any(float(ttff_data[key].ttff_sec) == 0.0 for key in ttff_data.keys()):
1399        ad.log.error("One or more TTFF %s Timeout" % ttff_mode)
1400        return False
1401    elif any(float(ttff_data[key].ttff_sec) >= criteria for key in
1402             ttff_data.keys()):
1403        ad.log.error("One or more TTFF %s are over test criteria %d seconds"
1404                     % (ttff_mode, criteria))
1405        return False
1406    ad.log.info("All TTFF %s are within test criteria %d seconds."
1407                % (ttff_mode, criteria))
1408    return True
1409
1410
1411def ttff_property_key_and_value(ad, ttff_data, ttff_mode):
1412    """Output ttff_data to test_run_info for ACTS plugin to parse and display
1413    on MobileHarness as Property.
1414
1415    Args:
1416        ad: An AndroidDevice object.
1417        ttff_data: TTFF data of secs, position error and signal strength.
1418        ttff_mode: TTFF Test mode for current test item.
1419    """
1420    timeout_ttff = 61
1421    prop_basename = "TestResult "+ttff_mode.replace(" ", "_")+"_TTFF_"
1422    sec_list = [float(ttff_data[key].ttff_sec) for key in ttff_data.keys()]
1423    pe_list = [float(ttff_data[key].ttff_pe) for key in ttff_data.keys()]
1424    ant_cn_list = [float(ttff_data[key].ttff_ant_cn) for key in
1425                   ttff_data.keys()]
1426    base_cn_list = [float(ttff_data[key].ttff_base_cn) for key in
1427                    ttff_data.keys()]
1428    haccu_list = [float(ttff_data[key].ttff_haccu) for key in
1429                    ttff_data.keys()]
1430    timeoutcount = sec_list.count(0.0)
1431    sec_list = sorted(sec_list)
1432    if len(sec_list) == timeoutcount:
1433        median_ttff = avgttff = timeout_ttff
1434    else:
1435        avgttff = sum(sec_list)/(len(sec_list) - timeoutcount)
1436        median_ttff = median(sec_list)
1437    if timeoutcount != 0:
1438        maxttff = timeout_ttff
1439    else:
1440        maxttff = max(sec_list)
1441    avgdis = sum(pe_list)/len(pe_list)
1442    maxdis = max(pe_list)
1443    ant_avgcn = sum(ant_cn_list)/len(ant_cn_list)
1444    base_avgcn = sum(base_cn_list)/len(base_cn_list)
1445    avg_haccu = sum(haccu_list)/len(haccu_list)
1446    ad.log.info(prop_basename+"AvgTime %.1f" % avgttff)
1447    ad.log.info(prop_basename+"MedianTime %.1f" % median_ttff)
1448    ad.log.info(prop_basename+"MaxTime %.1f" % maxttff)
1449    ad.log.info(prop_basename+"TimeoutCount %d" % timeoutcount)
1450    ad.log.info(prop_basename+"AvgDis %.1f" % avgdis)
1451    ad.log.info(prop_basename+"MaxDis %.1f" % maxdis)
1452    ad.log.info(prop_basename+"Ant_AvgSignal %.1f" % ant_avgcn)
1453    ad.log.info(prop_basename+"Base_AvgSignal %.1f" % base_avgcn)
1454    ad.log.info(prop_basename+"Avg_Horizontal_Accuracy %.1f" % avg_haccu)
1455
1456
1457def calculate_position_error(latitude, longitude, true_position):
1458    """Use haversine formula to calculate position error base on true location
1459    coordinate.
1460
1461    Args:
1462        latitude: latitude of location fixed in the present.
1463        longitude: longitude of location fixed in the present.
1464        true_position: [latitude, longitude] of true location coordinate.
1465
1466    Returns:
1467        position_error of location fixed in the present.
1468    """
1469    radius = 6371009
1470    dlat = math.radians(latitude - true_position[0])
1471    dlon = math.radians(longitude - true_position[1])
1472    a = math.sin(dlat/2) * math.sin(dlat/2) + \
1473        math.cos(math.radians(true_position[0])) * \
1474        math.cos(math.radians(latitude)) * math.sin(dlon/2) * math.sin(dlon/2)
1475    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
1476    return radius * c
1477
1478
1479def launch_google_map(ad):
1480    """Launch Google Map via intent.
1481
1482    Args:
1483        ad: An AndroidDevice object.
1484    """
1485    ad.log.info("Launch Google Map.")
1486    try:
1487        if is_device_wearable(ad):
1488            cmd = ("am start -S -n com.google.android.apps.maps/"
1489                   "com.google.android.apps.gmmwearable.MainActivity")
1490        else:
1491            cmd = ("am start -S -n com.google.android.apps.maps/"
1492                   "com.google.android.maps.MapsActivity")
1493        ad.adb.shell(cmd)
1494        ad.send_keycode("BACK")
1495        ad.force_stop_apk("com.google.android.apps.maps")
1496        ad.adb.shell(cmd)
1497    except Exception as e:
1498        ad.log.error(e)
1499        raise signals.TestError("Failed to launch google map.")
1500    check_current_focus_app(ad)
1501
1502
1503def check_current_focus_app(ad):
1504    """Check to see current focused window and app.
1505
1506    Args:
1507        ad: An AndroidDevice object.
1508    Returns:
1509        string: the current focused window / app
1510    """
1511    time.sleep(1)
1512    current = ad.adb.shell(
1513        "dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'")
1514    ad.log.debug("\n"+current)
1515    return current
1516
1517
1518def check_location_api(ad, retries):
1519    """Verify if GnssLocationProvider API reports location.
1520
1521    Args:
1522        ad: An AndroidDevice object.
1523        retries: Retry time.
1524
1525    Returns:
1526        True: GnssLocationProvider API reports location.
1527        otherwise return False.
1528    """
1529    for i in range(retries):
1530        begin_time = get_current_epoch_time()
1531        ad.log.info("Try to get location report from GnssLocationProvider API "
1532                    "- attempt %d" % (i+1))
1533        while get_current_epoch_time() - begin_time <= 30000:
1534            logcat_results = ad.search_logcat("reportLocation", begin_time)
1535            if logcat_results:
1536                ad.log.info("%s" % logcat_results[-1]["log_message"])
1537                ad.log.info("GnssLocationProvider reports location "
1538                            "successfully.")
1539                return True
1540        if not ad.is_adb_logcat_on:
1541            ad.start_adb_logcat()
1542    ad.log.error("GnssLocationProvider is unable to report location.")
1543    return False
1544
1545
1546def check_network_location(ad, retries, location_type, criteria=30):
1547    """Verify if NLP reports location after requesting via GPSTool.
1548
1549    Args:
1550        ad: An AndroidDevice object.
1551        retries: Retry time.
1552        location_type: cell or wifi.
1553        criteria: expected nlp return time, default 30 seconds
1554
1555    Returns:
1556        True: NLP reports location.
1557        otherwise return False.
1558    """
1559    criteria = criteria * 1000
1560    search_pattern = ("GPSTool : networkLocationType = %s" % location_type)
1561    for i in range(retries):
1562        # Capture the begin time 1 seconds before due to time gap.
1563        begin_time = get_current_epoch_time() - 1000
1564        ad.log.info("Try to get NLP status - attempt %d" % (i+1))
1565        ad.adb.shell(
1566            "am start -S -n com.android.gpstool/.GPSTool --es mode nlp")
1567        while get_current_epoch_time() - begin_time <= criteria:
1568            # Search pattern in 1 second interval
1569            time.sleep(1)
1570            result = ad.search_logcat(search_pattern, begin_time)
1571            if result:
1572                ad.log.info("Pattern Found: %s." % result[-1]["log_message"])
1573                ad.send_keycode("BACK")
1574                return True
1575        if not ad.is_adb_logcat_on:
1576            ad.start_adb_logcat()
1577        ad.send_keycode("BACK")
1578    ad.log.error("Unable to report network location \"%s\"." % location_type)
1579    return False
1580
1581
1582def set_attenuator_gnss_signal(ad, attenuator, atten_value):
1583    """Set attenuation value for different GNSS signal.
1584
1585    Args:
1586        ad: An AndroidDevice object.
1587        attenuator: The attenuator object.
1588        atten_value: attenuation value
1589    """
1590    try:
1591        ad.log.info(
1592            "Set attenuation value to \"%d\" for GNSS signal." % atten_value)
1593        attenuator[0].set_atten(atten_value)
1594    except Exception as e:
1595        ad.log.error(e)
1596
1597
1598def set_battery_saver_mode(ad, state):
1599    """Enable or disable battery saver mode via adb.
1600
1601    Args:
1602        ad: An AndroidDevice object.
1603        state: True is enable Battery Saver mode. False is disable.
1604    """
1605    ad.root_adb()
1606    if state:
1607        ad.log.info("Enable Battery Saver mode.")
1608        ad.adb.shell("cmd battery unplug")
1609        ad.adb.shell("settings put global low_power 1")
1610    else:
1611        ad.log.info("Disable Battery Saver mode.")
1612        ad.adb.shell("settings put global low_power 0")
1613        ad.adb.shell("cmd battery reset")
1614
1615
1616def set_gnss_qxdm_mask(ad, masks):
1617    """Find defined gnss qxdm mask and set as default logging mask.
1618
1619    Args:
1620        ad: An AndroidDevice object.
1621        masks: Defined gnss qxdm mask.
1622    """
1623    try:
1624        for mask in masks:
1625            if not tlutils.find_qxdm_log_mask(ad, mask):
1626                continue
1627            tlutils.set_qxdm_logger_command(ad, mask)
1628            break
1629    except Exception as e:
1630        ad.log.error(e)
1631        raise signals.TestError("Failed to set any QXDM masks.")
1632
1633
1634def start_youtube_video(ad, url=None, retries=0):
1635    """Start youtube video and verify if audio is in music state.
1636
1637    Args:
1638        ad: An AndroidDevice object.
1639        url: Youtube video url.
1640        retries: Retry times if audio is not in music state.
1641
1642    Returns:
1643        True if youtube video is playing normally.
1644        False if youtube video is not playing properly.
1645    """
1646    for i in range(retries):
1647        ad.log.info("Open an youtube video - attempt %d" % (i+1))
1648        cmd = ("am start -n com.google.android.youtube/"
1649               "com.google.android.youtube.UrlActivity -d \"%s\"" % url)
1650        ad.adb.shell(cmd)
1651        time.sleep(2)
1652        out = ad.adb.shell(
1653            "dumpsys activity | grep NewVersionAvailableActivity")
1654        if out:
1655            ad.log.info("Skip Youtube New Version Update.")
1656            ad.send_keycode("BACK")
1657        if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1):
1658            ad.log.info("Started a video in youtube, audio is in MUSIC state")
1659            return True
1660        ad.log.info("Force-Stop youtube and reopen youtube again.")
1661        ad.force_stop_apk("com.google.android.youtube")
1662    check_current_focus_app(ad)
1663    raise signals.TestError("Started a video in youtube, "
1664                            "but audio is not in MUSIC state")
1665
1666
1667def get_gms_version(ad):
1668    cmd = "dumpsys package com.google.android.gms | grep versionName"
1669    return ad.adb.shell(cmd).split("\n")[0].split("=")[1]
1670
1671
1672def get_baseband_and_gms_version(ad, extra_msg=""):
1673    """Get current radio baseband and GMSCore version of AndroidDevice object.
1674
1675    Args:
1676        ad: An AndroidDevice object.
1677        extra_msg: Extra message before or after the change.
1678    """
1679    mpss_version = ""
1680    brcm_gps_version = ""
1681    brcm_sensorhub_version = ""
1682    try:
1683        build_version = ad.adb.getprop("ro.build.id")
1684        baseband_version = ad.adb.getprop("gsm.version.baseband")
1685        gms_version = get_gms_version(ad)
1686        if check_chipset_vendor_by_qualcomm(ad):
1687            mpss_version = ad.adb.shell(
1688                "cat /sys/devices/soc0/images | grep MPSS | cut -d ':' -f 3")
1689        else:
1690            brcm_gps_version = ad.adb.shell("cat /data/vendor/gps/chip.info")
1691            sensorhub_version = ad.adb.shell(
1692                "cat /vendor/firmware/SensorHub.patch | grep ChangeList")
1693            brcm_sensorhub_version = re.compile(
1694                r'<ChangeList=(\w+)>').search(sensorhub_version).group(1)
1695        if not extra_msg:
1696            ad.log.info("TestResult Build_Version %s" % build_version)
1697            ad.log.info("TestResult Baseband_Version %s" % baseband_version)
1698            ad.log.info(
1699                "TestResult GMS_Version %s" % gms_version.replace(" ", ""))
1700            if check_chipset_vendor_by_qualcomm(ad):
1701                ad.log.info("TestResult MPSS_Version %s" % mpss_version)
1702            else:
1703                ad.log.info("TestResult GPS_Version %s" % brcm_gps_version)
1704                ad.log.info(
1705                    "TestResult SensorHub_Version %s" % brcm_sensorhub_version)
1706        else:
1707            ad.log.info(
1708                "%s, Baseband_Version = %s" % (extra_msg, baseband_version))
1709    except Exception as e:
1710        ad.log.error(e)
1711
1712
1713def start_toggle_gnss_by_gtw_gpstool(ad, iteration):
1714    """Send toggle gnss off/on start_test_action
1715
1716    Args:
1717        ad: An AndroidDevice object.
1718        iteration: Iteration of toggle gnss off/on cycles.
1719    """
1720    msg_list = []
1721    begin_time = get_current_epoch_time()
1722    try:
1723        for i in range(1, 4):
1724            ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool "
1725                         "--es mode toggle --es cycle %d" % iteration)
1726            time.sleep(1)
1727            if is_device_wearable(ad):
1728                # Wait 20 seconds for Wearable low performance time.
1729                time.sleep(20)
1730                if ad.search_logcat("ToggleGPS onResume",
1731                                begin_time):
1732                    ad.log.info("Send ToggleGPS start_test_action successfully.")
1733                    break
1734            elif ad.search_logcat("cmp=com.android.gpstool/.ToggleGPS",
1735                                begin_time):
1736                ad.log.info("Send ToggleGPS start_test_action successfully.")
1737                break
1738        else:
1739            check_current_focus_app(ad)
1740            raise signals.TestError("Fail to send ToggleGPS "
1741                                    "start_test_action within 3 attempts.")
1742        time.sleep(2)
1743        if is_device_wearable(ad):
1744            test_start = ad.search_logcat("GPSService: create toggle GPS log",
1745                                      begin_time)
1746        else:
1747            test_start = ad.search_logcat("GPSTool_ToggleGPS: startService",
1748                                      begin_time)
1749        if test_start:
1750            ad.log.info(test_start[-1]["log_message"].split(":")[-1].strip())
1751        else:
1752            raise signals.TestError("Fail to start toggle GPS off/on test.")
1753        # Every iteration is expected to finish within 4 minutes.
1754        while get_current_epoch_time() - begin_time <= iteration * 240000:
1755            crash_end = ad.search_logcat("Force finishing activity "
1756                                         "com.android.gpstool/.GPSTool",
1757                                         begin_time)
1758            if crash_end:
1759                raise signals.TestError("GPSTool crashed. Abort test.")
1760            toggle_results = ad.search_logcat("GPSTool : msg", begin_time)
1761            if toggle_results:
1762                for toggle_result in toggle_results:
1763                    msg = toggle_result["log_message"]
1764                    if not msg in msg_list:
1765                        ad.log.info(msg.split(":")[-1].strip())
1766                        msg_list.append(msg)
1767                    if "timeout" in msg:
1768                        raise signals.TestFailure("Fail to get location fixed "
1769                                                  "within 60 seconds.")
1770                    if "Test end" in msg:
1771                        raise signals.TestPass("Completed quick toggle GNSS "
1772                                               "off/on test.")
1773        raise signals.TestFailure("Fail to finish toggle GPS off/on test "
1774                                  "within %d minutes" % (iteration * 4))
1775    finally:
1776        ad.send_keycode("HOME")
1777
1778
1779def grant_location_permission(ad, option):
1780    """Grant or revoke location related permission.
1781
1782    Args:
1783        ad: An AndroidDevice object.
1784        option: Boolean to grant or revoke location related permissions.
1785    """
1786    action = "grant" if option else "revoke"
1787    for permission in LOCATION_PERMISSIONS:
1788        ad.log.info(
1789            "%s permission:%s on %s" % (action, permission, TEST_PACKAGE_NAME))
1790        ad.adb.shell("pm %s %s %s" % (action, TEST_PACKAGE_NAME, permission))
1791
1792
1793def check_location_runtime_permissions(ad, package, permissions):
1794    """Check if runtime permissions are granted on selected package.
1795
1796    Args:
1797        ad: An AndroidDevice object.
1798        package: Apk package name to check.
1799        permissions: A list of permissions to be granted.
1800    """
1801    for _ in range(3):
1802        location_runtime_permission = ad.adb.shell(
1803            "dumpsys package %s | grep ACCESS_FINE_LOCATION" % package)
1804        if "true" not in location_runtime_permission:
1805            ad.log.info("ACCESS_FINE_LOCATION is NOT granted on %s" % package)
1806            for permission in permissions:
1807                ad.log.debug("Grant %s on %s" % (permission, package))
1808                ad.adb.shell("pm grant %s %s" % (package, permission))
1809        else:
1810            ad.log.info("ACCESS_FINE_LOCATION is granted on %s" % package)
1811            break
1812    else:
1813        raise signals.TestError(
1814            "Fail to grant ACCESS_FINE_LOCATION on %s" % package)
1815
1816
1817def install_mdstest_app(ad, mdsapp):
1818    """
1819        Install MDS test app in DUT
1820
1821        Args:
1822            ad: An Android Device Object
1823            mdsapp: Installation path of MDSTest app
1824    """
1825    if not ad.is_apk_installed("com.google.mdstest"):
1826        ad.adb.install("-r %s" % mdsapp, timeout=300, ignore_status=True)
1827
1828
1829def write_modemconfig(ad, mdsapp, nvitem_dict, modemparfile):
1830    """
1831        Modify the NV items using modem_tool.par
1832        Note: modem_tool.par
1833
1834        Args:
1835            ad:  An Android Device Object
1836            mdsapp: Installation path of MDSTest app
1837            nvitem_dict: dictionary of NV items and values.
1838            modemparfile: modem_tool.par path.
1839    """
1840    ad.log.info("Verify MDSTest app installed in DUT")
1841    install_mdstest_app(ad, mdsapp)
1842    os.system("chmod 777 %s" % modemparfile)
1843    for key, value in nvitem_dict.items():
1844        if key.isdigit():
1845            op_name = "WriteEFS"
1846        else:
1847            op_name = "WriteNV"
1848        ad.log.info("Modifying the NV{!r} using {}".format(key, op_name))
1849        job.run("{} --op {} --item {} --data '{}'".
1850                format(modemparfile, op_name, key, value))
1851        time.sleep(2)
1852
1853
1854def verify_modemconfig(ad, nvitem_dict, modemparfile):
1855    """
1856        Verify the NV items using modem_tool.par
1857        Note: modem_tool.par
1858
1859        Args:
1860            ad:  An Android Device Object
1861            nvitem_dict: dictionary of NV items and values
1862            modemparfile: modem_tool.par path.
1863    """
1864    os.system("chmod 777 %s" % modemparfile)
1865    for key, value in nvitem_dict.items():
1866        if key.isdigit():
1867            op_name = "ReadEFS"
1868        else:
1869            op_name = "ReadNV"
1870        # Sleeptime to avoid Modem communication error
1871        time.sleep(5)
1872        result = job.run(
1873            "{} --op {} --item {}".format(modemparfile, op_name, key))
1874        output = str(result.stdout)
1875        ad.log.info("Actual Value for NV{!r} is {!r}".format(key, output))
1876        if not value.casefold() in output:
1877            ad.log.error("NV Value is wrong {!r} in {!r}".format(value, result))
1878            raise ValueError(
1879                "could not find {!r} in {!r}".format(value, result))
1880
1881
1882def check_ttff_pe(ad, ttff_data, ttff_mode, pe_criteria):
1883    """Verify all TTFF results from ttff_data.
1884
1885    Args:
1886        ad: An AndroidDevice object.
1887        ttff_data: TTFF data of secs, position error and signal strength.
1888        ttff_mode: TTFF Test mode for current test item.
1889        pe_criteria: Criteria for current test item.
1890
1891    """
1892    ad.log.info("%d iterations of TTFF %s tests finished."
1893                % (len(ttff_data.keys()), ttff_mode))
1894    ad.log.info("%s PASS criteria is %f meters" % (ttff_mode, pe_criteria))
1895    ad.log.debug("%s TTFF data: %s" % (ttff_mode, ttff_data))
1896
1897    if len(ttff_data.keys()) == 0:
1898        ad.log.error("GTW_GPSTool didn't process TTFF properly.")
1899        raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
1900
1901    elif any(float(ttff_data[key].ttff_pe) >= pe_criteria for key in
1902             ttff_data.keys()):
1903        ad.log.error("One or more TTFF %s are over test criteria %f meters"
1904                     % (ttff_mode, pe_criteria))
1905        raise signals.TestFailure("GTW_GPSTool didn't process TTFF properly.")
1906    else:
1907        ad.log.info("All TTFF %s are within test criteria %f meters." % (
1908            ttff_mode, pe_criteria))
1909        return True
1910
1911
1912def check_adblog_functionality(ad):
1913    """Restart adb logcat if system can't write logs into file after checking
1914    adblog file size.
1915
1916    Args:
1917        ad: An AndroidDevice object.
1918    """
1919    logcat_path = os.path.join(ad.device_log_path, "adblog_%s_debug.txt" %
1920                               ad.serial)
1921    if not os.path.exists(logcat_path):
1922        raise signals.TestError("Logcat file %s does not exist." % logcat_path)
1923    original_log_size = os.path.getsize(logcat_path)
1924    ad.log.debug("Original adblog size is %d" % original_log_size)
1925    time.sleep(.5)
1926    current_log_size = os.path.getsize(logcat_path)
1927    ad.log.debug("Current adblog size is %d" % current_log_size)
1928    if current_log_size == original_log_size:
1929        ad.log.warn("System can't write logs into file. Restart adb "
1930                    "logcat process now.")
1931        ad.stop_adb_logcat()
1932        ad.start_adb_logcat()
1933
1934
1935def build_instrumentation_call(package,
1936                               runner,
1937                               test_methods=None,
1938                               options=None):
1939    """Build an instrumentation call for the tests
1940
1941    Args:
1942        package: A string to identify test package.
1943        runner: A string to identify test runner.
1944        test_methods: A dictionary contains {class_name, test_method}.
1945        options: A dictionary constant {key, value} param for test.
1946
1947    Returns:
1948        An instrumentation call command.
1949    """
1950    if test_methods is None:
1951        test_methods = {}
1952        cmd_builder = InstrumentationCommandBuilder()
1953    else:
1954        cmd_builder = InstrumentationTestCommandBuilder()
1955    if options is None:
1956        options = {}
1957    cmd_builder.set_manifest_package(package)
1958    cmd_builder.set_runner(runner)
1959    cmd_builder.add_flag("-w")
1960    for class_name, test_method in test_methods.items():
1961        cmd_builder.add_test_method(class_name, test_method)
1962    for option_key, option_value in options.items():
1963        cmd_builder.add_key_value_param(option_key, option_value)
1964    return cmd_builder.build()
1965
1966
1967def check_chipset_vendor_by_qualcomm(ad):
1968    """Check if chipset vendor is by Qualcomm.
1969
1970    Args:
1971        ad: An AndroidDevice object.
1972
1973    Returns:
1974        True if it's by Qualcomm. False irf not.
1975    """
1976    ad.root_adb()
1977    soc = str(ad.adb.shell("getprop gsm.version.ril-impl"))
1978    ad.log.debug("SOC = %s" % soc)
1979    return "Qualcomm" in soc
1980
1981
1982def delete_lto_file(ad):
1983    """Delete downloaded LTO files.
1984
1985    Args:
1986        ad: An AndroidDevice object.
1987    """
1988    remount_device(ad)
1989    status = ad.adb.shell("rm -rf /data/vendor/gps/lto*")
1990    ad.log.info("Delete downloaded LTO files.\n%s" % status)
1991
1992
1993def lto_mode(ad, state):
1994    """Enable or Disable LTO mode.
1995
1996    Args:
1997        ad: An AndroidDevice object.
1998        state: True to enable. False to disable.
1999    """
2000    server_list = ["LONGTERM_PSDS_SERVER_1",
2001                   "LONGTERM_PSDS_SERVER_2",
2002                   "LONGTERM_PSDS_SERVER_3",
2003                   "NORMAL_PSDS_SERVER",
2004                   "REALTIME_PSDS_SERVER"]
2005    delete_lto_file(ad)
2006    if state:
2007        tmp_path = tempfile.mkdtemp()
2008        ad.pull_files("/etc/gps_debug.conf", tmp_path)
2009        gps_conf_path = os.path.join(tmp_path, "gps_debug.conf")
2010        gps_conf_file = open(gps_conf_path, "r")
2011        lines = gps_conf_file.readlines()
2012        gps_conf_file.close()
2013        fout = open(gps_conf_path, "w")
2014        for line in lines:
2015            for server in server_list:
2016                if server in line:
2017                    line = line.replace(line, "")
2018            fout.write(line)
2019        fout.close()
2020        ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
2021        ad.log.info("Push back modified gps_debug.conf")
2022        ad.log.info("LTO/RTO/RTI enabled")
2023        shutil.rmtree(tmp_path, ignore_errors=True)
2024    else:
2025        ad.adb.shell("echo %r >> /etc/gps_debug.conf" %
2026                     DISABLE_LTO_FILE_CONTENTS)
2027        ad.log.info("LTO/RTO/RTI disabled")
2028    reboot(ad)
2029
2030
2031def lto_mode_wearable(ad, state):
2032    """Enable or Disable LTO mode for wearable in Android R release.
2033
2034    Args:
2035        ad: An AndroidDevice object.
2036        state: True to enable. False to disable.
2037    """
2038    rto_enable = '    RtoEnable="true"\n'
2039    rto_disable = '    RtoEnable="false"\n'
2040    rti_enable = '    RtiEnable="true"\n'
2041    rti_disable = '    RtiEnable="false"\n'
2042    sync_lto_enable = '    HttpDirectSyncLto="true"\n'
2043    sync_lto_disable = '    HttpDirectSyncLto="false"\n'
2044    server_list = ["XTRA_SERVER_1", "XTRA_SERVER_2", "XTRA_SERVER_3"]
2045    delete_lto_file(ad)
2046    tmp_path = tempfile.mkdtemp()
2047    ad.pull_files("/vendor/etc/gnss/gps.xml", tmp_path)
2048    gps_xml_path = os.path.join(tmp_path, "gps.xml")
2049    gps_xml_file = open(gps_xml_path, "r")
2050    lines = gps_xml_file.readlines()
2051    gps_xml_file.close()
2052    fout = open(gps_xml_path, "w")
2053    for line in lines:
2054        if state:
2055            if rto_disable in line:
2056                line = line.replace(line, rto_enable)
2057                ad.log.info("RTO enabled")
2058            elif rti_disable in line:
2059                line = line.replace(line, rti_enable)
2060                ad.log.info("RTI enabled")
2061            elif sync_lto_disable in line:
2062                line = line.replace(line, sync_lto_enable)
2063                ad.log.info("LTO sync enabled")
2064        else:
2065            if rto_enable in line:
2066                line = line.replace(line, rto_disable)
2067                ad.log.info("RTO disabled")
2068            elif rti_enable in line:
2069                line = line.replace(line, rti_disable)
2070                ad.log.info("RTI disabled")
2071            elif sync_lto_enable in line:
2072                line = line.replace(line, sync_lto_disable)
2073                ad.log.info("LTO sync disabled")
2074        fout.write(line)
2075    fout.close()
2076    ad.push_system_file(gps_xml_path, "/vendor/etc/gnss/gps.xml")
2077    ad.log.info("Push back modified gps.xml")
2078    shutil.rmtree(tmp_path, ignore_errors=True)
2079    if state:
2080        xtra_tmp_path = tempfile.mkdtemp()
2081        ad.pull_files("/etc/gps_debug.conf", xtra_tmp_path)
2082        gps_conf_path = os.path.join(xtra_tmp_path, "gps_debug.conf")
2083        gps_conf_file = open(gps_conf_path, "r")
2084        lines = gps_conf_file.readlines()
2085        gps_conf_file.close()
2086        fout = open(gps_conf_path, "w")
2087        for line in lines:
2088            for server in server_list:
2089                if server in line:
2090                    line = line.replace(line, "")
2091            fout.write(line)
2092        fout.close()
2093        ad.push_system_file(gps_conf_path, "/etc/gps_debug.conf")
2094        ad.log.info("Push back modified gps_debug.conf")
2095        ad.log.info("LTO/RTO/RTI enabled")
2096        shutil.rmtree(xtra_tmp_path, ignore_errors=True)
2097    else:
2098        ad.adb.shell(
2099            "echo %r >> /etc/gps_debug.conf" % DISABLE_LTO_FILE_CONTENTS_R)
2100        ad.log.info("LTO/RTO/RTI disabled")
2101
2102
2103def start_pixel_logger(ad, max_log_size_mb=100, max_number_of_files=500):
2104    """adb to start pixel logger for GNSS logging.
2105
2106    Args:
2107        ad: An AndroidDevice object.
2108        max_log_size_mb: Determines when to create a new log file if current
2109            one reaches the size limit.
2110        max_number_of_files: Determines how many log files can be saved on DUT.
2111    """
2112    retries = 3
2113    start_timeout_sec = 60
2114    default_gnss_cfg = "/vendor/etc/mdlog/DEFAULT+SECURITY+FULLDPL+GPS.cfg"
2115    if check_chipset_vendor_by_qualcomm(ad):
2116        start_cmd = ("am startservice -a com.android.pixellogger."
2117                     "service.logging.LoggingService.ACTION_START_LOGGING "
2118                     "-e intent_key_cfg_path '%s' "
2119                     "--ei intent_key_max_log_size_mb %d "
2120                     "--ei intent_key_max_number_of_files %d" %
2121                     (default_gnss_cfg, max_log_size_mb, max_number_of_files))
2122    else:
2123        start_cmd = ("am startservice -a com.android.pixellogger."
2124                     "service.logging.LoggingService.ACTION_START_LOGGING "
2125                     "-e intent_logger brcm_gps "
2126                     "--ei intent_key_max_log_size_mb %d "
2127                     "--ei intent_key_max_number_of_files %d" %
2128                     (max_log_size_mb, max_number_of_files))
2129    for attempt in range(retries):
2130        begin_time = get_current_epoch_time() - 3000
2131        ad.log.info("Start Pixel Logger - Attempt %d" % (attempt + 1))
2132        ad.adb.shell(start_cmd)
2133        while get_current_epoch_time() - begin_time <= start_timeout_sec * 1000:
2134            if not ad.is_adb_logcat_on:
2135                ad.start_adb_logcat()
2136            if check_chipset_vendor_by_qualcomm(ad):
2137                start_result = ad.search_logcat(
2138                    "ModemLogger: Start logging", begin_time)
2139            else:
2140                start_result = ad.search_logcat("startRecording", begin_time)
2141            if start_result:
2142                ad.log.info("Pixel Logger starts recording successfully.")
2143                return True
2144        stop_pixel_logger(ad)
2145    else:
2146        ad.log.warn("Pixel Logger fails to start recording in %d seconds "
2147                    "within %d attempts." % (start_timeout_sec, retries))
2148
2149
2150def stop_pixel_logger(ad):
2151    """adb to stop pixel logger for GNSS logging.
2152
2153    Args:
2154        ad: An AndroidDevice object.
2155    """
2156    retries = 3
2157    stop_timeout_sec = 60
2158    zip_timeout_sec = 30
2159    if check_chipset_vendor_by_qualcomm(ad):
2160        stop_cmd = ("am startservice -a com.android.pixellogger."
2161                    "service.logging.LoggingService.ACTION_STOP_LOGGING")
2162    else:
2163        stop_cmd = ("am startservice -a com.android.pixellogger."
2164                    "service.logging.LoggingService.ACTION_STOP_LOGGING "
2165                    "-e intent_logger brcm_gps")
2166    for attempt in range(retries):
2167        begin_time = get_current_epoch_time() - 3000
2168        ad.log.info("Stop Pixel Logger - Attempt %d" % (attempt + 1))
2169        ad.adb.shell(stop_cmd)
2170        while get_current_epoch_time() - begin_time <= stop_timeout_sec * 1000:
2171            if not ad.is_adb_logcat_on:
2172                ad.start_adb_logcat()
2173            stop_result = ad.search_logcat(
2174                "LoggingService: Stopping service", begin_time)
2175            if stop_result:
2176                ad.log.info("Pixel Logger stops successfully.")
2177                zip_end_time = time.time() + zip_timeout_sec
2178                while time.time() < zip_end_time:
2179                    zip_file_created = ad.search_logcat(
2180                        "FileUtil: Zip file has been created", begin_time)
2181                    if zip_file_created:
2182                        ad.log.info("Pixel Logger created zip file "
2183                                    "successfully.")
2184                        return True
2185                else:
2186                    ad.log.warn("Pixel Logger failed to create zip file.")
2187                    return False
2188        ad.force_stop_apk("com.android.pixellogger")
2189    else:
2190        ad.log.warn("Pixel Logger fails to stop in %d seconds within %d "
2191                    "attempts." % (stop_timeout_sec, retries))
2192
2193
2194def launch_eecoexer(ad):
2195    """Launch EEcoexer.
2196
2197    Args:
2198        ad: An AndroidDevice object.
2199    Raise:
2200        signals.TestError if DUT fails to launch EEcoexer
2201    """
2202    launch_cmd = ("am start -a android.intent.action.MAIN -n"
2203                  "com.google.eecoexer"
2204                  "/.MainActivity")
2205    ad.adb.shell(launch_cmd)
2206    try:
2207        ad.log.info("Launch EEcoexer.")
2208    except Exception as e:
2209        ad.log.error(e)
2210        raise signals.TestError("Failed to launch EEcoexer.")
2211
2212
2213def execute_eecoexer_function(ad, eecoexer_args):
2214    """Execute EEcoexer commands.
2215
2216    Args:
2217        ad: An AndroidDevice object.
2218        eecoexer_args: EEcoexer function arguments
2219    """
2220    cat_index = eecoexer_args.split(',')[:2]
2221    cat_index = ','.join(cat_index)
2222    enqueue_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2223                   " --es sms_body ENQUEUE,{}".format(eecoexer_args))
2224    exe_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2225               " --es sms_body EXECUTE")
2226    wait_for_cmd = ("am broadcast -a com.google.eecoexer.action.LISTENER"
2227                   " --es sms_body WAIT_FOR_COMPLETE,{}".format(cat_index))
2228    ad.log.info("EEcoexer Add Enqueue: {}".format(eecoexer_args))
2229    ad.adb.shell(enqueue_cmd)
2230    ad.log.info("EEcoexer Excute.")
2231    ad.adb.shell(exe_cmd)
2232    ad.log.info("Wait EEcoexer for complete")
2233    ad.adb.shell(wait_for_cmd)
2234
2235
2236def get_process_pid(ad, process_name):
2237    """Gets the process PID
2238
2239    Args:
2240        ad: The device under test
2241        process_name: The name of the process
2242
2243    Returns:
2244        The PID of the process
2245    """
2246    command = f"ps -A | grep {process_name} |  awk '{{print $2}}'"
2247    pid = ad.adb.shell(command)
2248    return pid
2249
2250
2251def restart_gps_daemons(ad):
2252    """Restart GPS daemons by killing services of gpsd, lhd and scd.
2253
2254    Args:
2255        ad: An AndroidDevice object.
2256    """
2257    gps_daemons_list = ["gpsd", "lhd", "scd"]
2258    ad.root_adb()
2259    for service in gps_daemons_list:
2260        time.sleep(3)
2261        ad.log.info("Kill GPS daemon \"%s\"" % service)
2262        service_pid = get_process_pid(ad, service)
2263        ad.log.debug("%s PID: %s" % (service, service_pid))
2264        ad.adb.shell(f"kill -9 {service_pid}")
2265        # Wait 3 seconds for daemons and services to start.
2266        time.sleep(3)
2267
2268        new_pid = get_process_pid(ad, service)
2269        ad.log.debug("%s new PID: %s" % (service, new_pid))
2270        if not new_pid or service_pid == new_pid:
2271            raise signals.TestError("Unable to restart \"%s\"" % service)
2272
2273        ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % (
2274            service, service_pid, new_pid))
2275
2276
2277def is_device_wearable(ad):
2278    """Check device is wearable project or not.
2279
2280    Args:
2281        ad: An AndroidDevice object.
2282    """
2283    package = ad.adb.getprop("ro.cw.home_package_names")
2284    ad.log.debug("[ro.cw.home_package_names]: [%s]" % package)
2285    return "wearable" in package
2286
2287
2288def is_mobile_data_on(ad):
2289    """Check if mobile data of device is on.
2290
2291    Args:
2292        ad: An AndroidDevice object.
2293    """
2294    if is_device_wearable(ad):
2295        cell_on = ad.adb.shell("settings get global cell_on")
2296        ad.log.debug("Current mobile status is %s" % cell_on)
2297        return "1" in cell_on
2298    else:
2299        return ad.droid.telephonyIsDataEnabled()
2300
2301
2302def human_to_epoch_time(human_time):
2303    """Convert human readable time to epoch time.
2304
2305    Args:
2306        human_time: Human readable time. (Ex: 2020-08-04 13:24:28.900)
2307
2308    Returns:
2309        epoch: Epoch time in milliseconds.
2310    """
2311    if "/" in human_time:
2312        human_time.replace("/", "-")
2313    try:
2314        epoch_start = datetime.utcfromtimestamp(0)
2315        if "." in human_time:
2316            epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S.%f")
2317        else:
2318            epoch_time = datetime.strptime(human_time, "%Y-%m-%d %H:%M:%S")
2319        epoch = int((epoch_time - epoch_start).total_seconds() * 1000)
2320        return epoch
2321    except ValueError:
2322        return None
2323
2324
2325def _get_dpo_info_from_logcat(ad, begin_time):
2326    """Gets the DPO info from logcat.
2327
2328    Args:
2329        ad: The device under test.
2330        begin_time: The start time of the log.
2331    """
2332    dpo_results = ad.search_logcat("HardwareClockDiscontinuityCount",
2333                                   begin_time)
2334    if not dpo_results:
2335        raise signals.TestError(
2336            "No \"HardwareClockDiscontinuityCount\" is found in logs.")
2337    return dpo_results
2338
2339
2340def check_dpo_rate_via_gnss_meas(ad, begin_time, dpo_threshold):
2341    """Check DPO engage rate through "HardwareClockDiscontinuityCount" in
2342    GnssMeasurement callback.
2343
2344    Args:
2345        ad: An AndroidDevice object.
2346        begin_time: test begin time.
2347        dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
2348    """
2349    time_regex = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3})'
2350    dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
2351    ad.log.info(dpo_results[0]["log_message"])
2352    ad.log.info(dpo_results[-1]["log_message"])
2353    start_time = re.compile(
2354        time_regex).search(dpo_results[0]["log_message"]).group(1)
2355    end_time = re.compile(
2356        time_regex).search(dpo_results[-1]["log_message"]).group(1)
2357    gnss_start_epoch = human_to_epoch_time(start_time)
2358    gnss_stop_epoch = human_to_epoch_time(end_time)
2359    test_time_in_sec = round((gnss_stop_epoch - gnss_start_epoch) / 1000) + 1
2360    first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
2361    final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
2362    dpo_rate = ((final_dpo_count - first_dpo_count)/test_time_in_sec)
2363    dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
2364    ad.log.info("DPO is ON for %d seconds during %d seconds test." % (
2365        final_dpo_count - first_dpo_count, test_time_in_sec))
2366    ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
2367    threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
2368    asserts.assert_true(dpo_rate * 100 > dpo_threshold,
2369                        "DPO only engaged %s in %d seconds test with "
2370                        "threshold %s." % (dpo_engage_rate,
2371                                           test_time_in_sec,
2372                                           threshold))
2373
2374
2375def parse_brcm_nmea_log(ad, nmea_pattern, brcm_error_log_allowlist, stop_logger=True):
2376    """Parse specific NMEA pattern out of BRCM NMEA log.
2377
2378    Args:
2379        ad: An AndroidDevice object.
2380        nmea_pattern: Specific NMEA pattern to parse.
2381        brcm_error_log_allowlist: Benign error logs to exclude.
2382        stop_logger: To stop pixel logger or not.
2383
2384    Returns:
2385        brcm_log_list: A list of specific NMEA pattern logs.
2386    """
2387    brcm_log_list = []
2388    brcm_log_error_pattern = ["lhd: FS: Start Failsafe dump", "E slog"]
2389    brcm_error_log_list = []
2390    pixellogger_path = (
2391        "/sdcard/Android/data/com.android.pixellogger/files/logs/gps/.")
2392    if not isinstance(nmea_pattern, re.Pattern):
2393        nmea_pattern = re.compile(nmea_pattern)
2394
2395    with tempfile.TemporaryDirectory() as tmp_dir:
2396        try:
2397            ad.pull_files(pixellogger_path, tmp_dir)
2398        except AdbCommandError:
2399            raise FileNotFoundError("No pixel logger folders found")
2400
2401        # Although we don't rely on the zip file, stop pixel logger here to avoid
2402        # wasting resources.
2403        if stop_logger:
2404            stop_pixel_logger(ad)
2405
2406        tmp_path = pathlib.Path(tmp_dir)
2407        log_folders = sorted([x for x in tmp_path.iterdir() if x.is_dir()])
2408        if not log_folders:
2409            raise FileNotFoundError("No BRCM logs found.")
2410        # The folder name is a string of datetime, the latest one will be in the last index.
2411        gl_logs = log_folders[-1].glob("**/gl*.log")
2412
2413        for nmea_log_path in gl_logs:
2414            ad.log.info("Parsing log pattern of \"%s\" in %s" % (nmea_pattern,
2415                                                                 nmea_log_path))
2416            with open(nmea_log_path, "r", encoding="UTF-8", errors="ignore") as lines:
2417                for line in lines:
2418                    line = line.strip()
2419                    if nmea_pattern.fullmatch(line):
2420                        brcm_log_list.append(line)
2421                    for attr in brcm_log_error_pattern:
2422                        if attr in line:
2423                            benign_log = False
2424                            for regex_pattern in brcm_error_log_allowlist:
2425                                if re.search(regex_pattern, line):
2426                                    benign_log = True
2427                                    ad.log.debug("\"%s\" is in allow-list and removed "
2428                                                "from error." % line)
2429                            if not benign_log:
2430                                brcm_error_log_list.append(line)
2431
2432    brcm_error_log = "".join(brcm_error_log_list)
2433    return brcm_log_list, brcm_error_log
2434
2435
2436def _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist, stop_pixel_logger=True):
2437    """Gets the power log from pixel logger.
2438
2439    Args:
2440        ad: The device under test.
2441        brcm_error_log_allow_list: The allow list to ignore certain error in pixel logger.
2442        stop_pixel_logger: To disable pixel logger when getting the log.
2443    """
2444    pglor_list, brcm_error_log = parse_brcm_nmea_log(
2445        ad, _BRCM_DUTY_CYCLE_PATTERN, brcm_error_log_allowlist, stop_pixel_logger)
2446    if not pglor_list:
2447        raise signals.TestFailure("Fail to get DPO logs from pixel logger")
2448
2449    return pglor_list, brcm_error_log
2450
2451
2452def check_dpo_rate_via_brcm_log(ad, dpo_threshold, brcm_error_log_allowlist):
2453    """Check DPO engage rate through "$PGLOR,11,STA" in BRCM Log.
2454    D - Disabled, Always full power.
2455    F - Enabled, now in full power mode.
2456    S - Enabled, now in power save mode.
2457    H - Host off load mode.
2458
2459    Args:
2460        ad: An AndroidDevice object.
2461        dpo_threshold: The value to set threshold. (Ex: dpo_threshold = 60)
2462        brcm_error_log_allowlist: Benign error logs to exclude.
2463    """
2464    always_full_power_count = 0
2465    full_power_count = 0
2466    power_save_count = 0
2467    pglor_list, brcm_error_log = _get_power_mode_log_from_pixel_logger(ad, brcm_error_log_allowlist)
2468
2469    for pglor in pglor_list:
2470        power_res = re.compile(r',P,(\w),').search(pglor).group(1)
2471        if power_res == "D":
2472            always_full_power_count += 1
2473        elif power_res == "F":
2474            full_power_count += 1
2475        elif power_res == "S":
2476            power_save_count += 1
2477    ad.log.info(sorted(pglor_list)[0])
2478    ad.log.info(sorted(pglor_list)[-1])
2479    ad.log.info("TestResult Total_Count %d" % len(pglor_list))
2480    ad.log.info("TestResult Always_Full_Power_Count %d" %
2481                always_full_power_count)
2482    ad.log.info("TestResult Full_Power_Mode_Count %d" % full_power_count)
2483    ad.log.info("TestResult Power_Save_Mode_Count %d" % power_save_count)
2484    dpo_rate = (power_save_count / len(pglor_list))
2485    dpo_engage_rate = "{percent:.2%}".format(percent=dpo_rate)
2486    ad.log.info("Power Save Mode is ON for %d seconds during %d seconds test."
2487                % (power_save_count, len(pglor_list)))
2488    ad.log.info("TestResult DPO_Engage_Rate " + dpo_engage_rate)
2489    threshold = "{percent:.0%}".format(percent=dpo_threshold / 100)
2490    asserts.assert_true((dpo_rate * 100 > dpo_threshold) and not brcm_error_log,
2491                        "Power Save Mode only engaged %s in %d seconds test "
2492                        "with threshold %s.\nAbnormal behavior found as below."
2493                        "\n%s" % (dpo_engage_rate,
2494                                  len(pglor_list),
2495                                  threshold,
2496                                  brcm_error_log))
2497
2498
2499def process_pair(watch, phone):
2500    """Pair phone to watch via Bluetooth in OOBE.
2501
2502    Args:
2503        watch: A wearable project.
2504        phone: A pixel phone.
2505    """
2506    check_location_service(phone)
2507    utils.sync_device_time(phone)
2508    bt_model_name = watch.adb.getprop("ro.product.model")
2509    bt_sn_name = watch.adb.getprop("ro.serialno")
2510    bluetooth_name = bt_model_name +" " + bt_sn_name[10:]
2511    fastboot_factory_reset(watch, False)
2512    # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2513    watch.log.info("Wait 1 min for wearable system busy time.")
2514    time.sleep(60)
2515    watch.adb.shell("input keyevent 4")
2516    # Clear Denali paired data in phone.
2517    phone.adb.shell("pm clear com.google.android.gms")
2518    phone.adb.shell("pm clear com.google.android.apps.wear.companion")
2519    phone.adb.shell("am start -S -n com.google.android.apps.wear.companion/"
2520                        "com.google.android.apps.wear.companion.application.RootActivity")
2521    uia_click(phone, "Continue")
2522    uia_click(phone, "More")
2523    uia_click(phone, "I agree")
2524    uia_click(phone, "I accept")
2525    uia_click(phone, bluetooth_name)
2526    uia_click(phone, "Pair")
2527    uia_click(phone, "Skip")
2528    uia_click(phone, "Next")
2529    uia_click(phone, "Skip")
2530    uia_click(phone, "Done")
2531    # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2532    watch.log.info("Wait 3 mins for complete pairing process.")
2533    time.sleep(180)
2534    set_screen_always_on(watch)
2535    check_location_service(watch)
2536    enable_gnss_verbose_logging(watch)
2537
2538
2539def is_bluetooth_connected(watch, phone):
2540    """Check if device's Bluetooth status is connected or not.
2541
2542    Args:
2543    watch: A wearable project
2544    phone: A pixel phone.
2545    """
2546    return watch.droid.bluetoothIsDeviceConnected(phone.droid.bluetoothGetLocalAddress())
2547
2548
2549def detect_crash_during_tracking(ad, begin_time, api_type, ignore_hal_crash=False):
2550    """Check if GNSS or GPSTool crash happened druing GNSS Tracking.
2551
2552    Args:
2553    ad: An AndroidDevice object.
2554    begin_time: Start Time to check if crash happened in logs.
2555    api_type: Using GNSS or FLP reading method in GNSS tracking.
2556    ignore_hal_crash: In BRCM devices, once the HAL is being killed, it will write error/fatal logs.
2557      Ignore this error if the error logs are expected.
2558    """
2559    gnss_crash_list = [".*Fatal signal.*gnss",
2560                       ".*Fatal signal.*xtra"]
2561    if not ignore_hal_crash:
2562        gnss_crash_list += [".*Fatal signal.*gpsd", ".*F DEBUG.*gnss"]
2563    if not ad.is_adb_logcat_on:
2564        ad.start_adb_logcat()
2565    for attr in gnss_crash_list:
2566        gnss_crash_result = ad.adb.shell(
2567            "logcat -d | grep -E -i '%s'" % attr, ignore_status=True, timeout = 300)
2568        if gnss_crash_result:
2569            start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
2570            raise signals.TestFailure(
2571                "Test failed due to GNSS HAL crashed. \n%s" %
2572                gnss_crash_result)
2573    gpstool_crash_result = ad.search_logcat("Force finishing activity "
2574                                            "com.android.gpstool/.GPSTool",
2575                                            begin_time)
2576    if gpstool_crash_result:
2577            raise signals.TestError("GPSTool crashed. Abort test.")
2578
2579
2580def is_wearable_btwifi(ad):
2581    """Check device is wearable btwifi sku or not.
2582
2583    Args:
2584        ad: An AndroidDevice object.
2585    """
2586    package = ad.adb.getprop("ro.product.product.name")
2587    ad.log.debug("[ro.product.product.name]: [%s]" % package)
2588    return "btwifi" in package
2589
2590
2591def compare_watch_phone_location(ad,watch_file, phone_file):
2592    """Compare watch and phone's FLP location to see if the same or not.
2593
2594    Args:
2595        ad: An AndroidDevice object.
2596        watch_file: watch's FLP locations
2597        phone_file: phone's FLP locations
2598    """
2599    not_match_location_counts = 0
2600    not_match_location = []
2601    for watch_key, watch_value in watch_file.items():
2602        if phone_file.get(watch_key):
2603            lat_ads = abs(float(watch_value[0]) - float(phone_file[watch_key][0]))
2604            lon_ads = abs(float(watch_value[1]) - float(phone_file[watch_key][1]))
2605            if lat_ads > 0.000002 or lon_ads > 0.000002:
2606                not_match_location_counts += 1
2607                not_match_location += (watch_key, watch_value, phone_file[watch_key])
2608    if not_match_location_counts > 0:
2609        ad.log.info("There are %s not match locations: %s" %(not_match_location_counts, not_match_location))
2610        ad.log.info("Watch's locations are not using Phone's locations.")
2611        return False
2612    else:
2613        ad.log.info("Watch's locations are using Phone's location.")
2614        return True
2615
2616
2617def check_tracking_file(ad):
2618    """Check tracking file in device and save "Latitude", "Longitude", and "Time" information.
2619
2620    Args:
2621        ad: An AndroidDevice object.
2622
2623    Returns:
2624        location_reports: A dict with [latitude, longitude]
2625    """
2626    location_reports = dict()
2627    test_logfile = {}
2628    file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
2629                                  % GNSSSTATUS_LOG_PATH))
2630    if file_count != 1:
2631        ad.log.error("%d API logs exist." % file_count)
2632    dir_file = ad.adb.shell("ls %s" % GNSSSTATUS_LOG_PATH).split()
2633    for path_key in dir_file:
2634        if fnmatch.fnmatch(path_key, "*.txt"):
2635            logpath = posixpath.join(GNSSSTATUS_LOG_PATH, path_key)
2636            out = ad.adb.shell("wc -c %s" % logpath)
2637            file_size = int(out.split(" ")[0])
2638            if file_size < 10:
2639                ad.log.info("Skip log %s due to log size %d bytes" %
2640                            (path_key, file_size))
2641                continue
2642            test_logfile = logpath
2643    if not test_logfile:
2644        raise signals.TestError("Failed to get test log file in device.")
2645    lines = ad.adb.shell("cat %s" % test_logfile).split("\n")
2646    for file_data in lines:
2647        if "Latitude:" in file_data:
2648            file_lat = ("%.6f" %float(file_data[9:]))
2649        elif "Longitude:" in file_data:
2650            file_long = ("%.6f" %float(file_data[11:]))
2651        elif "Time:" in file_data:
2652            file_time = (file_data[17:25])
2653            location_reports[file_time] = [file_lat, file_long]
2654    return location_reports
2655
2656
2657def uia_click(ad, matching_text):
2658    """Use uiautomator to click objects.
2659
2660    Args:
2661        ad: An AndroidDevice object.
2662        matching_text: Text of the target object to click
2663    """
2664    if ad.uia(textMatches=matching_text).wait.exists(timeout=60000):
2665
2666        ad.uia(textMatches=matching_text).click()
2667        ad.log.info("Click button %s" % matching_text)
2668    else:
2669        ad.log.error("No button named %s" % matching_text)
2670
2671
2672def delete_bcm_nvmem_sto_file(ad):
2673    """Delete BCM's NVMEM ephemeris gldata.sto.
2674
2675    Args:
2676        ad: An AndroidDevice object.
2677    """
2678    remount_device(ad)
2679    rm_cmd = "rm -rf {}".format(BCM_NVME_STO_PATH)
2680    status = ad.adb.shell(rm_cmd)
2681    ad.log.info("Delete BCM's NVMEM ephemeris files.\n%s" % status)
2682
2683
2684def bcm_gps_xml_update_option(ad,
2685                           option,
2686                           search_line=None,
2687                           append_txt=None,
2688                           update_txt=None,
2689                           delete_txt=None,
2690                           gps_xml_path=BCM_GPS_XML_PATH):
2691    """Append parameter setting in gps.xml for BCM solution
2692
2693    Args:
2694        option: A str to identify the operation (add/update/delete).
2695        ad: An AndroidDevice object.
2696        search_line: Pattern matching of target
2697        line for appending new line data.
2698        append_txt: New lines that will be appended after the search_line.
2699        update_txt: New line to update the original file.
2700        delete_txt: lines to delete from the original file.
2701        gps_xml_path: gps.xml file location of DUT
2702    """
2703    remount_device(ad)
2704    #Update gps.xml
2705    tmp_log_path = tempfile.mkdtemp()
2706    ad.pull_files(gps_xml_path, tmp_log_path)
2707    gps_xml_tmp_path = os.path.join(tmp_log_path, "gps.xml")
2708    gps_xml_file = open(gps_xml_tmp_path, "r")
2709    lines = gps_xml_file.readlines()
2710    gps_xml_file.close()
2711    fout = open(gps_xml_tmp_path, "w")
2712    if option == "add":
2713        for line in lines:
2714            if line.strip() in append_txt:
2715                ad.log.info("{} is already in the file. Skip".format(append_txt))
2716                continue
2717            fout.write(line)
2718            if search_line in line:
2719                for add_txt in append_txt:
2720                    fout.write(add_txt)
2721                    ad.log.info("Add new line: '{}' in gps.xml.".format(add_txt))
2722    elif option == "update":
2723        for line in lines:
2724            if search_line in line:
2725                ad.log.info(line)
2726                fout.write(update_txt)
2727                ad.log.info("Update line: '{}' in gps.xml.".format(update_txt))
2728                continue
2729            fout.write(line)
2730    elif option == "delete":
2731        for line in lines:
2732            if delete_txt in line:
2733                ad.log.info("Delete line: '{}' in gps.xml.".format(line.strip()))
2734                continue
2735            fout.write(line)
2736    fout.close()
2737
2738    # Update gps.xml with gps_new.xml
2739    ad.push_system_file(gps_xml_tmp_path, gps_xml_path)
2740
2741    # remove temp folder
2742    shutil.rmtree(tmp_log_path, ignore_errors=True)
2743
2744def bcm_gps_ignore_warmstandby(ad):
2745    """ remove warmstandby setting in BCM gps.xml to reset tracking filter
2746    Args:
2747        ad: An AndroidDevice object.
2748    """
2749    search_line_tag = '<gll\n'
2750    delete_line_str = 'WarmStandbyTimeout1Seconds'
2751    bcm_gps_xml_update_option(ad,
2752                              "delete",
2753                              search_line_tag,
2754                              append_txt=None,
2755                              update_txt=None,
2756                              delete_txt=delete_line_str)
2757
2758    search_line_tag = '<gll\n'
2759    delete_line_str = 'WarmStandbyTimeout2Seconds'
2760    bcm_gps_xml_update_option(ad,
2761                              "delete",
2762                              search_line_tag,
2763                              append_txt=None,
2764                              update_txt=None,
2765                              delete_txt=delete_line_str)
2766
2767def bcm_gps_ignore_rom_alm(ad):
2768    """ Update BCM gps.xml with ignoreRomAlm="True"
2769    Args:
2770        ad: An AndroidDevice object.
2771    """
2772    search_line_tag = '<hal\n'
2773    append_line_str = ['       IgnoreJniTime=\"true\"\n']
2774    bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
2775
2776    search_line_tag = '<gll\n'
2777    append_line_str = ['       IgnoreRomAlm=\"true\"\n',
2778                       '       AutoColdStartSignal=\"SIMULATED\"\n',
2779                       '       IgnoreJniTime=\"true\"\n']
2780    bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
2781
2782
2783def check_inject_time(ad):
2784    """Check if watch could get the UTC time.
2785
2786    Args:
2787        ad: An AndroidDevice object.
2788    """
2789    for i in range(1, 6):
2790        time.sleep(10)
2791        inject_time_results = ad.search_logcat("GPSIC.OUT.gps_inject_time")
2792        ad.log.info("Check time injected - attempt %s" % i)
2793        if inject_time_results:
2794            ad.log.info("Time is injected successfully.")
2795            return True
2796    raise signals.TestFailure("Fail to get time injected within %s attempts." % i)
2797
2798def recover_paired_status(watch, phone):
2799    """Recover Bluetooth paired status if not paired.
2800
2801    Args:
2802        watch: A wearable project.
2803        phone: A pixel phone.
2804    """
2805    for _ in range(3):
2806        watch.log.info("Switch Bluetooth Off-On to recover paired status.")
2807        for status in (False, True):
2808            watch.droid.bluetoothToggleState(status)
2809            phone.droid.bluetoothToggleState(status)
2810            # TODO (chenstanley)Need to re-structure for better code and test flow instead of simply waiting
2811            watch.log.info("Wait for Bluetooth auto re-connect.")
2812            time.sleep(10)
2813        if is_bluetooth_connected(watch, phone):
2814            watch.log.info("Success to recover paired status.")
2815            return True
2816    raise signals.TestFailure("Fail to recover BT paired status in 3 attempts.")
2817
2818def push_lhd_overlay(ad):
2819    """Push lhd_overlay.conf to device in /data/vendor/gps/overlay/
2820
2821    ad:
2822        ad: An AndroidDevice object.
2823    """
2824    overlay_name = "lhd_overlay.conf"
2825    overlay_asset = ad.adb.shell("ls /data/vendor/gps/overlay/")
2826    if overlay_name in overlay_asset:
2827        ad.log.info(f"{overlay_name} already in device, skip.")
2828        return
2829
2830    temp_path = tempfile.mkdtemp()
2831    file_path = os.path.join(temp_path, overlay_name)
2832    lhd_content = 'Lhe477xDebugFlags=RPC:FACILITY=2097151:LOG_INFO:STDOUT_PUTS:STDOUT_LOG\n'\
2833                  'LogLevel=*:E\nLogLevel=*:W\nLogLevel=*:I\nLog=LOGCAT\nLogEnabled=true\n'
2834    overlay_path = "/data/vendor/gps/overlay/"
2835    with open(file_path, "w") as f:
2836        f.write(lhd_content)
2837    ad.log.info("Push lhd_overlay to device")
2838    ad.adb.push(file_path, overlay_path)
2839
2840
2841def disable_ramdump(ad):
2842    """Disable ramdump so device will reboot when about to enter ramdump
2843
2844    Once device enter ramdump, it will take a while to generate dump file
2845    The process may take a while and block all the tests.
2846    By disabling the ramdump mode, device will reboot instead of entering ramdump mode
2847
2848    Args:
2849        ad: An AndroidDevice object.
2850    """
2851    ad.log.info("Enter bootloader mode")
2852    ad.stop_services()
2853    ad.adb.reboot("bootloader")
2854    for _ in range(1,9):
2855        if ad.is_bootloader:
2856            break
2857        time.sleep(1)
2858    else:
2859        raise signals.TestFailure("can't enter bootloader mode")
2860    ad.log.info("Disable ramdump")
2861    ad.fastboot.oem("ramdump disable")
2862    ad.fastboot.reboot()
2863    ad.wait_for_boot_completion()
2864    ad.root_adb()
2865    tutils.bring_up_sl4a(ad)
2866    ad.start_adb_logcat()
2867
2868
2869def deep_suspend_device(ad):
2870    """Force DUT to enter deep suspend mode
2871
2872    When DUT is connected to PCs, it won't enter deep suspend mode
2873    by pressing power button.
2874
2875    To force DUT enter deep suspend mode, we need to send the
2876    following command to DUT  "echo mem >/sys/power/state"
2877
2878    To make sure the DUT stays in deep suspend mode for a while,
2879    it will send the suspend command 3 times with 15s interval
2880
2881    Args:
2882        ad: An AndroidDevice object.
2883    """
2884    ad.log.info("Ready to go to deep suspend mode")
2885    begin_time = get_device_time(ad)
2886    ad.droid.goToSleepNow()
2887    ensure_power_manager_is_dozing(ad, begin_time)
2888    ad.stop_services()
2889    try:
2890        command = "echo deep > /sys/power/mem_sleep && echo mem > /sys/power/state"
2891        for i in range(1, 4):
2892            ad.log.debug(f"Send deep suspend command round {i}")
2893            ad.adb.shell(command, ignore_status=True)
2894            # sleep here to ensure the device stays enough time in deep suspend mode
2895            time.sleep(15)
2896            if not _is_device_enter_deep_suspend(ad):
2897                raise signals.TestFailure("Device didn't enter deep suspend mode")
2898        ad.log.info("Wake device up now")
2899    except Exception:
2900        # when exception happen, it's very likely the device is rebooting
2901        # to ensure the test can go on, wait for the device is ready
2902        ad.log.warn("Device may be rebooting, wait for it")
2903        ad.wait_for_boot_completion()
2904        ad.root_adb()
2905        raise
2906    finally:
2907        tutils.bring_up_sl4a(ad)
2908        ad.start_adb_logcat()
2909        ad.droid.wakeUpNow()
2910
2911
2912def get_device_time(ad):
2913    """Get current datetime from device
2914
2915    Args:
2916        ad: An AndroidDevice object.
2917
2918    Returns:
2919        datetime object
2920    """
2921    result = ad.adb.shell("date +\"%Y-%m-%d %T.%3N\"")
2922    return datetime.strptime(result, "%Y-%m-%d %H:%M:%S.%f")
2923
2924
2925def ensure_power_manager_is_dozing(ad, begin_time):
2926    """Check if power manager is in dozing
2927    When device is sleeping, power manager should goes to doze mode.
2928    To ensure that, we check the log every 1 second (maximum to 3 times)
2929
2930    Args:
2931        ad: An AndroidDevice object.
2932        begin_time: datetime, used as the starting point to search log
2933    """
2934    keyword = "PowerManagerService: Dozing"
2935    ad.log.debug("Log search start time: %s" % begin_time)
2936    for i in range(0,3):
2937        result = ad.search_logcat(keyword, begin_time)
2938        if result:
2939            break
2940        ad.log.debug("Power manager is not dozing... retry in 1 second")
2941        time.sleep(1)
2942    else:
2943        ad.log.warn("Power manager didn't enter dozing")
2944
2945
2946
2947def _is_device_enter_deep_suspend(ad):
2948    """Check device has been enter deep suspend mode
2949
2950    If device has entered deep suspend mode, we should be able to find keyword
2951    "suspend entry (deep)"
2952
2953    Args:
2954        ad: An AndroidDevice object.
2955
2956    Returns:
2957        bool: True / False -> has / has not entered deep suspend
2958    """
2959    cmd = f"adb -s {ad.serial} logcat -d|grep \"suspend entry (deep)\""
2960    process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
2961                               stderr=subprocess.PIPE, shell=True)
2962    result, _ = process.communicate()
2963    ad.log.debug(f"suspend result = {result}")
2964
2965    return bool(result)
2966
2967
2968def check_location_report_interval(ad, location_reported_time_src, total_seconds, tolerance):
2969    """Validate the interval between two location reported time
2970    Normally the interval should be around 1 second but occasionally it may up to nearly 2 seconds
2971    So we set up a tolerance - 99% of reported interval should be less than 1.3 seconds
2972
2973    We validate the interval backward, because the wrong interval mostly happened at the end
2974    Args:
2975        ad: An AndroidDevice object.
2976        location_reported_time_src: A list of reported time(in string) from GPS tool
2977        total_seconds: (int) how many seconds has the GPS been enabled
2978        tolerance: (float) set how many ratio of error should be accepted
2979                   if we want to set tolerance to be 1% then pass 0.01 as tolerance value
2980    """
2981    ad.log.info("Checking location report frequency")
2982    error_count = 0
2983    error_tolerance = max(1, int(total_seconds * tolerance))
2984    expected_longest_interval = 1.3
2985    location_reported_time = list(map(lambda x: datetime.strptime(x, "%Y/%m/%d %H:%M:%S.%f"),
2986                                      location_reported_time_src))
2987    location_reported_time = sorted(location_reported_time)
2988    last_gps_report_time = location_reported_time[-1]
2989    ad.log.debug("Location report time: %s" % location_reported_time)
2990
2991    for reported_time in reversed(location_reported_time):
2992        time_diff = last_gps_report_time - reported_time
2993        if time_diff.total_seconds() > expected_longest_interval:
2994            error_count += 1
2995        last_gps_report_time = reported_time
2996
2997    if error_count > error_tolerance:
2998        fail_message = (f"Interval longer than {expected_longest_interval}s "
2999                        f"exceed tolerance count: {error_tolerance}, error count: {error_count}")
3000        ad.log.error(fail_message)
3001
3002
3003@contextmanager
3004def set_screen_status(ad, off=True):
3005    """Set screen on / off
3006
3007    A context manager function, can be used with "with" statement.
3008    example:
3009        with set_screen_status(ad, off=True):
3010            do anything you want during screen is off
3011    Once the function end, it will turn on the screen
3012    Args:
3013        ad: AndroidDevice object
3014        off: (bool) True -> turn off screen / False -> leave screen as it is
3015    """
3016    try:
3017        if off:
3018            ad.droid.goToSleepNow()
3019        yield ad
3020    finally:
3021        ad.droid.wakeUpNow()
3022        ensure_device_screen_is_on(ad)
3023
3024
3025@contextmanager
3026def full_gnss_measurement(ad):
3027    """Context manager function to enable full gnss measurement"""
3028    try:
3029        ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 1")
3030        yield ad
3031    finally:
3032        ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 0")
3033
3034
3035def ensure_device_screen_is_on(ad):
3036    """Make sure the screen is on
3037
3038    Will try 3 times, each with 1 second interval
3039
3040    Raise:
3041        GnssTestUtilsError: if screen can't be turn on after 3 tries
3042    """
3043    for _ in range(3):
3044        # when NotificationShade appears in focus window, it indicates the screen is still off
3045        if "NotificationShade" not in check_current_focus_app(ad):
3046            break
3047        time.sleep(1)
3048    else:
3049        raise GnssTestUtilsError("Device screen is not on after 3 tries")
3050
3051
3052def start_qxdm_and_tcpdump_log(ad, enable):
3053    """Start QXDM and adb tcpdump if collect_logs is True.
3054    Args:
3055        ad: AndroidDevice object
3056        enable: (bool) True -> start collecting
3057                       False -> not start collecting
3058    """
3059    if enable:
3060        start_pixel_logger(ad)
3061        tlutils.start_adb_tcpdump(ad)
3062
3063
3064def set_screen_always_on(ad):
3065    """Ensure the sceen will not turn off and display the correct app screen
3066    for wearable, we also disable the charing screen,
3067    otherwise the charing screen will keep popping up and block the GPS tool
3068    """
3069    if is_device_wearable(ad):
3070        ad.adb.shell("settings put global stay_on_while_plugged_in 7")
3071        ad.adb.shell("setprop persist.enable_charging_experience false")
3072    else:
3073        ad.adb.shell("settings put system screen_off_timeout 1800000")
3074
3075
3076def validate_adr_rate(ad, pass_criteria):
3077    """Check the ADR rate
3078
3079    Args:
3080        ad: AndroidDevice object
3081        pass_criteria: (float) the passing ratio, 1 = 100%, 0.5 = 50%
3082    """
3083    adr_statistic = GnssMeasurement(ad).get_adr_static()
3084
3085    ad.log.info("ADR threshold: {0:.1%}".format(pass_criteria))
3086    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_rate {0:.1%}".format(adr_statistic.valid_rate))
3087    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3088                "ADR_usable_rate {0:.1%}".format(adr_statistic.usable_rate))
3089    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_total_count %s" % adr_statistic.total_count)
3090    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_valid_count %s" % adr_statistic.valid_count)
3091    ad.log.info(UPLOAD_TO_SPONGE_PREFIX + "ADR_reset_count %s" % adr_statistic.reset_count)
3092    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3093                "ADR_cycle_slip_count %s" % adr_statistic.cycle_slip_count)
3094    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3095                "ADR_half_cycle_reported_count %s" % adr_statistic.half_cycle_reported_count)
3096    ad.log.info(UPLOAD_TO_SPONGE_PREFIX +
3097                "ADR_half_cycle_resolved_count %s" % adr_statistic.half_cycle_resolved_count)
3098
3099    asserts.assert_true(
3100        (pass_criteria < adr_statistic.valid_rate) and (pass_criteria < adr_statistic.usable_rate),
3101        f"ADR valid rate: {adr_statistic.valid_rate:.1%}, "
3102        f"ADR usable rate: {adr_statistic.usable_rate:.1%} "
3103        f"Lower than expected: {pass_criteria:.1%}"
3104    )
3105
3106
3107def pair_to_wearable(watch, phone):
3108    """Pair watch to phone.
3109
3110    Args:
3111        watch: A wearable project.
3112        phone: A pixel phone.
3113    Raise:
3114        TestFailure: If pairing process could not success after 3 tries.
3115    """
3116    for _ in range(3):
3117        process_pair(watch, phone)
3118        if is_bluetooth_connected(watch, phone):
3119            watch.log.info("Pairing successfully.")
3120            return True
3121    raise signals.TestFailure("Pairing is not successfully.")
3122
3123
3124def disable_battery_defend(ad):
3125    """Disable battery defend config to prevent battery defend message pop up
3126    after connecting to the same charger for 4 days in a row.
3127
3128    Args:
3129        ad: A wearable project.
3130    """
3131    for _ in range(5):
3132        remount_device(ad)
3133        ad.adb.shell("setprop vendor.battery.defender.disable 1")
3134        # To simulate cable unplug and the status will be recover after device reboot.
3135        ad.adb.shell("cmd battery unplug")
3136        # Sleep 3 seconds for waiting adb commend changes config and simulates cable unplug.
3137        time.sleep(3)
3138        config_setting = ad.adb.shell("getprop vendor.battery.defender.state")
3139        if config_setting == "DISABLED":
3140            ad.log.info("Disable Battery Defend setting successfully.")
3141            break
3142
3143
3144def restart_hal_service(ad):
3145    """Restart HAL service by killing the pid.
3146
3147    Gets the pid by ps command and pass the pid to kill command. Then we get the pid of HAL service
3148    again to see if the pid changes(pid should be different after HAL restart). If not, we will
3149    retry up to 4 times before raising Test Failure.
3150
3151    Args:
3152        ad: AndroidDevice object
3153    """
3154    ad.log.info("Restart HAL service")
3155    hal_process_name = "'android.hardware.gnss@[[:digit:]]\{1,2\}\.[[:digit:]]\{1,2\}-service'"
3156    hal_pid = get_process_pid(ad, hal_process_name)
3157    ad.log.info("HAL pid: %s" % hal_pid)
3158
3159    # Retry kill process if the PID is the same as original one
3160    for _ in range(4):
3161        ad.log.info("Kill HAL service")
3162        ad.adb.shell(f"kill -9 {hal_pid}")
3163
3164        # Waits for the HAL service to restart up to 4 seconds.
3165        for _ in range(4):
3166            new_hal_pid = get_process_pid(ad, hal_process_name)
3167            ad.log.info("New HAL pid: %s" % new_hal_pid)
3168            if new_hal_pid:
3169                if hal_pid != new_hal_pid:
3170                    return
3171                break
3172            time.sleep(1)
3173    else:
3174        raise signals.TestFailure("HAL service can't be killed")
3175
3176
3177def run_ttff(ad, mode, criteria, test_cycle, base_lat_long, collect_logs=False):
3178    """Verify TTFF functionality with mobile data.
3179
3180    Args:
3181        mode: "cs", "ws" or "hs"
3182        criteria: Criteria for the test.
3183
3184    Returns:
3185        ttff_data: A dict of all TTFF data.
3186    """
3187    start_qxdm_and_tcpdump_log(ad, collect_logs)
3188    return run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, base_lat_long)
3189
3190
3191def re_register_measurement_callback(dut):
3192    """Send command to unregister then register measurement callback.
3193
3194    Args:
3195        dut: The device under test.
3196    """
3197    dut.log.info("Reregister measurement callback")
3198    dut.adb.shell("am broadcast -a com.android.gpstool.stop_meas_action")
3199    time.sleep(1)
3200    dut.adb.shell("am broadcast -a com.android.gpstool.start_meas_action")
3201    time.sleep(1)
3202
3203
3204def check_power_save_mode_status(ad, full_power, begin_time, brcm_error_allowlist):
3205    """Checks the power save mode status.
3206
3207    For Broadcom:
3208        Gets NEMA sentences from pixel logger and retrieve the status [F, S, D].
3209        F,S => not in full power mode
3210        D => in full power mode
3211    For Qualcomm:
3212        Gets the HardwareClockDiscontinuityCount from logcat. In full power mode, the
3213        HardwareClockDiscontinuityCount should not be increased.
3214
3215    Args:
3216        ad: The device under test.
3217        full_power: The device is in full power mode or not.
3218        begin_time: It is used to get the correct logcat information for qualcomm.
3219        brcm_error_allowlist: It is used to ignore certain error in pixel logger.
3220    """
3221    if check_chipset_vendor_by_qualcomm(ad):
3222        _check_qualcomm_power_save_mode(ad, full_power, begin_time)
3223    else:
3224        _check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist)
3225
3226
3227def _check_qualcomm_power_save_mode(ad, full_power, begin_time):
3228    dpo_results = _get_dpo_info_from_logcat(ad, begin_time)
3229    first_dpo_count = int(dpo_results[0]["log_message"].split()[-1])
3230    final_dpo_count = int(dpo_results[-1]["log_message"].split()[-1])
3231    dpo_count_diff = final_dpo_count - first_dpo_count
3232    ad.log.debug("The DPO count diff is {diff}".format(diff=dpo_count_diff))
3233    if full_power:
3234        asserts.assert_equal(dpo_count_diff, 0, msg="DPO count diff should be 0")
3235    else:
3236        asserts.assert_true(dpo_count_diff > 0, msg="DPO count diff should be more than 0")
3237
3238
3239def _check_broadcom_power_save_mode(ad, full_power, brcm_error_allowlist):
3240    power_save_log, _ = _get_power_mode_log_from_pixel_logger(
3241        ad, brcm_error_allowlist, stop_pixel_logger=False)
3242    power_status = re.compile(r',P,(\w),').search(power_save_log[-2]).group(1)
3243    ad.log.debug("The power status is {status}".format(status=power_status))
3244    if full_power:
3245        asserts.assert_true(power_status == "D", msg="Should be in full power mode")
3246    else:
3247        asserts.assert_true(power_status in ["F", "S"], msg="Should not be in full power mode")
3248
3249@contextmanager
3250def run_gnss_tracking(ad, criteria, meas_flag):
3251    """A context manager to enable gnss tracking and stops at the end.
3252
3253    Args:
3254        ad: The device under test.
3255        criteria: The criteria for First Fixed.
3256        meas_flag: A flag to turn on measurement log or not.
3257    """
3258    process_gnss_by_gtw_gpstool(ad, criteria=criteria, meas_flag=meas_flag)
3259    try:
3260        yield
3261    finally:
3262        start_gnss_by_gtw_gpstool(ad, state=False)
3263
3264def log_current_epoch_time(ad, sponge_key):
3265    """Logs current epoch timestamp in second.
3266
3267    Args:
3268        sponge_key: The key name of the sponge property.
3269    """
3270    current_epoch_time = get_current_epoch_time() // 1000
3271    ad.log.info(f"TestResult {sponge_key} {current_epoch_time}")