1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Stress test utility for repeating actions repeatedly on android devices. 16 17Configures multiple devices to simultaneously run through the same set of 18actions over and over, while keeping logs from various sources. Primarily 19designed for playing audio to the devices and scanning their log output for 20events, while running other adb commands in between. 21""" 22from __future__ import absolute_import 23from __future__ import division 24from __future__ import print_function 25 26import datetime 27from email import encoders 28from email.mime import text 29import email.mime.base as base 30import email.mime.multipart as multipart 31import logging 32import mimetypes 33import os 34import platform 35import re 36import shlex 37import signal 38import smtplib 39import socket 40import subprocess 41import sys 42import tempfile 43import threading 44import time 45import uuid 46import wave 47from absl import app 48from absl import flags 49import pexpect 50import queue 51import stress_test_common 52import stress_test_pb2 53from google.protobuf import text_format 54 55_SUMMARY_LINES = "-" * 73 56 57if sys.platform.startswith("win"): 58 pexpect = None 59 60_SUMMARY_COLUMNS = ( 61 "| Event Type | Event Count | Consecutive no event |") 62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|" 63 64FLAGS = flags.FLAGS 65flags.DEFINE_string("notification_address", "", 66 "Email address where to send notification events. Will " 67 "default to $USER@google.com if not provided. No emails " 68 "will be sent if suppress_notification_emails is True.") 69flags.DEFINE_bool("suppress_notification_emails", False, 70 "Prevents emails from being sent as notifications if True.") 71flags.DEFINE_string("test_name", None, 72 "Name of stress test to run. For example, if you set this " 73 "to 'dsp_trigger_sw_rejection', the stress test in " 74 "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will " 75 "be loaded and executed.") 76# flags.mark_flag_as_required("test_name") 77flags.DEFINE_string("output_root", "./", 78 "Path where directory should be generated containing all " 79 "logs from devices and moved files.") 80flags.DEFINE_integer("num_iterations", None, 81 "If set to a positive number, the number of iterations of " 82 "the stress test to run. Otherwise, the test runs " 83 "forever.") 84flags.DEFINE_list("devices", [], 85 "Serial numbers of devices that should be included in the " 86 "stress test. If empty, all devices will be used.") 87flags.DEFINE_integer("print_summary_every_n", 10, 88 "Prints the summary to the log file every n iterations.") 89 90flags.DEFINE_string("email_sender_address", "", 91 "Account to use for sending notification emails.") 92flags.DEFINE_string("email_sender_password", "", 93 "Password to use for notification email account.") 94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com", 95 "SMTP server to use for sending notification emails.") 96flags.DEFINE_integer("email_smtp_port", 465, 97 "Port to use for the notification SMTP server.") 98flags.DEFINE_integer("device_settle_time", 5, 99 "Time to wait for devices to settle.") 100flags.DEFINE_bool("use_sox", platform.system() != "Windows", 101 "Use sox for playback, otherwise, attempt to use platform " 102 "specific features.") 103flags.DEFINE_bool("attach_bugreport", True, 104 "Attach bugreport to email if test failed.") 105flags.DEFINE_bool("delete_data_dir", False, 106 "If true, code will delete all the files generated by this " 107 "test at the end.") 108 109if platform.system().startswith("CYGWIN"): 110 FLAGS.device_settle_time = 30 111 112 113def QueueWorker(worker_queue): 114 while True: 115 work = worker_queue.get() 116 try: 117 work() 118 except: # pylint:disable=bare-except 119 logging.exception("Exception in worker queue - task remains uncompleted.") 120 worker_queue.task_done() 121 122 123def SendNotificationEmail(subject, body, bugreport=None): 124 """Sends an email with the specified subject and body. 125 126 Also attach bugreport if bugreport location is provided as argument 127 128 Args: 129 subject: Subject of the email. 130 body: Body of the email. 131 bugreport: If provided, it will be attach to the email. 132 """ 133 if FLAGS.suppress_notification_emails: 134 logging.info("Email with subject '%s' has been suppressed", subject) 135 return 136 try: 137 # Assemble the message to send. 138 recpient_address = FLAGS.notification_address 139 message = multipart.MIMEMultipart("alternative") 140 message["From"] = "Stress Test on %s" % socket.gethostname() 141 message["To"] = recpient_address 142 message["Subject"] = subject 143 message.attach(text.MIMEText(body, "plain")) 144 message.attach(text.MIMEText("<pre>%s</pre>" % body, "html")) 145 146 if FLAGS.attach_bugreport and bugreport: 147 # buildozer: disable=unused-variable 148 ctype, _ = mimetypes.guess_type(bugreport) 149 maintype, subtype = ctype.split("/", 1) 150 with open(bugreport, "rb") as fp: 151 att = base.MIMEBase(maintype, subtype) 152 att.set_payload(fp.read()) 153 encoders.encode_base64(att) 154 att.add_header("Content-Disposition", "attachment", filename=bugreport) 155 message.attach(att) 156 157 # Send the message from our special account. 158 server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port) 159 server.login(FLAGS.email_sender_address, FLAGS.email_sender_password) 160 server.sendmail(FLAGS.email_sender_address, recpient_address, 161 message.as_string()) 162 server.quit() 163 logging.info("Email with subject '%s' has been sent", subject) 164 except: # pylint:disable=bare-except 165 logging.exception("Failed to send notification email") 166 167 168class ProcessLogger(threading.Thread): 169 170 class EventScanner(object): 171 172 def __init__(self, name, process_name, regexes): 173 """Struct to store the data about an event. 174 175 Args: 176 name: Name of event. 177 process_name: Name of the process that is being logged. 178 regexes: An iteratable of regex strings that indicate an event has 179 happened. 180 """ 181 182 self.name = name 183 self.process_name = process_name 184 self.searches = [re.compile(regex).search for regex in regexes] 185 self.count = 0 186 187 def ScanForEvent(self, line, lock=None): 188 """Checks the line for matches. If found, updates the internal counter.""" 189 190 for search in self.searches: 191 if search(line.decode("utf-8")): 192 # Grab the lock (if provided), update the counter, and release it. 193 if lock: lock.acquire() 194 self.count += 1 195 if lock: lock.release() 196 logging.info("Event '%s' detected on %s", self.name, 197 self.process_name) 198 199 def __init__(self, name, command, output, events, 200 restart_process, repeats_output_when_opened): 201 """Threaded class that monitors processes for events, and logs output. 202 203 Args: 204 name: The name of the process being logged. 205 command: A list of arguments to be passed to the subprocess to execute. 206 output: Name of output file to write process stdout to. If blank or None, 207 will not be generated. 208 events: An iterable of LoggingEventConfigs to look for in the output. 209 restart_process: Restart the process if it terminates by itself. This 210 should typically be true, but false for processes that only should be 211 run once and have their output logged. 212 repeats_output_when_opened: Set to true if the process will repeat the 213 output of a previous call when it is restarted. This will prevent 214 duplicate lines from being logged. 215 """ 216 super(ProcessLogger, self).__init__() 217 self.name = name 218 self.command = command 219 self.restart_process = restart_process 220 self.repeats_output_when_opened = repeats_output_when_opened 221 self.process = None 222 self.lock = threading.Lock() 223 self.looking = False 224 225 # Compile the list of regexes that we're supposed to be looking for. 226 self.events = [] 227 for event in events: 228 self.events.append(ProcessLogger.EventScanner(event.name, self.name, 229 event.regex)) 230 231 if output: 232 stress_test_common.MakeDirsIfNeeded(os.path.dirname(output)) 233 self.output_fp = open(output, "w", encoding="utf-8") 234 logging.info("Logging device info to %s", output) 235 else: 236 self.output_fp = None 237 238 def GetEventCountsSinceLastCall(self): 239 """Returns the counts of all events since this method was last called.""" 240 event_map = {} 241 self.lock.acquire() 242 for event in self.events: 243 event_map[event.name] = event.count 244 event.count = 0 245 self.lock.release() 246 return event_map 247 248 def run(self): 249 last_line = None 250 should_log = True 251 first_run = True 252 skip_exception_line = False 253 self.lock.acquire() 254 last_run_time = 0 255 while self.restart_process: 256 self.lock.release() 257 if not first_run: 258 logging.info("Restarting process %s", "".join(str(self.command))) 259 time_since_last_run = datetime.datetime.now() - last_run_time 260 if time_since_last_run.total_seconds() < 1.0: 261 needed_delay = 1.0 - time_since_last_run.total_seconds() 262 logging.info("Delaying for %.2f seconds", needed_delay) 263 time.sleep(needed_delay) 264 else: 265 first_run = False 266 267 try: 268 if pexpect: 269 self.process = pexpect.spawn(" ".join(self.command), timeout=None) 270 output_source = self.process 271 else: 272 self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE) 273 output_source = self.process.stdout 274 last_run_time = datetime.datetime.now() 275 for line in output_source: 276 # If the process we're logging likes to repeat its output, we need to 277 # look for the last line we saw before we start doing anything with 278 # these lines anymore. 279 if self.repeats_output_when_opened: 280 if not should_log: 281 if last_line == line: 282 should_log = True 283 continue 284 elif skip_exception_line: 285 # ignore the last line which caused UnicodeEncodeError 286 skip_exception_line = False 287 continue 288 289 if self.output_fp: 290 self.output_fp.write(line.decode("utf-8", "backslashreplace").rstrip()) 291 self.output_fp.write("\n") 292 293 # Loop through all events we're watching for, to see if they occur on 294 # this line. If they do, update the fact that we've seen this event. 295 for event in self.events: 296 if self.looking: 297 event.ScanForEvent(line, lock=self.lock) 298 last_line = line 299 except UnicodeEncodeError: 300 logging.exception("UnicodeEncodeError on running logger process") 301 skip_exception_line = True 302 except: # pylint:disable=bare-except 303 logging.exception("Exception encountered running process") 304 finally: 305 if pexpect: 306 self.process.terminate() 307 else: 308 self.process.send_signal(signal.SIGTERM) 309 should_log = False 310 self.lock.acquire() 311 312 self.lock.release() 313 if pexpect: 314 if self.process.exitstatus is not None: 315 logging.info("Process finished - exit code %d", self.process.exitstatus) 316 else: 317 logging.info("Process finished - signal code %d", 318 self.process.signalstatus) 319 else: 320 if self.process.returncode is not None: 321 logging.info("Process finished - return code %d", 322 self.process.returncode) 323 else: 324 logging.info("Process finished - no return code") 325 326 def StopLogging(self): 327 if self.process: 328 self.lock.acquire() 329 self.restart_process = False 330 self.lock.release() 331 332 if pexpect: 333 self.process.kill(signal.SIGHUP) 334 self.process.kill(signal.SIGINT) 335 else: 336 self.process.send_signal(signal.SIGTERM) 337 338 339class Device(object): 340 341 SECONDS_TO_SLEEP_DURING_ROOT = 0.5 342 343 def __init__(self, serial_number, output_root, test_events, expected_result): 344 """Responsible for monitoring a specific device, and pulling files from it. 345 346 The actual work of the constructor will be handled asynchronously, you must 347 call WaitForTasks() before using the device. 348 349 Args: 350 serial_number: The device serial number. 351 output_root: The directory where to output log files/anything pulled from 352 the device. 353 test_events: The events (with conditions) that come from the StressTest 354 that should be evaluated at every iteration, along with a list of 355 actions to take when one of these events occur. For example, if there 356 have not been any detected hotword triggers, a bugreport can be 357 generated. 358 expected_result: Expected event count to pass the test. 359 """ 360 self.serial_number = serial_number 361 self.output_root = output_root 362 self.cmd_string_replacements = {} 363 self.iteration = 0 364 self.cmd_string_replacements["iteration"] = 0 365 self.cmd_string_replacements["serial_number"] = serial_number 366 self.cmd_string_replacements["output_root"] = output_root 367 self.name = None 368 self.process_loggers = [] 369 self.event_log = stress_test_pb2.EventLog() 370 self.cnt_per_iteration = expected_result 371 372 # Prepare the work queue, and offload the rest of the init into it. 373 self.work_queue = queue.Queue() 374 self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue]) 375 self.worker.daemon = True 376 self.worker.name = self.name 377 self.worker.start() 378 self.abort_requested = False 379 self.remove_device = False 380 self.test_events = test_events 381 382 self.work_queue.put(self.__init_async__) 383 384 def __init_async__(self): 385 # Get the device type, and append it to the serial number. 386 self.device_type = self.Command(["shell", "getprop", 387 "ro.product.name"]).strip().decode("utf-8") 388 self.name = "%s_%s" % (self.device_type, self.serial_number) 389 self.worker.name = self.name 390 self.cmd_string_replacements["device"] = self.name 391 logging.info("Setting up device %s", self.name) 392 393 config = stress_test_common.LoadDeviceConfig(self.device_type, 394 self.serial_number) 395 396 # Get the device ready. 397 self.Root() 398 399 # Run any setup commands. 400 for cmd in config.setup_command: 401 result = self.Command( 402 shlex.split(cmd % self.cmd_string_replacements)).strip() 403 if result: 404 for line in result.splitlines(): 405 logging.info(line) 406 407 self.files_to_move = config.file_to_move 408 409 self.event_names = set([event.name for event in config.event]) 410 self.event_counter = {name: 0 for name in self.event_names} 411 self.iterations_since_event = {name: 0 for name in self.event_names} 412 413 for file_to_watch in config.file_to_watch: 414 # Are there any events that match up with this file? 415 events = [x for x in config.event if x.source == file_to_watch.source] 416 417 if file_to_watch.source == "LOGCAT": 418 command = [ 419 "adb", "-s", self.serial_number, "logcat", "-v", "usec", "" 420 ] 421 command.extend(["%s:S" % tag for tag in config.tag_to_suppress]) 422 name = "logcat_" + self.serial_number 423 else: 424 command = [ 425 "adb", "-s", self.serial_number, "shell", 426 "while : ; do cat %s 2>&1; done" % file_to_watch.source 427 ] 428 name = "%s_%s" % (os.path.basename( 429 file_to_watch.source), self.serial_number) 430 431 process_logger = ProcessLogger( 432 name, command, os.path.join( 433 self.output_root, 434 file_to_watch.destination % self.cmd_string_replacements), 435 events, True, file_to_watch.repeats_output_on_open) 436 self.process_loggers.append(process_logger) 437 process_logger.start() 438 439 # Add any of the background processes. 440 for daemon_process in config.daemon_process: 441 # Are there any events that match up with this file? 442 events = [x for x in config.event if x.source == daemon_process.name] 443 command = shlex.split( 444 daemon_process.command % self.cmd_string_replacements) 445 if daemon_process.destination: 446 output = os.path.join( 447 self.output_root, 448 daemon_process.destination % self.cmd_string_replacements) 449 else: 450 output = None 451 name = "%s_%s" % (daemon_process.name, self.serial_number) 452 process_logger = ProcessLogger(name, command, output, events, 453 daemon_process.restart, 454 daemon_process.repeats_output_on_open) 455 self.process_loggers.append(process_logger) 456 process_logger.start() 457 458 # Build up the list of events we can actually process. 459 self.__UpdateEventCounters(number_of_iterations=0) 460 test_events = self.test_events 461 self.test_events = [] 462 for event in test_events: 463 try: 464 eval(event.condition, # pylint:disable=eval-used 465 {"__builtins__": None}, self.__ValuesInEval()) 466 self.test_events.append(event) 467 except Exception as err: # pylint:disable=broad-except 468 logging.error("Test event %s is not compatible with %s", event.name, 469 self.name) 470 logging.error(str(err)) 471 # Make sure that device specific events don't have conditions. 472 self.device_events = [] 473 for event in config.test_event: 474 if not event.name: 475 logging.error("Device %s test event is missing a name", self.name) 476 continue 477 if event.condition: 478 self.test_events.append(event) 479 else: 480 self.device_events.append(event) 481 482 def StartLookingForEvents(self): 483 """Starts all child ProcessLoggers to start looking for events.""" 484 for process_logger in self.process_loggers: 485 process_logger.looking = True 486 487 def __ValuesInEval(self): 488 values_in_eval = {key: value for key, value 489 in list(self.event_counter.items())} 490 for key, value in list(self.iterations_since_event.items()): 491 values_in_eval["iterations_since_%s" % key] = value 492 return values_in_eval 493 494 def __GetExpectedEventCount(self, event): 495 if event == "logcat_iteration": 496 return -1 497 try: 498 event_cnt = getattr(self.cnt_per_iteration, event) 499 except AttributeError: 500 event_cnt = -1 501 logging.exception("%s is not an attribute of expected_result", event) 502 return event_cnt 503 504 def __UpdateEventCounters(self, number_of_iterations=1): 505 # Update the event counters 506 visited_events = set() 507 error_log = [] 508 for process_logger in self.process_loggers: 509 events = process_logger.GetEventCountsSinceLastCall() 510 for event, count in list(events.items()): 511 # Print log when there is any missed event 512 expected_count = self.__GetExpectedEventCount(event) 513 514 if expected_count > 0: 515 if count > expected_count * number_of_iterations: 516 logging.info( 517 "[STRESS_TEST] In iteration %d, got duplicated %s : %d", 518 self.iteration, self.name, count) 519 logging.info("[STRESS_TEST] Will count only : %d", 520 expected_count * number_of_iterations) 521 count = expected_count * number_of_iterations 522 523 if count: 524 self.event_counter[event] += count 525 visited_events.add(event) 526 527 if expected_count >= 0: 528 if expected_count * number_of_iterations != count: 529 error_log.append( 530 _SUMMARY_COL_FORMATT % 531 (event, count, expected_count * number_of_iterations)) 532 533 # Go clear all the events that weren't consecutive. 534 for event in self.iterations_since_event: 535 if event in visited_events: 536 self.iterations_since_event[event] = 0 537 else: 538 self.iterations_since_event[event] += number_of_iterations 539 540 if error_log: 541 logging.info(_SUMMARY_LINES) 542 logging.info(" iteration %d : Something wrong in %s.", 543 self.iteration, self.name) 544 logging.info(_SUMMARY_LINES) 545 logging.info(_SUMMARY_COLUMNS) 546 logging.info(_SUMMARY_LINES) 547 for line in error_log: 548 logging.info(line) 549 logging.info(_SUMMARY_LINES) 550 551 def ProcessEvents(self): 552 """Updates the event_counter and iterations_since_event maps.""" 553 self.work_queue.put(self.__ProcessEventsAsync) 554 555 def __ProcessEventsAsync(self): 556 # Move any files to the local machine that should be moved. 557 if self.files_to_move: 558 for file_to_move in self.files_to_move: 559 try: 560 self.Command(["pull", file_to_move.source, file_to_move.destination]) 561 except: # pylint:disable=bare-except 562 logging.exception("Failed to pull %s", file_to_move.source) 563 564 self.__UpdateEventCounters() 565 566 for event in self.test_events: 567 if eval(event.condition, # pylint:disable=eval-used 568 {"__builtins__": None}, self.__ValuesInEval()): 569 logging.info("Condition has been met for event '%s'", event.name) 570 # Write the updated event log. 571 event_log_details = self.event_log.event.add() 572 event_log_details.iteration = self.iteration 573 event_log_details.name = event.name 574 with open(os.path.join(self.output_root, 575 "%s_event_log.ascii_proto" % self.name), 576 "w") as fp: 577 text_format.PrintMessage(self.event_log, fp) 578 579 # Do whatever other actions that are part of the event. 580 self.__ProcessEventActionQueue(event) 581 582 # Run any device specific actions for this event. 583 for device_event in self.device_events: 584 if device_event.name == event.name: 585 self.__ProcessEventActionQueue(device_event) 586 587 # Set up the next iteration. 588 self.iteration += 1 589 self.cmd_string_replacements["iteration"] = self.iteration 590 591 def __ProcessEventActionQueue(self, event): 592 bugreport = None 593 for action in event.action: 594 if action == "BUGREPORT": 595 bugreport = self.TakeBugReport() 596 elif action.startswith("DUMPSYS "): 597 self.CaptureDumpsys(action[action.find(" ") + 1:]) 598 elif action == "NOTIFY": 599 SendNotificationEmail( 600 "%s had event '%s' occur" % (self.name, event.name), 601 "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport) 602 elif action == "REMOVE_DEVICE": 603 logging.info("Removing %s from the test", self.serial_number) 604 self.remove_device = True 605 elif action == "ABORT": 606 logging.info("Abort requested") 607 self.abort_requested = True 608 else: 609 action %= self.cmd_string_replacements 610 logging.info("Running command %s on %s", action, self.name) 611 result = self.Command(shlex.split(action)).strip() 612 if result: 613 for line in result.splitlines(): 614 logging.info(line) 615 616 def Root(self): 617 self.Command(["root"]) 618 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 619 self.Command(["wait-for-device"]) 620 time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT) 621 622 def Stop(self): 623 """Stops all file loggers attached to this device.""" 624 for process_logger in self.process_loggers: 625 process_logger.StopLogging() 626 self.process_loggers = [] 627 628 def Join(self): 629 for process_logger in self.process_loggers: 630 process_logger.join() 631 self.WaitForTasks() 632 633 def AsyncCommand(self, command, log_output=False): 634 self.work_queue.put( 635 lambda: self.__AsyncCommand(command, log_output=log_output)) 636 637 def __AsyncCommand(self, command, log_output=False): 638 result = self.Command(command).strip() 639 if result and log_output: 640 for line in result.splitlines(): 641 logging.info(line.decode("utf-8")) 642 643 def Command(self, command): 644 """Runs the provided command on this device.""" 645 if command[0] in {"bugreport", "root", "wait-for-device", "shell", 646 "logcat"}: 647 return subprocess.check_output( 648 ["adb", "-s", self.serial_number] + command) 649 elif command[0] == "DUMPSYS": 650 self.CaptureDumpsys(command[1]) 651 return "" 652 elif command[0] == "pull": 653 try: 654 files = subprocess.check_output( 655 ["adb", "-s", self.serial_number, "shell", "ls", command[1]] 656 ).strip().splitlines() 657 except subprocess.CalledProcessError: 658 return "" 659 if len(files) == 1 and "No such file or directory" in files[0]: 660 return "" 661 for source_file in files: 662 destination = os.path.join(self.output_root, 663 command[2] % self.cmd_string_replacements) 664 stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination)) 665 logging.info("Moving %s from %s to %s", source_file, self.name, 666 destination) 667 subprocess.check_output(["adb", "-s", self.serial_number, "pull", 668 source_file, destination]) 669 if FLAGS.delete_data_dir: 670 subprocess.check_output([ 671 "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file 672 ]) 673 return "" 674 else: 675 return subprocess.check_output(command) 676 677 def TakeBugReport(self): 678 logging.info("Capturing bugreport on %s", self.name) 679 bugreport = os.path.join(self.output_root, 680 "%s_bugreport_iteration_%06d.zip" % 681 (self.name, self.iteration)) 682 sdk = int(self.Command( 683 ["shell", "getprop", "ro.build.version.sdk"]).strip()) 684 if sdk >= 24: # SDK 24 = Android N 685 with open(bugreport, "w") as bugreport_fp: 686 bugreport_fp.write(self.Command(["bugreport", bugreport])) 687 else: 688 bugreport_txt = os.path.join(self.output_root, 689 "%s_bugreport_iteration_%06d.txt" % 690 (self.name, self.iteration)) 691 with open(bugreport_txt, "w") as bugreport_fp: 692 bugreport_fp.write(self.Command(["bugreport"])) 693 self.Command(["zip", bugreport, bugreport_txt]) 694 695 self.Command(["pull", "/data/anr/traces.txt", 696 "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)]) 697 self.Command(["pull", "/data/anr/traces.txt.bugreport", 698 "%s_traces_iteration_%06d.txt.bugreport" % (self.name, 699 self.iteration)]) 700 return bugreport 701 702 def CaptureDumpsys(self, dumpsys_unit): 703 logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name) 704 stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root, 705 self.name)) 706 with open(os.path.join(self.output_root, self.name, 707 "%s_%06d.txt" % (dumpsys_unit, self.iteration)), 708 "w") as dumpsys_fp: 709 dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit])) 710 711 def WaitForTasks(self): 712 self.work_queue.join() 713 714 def GetSummaryLines(self): 715 lines = [ 716 "Device {}".format(self.name), 717 _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES 718 ] 719 for event, count in sorted(self.event_counter.items()): 720 lines.append(_SUMMARY_COL_FORMATT % ( 721 event, count, self.iterations_since_event[event])) 722 lines.append(_SUMMARY_LINES) 723 return lines 724 725 726def RunAsyncCommand(devices, command): 727 """Helper function for running async commands on many devices.""" 728 for device in devices: 729 device.AsyncCommand(command) 730 for device in devices: 731 device.WaitForTasks() 732 733 734class StressTest(object): 735 """Manages dispatching commands to devices/playing audio and events.""" 736 737 def __init__(self, output_root, test_name): 738 self.output_root = output_root 739 self.devices = [] 740 self.test_name = test_name 741 config = stress_test_pb2.StressTestConfig() 742 config_contents = stress_test_common.GetResourceContents( 743 os.path.join(stress_test_common.RESOURCE_DIR, 744 "stress_test.%s.ascii_proto" % test_name)) 745 text_format.Merge(config_contents, config) 746 self.events = config.event 747 self.setup_commands = config.setup_command 748 self.steps = config.step 749 self.audio_tempfiles = {} 750 self.uuid = str(uuid.uuid4()) 751 self.expected_result = None 752 self.iteration = 0 753 if config.expected_result: 754 self.expected_result = config.expected_result[0] 755 756 # Place all the audio files into temp files. 757 for step in self.steps: 758 if step.audio_file and step.audio_file not in self.audio_tempfiles: 759 # We can't delete the temp file on windows, since it gets nuked too 760 # early. 761 audio_tempfile = tempfile.NamedTemporaryFile( 762 delete=(platform.system() != "Windows"), 763 dir="." if platform.system().startswith("CYGWIN") else None 764 ) 765 if platform.system().startswith("CYGWIN"): 766 audio_tempfile.name = os.path.basename(audio_tempfile.name) 767 self.audio_tempfiles[step.audio_file] = audio_tempfile 768 if FLAGS.use_sox: 769 # Write out the raw PCM samples as a wave file. 770 audio_tempfile.write( 771 stress_test_common.GetResourceContents(step.audio_file)) 772 else: 773 # Make a temporary wave file for playout if we can't use sox. 774 wavefile = wave.open(audio_tempfile, "wb") 775 if step.audio_file_sample_rate <= 0: 776 step.audio_file_sample_rate = 16000 777 wavefile.setframerate(step.audio_file_sample_rate) 778 if step.audio_file_num_channels <= 0: 779 step.audio_file_num_channels = 1 780 wavefile.setnchannels(step.audio_file_num_channels) 781 if not step.audio_file_format: 782 wavefile.setsampwidth(2) 783 elif step.audio_file_format == "s8": 784 wavefile.setsampwidth(1) 785 elif step.audio_file_format == "s16": 786 wavefile.setsampwidth(2) 787 elif step.audio_file_format == "s32": 788 wavefile.setsampwidth(4) 789 else: 790 raise RuntimeError( 791 "Unsupported wave file format for %s" % step.audio_file) 792 wavefile.writeframes(stress_test_common.GetResourceContents( 793 step.audio_file)) 794 wavefile.close() 795 audio_tempfile.flush() 796 797 if platform.system() == "Windows": 798 audio_tempfile.close() 799 800 # Create all the devices that are attached to this machine. 801 for serial_number in self.GetActiveSerialNumbers(): 802 self.devices.append( 803 Device(serial_number, output_root, self.events, self.expected_result)) 804 if not self.devices: 805 raise app.UsageError("No devices connected") 806 807 self.devices.sort(key=lambda x: x.name) 808 809 # Make sure every device is done with their work for setup. 810 for device in self.devices: 811 device.WaitForTasks() 812 813 # Write out the info meta-data proto. Useful for doing analysis of the logs 814 # after the stress test has completed. 815 stress_test_info = stress_test_pb2.StressTestInfo() 816 stress_test_info.test_name = self.test_name 817 stress_test_info.test_description = config.description 818 stress_test_info.uuid = self.uuid 819 for device in self.devices: 820 device_pb = stress_test_info.device.add() 821 device_pb.device_type = device.device_type 822 device_pb.serial_number = device.serial_number 823 824 text_format.PrintMessage(stress_test_info, open(os.path.join( 825 self.output_root, "stress_test_info.ascii_proto"), "w")) 826 827 def GetActiveSerialNumbers(self): 828 serial_numbers = [] 829 for line in sorted( 830 subprocess.check_output(["adb", "devices"]).splitlines()): 831 if line.endswith(b"device"): 832 serial_number = line.split()[0].strip() 833 if FLAGS.devices and serial_number not in FLAGS.devices: 834 continue 835 serial_numbers.append(serial_number.decode("utf-8")) 836 return serial_numbers 837 838 def Start(self): 839 logging.info("Waiting for devices to settle") 840 time.sleep(5) 841 # Make a copy of the device list, as we'll be modifying this actual list. 842 devices = list(self.devices) 843 dropped_devices = [] 844 845 # If we have any setup commands, run them. 846 for command in self.setup_commands: 847 logging.info("Running command %s", command) 848 # Can't use the async command helper function since we need to get at 849 # the device cmd_string_replacements. 850 for device in devices: 851 device.AsyncCommand( 852 shlex.split(command % device.cmd_string_replacements), 853 log_output=True) 854 for device in devices: 855 device.WaitForTasks() 856 857 for device in devices: 858 device.StartLookingForEvents() 859 device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST", 860 "Starting {%s} TZ=$(getprop persist.sys.timezone) " 861 "YEAR=$(date +%%Y)" % self.uuid], True) 862 self.iteration = 0 863 while True: 864 logging.info("Starting iteration %d", self.iteration) 865 # Perform all the actions specified in the test. 866 RunAsyncCommand(devices, [ 867 "shell", "log", "-t", "STRESS_TEST", 868 "Performing iteration %d $(head -n 3 " 869 "/proc/timer_list | tail -n 1)" % self.iteration 870 ]) 871 872 for step in self.steps: 873 if step.delay_before: 874 logging.info("Waiting for %.2f seconds", step.delay_before) 875 time.sleep(step.delay_before) 876 877 if step.audio_file: 878 logging.info("Playing %s", step.audio_file) 879 RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST", 880 "Playing %s" % step.audio_file]) 881 882 if FLAGS.use_sox: 883 subprocess.check_call(["sox", "-q", 884 self.audio_tempfiles[step.audio_file].name, 885 "-d"]) 886 elif platform.system() == "Windows": 887 import winsound # pylint:disable=g-import-not-at-top 888 winsound.PlaySound(self.audio_tempfiles[step.audio_file].name, 889 winsound.SND_FILENAME | winsound.SND_NODEFAULT) 890 else: 891 raise app.RuntimeError("Unsupported platform for audio playback") 892 893 if step.command: 894 logging.info("Running command %s", step.command) 895 # Can't use the async command helper function since we need to get at 896 # the device cmd_string_replacements. 897 for device in devices: 898 device.AsyncCommand( 899 shlex.split(step.command % device.cmd_string_replacements), 900 log_output=True) 901 for device in devices: 902 device.WaitForTasks() 903 904 if step.delay_after: 905 logging.info("Waiting for %.2f seconds", step.delay_after) 906 time.sleep(step.delay_after) 907 908 RunAsyncCommand(devices, [ 909 "shell", "log", "-t", "STRESS_TEST", 910 "Iteration %d complete $(head -n 3 " 911 "/proc/timer_list | tail -n 1)" % self.iteration 912 ]) 913 self.iteration += 1 914 915 # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for 916 # a bit. This throws off the event counts, so we should probably add some 917 # synchronization rules before we trigger any events. 918 919 # Go through each device, update the event counter, and see if we need to 920 # trigger any events. 921 devices_to_remove = [] 922 abort_requested = False 923 active_devices = self.GetActiveSerialNumbers() 924 for device in devices: 925 if device.serial_number in active_devices: 926 device.ProcessEvents() 927 else: 928 logging.error("Dropped device %s", device.name) 929 SendNotificationEmail( 930 "Dropped device %s" % device.name, 931 "Device %s is not longer present in the system" % device.name) 932 dropped_devices.append(device) 933 devices_to_remove.append(device) 934 935 # Check to see if any of the dropped devices have come back. If yes, grab 936 # a bug report. 937 for device in dropped_devices: 938 if device.serial_number in active_devices: 939 logging.info("Device %s reappeared", device.name) 940 device.Root() 941 device.TakeBugReport() 942 943 dropped_devices = [d for d in dropped_devices 944 if d.serial_number not in active_devices] 945 946 for device in devices: 947 device.WaitForTasks() 948 if device.remove_device: 949 devices_to_remove.append(device) 950 if device.abort_requested: 951 abort_requested = True 952 953 # Remove devices from our list of things to monitor if they've been marked 954 # for deletion. 955 if devices_to_remove: 956 for device in devices_to_remove: 957 device.Stop() 958 devices = [d for d in devices if d not in devices_to_remove] 959 960 # Print out the iteration summary. 961 if self.iteration % FLAGS.print_summary_every_n == 0: 962 for line in self.GetSummaryLines(): 963 logging.info(line) 964 965 # See if we need to break out of the outer loop. 966 if abort_requested or not devices: 967 break 968 if FLAGS.num_iterations: 969 if self.iteration >= FLAGS.num_iterations: 970 logging.info("Completed full iteration : %d", self.iteration) 971 break 972 SendNotificationEmail( 973 "Stress test %s completed" % (FLAGS.test_name), 974 "\n".join(["Summary:"] + self.GetSummaryLines())) 975 976 def Stop(self): 977 logging.debug("Stopping devices") 978 for device in self.devices: 979 device.Stop() 980 for device in self.devices: 981 device.Join() 982 983 def GetSummaryLines(self): 984 lines = [ 985 _SUMMARY_LINES, 986 "Conducted %d iterations out of %d" % 987 (self.iteration, FLAGS.num_iterations), 988 _SUMMARY_LINES 989 ] 990 for device in self.devices: 991 lines.extend(device.GetSummaryLines()) 992 lines.append(_SUMMARY_LINES) 993 return lines 994 995 996def main(unused_argv): 997 # Check to make sure that there are no other instances of ADB running - if 998 # there are, print a warning and wait a bit for them to see it and decide if 999 # they want to keep running, knowing that logs may be invalid. 1000 try: 1001 if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"): 1002 print("It looks like there are other instances of adb running. If these " 1003 "other instances are also cating log files, you will not be " 1004 "capturing everything in this stress test (so logs will be " 1005 "invalid).") 1006 print("Continuing in 3...", end=" ") 1007 sys.stdout.flush() 1008 for i in [2, 1, 0]: 1009 time.sleep(1) 1010 if i: 1011 print("%d..." % i, end=" ") 1012 else: 1013 print("") 1014 sys.stdout.flush() 1015 except OSError: 1016 print("Unexpected error:", sys.exc_info()[0]) 1017 if sys.platform.startswith("win"): 1018 pass 1019 else: 1020 raise 1021 1022 # Make the base output directory. 1023 output_root = os.path.join(FLAGS.output_root, "%s_%s" % ( 1024 FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))) 1025 # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name) 1026 stress_test_common.MakeDirsIfNeeded(output_root) 1027 1028 # Set up logging. 1029 formatter = logging.Formatter( 1030 "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s") 1031 root_logger = logging.getLogger() 1032 root_logger.setLevel(logging.INFO) 1033 root_logger.setLevel(logging.DEBUG) 1034 1035 file_handler = logging.FileHandler(os.path.join(output_root, 1036 "stress_test.log")) 1037 file_handler.setFormatter(formatter) 1038 root_logger.addHandler(file_handler) 1039 1040 console_handler = logging.StreamHandler() 1041 console_handler.setFormatter(formatter) 1042 root_logger.addHandler(console_handler) 1043 1044 stress_test = StressTest(output_root, FLAGS.test_name) 1045 try: 1046 stress_test.Start() 1047 finally: 1048 logging.info("Stopping device logging threads") 1049 stress_test.Stop() 1050 for line in stress_test.GetSummaryLines(): 1051 logging.info(line) 1052 if FLAGS.delete_data_dir: 1053 print("Deleting Data Dir") 1054 subprocess.check_output(["rm", "-r", "-f", output_root]) 1055 1056 1057if __name__ == "__main__": 1058 app.run(main) 1059