1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright 2019 The ChromiumOS Authors 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Script to image a ChromeOS device. 9 10This script images a remote ChromeOS device with a specific image." 11""" 12 13 14__author__ = "asharif@google.com (Ahmad Sharif)" 15 16import argparse 17import filecmp 18import getpass 19import glob 20import os 21import re 22import shutil 23import sys 24import tempfile 25import time 26 27from cros_utils import command_executer 28from cros_utils import locks 29from cros_utils import logger 30from cros_utils import misc 31from cros_utils.file_utils import FileUtils 32 33 34checksum_file = "/usr/local/osimage_checksum_file" 35lock_file = "/tmp/image_chromeos_lock/image_chromeos_lock" 36 37 38def Usage(parser, message): 39 print("ERROR: %s" % message) 40 parser.print_help() 41 sys.exit(0) 42 43 44def CheckForCrosFlash(chromeos_root, remote, log_level): 45 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 46 47 # Check to see if remote machine has cherrypy, ctypes 48 command = "python -c 'import cherrypy, ctypes'" 49 ret = cmd_executer.CrosRunCommand( 50 command, chromeos_root=chromeos_root, machine=remote 51 ) 52 logger.GetLogger().LogFatalIf( 53 ret == 255, "Failed ssh to %s (for checking cherrypy)" % remote 54 ) 55 logger.GetLogger().LogFatalIf( 56 ret != 0, 57 "Failed to find cherrypy or ctypes on remote '{}', " 58 "cros flash cannot work.".format(remote), 59 ) 60 61 62def DisableCrosBeeps(chromeos_root, remote, log_level): 63 """Disable annoying chromebooks beeps after reboots.""" 64 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 65 66 command = "/usr/share/vboot/bin/set_gbb_flags.sh 0x1" 67 logger.GetLogger().LogOutput("Trying to disable beeping.") 68 69 ret, o, _ = cmd_executer.CrosRunCommandWOutput( 70 command, chromeos_root=chromeos_root, machine=remote 71 ) 72 if ret != 0: 73 logger.GetLogger().LogOutput(o) 74 logger.GetLogger().LogOutput("Failed to disable beeps.") 75 76 77def FindChromeOSImage(image_file, chromeos_root): 78 """Find path for ChromeOS image inside chroot. 79 80 This function could be called with image paths that are either inside 81 or outside the chroot. In either case the path needs to be translated 82 to an real/absolute path inside the chroot. 83 Example input paths: 84 /usr/local/google/home/uname/chromeos/chroot/tmp/my-test-images/image 85 ~/trunk/src/build/images/board/latest/image 86 /tmp/peppy-release/R67-1235.0.0/image 87 88 Corresponding example output paths: 89 /tmp/my-test-images/image 90 /home/uname/trunk/src/build/images/board/latest/image 91 /tmp/peppy-release/R67-1235.0,0/image 92 """ 93 94 # Get the name of the user, for "/home/<user>" part of the path. 95 whoami = getpass.getuser() 96 # Get the full path for the chroot dir, including 'chroot' 97 real_chroot_dir = os.path.join(os.path.realpath(chromeos_root), "chroot") 98 # Get the full path for the chromeos root, excluding 'chroot' 99 real_chromeos_root = os.path.realpath(chromeos_root) 100 101 # If path name starts with real_chroot_dir, remove that piece, but assume 102 # the rest of the path is correct. 103 if image_file.find(real_chroot_dir) != -1: 104 chroot_image = image_file[len(real_chroot_dir) :] 105 # If path name starts with chromeos_root, excluding 'chroot', replace the 106 # chromeos_root with the prefix: '/home/<username>/trunk'. 107 elif image_file.find(real_chromeos_root) != -1: 108 chroot_image = image_file[len(real_chromeos_root) :] 109 chroot_image = "/home/%s/trunk%s" % (whoami, chroot_image) 110 # Else assume the path is already internal, so leave it alone. 111 else: 112 chroot_image = image_file 113 114 return chroot_image 115 116 117def DoImage(argv): 118 """Image ChromeOS.""" 119 120 parser = argparse.ArgumentParser() 121 parser.add_argument( 122 "-c", 123 "--chromeos_root", 124 dest="chromeos_root", 125 help="Target directory for ChromeOS installation.", 126 ) 127 parser.add_argument("-r", "--remote", dest="remote", help="Target device.") 128 parser.add_argument( 129 "-i", "--image", dest="image", help="Image binary file." 130 ) 131 parser.add_argument( 132 "-b", "--board", dest="board", help="Target board override." 133 ) 134 parser.add_argument( 135 "-f", 136 "--force", 137 dest="force", 138 action="store_true", 139 default=False, 140 help="Force an image even if it is non-test.", 141 ) 142 parser.add_argument( 143 "-n", 144 "--no_lock", 145 dest="no_lock", 146 default=False, 147 action="store_true", 148 help="Do not attempt to lock remote before imaging. " 149 "This option should only be used in cases where the " 150 "exclusive lock has already been acquired (e.g. in " 151 "a script that calls this one).", 152 ) 153 parser.add_argument( 154 "-l", 155 "--logging_level", 156 dest="log_level", 157 default="verbose", 158 help="Amount of logging to be used. Valid levels are " 159 "'quiet', 'average', and 'verbose'.", 160 ) 161 parser.add_argument("-a", "--image_args", dest="image_args") 162 163 options = parser.parse_args(argv[1:]) 164 165 if not options.log_level in command_executer.LOG_LEVEL: 166 Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'") 167 else: 168 log_level = options.log_level 169 170 # Common initializations 171 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 172 l = logger.GetLogger() 173 174 if options.chromeos_root is None: 175 Usage(parser, "--chromeos_root must be set") 176 177 if options.remote is None: 178 Usage(parser, "--remote must be set") 179 180 options.chromeos_root = os.path.expanduser(options.chromeos_root) 181 182 if options.board is None: 183 board = cmd_executer.CrosLearnBoard( 184 options.chromeos_root, options.remote 185 ) 186 else: 187 board = options.board 188 189 if options.image is None: 190 images_dir = misc.GetImageDir(options.chromeos_root, board) 191 image = os.path.join(images_dir, "latest", "chromiumos_test_image.bin") 192 if not os.path.exists(image): 193 image = os.path.join(images_dir, "latest", "chromiumos_image.bin") 194 is_xbuddy_image = False 195 else: 196 image = options.image 197 is_xbuddy_image = image.startswith("xbuddy://") 198 if not is_xbuddy_image: 199 image = os.path.expanduser(image) 200 201 if not is_xbuddy_image: 202 image = os.path.realpath(image) 203 204 if not os.path.exists(image) and not is_xbuddy_image: 205 Usage(parser, "Image file: " + image + " does not exist!") 206 207 try: 208 should_unlock = False 209 if not options.no_lock: 210 try: 211 _ = locks.AcquireLock( 212 list(options.remote.split()), options.chromeos_root 213 ) 214 should_unlock = True 215 except Exception as e: 216 raise RuntimeError("Error acquiring machine: %s" % str(e)) 217 218 reimage = False 219 local_image = False 220 if not is_xbuddy_image: 221 local_image = True 222 image_checksum = FileUtils().Md5File(image, log_level=log_level) 223 224 command = "cat " + checksum_file 225 ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput( 226 command, 227 chromeos_root=options.chromeos_root, 228 machine=options.remote, 229 ) 230 231 device_checksum = device_checksum.strip() 232 image_checksum = str(image_checksum) 233 234 l.LogOutput("Image checksum: " + image_checksum) 235 l.LogOutput("Device checksum: " + device_checksum) 236 237 if image_checksum != device_checksum: 238 [found, located_image] = LocateOrCopyImage( 239 options.chromeos_root, image, board=board 240 ) 241 242 reimage = True 243 l.LogOutput("Checksums do not match. Re-imaging...") 244 245 chroot_image = FindChromeOSImage( 246 located_image, options.chromeos_root 247 ) 248 249 is_test_image = IsImageModdedForTest( 250 options.chromeos_root, chroot_image, log_level 251 ) 252 253 if not is_test_image and not options.force: 254 logger.GetLogger().LogFatal( 255 "Have to pass --force to image a " "non-test image!" 256 ) 257 else: 258 reimage = True 259 found = True 260 l.LogOutput("Using non-local image; Re-imaging...") 261 262 if reimage: 263 # If the device has /tmp mounted as noexec, image_to_live.sh can fail. 264 command = "mount -o remount,rw,exec /tmp" 265 cmd_executer.CrosRunCommand( 266 command, 267 chromeos_root=options.chromeos_root, 268 machine=options.remote, 269 ) 270 271 # Check to see if cros flash will work for the remote machine. 272 CheckForCrosFlash(options.chromeos_root, options.remote, log_level) 273 274 # Disable the annoying chromebook beeps after reboot. 275 DisableCrosBeeps(options.chromeos_root, options.remote, log_level) 276 277 cros_flash_args = [ 278 "cros", 279 "flash", 280 "--board=%s" % board, 281 "--clobber-stateful", 282 options.remote, 283 ] 284 if local_image: 285 cros_flash_args.append(chroot_image) 286 else: 287 cros_flash_args.append(image) 288 289 command = " ".join(cros_flash_args) 290 291 # Workaround for crosbug.com/35684. 292 os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600) 293 294 if log_level == "average": 295 cmd_executer.SetLogLevel("verbose") 296 retries = 0 297 while True: 298 if log_level == "quiet": 299 l.LogOutput("CMD : %s" % command) 300 ret = cmd_executer.ChrootRunCommand( 301 options.chromeos_root, command, command_timeout=1800 302 ) 303 if ret == 0 or retries >= 2: 304 break 305 retries += 1 306 if log_level == "quiet": 307 l.LogOutput("Imaging failed. Retry # %d." % retries) 308 309 if log_level == "average": 310 cmd_executer.SetLogLevel(log_level) 311 312 logger.GetLogger().LogFatalIf(ret, "Image command failed") 313 314 # Unfortunately cros_image_to_target.py sometimes returns early when the 315 # machine isn't fully up yet. 316 ret = EnsureMachineUp( 317 options.chromeos_root, options.remote, log_level 318 ) 319 320 # If this is a non-local image, then the ret returned from 321 # EnsureMachineUp is the one that will be returned by this function; 322 # in that case, make sure the value in 'ret' is appropriate. 323 if not local_image and ret: 324 ret = 0 325 else: 326 ret = 1 327 328 if local_image: 329 if log_level == "average": 330 l.LogOutput("Verifying image.") 331 command = "echo %s > %s && chmod -w %s" % ( 332 image_checksum, 333 checksum_file, 334 checksum_file, 335 ) 336 ret = cmd_executer.CrosRunCommand( 337 command, 338 chromeos_root=options.chromeos_root, 339 machine=options.remote, 340 ) 341 logger.GetLogger().LogFatalIf(ret, "Writing checksum failed.") 342 343 successfully_imaged = VerifyChromeChecksum( 344 options.chromeos_root, 345 chroot_image, 346 options.remote, 347 log_level, 348 ) 349 logger.GetLogger().LogFatalIf( 350 not successfully_imaged, "Image verification failed!" 351 ) 352 TryRemountPartitionAsRW( 353 options.chromeos_root, options.remote, log_level 354 ) 355 356 if not found: 357 temp_dir = os.path.dirname(located_image) 358 l.LogOutput("Deleting temp image dir: %s" % temp_dir) 359 shutil.rmtree(temp_dir) 360 l.LogOutput("Image updated.") 361 else: 362 l.LogOutput("Checksums match, skip image update and reboot.") 363 command = "reboot && exit" 364 _ = cmd_executer.CrosRunCommand( 365 command, 366 chromeos_root=options.chromeos_root, 367 machine=options.remote, 368 ) 369 # Wait 30s after reboot. 370 time.sleep(30) 371 372 finally: 373 if should_unlock: 374 locks.ReleaseLock( 375 list(options.remote.split()), options.chromeos_root 376 ) 377 378 return ret 379 380 381def LocateOrCopyImage(chromeos_root, image, board=None): 382 l = logger.GetLogger() 383 if board is None: 384 board_glob = "*" 385 else: 386 board_glob = board 387 388 chromeos_root_realpath = os.path.realpath(chromeos_root) 389 image = os.path.realpath(image) 390 391 if image.startswith("%s/" % chromeos_root_realpath): 392 return [True, image] 393 394 # First search within the existing build dirs for any matching files. 395 images_glob = "%s/src/build/images/%s/*/*.bin" % ( 396 chromeos_root_realpath, 397 board_glob, 398 ) 399 images_list = glob.glob(images_glob) 400 for potential_image in images_list: 401 if filecmp.cmp(potential_image, image): 402 l.LogOutput( 403 "Found matching image %s in chromeos_root." % potential_image 404 ) 405 return [True, potential_image] 406 # We did not find an image. Copy it in the src dir and return the copied 407 # file. 408 if board is None: 409 board = "" 410 base_dir = "%s/src/build/images/%s" % (chromeos_root_realpath, board) 411 if not os.path.isdir(base_dir): 412 os.makedirs(base_dir) 413 temp_dir = tempfile.mkdtemp(prefix="%s/tmp" % base_dir) 414 new_image = "%s/%s" % (temp_dir, os.path.basename(image)) 415 l.LogOutput( 416 "No matching image found. Copying %s to %s" % (image, new_image) 417 ) 418 shutil.copyfile(image, new_image) 419 return [False, new_image] 420 421 422def GetImageMountCommand(image, rootfs_mp, stateful_mp): 423 image_dir = os.path.dirname(image) 424 image_file = os.path.basename(image) 425 mount_command = ( 426 "cd /mnt/host/source/src/scripts &&" 427 "./mount_gpt_image.sh --from=%s --image=%s" 428 " --safe --read_only" 429 " --rootfs_mountpt=%s" 430 " --stateful_mountpt=%s" 431 % (image_dir, image_file, rootfs_mp, stateful_mp) 432 ) 433 return mount_command 434 435 436def MountImage( 437 chromeos_root, 438 image, 439 rootfs_mp, 440 stateful_mp, 441 log_level, 442 unmount=False, 443 extra_commands="", 444): 445 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 446 command = GetImageMountCommand(image, rootfs_mp, stateful_mp) 447 if unmount: 448 command = "%s --unmount" % command 449 if extra_commands: 450 command = "%s ; %s" % (command, extra_commands) 451 ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) 452 logger.GetLogger().LogFatalIf(ret, "Mount/unmount command failed!") 453 return out 454 455 456def IsImageModdedForTest(chromeos_root, image, log_level): 457 if log_level != "verbose": 458 log_level = "quiet" 459 command = "mktemp -d" 460 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 461 _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( 462 chromeos_root, command 463 ) 464 _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( 465 chromeos_root, command 466 ) 467 rootfs_mp = rootfs_mp.strip() 468 stateful_mp = stateful_mp.strip() 469 lsb_release_file = os.path.join(rootfs_mp, "etc/lsb-release") 470 extra = "grep CHROMEOS_RELEASE_TRACK %s | grep -i test" % lsb_release_file 471 output = MountImage( 472 chromeos_root, 473 image, 474 rootfs_mp, 475 stateful_mp, 476 log_level, 477 extra_commands=extra, 478 ) 479 is_test_image = re.search("test", output, re.IGNORECASE) 480 MountImage( 481 chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True 482 ) 483 return is_test_image 484 485 486def VerifyChromeChecksum(chromeos_root, image, remote, log_level): 487 command = "mktemp -d" 488 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 489 _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( 490 chromeos_root, command 491 ) 492 _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( 493 chromeos_root, command 494 ) 495 rootfs_mp = rootfs_mp.strip() 496 stateful_mp = stateful_mp.strip() 497 chrome_file = "%s/opt/google/chrome/chrome" % rootfs_mp 498 extra = "md5sum %s" % chrome_file 499 out = MountImage( 500 chromeos_root, 501 image, 502 rootfs_mp, 503 stateful_mp, 504 log_level, 505 extra_commands=extra, 506 ) 507 image_chrome_checksum = out.strip().split()[0] 508 MountImage( 509 chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True 510 ) 511 512 command = "md5sum /opt/google/chrome/chrome" 513 [_, o, _] = cmd_executer.CrosRunCommandWOutput( 514 command, chromeos_root=chromeos_root, machine=remote 515 ) 516 device_chrome_checksum = o.split()[0] 517 return image_chrome_checksum.strip() == device_chrome_checksum.strip() 518 519 520# Remount partition as writable. 521# TODO: auto-detect if an image is built using --noenable_rootfs_verification. 522def TryRemountPartitionAsRW(chromeos_root, remote, log_level): 523 l = logger.GetLogger() 524 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 525 command = "sudo mount -o remount,rw /" 526 ret = cmd_executer.CrosRunCommand( 527 command, 528 chromeos_root=chromeos_root, 529 machine=remote, 530 terminated_timeout=10, 531 ) 532 if ret: 533 ## Safely ignore. 534 l.LogWarning( 535 "Failed to remount partition as rw, " 536 "probably the image was not built with " 537 '"--noenable_rootfs_verification", ' 538 "you can safely ignore this." 539 ) 540 else: 541 l.LogOutput("Re-mounted partition as writable.") 542 543 544def EnsureMachineUp(chromeos_root, remote, log_level): 545 l = logger.GetLogger() 546 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 547 timeout = 600 548 magic = "abcdefghijklmnopqrstuvwxyz" 549 command = "echo %s" % magic 550 start_time = time.time() 551 while True: 552 current_time = time.time() 553 if current_time - start_time > timeout: 554 l.LogError( 555 "Timeout of %ss reached. Machine still not up. Aborting." 556 % timeout 557 ) 558 return False 559 ret = cmd_executer.CrosRunCommand( 560 command, chromeos_root=chromeos_root, machine=remote 561 ) 562 if not ret: 563 return True 564 565 566if __name__ == "__main__": 567 retval = DoImage(sys.argv) 568 sys.exit(retval) 569