• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import pathlib
3import re
4import shutil
5import tempfile
6from collections import defaultdict
7
8
9class AdrInfo:
10    """Represent one ADR value
11    An ADR value is a decimal number range from 0 - 31
12
13    How to parse the ADR value:
14        First, transform the decimal number to binary then we will get a 5 bit number
15        The meaning of each bit is as follow:
16                 0                  0               0         0       0
17        HalfCycleReported   HalfCycleResolved   CycleSlip   Reset   Valid
18    Special rule:
19        For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR
20    More insight of ADR value:
21        go/adrstates
22
23    Attributes:
24        is_valid: (bool)
25        is_reset: (bool)
26        is_cycle_slip: (bool)
27        is_half_cycle_resolved: (bool)
28        is_half_cycle_reported: (bool)
29        is_usable: (bool)
30    """
31    def __init__(self, adr_value: int, count: int):
32        src = bin(int(adr_value))
33        self._valid = int(src[-1])
34        self._reset = int(src[-2])
35        self._cycle_slip = int(src[-3])
36        self._half_cycle_resolved = int(src[-4])
37        self._half_cycle_reported = int(src[-5])
38        self.count = count
39
40    @property
41    def is_usable(self):
42        return self.is_valid and not self.is_reset and not self.is_cycle_slip
43
44    @property
45    def is_valid(self):
46        return bool(self._valid)
47
48    @property
49    def is_reset(self):
50        return bool(self._reset)
51
52    @property
53    def is_cycle_slip(self):
54        return bool(self._cycle_slip)
55
56    @property
57    def is_half_cycle_resolved(self):
58        return bool(self._half_cycle_resolved)
59
60    @property
61    def is_half_cycle_reported(self):
62        return bool(self._half_cycle_reported)
63
64
65class AdrStatistic:
66    """Represent the ADR statistic
67
68    Attributes:
69        usable_count: (int)
70        valid_count: (int)
71        reset_count: (int)
72        cycle_slip_count: (int)
73        half_cycle_resolved_count: (int)
74        half_cycle_reported_count: (int)
75        total_count: (int)
76        usable_rate: (float)
77            usable_count / total_count
78        valid_rate: (float)
79            valid_count / total_count
80    """
81    def __init__(self):
82        self.usable_count = 0
83        self.valid_count = 0
84        self.reset_count = 0
85        self.cycle_slip_count = 0
86        self.half_cycle_resolved_count = 0
87        self.half_cycle_reported_count = 0
88        self.total_count = 0
89
90    @property
91    def usable_rate(self):
92        denominator = max(1, self.total_count)
93        result = self.usable_count / denominator
94        return round(result, 3)
95
96    @property
97    def valid_rate(self):
98        denominator = max(1, self.total_count)
99        result = self.valid_count / denominator
100        return round(result, 3)
101
102    def add_adr_info(self, adr_info: AdrInfo):
103        """Add ADR info record to increase the statistic
104
105        Args:
106            adr_info: AdrInfo object
107        """
108        if adr_info.is_valid:
109            self.valid_count += adr_info.count
110        if adr_info.is_reset:
111            self.reset_count += adr_info.count
112        if adr_info.is_cycle_slip:
113            self.cycle_slip_count += adr_info.count
114        if adr_info.is_half_cycle_resolved:
115            self.half_cycle_resolved_count += adr_info.count
116        if adr_info.is_half_cycle_reported:
117            self.half_cycle_reported_count += adr_info.count
118        if adr_info.is_usable:
119            self.usable_count += adr_info.count
120        self.total_count += adr_info.count
121
122
123
124class GnssMeasurement:
125    """Represent the content of measurement file generated by gps tool"""
126
127    FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt"
128
129    def __init__(self, ad):
130        self.ad = ad
131
132    def _generate_local_temp_path(self, file_name="file.txt"):
133        """Generate a file path for temporarily usage
134
135        Returns:
136            string: local file path
137        """
138        folder = tempfile.mkdtemp()
139        file_path = os.path.join(folder, file_name)
140        return file_path
141
142    def _get_latest_measurement_file_path(self):
143        """Get the latest measurement file path on device
144
145        Returns:
146            string: file path on device
147        """
148        command = f"ls -tr {self.FILE_PATTERN} | tail -1"
149        result = self.ad.adb.shell(command)
150        return result
151
152    def get_latest_measurement_file(self):
153        """Pull the latest measurement file from device to local
154
155        Returns:
156            string: local file path to the measurement file
157
158        Raise:
159            FileNotFoundError: can't get measurement file from device
160        """
161        self.ad.log.info("Get measurement file from device")
162        dest = self._generate_local_temp_path(file_name="measurement.txt")
163        src = self._get_latest_measurement_file_path()
164        if not src:
165            raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}")
166        self.ad.pull_files(src, dest)
167        return dest
168
169    def _get_adr_src_value(self):
170        """Get ADR value from measurement file
171
172        Returns:
173            dict: {<ADR_value>: count, <ADR_value>: count...}
174        """
175        try:
176            file_path = self.get_latest_measurement_file()
177            adr_src = defaultdict(int)
178            adr_src_regex = re.compile("=\s(\d*)")
179            with open(file_path) as f:
180                for line in f:
181                    if "AccumulatedDeltaRangeState" in line:
182                        value = re.search(adr_src_regex, line)
183                        if not value:
184                            self.ad.log.warn("Can't get ADR value %s" % line)
185                            continue
186                        key = value.group(1)
187                        adr_src[key] += 1
188            return adr_src
189        finally:
190            folder = pathlib.PurePosixPath(file_path).parent
191            shutil.rmtree(folder, ignore_errors=True)
192
193    def get_adr_static(self):
194        """Get ADR statistic
195
196        Summarize ADR value from measurement file
197
198        Returns:
199            AdrStatistic object
200        """
201        self.ad.log.info("Get ADR statistic")
202        adr_src = self._get_adr_src_value()
203        adr_static = AdrStatistic()
204        for key, value in adr_src.items():
205            self.ad.log.debug("ADR value: %s - count: %s" % (key, value))
206            adr_info = AdrInfo(key, value)
207            adr_static.add_adr_info(adr_info)
208        return adr_static
209