• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from datetime import datetime
2import os
3import pathlib
4import re
5import shutil
6import tempfile
7from collections import defaultdict
8
9
10_EVENT_TIME_FORMAT = "%Y/%m/%d %H:%M:%S.%f"
11_EVENT_TIME_PATTERN = re.compile(r"(\d+(?:\/\d+){2}\s\d{2}(?::\d{2}){2}\.\d+)\sRead:")
12_GNSS_CLOCK_START_LOG_PATTERN = re.compile(r"^GnssClock:")
13_GNSS_CLOCK_TIME_NANOS_PATTERN = re.compile(f"^\s+TimeNanos\s*=\s*([-]?\d*)")
14_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN = re.compile(f"^\s+FullBiasNanos\s*=\s*([-]?\d*)")
15_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN = re.compile(f"^\s+elapsedRealtimeNanos\s*=\s*([-]?\d*)")
16
17class AdrInfo:
18    """Represent one ADR value
19    An ADR value is a decimal number range from 0 - 31
20
21    How to parse the ADR value:
22        First, transform the decimal number to binary then we will get a 5 bit number
23        The meaning of each bit is as follow:
24                 0                  0               0         0       0
25        HalfCycleReported   HalfCycleResolved   CycleSlip   Reset   Valid
26    Special rule:
27        For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR
28    More insight of ADR value:
29        go/adrstates
30
31    Attributes:
32        is_valid: (bool)
33        is_reset: (bool)
34        is_cycle_slip: (bool)
35        is_half_cycle_resolved: (bool)
36        is_half_cycle_reported: (bool)
37        is_usable: (bool)
38    """
39    def __init__(self, adr_value: int, count: int):
40        src = bin(int(adr_value))
41        self._valid = int(src[-1])
42        self._reset = int(src[-2])
43        self._cycle_slip = int(src[-3])
44        self._half_cycle_resolved = int(src[-4])
45        self._half_cycle_reported = int(src[-5])
46        self.count = count
47
48    @property
49    def is_usable(self):
50        return self.is_valid and not self.is_reset and not self.is_cycle_slip
51
52    @property
53    def is_valid(self):
54        return bool(self._valid)
55
56    @property
57    def is_reset(self):
58        return bool(self._reset)
59
60    @property
61    def is_cycle_slip(self):
62        return bool(self._cycle_slip)
63
64    @property
65    def is_half_cycle_resolved(self):
66        return bool(self._half_cycle_resolved)
67
68    @property
69    def is_half_cycle_reported(self):
70        return bool(self._half_cycle_reported)
71
72
73class AdrStatistic:
74    """Represent the ADR statistic
75
76    Attributes:
77        usable_count: (int)
78        valid_count: (int)
79        reset_count: (int)
80        cycle_slip_count: (int)
81        half_cycle_resolved_count: (int)
82        half_cycle_reported_count: (int)
83        total_count: (int)
84        usable_rate: (float)
85            usable_count / total_count
86        valid_rate: (float)
87            valid_count / total_count
88    """
89    def __init__(self):
90        self.usable_count = 0
91        self.valid_count = 0
92        self.reset_count = 0
93        self.cycle_slip_count = 0
94        self.half_cycle_resolved_count = 0
95        self.half_cycle_reported_count = 0
96        self.total_count = 0
97
98    @property
99    def usable_rate(self):
100        denominator = max(1, self.total_count)
101        result = self.usable_count / denominator
102        return round(result, 3)
103
104    @property
105    def valid_rate(self):
106        denominator = max(1, self.total_count)
107        result = self.valid_count / denominator
108        return round(result, 3)
109
110    def add_adr_info(self, adr_info: AdrInfo):
111        """Add ADR info record to increase the statistic
112
113        Args:
114            adr_info: AdrInfo object
115        """
116        if adr_info.is_valid:
117            self.valid_count += adr_info.count
118        if adr_info.is_reset:
119            self.reset_count += adr_info.count
120        if adr_info.is_cycle_slip:
121            self.cycle_slip_count += adr_info.count
122        if adr_info.is_half_cycle_resolved:
123            self.half_cycle_resolved_count += adr_info.count
124        if adr_info.is_half_cycle_reported:
125            self.half_cycle_reported_count += adr_info.count
126        if adr_info.is_usable:
127            self.usable_count += adr_info.count
128        self.total_count += adr_info.count
129
130
131class GnssClockSubEvent:
132    time_nanos: int
133    full_bias_nanos: int
134    elapsed_real_time_nanos: int
135
136    def __init__(self, event_time):
137        self.event_time = event_time
138
139    def parse(self, line):
140        if _GNSS_CLOCK_TIME_NANOS_PATTERN.search(line):
141            self.time_nanos = int(_GNSS_CLOCK_TIME_NANOS_PATTERN.search(line).group(1))
142        elif _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line):
143            self.full_bias_nanos = int(
144                _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line).group(1))
145        elif _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line):
146            self.elapsed_real_time_nanos = int(
147                _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line).group(1))
148
149    def __repr__(self) -> str:
150        return (f"event time: {self.event_time}, "
151                f"timenanos: {self.time_nanos}, "
152                f"full_bias: {self.full_bias_nanos}, "
153                f"elapsed_realtime: {self.elapsed_real_time_nanos}")
154
155    @property
156    def gps_elapsed_realtime_diff(self):
157        return self.time_nanos - self.full_bias_nanos - self.elapsed_real_time_nanos
158
159
160class GnssMeasurement:
161    """Represent the content of measurement file generated by gps tool"""
162
163    FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt"
164
165    def __init__(self, ad):
166        self.ad = ad
167
168    def _generate_local_temp_path(self, file_name="file.txt"):
169        """Generate a file path for temporarily usage
170
171        Returns:
172            string: local file path
173        """
174        folder = tempfile.mkdtemp()
175        file_path = os.path.join(folder, file_name)
176        return file_path
177
178    def _get_latest_measurement_file_path(self):
179        """Get the latest measurement file path on device
180
181        Returns:
182            string: file path on device
183        """
184        command = f"ls -tr {self.FILE_PATTERN} | tail -1"
185        result = self.ad.adb.shell(command)
186        return result
187
188    def get_latest_measurement_file(self):
189        """Pull the latest measurement file from device to local
190
191        Returns:
192            string: local file path to the measurement file
193
194        Raise:
195            FileNotFoundError: can't get measurement file from device
196        """
197        self.ad.log.info("Get measurement file from device")
198        dest = self._generate_local_temp_path(file_name="measurement.txt")
199        src = self._get_latest_measurement_file_path()
200        if not src:
201            raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}")
202        self.ad.pull_files(src, dest)
203        return dest
204
205    def _get_adr_src_value(self):
206        """Get ADR value from measurement file
207
208        Returns:
209            dict: {<ADR_value>: count, <ADR_value>: count...}
210        """
211        try:
212            file_path = self.get_latest_measurement_file()
213            adr_src = defaultdict(int)
214            adr_src_regex = re.compile("=\s(\d*)")
215            with open(file_path) as f:
216                for line in f:
217                    if "AccumulatedDeltaRangeState" in line:
218                        value = re.search(adr_src_regex, line)
219                        if not value:
220                            self.ad.log.warn("Can't get ADR value %s" % line)
221                            continue
222                        key = value.group(1)
223                        adr_src[key] += 1
224            return adr_src
225        finally:
226            folder = pathlib.PurePosixPath(file_path).parent
227            shutil.rmtree(folder, ignore_errors=True)
228
229    def get_adr_static(self):
230        """Get ADR statistic
231
232        Summarize ADR value from measurement file
233
234        Returns:
235            AdrStatistic object
236        """
237        self.ad.log.info("Get ADR statistic")
238        adr_src = self._get_adr_src_value()
239        adr_static = AdrStatistic()
240        for key, value in adr_src.items():
241            self.ad.log.debug("ADR value: %s - count: %s" % (key, value))
242            adr_info = AdrInfo(key, value)
243            adr_static.add_adr_info(adr_info)
244        return adr_static
245
246    def get_gnss_clock_info(self):
247        sub_events = []
248        event_time = None
249        with tempfile.TemporaryDirectory() as folder:
250            local_measurement_file = os.path.join(folder, "measurement_file")
251            self.ad.pull_files(self._get_latest_measurement_file_path(), local_measurement_file)
252            with open(local_measurement_file) as f:
253                for line in f:
254                    if re.search(_EVENT_TIME_PATTERN, line):
255                        event_time = re.search(_EVENT_TIME_PATTERN, line).group(1)
256                        event_time = datetime.strptime(event_time, _EVENT_TIME_FORMAT)
257                    elif re.search(_GNSS_CLOCK_START_LOG_PATTERN, line):
258                        sub_events.append(GnssClockSubEvent(event_time))
259                    elif line.startswith(" "):
260                        sub_events[-1].parse(line)
261        return sub_events
262
263