• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2
3# Copyright (C) 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import logging
18import os.path
19import select
20import sys
21import time
22import collections
23import socket
24import gflags as flags  # http://code.google.com/p/python-gflags/
25import pkgutil
26import threading
27import Queue
28
29# queue to signal thread to exit
30signal_exit_q = Queue.Queue()
31signal_abort = Queue.Queue()
32
33# let this script know about the power monitor implementations
34sys.path = [os.path.basename(__file__)] + sys.path
35available_monitors = [name for _, name, _ in pkgutil.iter_modules(
36    [os.path.join(os.path.dirname(__file__),'power_monitors')]) if not name.startswith('_')]
37
38APK = os.path.join( os.path.dirname(__file__), '..', "CtsVerifier.apk")
39
40FLAGS = flags.FLAGS
41
42# whether to use a strict delay to ensure screen is off, or attempt to use power measurements
43USE_STRICT_DELAY = False
44if USE_STRICT_DELAY:
45    DELAY_SCREEN_OFF = 30.0
46else:
47    DELAY_SCREEN_OFF = 2.0
48
49# whether to log data collected to a file for each sensor run:
50LOG_DATA_TO_FILE = True
51
52logging.getLogger().setLevel(logging.ERROR)
53
54def do_import(name):
55    """import a module by name dynamically"""
56    mod = __import__(name)
57    components = name.split('.')
58    for comp in components[1:]:
59        mod = getattr(mod, comp)
60    return mod
61
62
63class PowerTest:
64    """Class to run a suite of power tests"""
65
66    # Thresholds for max allowed power usage per sensor tested
67    MAX_ACCEL_POWER = 0.08  # Amps
68    MAX_MAG_POWER = 0.08  # Amps
69    MAX_GYRO_POWER = 0.08  # Amps
70    MAX_SIGMO_POWER = 0.08 # Amps
71    MAX_STEP_COUNTER_POWER = 0.08 # Amps
72    MAX_STEP_DETECTOR_POWER = 0.08 # Amps
73
74
75    PORT = 0  # any available port
76    DOMAIN_NAME = "/android/cts/powertest"
77    SAMPLE_COUNT_NOMINAL = 1000
78    RATE_NOMINAL = 100
79
80    REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
81    REQUEST_EXIT = "EXIT"
82    REQUEST_RAISE = "RAISE %s %s"
83    REQUEST_USER_RESPONSE = "USER RESPONSE %s"
84    REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s"
85    REQUEST_SENSOR_SWITCH = "SENSOR %s %s"
86    REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s"
87    REQUEST_SCREEN_OFF = "SCREEN OFF"
88    REQUEST_SHOW_MESSAGE = "MESSAGE %s"
89
90
91    def __init__(self):
92        power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
93        testid = time.strftime("%d_%m_%Y__%H__%M_%S")
94        self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
95        print ("Establishing connection to device...")
96        self.setUsbEnabled(True)
97        status = self._power_monitor.GetStatus()
98        self._native_hz = status["sampleRate"] * 1000
99        self._current_test = "None"
100        self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE,
101                                                      reportErrors=True)
102
103    def __del__(self):
104        self.finalize()
105
106    def finalize(self):
107        """To be called upon termination of host connection to device"""
108        if PowerTest.PORT > 0:
109            # tell device side to exit connection loop, and remove the forwarding connection
110            self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors=False)
111            self.executeLocal("adb forward --remove tcp:%d" % PowerTest.PORT)
112        PowerTest.PORT = 0
113        if self._power_monitor:
114            self._power_monitor.Close()
115            self._power_monitor = None
116
117    def _send(self, msg, report_errors=True):
118        """Connect to the device, send the given command, and then disconnect"""
119        if PowerTest.PORT == 0:
120            # on first attempt to send a command, connect to device via any open port number,
121            # forwarding that port to a local socket on the device via adb
122            logging.debug("Seeking port for communication...")
123            # discover an open port
124            dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125            dummysocket.bind(("localhost", 0))
126            (_, PowerTest.PORT) = dummysocket.getsockname()
127            dummysocket.close()
128            assert(PowerTest.PORT > 0)
129            status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
130                    (PowerTest.PORT, PowerTest.DOMAIN_NAME))
131            if report_errors:
132                self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb")
133            logging.info("Forwarding requests over local port %d" % PowerTest.PORT)
134
135        link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
136
137        try:
138            logging.debug("Connecting to device...")
139            link.connect (("localhost", PowerTest.PORT))
140            logging.debug("Connected.")
141        except:
142            if report_errors:
143                self.reportErrorIf(True, msg="Unable to communicate with device: connection refused")
144        logging.debug("Sending '%s'" % msg)
145        link.sendall(msg)
146        logging.debug("Getting response...")
147        response = link.recv(4096)
148        logging.debug("Got response '%s'" % response)
149        link.close()
150        return response
151
152    def queryDevice(self, query):
153        """Post a yes/no query to the device, return True upon successful query, False otherwise"""
154        logging.info("Querying device with '%s'" % query)
155        return self._send(query) == "OK"
156
157    # TODO: abstract device communication (and string commands) into its own class
158    def executeOnDevice(self, cmd , reportErrors=True):
159        """Execute a (string) command on the remote device"""
160        return self._send(cmd , reportErrors)
161
162    def executeLocal(self, cmd, check_status=True):
163        """execute a shell command locally (on the host)"""
164        from subprocess import call
165        status = call(cmd.split(' '))
166        if status != 0 and check_status:
167            logging.error("Failed to execute \"%s\"" % cmd)
168        else:
169            logging.debug("Executed \"%s\"" % cmd)
170        return status
171
172    def reportErrorIf(self, condition, msg):
173        """Report an error condition to the device if condition is True.
174        Will raise an exception on the device if condition is True"""
175        if condition:
176            try:
177                logging.error("Exiting on error: %s" % msg)
178                self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg), False)
179            except:
180
181                logging.error("Unable to communicate with device to report error: %s" % msg)
182                self.finalize()
183                sys.exit(msg)
184            raise Exception(msg)
185
186    def setUsbEnabled(self, enabled, verbose=True):
187        if enabled:
188            val = 1
189        else:
190            val = 0
191        self._power_monitor.SetUsbPassthrough(val)
192        tries = 0
193
194        # Sometimes command won't go through first time, particularly if immediately after a data
195        # collection, so allow for retries
196        status = self._power_monitor.GetStatus()
197        while status is None and tries < 5:
198            tries += 1
199            time.sleep(2.0)
200            logging.error("Retrying get status call...")
201            self._power_monitor.StopDataCollection()
202            self._power_monitor.SetUsbPassthrough(val)
203            status = self._power_monitor.GetStatus()
204
205        if enabled:
206            if verbose: print("...USB enabled, waiting for device")
207            self.executeLocal ("adb wait-for-device")
208            if verbose: print("...device online")
209        else:
210            if verbose: logging.info("...USB disabled")
211        # re-establish port forwarding
212        if enabled and PowerTest.PORT > 0:
213            status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
214                                       (PowerTest.PORT, PowerTest.DOMAIN_NAME))
215            self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb")
216
217    def waitForScreenOff(self):
218        # disconnect of USB will cause screen to go on, so must wait (1 second more than screen off
219        # timeout)
220        if USE_STRICT_DELAY:
221            time.sleep(DELAY_SCREEN_OFF)
222            return
223
224        # need at least 100 sequential clean low-power measurements to know screen is off
225        THRESHOLD_COUNT_LOW_POWER = 100
226        CURRENT_LOW_POWER_THRESHOLD = 0.060  # Amps
227        TIMEOUT_SCREEN_OFF = 30 # this many tries at most
228        count_good = 0
229        tries = 0
230        print("Waiting for screen off and application processor in suspend mode...")
231        while count_good < THRESHOLD_COUNT_LOW_POWER:
232            measurements = self.collectMeasurements(THRESHOLD_COUNT_LOW_POWER,
233                                                      PowerTest.RATE_NOMINAL,
234                                                      ensure_screen_off=False,
235                                                      verbose=False)
236            count_good = len([m for m in measurements
237                               if m < CURRENT_LOW_POWER_THRESHOLD])
238            tries += 1
239            if count_good < THRESHOLD_COUNT_LOW_POWER and measurements:
240                print("Current usage: %.2f mAmps. Device is probably not in suspend mode.   Waiting..." %
241                      (1000.0*(sum(measurements)/len(measurements))))
242            if tries >= TIMEOUT_SCREEN_OFF:
243                # TODO: dump the state of sensor service to identify if there are features using sensors
244                self.reportErrorIf(tries>=TIMEOUT_SCREEN_OFF,
245                    msg="Unable to determine application processor suspend mode status.")
246                break
247        if DELAY_SCREEN_OFF:
248            # add additional delay time if necessary
249            time.sleep(DELAY_SCREEN_OFF)
250        print("...Screen off and device in suspend mode.")
251
252    def collectMeasurements(self, measurementCount, rate , ensure_screen_off=True, verbose=True,
253                             plot_data = False):
254        assert(measurementCount > 0)
255        decimate_by = self._native_hz / rate  or 1
256        if ensure_screen_off:
257            self.waitForScreenOff()
258            print ("Taking measurements...")
259        self._power_monitor.StartDataCollection()
260        sub_measurements = []
261        measurements = []
262        tries = 0
263        if verbose: print("")
264        try:
265            while len(measurements) < measurementCount and tries < 5:
266                if tries:
267                    self._power_monitor.StopDataCollection()
268                    self._power_monitor.StartDataCollection()
269                    time.sleep(1.0)
270                tries += 1
271                additional = self._power_monitor.CollectData()
272                if additional is not None:
273                    tries = 0
274                    sub_measurements.extend(additional)
275                    while len(sub_measurements) >= decimate_by:
276                        sub_avg = sum(sub_measurements) / len(sub_measurements)
277                        measurements.append(sub_avg)
278                        sub_measurements = sub_measurements[decimate_by:]
279                        if verbose:
280                            sys.stdout.write("\33[1A\33[2K")
281                            print ("MEASURED[%d]: %f" % (len(measurements),measurements[-1]))
282        finally:
283            self._power_monitor.StopDataCollection()
284
285        self.reportErrorIf(measurementCount > len(measurements),
286                            "Unable to collect all requested measurements")
287        return measurements
288
289    def request_user_acknowledgment(self, msg):
290        """Post message to user on screen and wait for acknowledgment"""
291        response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
292        self.reportErrorIf(response != "OK", "Unable to request user acknowledgment")
293
294    def setTestResult (self, testname, condition, msg):
295        if condition is False:
296            val = "FAIL"
297        elif condition is True:
298            val = "PASS"
299        else:
300            val = condition
301        print ("Test %s : %s" % (testname, val))
302        response = self.executeOnDevice(PowerTest.REQUEST_SET_TEST_RESULT % (testname, val, msg))
303        self.reportErrorIf(response != "OK", "Unable to send test status to Verifier")
304
305    def setPowerOn(self, sensor, powered_on):
306        response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
307                                        ({True:"ON", False:"OFF"}[powered_on], sensor))
308        self.reportErrorIf(response == "ERR", "Unable to set sensor %s state" % sensor)
309        logging.info("Set %s %s" % (sensor, {True:"ON", False:"OFF"}[powered_on]))
310        return response
311
312    def runPowerTest(self, sensor, max_power_allowed, user_request = None):
313        if not signal_abort.empty():
314            sys.exit( signal_abort.get() )
315        self._current_test = "%s_Power_Test_While_%s" % (sensor,
316                                    {True:"Under_Motion", False:"Still"}[user_request is not None])
317        try:
318            print ("\n\n---------------------------------")
319            if user_request is not None:
320                print ("Running power test on %s under motion." % sensor)
321            else:
322                print ("Running power test on %s while device is still." % sensor)
323            print ("---------------------------------")
324            response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
325            if response == "UNAVAILABLE":
326                self.setTestResult(self._current_test, condition="SKIPPED",
327                    msg="Sensor %s not available on this platform"%sensor)
328            self.setPowerOn("ALL", False)
329            if response == "UNAVAILABLE":
330                self.setTestResult(self._current_test, condition="SKIPPED",
331                                   msg="Sensor %s not available on this device"%sensor)
332                return
333
334            self.reportErrorIf(response != "OK", "Unable to set all sensor off")
335            if not signal_abort.empty():
336                sys.exit( signal_abort.get() )
337            self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
338            self.setUsbEnabled(False)
339            print("Collecting background measurements...")
340            measurements = self.collectMeasurements( PowerTest.SAMPLE_COUNT_NOMINAL,
341                                                     PowerTest.RATE_NOMINAL,
342                                                     plot_data = True)
343            if measurements and LOG_DATA_TO_FILE:
344                with open( "/tmp/cts-power-tests-%s-%s-background-data.log"%(sensor,
345                   {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f:
346                    for m in measurements:
347                        f.write( "%.4f\n"%m)
348            self.reportErrorIf(not measurements, "No background measurements could be taken")
349            backgnd = sum(measurements) / len(measurements)
350            self.setUsbEnabled(True)
351            self.setPowerOn(sensor, True)
352            if user_request is not None:
353                print("===========================================\n" +
354                      "==> Please follow the instructions presented on the device\n" +
355                      "==========================================="
356                     )
357                self.request_user_acknowledgment(user_request)
358            self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
359            self.setUsbEnabled(False)
360            self.reportErrorIf(response != "OK", "Unable to set sensor %s ON" % sensor)
361            print ("Collecting sensor %s measurements" % sensor)
362            measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
363                                                    PowerTest.RATE_NOMINAL)
364
365            if measurements and LOG_DATA_TO_FILE:
366                with open( "/tmp/cts-power-tests-%s-%s-sensor-data.log"%(sensor,
367                   {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f:
368                    for m in measurements:
369                        f.write( "%.4f\n"%m)
370                    self.setUsbEnabled(True, verbose = False)
371                    print("Saving raw data files to device...")
372                    self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
373                    self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
374                    self.setUsbEnabled(False, verbose = False)
375            self.reportErrorIf(not measurements, "No measurements could be taken for %s" % sensor)
376            avg = sum(measurements) / len(measurements)
377            squared = [(m-avg)*(m-avg) for m in measurements]
378
379            import math
380            stddev = math.sqrt(sum(squared)/len(squared))
381            current_diff = avg - backgnd
382            self.setUsbEnabled(True)
383            max_power = max(measurements) - avg
384            if current_diff <= max_power_allowed:
385                # TODO: fail the test of background > current
386                message = ("Draw is within limits. Current:%f Background:%f   Measured: %f Stddev: %f  Peak: %f")%\
387                             ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0)
388            else:
389                message = ("Draw is too high. Current:%f Background:%f   Measured: %f Stddev: %f  Peak: %f")%\
390                             ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0)
391            self.setTestResult( testname = self._current_test,
392                                condition = current_diff <= max_power_allowed,
393                                msg = message)
394            print("Result: "+message)
395        except:
396            import traceback
397            traceback.print_exc()
398            self.setTestResult(self._current_test, condition="FAIL",
399                               msg="Exception occurred during run of test.")
400
401
402    @staticmethod
403    def run_tests():
404        testrunner = None
405        try:
406            GENERIC_MOTION_REQUEST = "\n===> Please press Next and when the screen is off, keep " + \
407                "the device under motion with only tiny, slow movements until the screen turns " + \
408                "on again.\nPlease refrain from interacting with the screen or pressing any side " + \
409                "buttons while measurements are taken."
410            USER_STEPS_REQUEST = "\n===> Please press Next and when the screen is off, then move " + \
411                "the device to simulate step motion until the screen turns on again.\nPlease " + \
412                "refrain from interacting with the screen or pressing any side buttons while " + \
413                "measurements are taken."
414            testrunner = PowerTest()
415            testrunner.executeOnDevice(PowerTest.REQUEST_SHOW_MESSAGE % "Connected.  Running tests...")
416            testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = GENERIC_MOTION_REQUEST)
417            testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = USER_STEPS_REQUEST)
418            testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = USER_STEPS_REQUEST)
419            testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = GENERIC_MOTION_REQUEST)
420            testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = GENERIC_MOTION_REQUEST)
421            testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = GENERIC_MOTION_REQUEST)
422            testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = None)
423            testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = None)
424            testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = None)
425            testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = None)
426            testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = None)
427            testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = None)
428        except:
429            import traceback
430            traceback.print_exc()
431        finally:
432            signal_exit_q.put(0) # anything will signal thread to terminate
433            logging.info("TESTS COMPLETE")
434            if testrunner:
435                try:
436                    testrunner.finalize()
437                except socket.error:
438                    sys.exit("============================\nUnable to connect to device under " + \
439                             "test. Make sure the device is connected via the usb pass-through, " + \
440                             "the CtsVerifier app is running the SensorPowerTest on the device, " + \
441                             "and USB pass-through is enabled.\n===========================")
442
443
444def main(argv):
445  """ Simple command-line interface for a power test application."""
446  useful_flags = ["voltage", "status", "usbpassthrough",
447                  "samples", "current", "log", "power_monitor"]
448  if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
449    print __doc__.strip()
450    print FLAGS.MainModuleHelp()
451    return
452
453  if FLAGS.avg and FLAGS.avg < 0:
454    loggign.error("--avg must be greater than 0")
455    return
456
457  if FLAGS.voltage is not None:
458    if FLAGS.voltage > 5.5:
459        print("!!WARNING: Voltage higher than typical values!!!")
460    try:
461        response = raw_input("Voltage of %.3f requested.  Confirm this is correct (Y/N)"%FLAGS.voltage)
462        if response.upper() != "Y":
463            sys.exit("Aborting")
464    except:
465        sys.exit("Aborting.")
466
467  if not FLAGS.power_monitor:
468      sys.exit("You must specify a '--power_monitor' option to specify which power monitor type " + \
469               "you are using.\nOne of:\n  \n  ".join(available_monitors))
470  power_monitors = do_import('power_monitors.%s' % FLAGS.power_monitor)
471  try:
472      mon = power_monitors.Power_Monitor(device=FLAGS.device)
473  except:
474      import traceback
475      traceback.print_exc()
476      sys.exit("No power monitors found")
477
478  if FLAGS.voltage is not None:
479
480    if FLAGS.ramp is not None:
481      mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
482    else:
483      mon.SetVoltage(FLAGS.voltage)
484
485  if FLAGS.current is not None:
486    mon.SetMaxCurrent(FLAGS.current)
487
488  if FLAGS.status:
489    items = sorted(mon.GetStatus().items())
490    print "\n".join(["%s: %s" % item for item in items])
491
492  if FLAGS.usbpassthrough:
493    if FLAGS.usbpassthrough == 'off':
494      mon.SetUsbPassthrough(0)
495    elif FLAGS.usbpassthrough == 'on':
496      mon.SetUsbPassthrough(1)
497    elif FLAGS.usbpassthrough == 'auto':
498      mon.SetUsbPassthrough(2)
499    else:
500      mon.Close()
501      sys.exit('bad pass-through flag: %s' % FLAGS.usbpassthrough)
502
503  if FLAGS.samples:
504    # Make sure state is normal
505    mon.StopDataCollection()
506    status = mon.GetStatus()
507    native_hz = status["sampleRate"] * 1000
508
509    # Collect and average samples as specified
510    mon.StartDataCollection()
511
512    # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
513    # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
514    # This is the error accumulator in a variation of Bresenham's algorithm.
515    emitted = offset = 0
516    collected = []
517    history_deque = collections.deque()  # past n samples for rolling average
518
519    try:
520      last_flush = time.time()
521      while emitted < FLAGS.samples or FLAGS.samples == -1:
522        # The number of raw samples to consume before emitting the next output
523        need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
524        if need > len(collected):  # still need more input samples
525          samples = mon.CollectData()
526          if not samples: break
527          collected.extend(samples)
528        else:
529          # Have enough data, generate output samples.
530          # Adjust for consuming 'need' input samples.
531          offset += need * FLAGS.hz
532          while offset >= native_hz:  # maybe multiple, if FLAGS.hz > native_hz
533            this_sample = sum(collected[:need]) / need
534
535            if FLAGS.timestamp: print int(time.time()),
536
537            if FLAGS.avg:
538              history_deque.appendleft(this_sample)
539              if len(history_deque) > FLAGS.avg: history_deque.pop()
540              print "%f %f" % (this_sample,
541                               sum(history_deque) / len(history_deque))
542            else:
543              print "%f" % this_sample
544            sys.stdout.flush()
545
546            offset -= native_hz
547            emitted += 1  # adjust for emitting 1 output sample
548          collected = collected[need:]
549          now = time.time()
550          if now - last_flush >= 0.99:  # flush every second
551            sys.stdout.flush()
552            last_flush = now
553    except KeyboardInterrupt:
554      print("interrupted")
555      return 1
556    finally:
557      mon.Close()
558    return 0
559
560  if FLAGS.run:
561    if not FLAGS.power_monitor:
562        sys.exit("When running power tests, you must specify which type of power monitor to use" +
563                 " with '--power_monitor <type of power monitor>'")
564    PowerTest.run_tests()
565
566
567if __name__ == "__main__":
568    flags.DEFINE_boolean("status", None, "Print power meter status")
569    flags.DEFINE_integer("avg", None,
570                         "Also report average over last n data points")
571    flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
572    flags.DEFINE_float("current", None, "Set max output current")
573    flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
574    flags.DEFINE_integer("samples", None, "Collect and print this many samples")
575    flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
576    flags.DEFINE_string("device", None,
577                        "Path to the device in /dev/... (ex:/dev/ttyACM1)")
578    flags.DEFINE_boolean("timestamp", None,
579                         "Also print integer (seconds) timestamp on each line")
580    flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")
581    flags.DEFINE_boolean("log", False, "Log progress to a file or not")
582    flags.DEFINE_boolean("run", False, "Run the test suite for power")
583    flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
584    sys.exit(main(FLAGS(sys.argv)))
585
586