1#!/usr/bin/env python3 2# Copyright 2021 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the thread analyzer.""" 16 17import unittest 18from pw_thread.thread_analyzer import ThreadInfo, ThreadSnapshotAnalyzer 19from pw_thread_protos import thread_pb2 20import pw_tokenizer 21from pw_tokenizer import tokens 22 23 24class ThreadInfoTest(unittest.TestCase): 25 """Tests that the ThreadInfo class produces expected results.""" 26 27 def test_empty_thread(self): 28 thread_info = ThreadInfo(thread_pb2.Thread()) 29 expected = '\n'.join( 30 ( 31 'Est CPU usage: unknown', 32 'Stack info', 33 ' Current usage: 0x???????? - 0x???????? (size unknown)', 34 ' Est peak usage: size unknown', 35 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 36 ) 37 ) 38 self.assertFalse(thread_info.has_stack_size_limit()) 39 self.assertFalse(thread_info.has_stack_used()) 40 self.assertEqual(expected, str(thread_info)) 41 42 def test_thread_with_cpu_usage(self): 43 thread = thread_pb2.Thread() 44 thread.cpu_usage_hundredths = 1234 45 thread_info = ThreadInfo(thread) 46 47 expected = '\n'.join( 48 ( 49 'Est CPU usage: 12.34%', 50 'Stack info', 51 ' Current usage: 0x???????? - 0x???????? (size unknown)', 52 ' Est peak usage: size unknown', 53 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 54 ) 55 ) 56 self.assertFalse(thread_info.has_stack_size_limit()) 57 self.assertFalse(thread_info.has_stack_used()) 58 self.assertEqual(expected, str(thread_info)) 59 60 def test_thread_with_stack_pointer(self): 61 thread = thread_pb2.Thread() 62 thread.stack_pointer = 0x5AC6A86C 63 thread_info = ThreadInfo(thread) 64 65 expected = '\n'.join( 66 ( 67 'Est CPU usage: unknown', 68 'Stack info', 69 ' Current usage: 0x???????? - 0x5ac6a86c (size unknown)', 70 ' Est peak usage: size unknown', 71 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 72 ) 73 ) 74 self.assertFalse(thread_info.has_stack_size_limit()) 75 self.assertFalse(thread_info.has_stack_used()) 76 self.assertEqual(expected, str(thread_info)) 77 78 def test_thread_with_stack_usage(self): 79 thread = thread_pb2.Thread() 80 thread.stack_start_pointer = 0x5AC6B86C 81 thread.stack_pointer = 0x5AC6A86C 82 thread_info = ThreadInfo(thread) 83 84 expected = '\n'.join( 85 ( 86 'Est CPU usage: unknown', 87 'Stack info', 88 ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes)', 89 ' Est peak usage: size unknown', 90 ' Stack limits: 0x5ac6b86c - 0x???????? (size unknown)', 91 ) 92 ) 93 self.assertFalse(thread_info.has_stack_size_limit()) 94 self.assertTrue(thread_info.has_stack_used()) 95 self.assertEqual(expected, str(thread_info)) 96 97 def test_thread_with_zero_size_stack(self): 98 thread = thread_pb2.Thread() 99 thread.stack_start_pointer = 0x5AC6B86C 100 thread.stack_end_pointer = 0x5AC6B86C 101 thread.stack_pointer = 0x5AC6A86C 102 thread.stack_pointer_est_peak = 0x5AC6A86C 103 thread_info = ThreadInfo(thread) 104 105 # pylint: disable=line-too-long 106 expected = '\n'.join( 107 ( 108 'Est CPU usage: unknown', 109 'Stack info', 110 ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes, NaN%)', 111 ' Est peak usage: 4096 bytes, NaN%', 112 ' Stack limits: 0x5ac6b86c - 0x5ac6b86c (WARNING: total stack size is 0 bytes)', 113 ) 114 ) 115 # pylint: enable=line-too-long 116 self.assertTrue(thread_info.has_stack_size_limit()) 117 self.assertTrue(thread_info.has_stack_used()) 118 self.assertEqual(expected, str(thread_info)) 119 120 def test_thread_with_all_stack_info(self): 121 thread = thread_pb2.Thread() 122 thread.stack_start_pointer = 0x5AC6B86C 123 thread.stack_end_pointer = 0x5AC6986C 124 thread.stack_pointer = 0x5AC6A86C 125 thread_info = ThreadInfo(thread) 126 127 # pylint: disable=line-too-long 128 expected = '\n'.join( 129 ( 130 'Est CPU usage: unknown', 131 'Stack info', 132 ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes, 50.00%)', 133 ' Est peak usage: size unknown', 134 ' Stack limits: 0x5ac6b86c - 0x5ac6986c (8192 bytes)', 135 ) 136 ) 137 # pylint: enable=line-too-long 138 self.assertTrue(thread_info.has_stack_size_limit()) 139 self.assertTrue(thread_info.has_stack_used()) 140 self.assertEqual(expected, str(thread_info)) 141 142 143class ThreadSnapshotAnalyzerTest(unittest.TestCase): 144 """Tests that the ThreadSnapshotAnalyzer class produces expected results.""" 145 146 def test_no_threads(self): 147 analyzer = ThreadSnapshotAnalyzer(thread_pb2.SnapshotThreadInfo()) 148 self.assertEqual('', str(analyzer)) 149 150 def test_one_empty_thread(self): 151 snapshot = thread_pb2.SnapshotThreadInfo() 152 snapshot.threads.append(thread_pb2.Thread()) 153 expected = '\n'.join( 154 ( 155 'Thread State', 156 ' 1 thread running.', 157 '', 158 'Thread (UNKNOWN): [unnamed thread]', 159 'Est CPU usage: unknown', 160 'Stack info', 161 ' Current usage: 0x???????? - 0x???????? (size unknown)', 162 ' Est peak usage: size unknown', 163 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 164 '', 165 ) 166 ) 167 analyzer = ThreadSnapshotAnalyzer(snapshot) 168 self.assertEqual(analyzer.active_thread(), None) 169 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 170 171 def test_two_threads(self): 172 """Ensures multiple threads are printed correctly.""" 173 snapshot = thread_pb2.SnapshotThreadInfo() 174 175 temp_thread = thread_pb2.Thread() 176 temp_thread.name = 'Idle'.encode() 177 temp_thread.state = thread_pb2.ThreadState.Enum.READY 178 temp_thread.stack_start_pointer = 0x2001AC00 179 temp_thread.stack_end_pointer = 0x2001AA00 180 temp_thread.stack_pointer = 0x2001AB0C 181 temp_thread.stack_pointer_est_peak = 0x2001AA00 182 snapshot.threads.append(temp_thread) 183 184 temp_thread = thread_pb2.Thread() 185 temp_thread.name = 'Alice'.encode() 186 temp_thread.stack_start_pointer = 0x2001B000 187 temp_thread.stack_pointer = 0x2001AE20 188 temp_thread.state = thread_pb2.ThreadState.Enum.BLOCKED 189 snapshot.threads.append(temp_thread) 190 191 # pylint: disable=line-too-long 192 expected = '\n'.join( 193 ( 194 'Thread State', 195 ' 2 threads running.', 196 '', 197 'Thread (READY): Idle', 198 'Est CPU usage: unknown', 199 'Stack info', 200 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 201 ' Est peak usage: 512 bytes, 100.00%', 202 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 203 '', 204 'Thread (BLOCKED): Alice', 205 'Est CPU usage: unknown', 206 'Stack info', 207 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 208 ' Est peak usage: size unknown', 209 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 210 '', 211 ) 212 ) 213 # pylint: enable=line-too-long 214 analyzer = ThreadSnapshotAnalyzer(snapshot) 215 self.assertEqual(analyzer.active_thread(), None) 216 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 217 218 def test_interrupts_with_thread(self): 219 """Ensures interrupts are properly reported as active.""" 220 snapshot = thread_pb2.SnapshotThreadInfo() 221 222 temp_thread = thread_pb2.Thread() 223 temp_thread.name = 'Idle'.encode() 224 temp_thread.state = thread_pb2.ThreadState.Enum.READY 225 temp_thread.stack_start_pointer = 0x2001AC00 226 temp_thread.stack_end_pointer = 0x2001AA00 227 temp_thread.stack_pointer = 0x2001AB0C 228 temp_thread.stack_pointer_est_peak = 0x2001AA00 229 snapshot.threads.append(temp_thread) 230 231 temp_thread = thread_pb2.Thread() 232 temp_thread.name = 'Main/Handler'.encode() 233 temp_thread.stack_start_pointer = 0x2001B000 234 temp_thread.stack_pointer = 0x2001AE20 235 temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER 236 snapshot.threads.append(temp_thread) 237 238 # pylint: disable=line-too-long 239 expected = '\n'.join( 240 ( 241 'Thread State', 242 ' 2 threads running, Main/Handler active at the time of capture.', 243 ' ~~~~~~~~~~~~', 244 '', 245 # Ensure the active thread is moved to the top of the list. 246 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', 247 'Est CPU usage: unknown', 248 'Stack info', 249 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 250 ' Est peak usage: size unknown', 251 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 252 '', 253 'Thread (READY): Idle', 254 'Est CPU usage: unknown', 255 'Stack info', 256 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 257 ' Est peak usage: 512 bytes, 100.00%', 258 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 259 '', 260 ) 261 ) 262 # pylint: enable=line-too-long 263 analyzer = ThreadSnapshotAnalyzer(snapshot) 264 self.assertEqual(analyzer.active_thread(), temp_thread) 265 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 266 267 def test_active_thread(self): 268 """Ensures the 'active' thread is highlighted.""" 269 snapshot = thread_pb2.SnapshotThreadInfo() 270 271 temp_thread = thread_pb2.Thread() 272 temp_thread.name = 'Idle'.encode() 273 temp_thread.state = thread_pb2.ThreadState.Enum.READY 274 temp_thread.stack_start_pointer = 0x2001AC00 275 temp_thread.stack_end_pointer = 0x2001AA00 276 temp_thread.stack_pointer = 0x2001AB0C 277 temp_thread.stack_pointer_est_peak = 0x2001AC00 + 0x100 278 snapshot.threads.append(temp_thread) 279 280 temp_thread = thread_pb2.Thread() 281 temp_thread.name = 'Main/Handler'.encode() 282 temp_thread.active = True 283 temp_thread.stack_start_pointer = 0x2001B000 284 temp_thread.stack_pointer = 0x2001AE20 285 temp_thread.stack_pointer_est_peak = 0x2001B000 + 0x200 286 temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER 287 snapshot.threads.append(temp_thread) 288 289 # pylint: disable=line-too-long 290 expected = '\n'.join( 291 ( 292 'Thread State', 293 ' 2 threads running, Main/Handler active at the time of capture.', 294 ' ~~~~~~~~~~~~', 295 '', 296 # Ensure the active thread is moved to the top of the list. 297 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', 298 'Est CPU usage: unknown', 299 'Stack info', 300 ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', 301 ' Est peak usage: 512 bytes', 302 ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', 303 '', 304 'Thread (READY): Idle', 305 'Est CPU usage: unknown', 306 'Stack info', 307 ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', 308 ' Est peak usage: 256 bytes, 50.00%', 309 ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', 310 '', 311 ) 312 ) 313 # pylint: enable=line-too-long 314 analyzer = ThreadSnapshotAnalyzer(snapshot) 315 316 # Ensure the active thread is found. 317 self.assertEqual(analyzer.active_thread(), temp_thread) 318 self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) 319 320 def test_tokenized_thread_name(self): 321 """Ensures a tokenized thread name is detokenized.""" 322 snapshot = thread_pb2.SnapshotThreadInfo() 323 detokenizer = pw_tokenizer.Detokenizer( 324 tokens.Database( 325 [ 326 tokens.TokenizedStringEntry( 327 0x46BE7497, 'The thread for Kuzco' 328 ), 329 ] 330 ) 331 ) 332 333 temp_thread = thread_pb2.Thread() 334 temp_thread.name = b'\x97\x74\xBE\x46' 335 snapshot.threads.append(temp_thread) 336 temp_thread.name = b'\x5D\xA8\x66\xAE' 337 snapshot.threads.append(temp_thread) 338 339 # pylint: disable=line-too-long 340 expected = '\n'.join( 341 ( 342 'Thread State', 343 ' 2 threads running.', 344 '', 345 'Thread (UNKNOWN): The thread for Kuzco', 346 'Est CPU usage: unknown', 347 'Stack info', 348 ' Current usage: 0x???????? - 0x???????? (size unknown)', 349 ' Est peak usage: size unknown', 350 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 351 '', 352 'Thread (UNKNOWN): $Xahmrg==', 353 'Est CPU usage: unknown', 354 'Stack info', 355 ' Current usage: 0x???????? - 0x???????? (size unknown)', 356 ' Est peak usage: size unknown', 357 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 358 '', 359 ) 360 ) 361 # pylint: enable=line-too-long 362 analyzer = ThreadSnapshotAnalyzer(snapshot, tokenizer_db=detokenizer) 363 364 # Ensure text dump matches expected contents. 365 self.assertEqual(str(analyzer), expected) 366 367 def test_no_db_tokenized_thread_name(self): 368 """Ensures a tokenized thread name is detokenized.""" 369 snapshot = thread_pb2.SnapshotThreadInfo() 370 371 temp_thread = thread_pb2.Thread() 372 temp_thread.name = b'\x97\x74\xBE\x46' 373 snapshot.threads.append(temp_thread) 374 temp_thread.name = b'\x5D\xA8\x66\xAE' 375 snapshot.threads.append(temp_thread) 376 377 # pylint: disable=line-too-long 378 expected = '\n'.join( 379 ( 380 'Thread State', 381 ' 2 threads running.', 382 '', 383 'Thread (UNKNOWN): $l3S+Rg==', 384 'Est CPU usage: unknown', 385 'Stack info', 386 ' Current usage: 0x???????? - 0x???????? (size unknown)', 387 ' Est peak usage: size unknown', 388 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 389 '', 390 'Thread (UNKNOWN): $Xahmrg==', 391 'Est CPU usage: unknown', 392 'Stack info', 393 ' Current usage: 0x???????? - 0x???????? (size unknown)', 394 ' Est peak usage: size unknown', 395 ' Stack limits: 0x???????? - 0x???????? (size unknown)', 396 '', 397 ) 398 ) 399 # pylint: enable=line-too-long 400 analyzer = ThreadSnapshotAnalyzer(snapshot) 401 402 # Ensure text dump matches expected contents. 403 self.assertEqual(str(analyzer), expected) 404 405 406if __name__ == '__main__': 407 unittest.main() 408