1import os 2import pathlib 3import re 4import shutil 5import tempfile 6from collections import defaultdict 7 8 9class AdrInfo: 10 """Represent one ADR value 11 An ADR value is a decimal number range from 0 - 31 12 13 How to parse the ADR value: 14 First, transform the decimal number to binary then we will get a 5 bit number 15 The meaning of each bit is as follow: 16 0 0 0 0 0 17 HalfCycleReported HalfCycleResolved CycleSlip Reset Valid 18 Special rule: 19 For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR 20 More insight of ADR value: 21 go/adrstates 22 23 Attributes: 24 is_valid: (bool) 25 is_reset: (bool) 26 is_cycle_slip: (bool) 27 is_half_cycle_resolved: (bool) 28 is_half_cycle_reported: (bool) 29 is_usable: (bool) 30 """ 31 def __init__(self, adr_value: int, count: int): 32 src = bin(int(adr_value)) 33 self._valid = int(src[-1]) 34 self._reset = int(src[-2]) 35 self._cycle_slip = int(src[-3]) 36 self._half_cycle_resolved = int(src[-4]) 37 self._half_cycle_reported = int(src[-5]) 38 self.count = count 39 40 @property 41 def is_usable(self): 42 return self.is_valid and not self.is_reset and not self.is_cycle_slip 43 44 @property 45 def is_valid(self): 46 return bool(self._valid) 47 48 @property 49 def is_reset(self): 50 return bool(self._reset) 51 52 @property 53 def is_cycle_slip(self): 54 return bool(self._cycle_slip) 55 56 @property 57 def is_half_cycle_resolved(self): 58 return bool(self._half_cycle_resolved) 59 60 @property 61 def is_half_cycle_reported(self): 62 return bool(self._half_cycle_reported) 63 64 65class AdrStatistic: 66 """Represent the ADR statistic 67 68 Attributes: 69 usable_count: (int) 70 valid_count: (int) 71 reset_count: (int) 72 cycle_slip_count: (int) 73 half_cycle_resolved_count: (int) 74 half_cycle_reported_count: (int) 75 total_count: (int) 76 usable_rate: (float) 77 usable_count / total_count 78 valid_rate: (float) 79 valid_count / total_count 80 """ 81 def __init__(self): 82 self.usable_count = 0 83 self.valid_count = 0 84 self.reset_count = 0 85 self.cycle_slip_count = 0 86 self.half_cycle_resolved_count = 0 87 self.half_cycle_reported_count = 0 88 self.total_count = 0 89 90 @property 91 def usable_rate(self): 92 denominator = max(1, self.total_count) 93 result = self.usable_count / denominator 94 return round(result, 3) 95 96 @property 97 def valid_rate(self): 98 denominator = max(1, self.total_count) 99 result = self.valid_count / denominator 100 return round(result, 3) 101 102 def add_adr_info(self, adr_info: AdrInfo): 103 """Add ADR info record to increase the statistic 104 105 Args: 106 adr_info: AdrInfo object 107 """ 108 if adr_info.is_valid: 109 self.valid_count += adr_info.count 110 if adr_info.is_reset: 111 self.reset_count += adr_info.count 112 if adr_info.is_cycle_slip: 113 self.cycle_slip_count += adr_info.count 114 if adr_info.is_half_cycle_resolved: 115 self.half_cycle_resolved_count += adr_info.count 116 if adr_info.is_half_cycle_reported: 117 self.half_cycle_reported_count += adr_info.count 118 if adr_info.is_usable: 119 self.usable_count += adr_info.count 120 self.total_count += adr_info.count 121 122 123 124class GnssMeasurement: 125 """Represent the content of measurement file generated by gps tool""" 126 127 FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt" 128 129 def __init__(self, ad): 130 self.ad = ad 131 132 def _generate_local_temp_path(self, file_name="file.txt"): 133 """Generate a file path for temporarily usage 134 135 Returns: 136 string: local file path 137 """ 138 folder = tempfile.mkdtemp() 139 file_path = os.path.join(folder, file_name) 140 return file_path 141 142 def _get_latest_measurement_file_path(self): 143 """Get the latest measurement file path on device 144 145 Returns: 146 string: file path on device 147 """ 148 command = f"ls -tr {self.FILE_PATTERN} | tail -1" 149 result = self.ad.adb.shell(command) 150 return result 151 152 def get_latest_measurement_file(self): 153 """Pull the latest measurement file from device to local 154 155 Returns: 156 string: local file path to the measurement file 157 158 Raise: 159 FileNotFoundError: can't get measurement file from device 160 """ 161 self.ad.log.info("Get measurement file from device") 162 dest = self._generate_local_temp_path(file_name="measurement.txt") 163 src = self._get_latest_measurement_file_path() 164 if not src: 165 raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}") 166 self.ad.pull_files(src, dest) 167 return dest 168 169 def _get_adr_src_value(self): 170 """Get ADR value from measurement file 171 172 Returns: 173 dict: {<ADR_value>: count, <ADR_value>: count...} 174 """ 175 try: 176 file_path = self.get_latest_measurement_file() 177 adr_src = defaultdict(int) 178 adr_src_regex = re.compile("=\s(\d*)") 179 with open(file_path) as f: 180 for line in f: 181 if "AccumulatedDeltaRangeState" in line: 182 value = re.search(adr_src_regex, line) 183 if not value: 184 self.ad.log.warn("Can't get ADR value %s" % line) 185 continue 186 key = value.group(1) 187 adr_src[key] += 1 188 return adr_src 189 finally: 190 folder = pathlib.PurePosixPath(file_path).parent 191 shutil.rmtree(folder, ignore_errors=True) 192 193 def get_adr_static(self): 194 """Get ADR statistic 195 196 Summarize ADR value from measurement file 197 198 Returns: 199 AdrStatistic object 200 """ 201 self.ad.log.info("Get ADR statistic") 202 adr_src = self._get_adr_src_value() 203 adr_static = AdrStatistic() 204 for key, value in adr_src.items(): 205 self.ad.log.debug("ADR value: %s - count: %s" % (key, value)) 206 adr_info = AdrInfo(key, value) 207 adr_static.add_adr_info(adr_info) 208 return adr_static 209