• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2021 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the thread analyzer."""
16
17import unittest
18from pw_thread.thread_analyzer import ThreadInfo, ThreadSnapshotAnalyzer
19from pw_thread_protos import thread_pb2
20import pw_tokenizer
21from pw_tokenizer import tokens
22
23
24class ThreadInfoTest(unittest.TestCase):
25    """Tests that the ThreadInfo class produces expected results."""
26
27    def test_empty_thread(self):
28        thread_info = ThreadInfo(thread_pb2.Thread())
29        expected = '\n'.join(
30            (
31                'Est CPU usage: unknown',
32                'Stack info',
33                '  Current usage:   0x???????? - 0x???????? (size unknown)',
34                '  Est peak usage:  size unknown',
35                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
36            )
37        )
38        self.assertFalse(thread_info.has_stack_size_limit())
39        self.assertFalse(thread_info.has_stack_used())
40        self.assertEqual(expected, str(thread_info))
41
42    def test_thread_with_cpu_usage(self):
43        thread = thread_pb2.Thread()
44        thread.cpu_usage_hundredths = 1234
45        thread_info = ThreadInfo(thread)
46
47        expected = '\n'.join(
48            (
49                'Est CPU usage: 12.34%',
50                'Stack info',
51                '  Current usage:   0x???????? - 0x???????? (size unknown)',
52                '  Est peak usage:  size unknown',
53                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
54            )
55        )
56        self.assertFalse(thread_info.has_stack_size_limit())
57        self.assertFalse(thread_info.has_stack_used())
58        self.assertEqual(expected, str(thread_info))
59
60    def test_thread_with_stack_pointer(self):
61        thread = thread_pb2.Thread()
62        thread.stack_pointer = 0x5AC6A86C
63        thread_info = ThreadInfo(thread)
64
65        expected = '\n'.join(
66            (
67                'Est CPU usage: unknown',
68                'Stack info',
69                '  Current usage:   0x???????? - 0x5ac6a86c (size unknown)',
70                '  Est peak usage:  size unknown',
71                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
72            )
73        )
74        self.assertFalse(thread_info.has_stack_size_limit())
75        self.assertFalse(thread_info.has_stack_used())
76        self.assertEqual(expected, str(thread_info))
77
78    def test_thread_with_stack_usage(self):
79        thread = thread_pb2.Thread()
80        thread.stack_start_pointer = 0x5AC6B86C
81        thread.stack_pointer = 0x5AC6A86C
82        thread_info = ThreadInfo(thread)
83
84        expected = '\n'.join(
85            (
86                'Est CPU usage: unknown',
87                'Stack info',
88                '  Current usage:   0x5ac6b86c - 0x5ac6a86c (4096 bytes)',
89                '  Est peak usage:  size unknown',
90                '  Stack limits:    0x5ac6b86c - 0x???????? (size unknown)',
91            )
92        )
93        self.assertFalse(thread_info.has_stack_size_limit())
94        self.assertTrue(thread_info.has_stack_used())
95        self.assertEqual(expected, str(thread_info))
96
97    def test_thread_with_zero_size_stack(self):
98        thread = thread_pb2.Thread()
99        thread.stack_start_pointer = 0x5AC6B86C
100        thread.stack_end_pointer = 0x5AC6B86C
101        thread.stack_pointer = 0x5AC6A86C
102        thread.stack_pointer_est_peak = 0x5AC6A86C
103        thread_info = ThreadInfo(thread)
104
105        # pylint: disable=line-too-long
106        expected = '\n'.join(
107            (
108                'Est CPU usage: unknown',
109                'Stack info',
110                '  Current usage:   0x5ac6b86c - 0x5ac6a86c (4096 bytes, NaN%)',
111                '  Est peak usage:  4096 bytes, NaN%',
112                '  Stack limits:    0x5ac6b86c - 0x5ac6b86c (WARNING: total stack size is 0 bytes)',
113            )
114        )
115        # pylint: enable=line-too-long
116        self.assertTrue(thread_info.has_stack_size_limit())
117        self.assertTrue(thread_info.has_stack_used())
118        self.assertEqual(expected, str(thread_info))
119
120    def test_thread_with_all_stack_info(self):
121        thread = thread_pb2.Thread()
122        thread.stack_start_pointer = 0x5AC6B86C
123        thread.stack_end_pointer = 0x5AC6986C
124        thread.stack_pointer = 0x5AC6A86C
125        thread_info = ThreadInfo(thread)
126
127        # pylint: disable=line-too-long
128        expected = '\n'.join(
129            (
130                'Est CPU usage: unknown',
131                'Stack info',
132                '  Current usage:   0x5ac6b86c - 0x5ac6a86c (4096 bytes, 50.00%)',
133                '  Est peak usage:  size unknown',
134                '  Stack limits:    0x5ac6b86c - 0x5ac6986c (8192 bytes)',
135            )
136        )
137        # pylint: enable=line-too-long
138        self.assertTrue(thread_info.has_stack_size_limit())
139        self.assertTrue(thread_info.has_stack_used())
140        self.assertEqual(expected, str(thread_info))
141
142
143class ThreadSnapshotAnalyzerTest(unittest.TestCase):
144    """Tests that the ThreadSnapshotAnalyzer class produces expected results."""
145
146    def test_no_threads(self):
147        analyzer = ThreadSnapshotAnalyzer(thread_pb2.SnapshotThreadInfo())
148        self.assertEqual('', str(analyzer))
149
150    def test_one_empty_thread(self):
151        snapshot = thread_pb2.SnapshotThreadInfo()
152        snapshot.threads.append(thread_pb2.Thread())
153        expected = '\n'.join(
154            (
155                'Thread State',
156                '  1 thread running.',
157                '',
158                'Thread (UNKNOWN): [unnamed thread]',
159                'Est CPU usage: unknown',
160                'Stack info',
161                '  Current usage:   0x???????? - 0x???????? (size unknown)',
162                '  Est peak usage:  size unknown',
163                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
164                '',
165            )
166        )
167        analyzer = ThreadSnapshotAnalyzer(snapshot)
168        self.assertEqual(analyzer.active_thread(), None)
169        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
170
171    def test_two_threads(self):
172        """Ensures multiple threads are printed correctly."""
173        snapshot = thread_pb2.SnapshotThreadInfo()
174
175        temp_thread = thread_pb2.Thread()
176        temp_thread.name = 'Idle'.encode()
177        temp_thread.state = thread_pb2.ThreadState.Enum.READY
178        temp_thread.stack_start_pointer = 0x2001AC00
179        temp_thread.stack_end_pointer = 0x2001AA00
180        temp_thread.stack_pointer = 0x2001AB0C
181        temp_thread.stack_pointer_est_peak = 0x2001AA00
182        snapshot.threads.append(temp_thread)
183
184        temp_thread = thread_pb2.Thread()
185        temp_thread.name = 'Alice'.encode()
186        temp_thread.stack_start_pointer = 0x2001B000
187        temp_thread.stack_pointer = 0x2001AE20
188        temp_thread.state = thread_pb2.ThreadState.Enum.BLOCKED
189        snapshot.threads.append(temp_thread)
190
191        # pylint: disable=line-too-long
192        expected = '\n'.join(
193            (
194                'Thread State',
195                '  2 threads running.',
196                '',
197                'Thread (READY): Idle',
198                'Est CPU usage: unknown',
199                'Stack info',
200                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
201                '  Est peak usage:  512 bytes, 100.00%',
202                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
203                '',
204                'Thread (BLOCKED): Alice',
205                'Est CPU usage: unknown',
206                'Stack info',
207                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
208                '  Est peak usage:  size unknown',
209                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
210                '',
211            )
212        )
213        # pylint: enable=line-too-long
214        analyzer = ThreadSnapshotAnalyzer(snapshot)
215        self.assertEqual(analyzer.active_thread(), None)
216        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
217
218    def test_interrupts_with_thread(self):
219        """Ensures interrupts are properly reported as active."""
220        snapshot = thread_pb2.SnapshotThreadInfo()
221
222        temp_thread = thread_pb2.Thread()
223        temp_thread.name = 'Idle'.encode()
224        temp_thread.state = thread_pb2.ThreadState.Enum.READY
225        temp_thread.stack_start_pointer = 0x2001AC00
226        temp_thread.stack_end_pointer = 0x2001AA00
227        temp_thread.stack_pointer = 0x2001AB0C
228        temp_thread.stack_pointer_est_peak = 0x2001AA00
229        snapshot.threads.append(temp_thread)
230
231        temp_thread = thread_pb2.Thread()
232        temp_thread.name = 'Main/Handler'.encode()
233        temp_thread.stack_start_pointer = 0x2001B000
234        temp_thread.stack_pointer = 0x2001AE20
235        temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER
236        snapshot.threads.append(temp_thread)
237
238        # pylint: disable=line-too-long
239        expected = '\n'.join(
240            (
241                'Thread State',
242                '  2 threads running, Main/Handler active at the time of capture.',
243                '                     ~~~~~~~~~~~~',
244                '',
245                # Ensure the active thread is moved to the top of the list.
246                'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]',
247                'Est CPU usage: unknown',
248                'Stack info',
249                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
250                '  Est peak usage:  size unknown',
251                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
252                '',
253                'Thread (READY): Idle',
254                'Est CPU usage: unknown',
255                'Stack info',
256                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
257                '  Est peak usage:  512 bytes, 100.00%',
258                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
259                '',
260            )
261        )
262        # pylint: enable=line-too-long
263        analyzer = ThreadSnapshotAnalyzer(snapshot)
264        self.assertEqual(analyzer.active_thread(), temp_thread)
265        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
266
267    def test_active_thread(self):
268        """Ensures the 'active' thread is highlighted."""
269        snapshot = thread_pb2.SnapshotThreadInfo()
270
271        temp_thread = thread_pb2.Thread()
272        temp_thread.name = 'Idle'.encode()
273        temp_thread.state = thread_pb2.ThreadState.Enum.READY
274        temp_thread.stack_start_pointer = 0x2001AC00
275        temp_thread.stack_end_pointer = 0x2001AA00
276        temp_thread.stack_pointer = 0x2001AB0C
277        temp_thread.stack_pointer_est_peak = 0x2001AC00 + 0x100
278        snapshot.threads.append(temp_thread)
279
280        temp_thread = thread_pb2.Thread()
281        temp_thread.name = 'Main/Handler'.encode()
282        temp_thread.active = True
283        temp_thread.stack_start_pointer = 0x2001B000
284        temp_thread.stack_pointer = 0x2001AE20
285        temp_thread.stack_pointer_est_peak = 0x2001B000 + 0x200
286        temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER
287        snapshot.threads.append(temp_thread)
288
289        # pylint: disable=line-too-long
290        expected = '\n'.join(
291            (
292                'Thread State',
293                '  2 threads running, Main/Handler active at the time of capture.',
294                '                     ~~~~~~~~~~~~',
295                '',
296                # Ensure the active thread is moved to the top of the list.
297                'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]',
298                'Est CPU usage: unknown',
299                'Stack info',
300                '  Current usage:   0x2001b000 - 0x2001ae20 (480 bytes)',
301                '  Est peak usage:  512 bytes',
302                '  Stack limits:    0x2001b000 - 0x???????? (size unknown)',
303                '',
304                'Thread (READY): Idle',
305                'Est CPU usage: unknown',
306                'Stack info',
307                '  Current usage:   0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)',
308                '  Est peak usage:  256 bytes, 50.00%',
309                '  Stack limits:    0x2001ac00 - 0x2001aa00 (512 bytes)',
310                '',
311            )
312        )
313        # pylint: enable=line-too-long
314        analyzer = ThreadSnapshotAnalyzer(snapshot)
315
316        # Ensure the active thread is found.
317        self.assertEqual(analyzer.active_thread(), temp_thread)
318        self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected)
319
320    def test_tokenized_thread_name(self):
321        """Ensures a tokenized thread name is detokenized."""
322        snapshot = thread_pb2.SnapshotThreadInfo()
323        detokenizer = pw_tokenizer.Detokenizer(
324            tokens.Database(
325                [
326                    tokens.TokenizedStringEntry(
327                        0x46BE7497, 'The thread for Kuzco'
328                    ),
329                ]
330            )
331        )
332
333        temp_thread = thread_pb2.Thread()
334        temp_thread.name = b'\x97\x74\xBE\x46'
335        snapshot.threads.append(temp_thread)
336        temp_thread.name = b'\x5D\xA8\x66\xAE'
337        snapshot.threads.append(temp_thread)
338
339        # pylint: disable=line-too-long
340        expected = '\n'.join(
341            (
342                'Thread State',
343                '  2 threads running.',
344                '',
345                'Thread (UNKNOWN): The thread for Kuzco',
346                'Est CPU usage: unknown',
347                'Stack info',
348                '  Current usage:   0x???????? - 0x???????? (size unknown)',
349                '  Est peak usage:  size unknown',
350                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
351                '',
352                'Thread (UNKNOWN): $Xahmrg==',
353                'Est CPU usage: unknown',
354                'Stack info',
355                '  Current usage:   0x???????? - 0x???????? (size unknown)',
356                '  Est peak usage:  size unknown',
357                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
358                '',
359            )
360        )
361        # pylint: enable=line-too-long
362        analyzer = ThreadSnapshotAnalyzer(snapshot, tokenizer_db=detokenizer)
363
364        # Ensure text dump matches expected contents.
365        self.assertEqual(str(analyzer), expected)
366
367    def test_no_db_tokenized_thread_name(self):
368        """Ensures a tokenized thread name is detokenized."""
369        snapshot = thread_pb2.SnapshotThreadInfo()
370
371        temp_thread = thread_pb2.Thread()
372        temp_thread.name = b'\x97\x74\xBE\x46'
373        snapshot.threads.append(temp_thread)
374        temp_thread.name = b'\x5D\xA8\x66\xAE'
375        snapshot.threads.append(temp_thread)
376
377        # pylint: disable=line-too-long
378        expected = '\n'.join(
379            (
380                'Thread State',
381                '  2 threads running.',
382                '',
383                'Thread (UNKNOWN): $l3S+Rg==',
384                'Est CPU usage: unknown',
385                'Stack info',
386                '  Current usage:   0x???????? - 0x???????? (size unknown)',
387                '  Est peak usage:  size unknown',
388                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
389                '',
390                'Thread (UNKNOWN): $Xahmrg==',
391                'Est CPU usage: unknown',
392                'Stack info',
393                '  Current usage:   0x???????? - 0x???????? (size unknown)',
394                '  Est peak usage:  size unknown',
395                '  Stack limits:    0x???????? - 0x???????? (size unknown)',
396                '',
397            )
398        )
399        # pylint: enable=line-too-long
400        analyzer = ThreadSnapshotAnalyzer(snapshot)
401
402        # Ensure text dump matches expected contents.
403        self.assertEqual(str(analyzer), expected)
404
405
406if __name__ == '__main__':
407    unittest.main()
408