1from datetime import datetime 2import os 3import pathlib 4import re 5import shutil 6import tempfile 7from collections import defaultdict 8 9 10_EVENT_TIME_FORMAT = "%Y/%m/%d %H:%M:%S.%f" 11_EVENT_TIME_PATTERN = re.compile(r"(\d+(?:\/\d+){2}\s\d{2}(?::\d{2}){2}\.\d+)\sRead:") 12_GNSS_CLOCK_START_LOG_PATTERN = re.compile(r"^GnssClock:") 13_GNSS_CLOCK_TIME_NANOS_PATTERN = re.compile(f"^\s+TimeNanos\s*=\s*([-]?\d*)") 14_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN = re.compile(f"^\s+FullBiasNanos\s*=\s*([-]?\d*)") 15_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN = re.compile(f"^\s+elapsedRealtimeNanos\s*=\s*([-]?\d*)") 16 17class AdrInfo: 18 """Represent one ADR value 19 An ADR value is a decimal number range from 0 - 31 20 21 How to parse the ADR value: 22 First, transform the decimal number to binary then we will get a 5 bit number 23 The meaning of each bit is as follow: 24 0 0 0 0 0 25 HalfCycleReported HalfCycleResolved CycleSlip Reset Valid 26 Special rule: 27 For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR 28 More insight of ADR value: 29 go/adrstates 30 31 Attributes: 32 is_valid: (bool) 33 is_reset: (bool) 34 is_cycle_slip: (bool) 35 is_half_cycle_resolved: (bool) 36 is_half_cycle_reported: (bool) 37 is_usable: (bool) 38 """ 39 def __init__(self, adr_value: int, count: int): 40 src = bin(int(adr_value)) 41 self._valid = int(src[-1]) 42 self._reset = int(src[-2]) 43 self._cycle_slip = int(src[-3]) 44 self._half_cycle_resolved = int(src[-4]) 45 self._half_cycle_reported = int(src[-5]) 46 self.count = count 47 48 @property 49 def is_usable(self): 50 return self.is_valid and not self.is_reset and not self.is_cycle_slip 51 52 @property 53 def is_valid(self): 54 return bool(self._valid) 55 56 @property 57 def is_reset(self): 58 return bool(self._reset) 59 60 @property 61 def is_cycle_slip(self): 62 return bool(self._cycle_slip) 63 64 @property 65 def is_half_cycle_resolved(self): 66 return bool(self._half_cycle_resolved) 67 68 @property 69 def is_half_cycle_reported(self): 70 return bool(self._half_cycle_reported) 71 72 73class AdrStatistic: 74 """Represent the ADR statistic 75 76 Attributes: 77 usable_count: (int) 78 valid_count: (int) 79 reset_count: (int) 80 cycle_slip_count: (int) 81 half_cycle_resolved_count: (int) 82 half_cycle_reported_count: (int) 83 total_count: (int) 84 usable_rate: (float) 85 usable_count / total_count 86 valid_rate: (float) 87 valid_count / total_count 88 """ 89 def __init__(self): 90 self.usable_count = 0 91 self.valid_count = 0 92 self.reset_count = 0 93 self.cycle_slip_count = 0 94 self.half_cycle_resolved_count = 0 95 self.half_cycle_reported_count = 0 96 self.total_count = 0 97 98 @property 99 def usable_rate(self): 100 denominator = max(1, self.total_count) 101 result = self.usable_count / denominator 102 return round(result, 3) 103 104 @property 105 def valid_rate(self): 106 denominator = max(1, self.total_count) 107 result = self.valid_count / denominator 108 return round(result, 3) 109 110 def add_adr_info(self, adr_info: AdrInfo): 111 """Add ADR info record to increase the statistic 112 113 Args: 114 adr_info: AdrInfo object 115 """ 116 if adr_info.is_valid: 117 self.valid_count += adr_info.count 118 if adr_info.is_reset: 119 self.reset_count += adr_info.count 120 if adr_info.is_cycle_slip: 121 self.cycle_slip_count += adr_info.count 122 if adr_info.is_half_cycle_resolved: 123 self.half_cycle_resolved_count += adr_info.count 124 if adr_info.is_half_cycle_reported: 125 self.half_cycle_reported_count += adr_info.count 126 if adr_info.is_usable: 127 self.usable_count += adr_info.count 128 self.total_count += adr_info.count 129 130 131class GnssClockSubEvent: 132 time_nanos: int 133 full_bias_nanos: int 134 elapsed_real_time_nanos: int 135 136 def __init__(self, event_time): 137 self.event_time = event_time 138 139 def parse(self, line): 140 if _GNSS_CLOCK_TIME_NANOS_PATTERN.search(line): 141 self.time_nanos = int(_GNSS_CLOCK_TIME_NANOS_PATTERN.search(line).group(1)) 142 elif _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line): 143 self.full_bias_nanos = int( 144 _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line).group(1)) 145 elif _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line): 146 self.elapsed_real_time_nanos = int( 147 _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line).group(1)) 148 149 def __repr__(self) -> str: 150 return (f"event time: {self.event_time}, " 151 f"timenanos: {self.time_nanos}, " 152 f"full_bias: {self.full_bias_nanos}, " 153 f"elapsed_realtime: {self.elapsed_real_time_nanos}") 154 155 @property 156 def gps_elapsed_realtime_diff(self): 157 return self.time_nanos - self.full_bias_nanos - self.elapsed_real_time_nanos 158 159 160class GnssMeasurement: 161 """Represent the content of measurement file generated by gps tool""" 162 163 FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt" 164 165 def __init__(self, ad): 166 self.ad = ad 167 168 def _generate_local_temp_path(self, file_name="file.txt"): 169 """Generate a file path for temporarily usage 170 171 Returns: 172 string: local file path 173 """ 174 folder = tempfile.mkdtemp() 175 file_path = os.path.join(folder, file_name) 176 return file_path 177 178 def _get_latest_measurement_file_path(self): 179 """Get the latest measurement file path on device 180 181 Returns: 182 string: file path on device 183 """ 184 command = f"ls -tr {self.FILE_PATTERN} | tail -1" 185 result = self.ad.adb.shell(command) 186 return result 187 188 def get_latest_measurement_file(self): 189 """Pull the latest measurement file from device to local 190 191 Returns: 192 string: local file path to the measurement file 193 194 Raise: 195 FileNotFoundError: can't get measurement file from device 196 """ 197 self.ad.log.info("Get measurement file from device") 198 dest = self._generate_local_temp_path(file_name="measurement.txt") 199 src = self._get_latest_measurement_file_path() 200 if not src: 201 raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}") 202 self.ad.pull_files(src, dest) 203 return dest 204 205 def _get_adr_src_value(self): 206 """Get ADR value from measurement file 207 208 Returns: 209 dict: {<ADR_value>: count, <ADR_value>: count...} 210 """ 211 try: 212 file_path = self.get_latest_measurement_file() 213 adr_src = defaultdict(int) 214 adr_src_regex = re.compile("=\s(\d*)") 215 with open(file_path) as f: 216 for line in f: 217 if "AccumulatedDeltaRangeState" in line: 218 value = re.search(adr_src_regex, line) 219 if not value: 220 self.ad.log.warn("Can't get ADR value %s" % line) 221 continue 222 key = value.group(1) 223 adr_src[key] += 1 224 return adr_src 225 finally: 226 folder = pathlib.PurePosixPath(file_path).parent 227 shutil.rmtree(folder, ignore_errors=True) 228 229 def get_adr_static(self): 230 """Get ADR statistic 231 232 Summarize ADR value from measurement file 233 234 Returns: 235 AdrStatistic object 236 """ 237 self.ad.log.info("Get ADR statistic") 238 adr_src = self._get_adr_src_value() 239 adr_static = AdrStatistic() 240 for key, value in adr_src.items(): 241 self.ad.log.debug("ADR value: %s - count: %s" % (key, value)) 242 adr_info = AdrInfo(key, value) 243 adr_static.add_adr_info(adr_info) 244 return adr_static 245 246 def get_gnss_clock_info(self): 247 sub_events = [] 248 event_time = None 249 with tempfile.TemporaryDirectory() as folder: 250 local_measurement_file = os.path.join(folder, "measurement_file") 251 self.ad.pull_files(self._get_latest_measurement_file_path(), local_measurement_file) 252 with open(local_measurement_file) as f: 253 for line in f: 254 if re.search(_EVENT_TIME_PATTERN, line): 255 event_time = re.search(_EVENT_TIME_PATTERN, line).group(1) 256 event_time = datetime.strptime(event_time, _EVENT_TIME_FORMAT) 257 elif re.search(_GNSS_CLOCK_START_LOG_PATTERN, line): 258 sub_events.append(GnssClockSubEvent(event_time)) 259 elif line.startswith(" "): 260 sub_events[-1].parse(line) 261 return sub_events 262 263