1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, gobject, logging, os, stat 6from dbus.mainloop.glib import DBusGMainLoop 7 8import common 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import autotemp, error 11from mainloop import ExceptionForward 12from mainloop import GenericTesterMainLoop 13 14 15"""This module contains several helper classes for writing tests to verify the 16CrosDisks DBus interface. In particular, the CrosDisksTester class can be used 17to derive functional tests that interact with the CrosDisks server over DBus. 18""" 19 20 21class ExceptionSuppressor(object): 22 """A context manager class for suppressing certain types of exception. 23 24 An instance of this class is expected to be used with the with statement 25 and takes a set of exception classes at instantiation, which are types of 26 exception to be suppressed (and logged) in the code block under the with 27 statement. 28 29 Example: 30 31 with ExceptionSuppressor(OSError, IOError): 32 # An exception, which is a sub-class of OSError or IOError, is 33 # suppressed in the block code under the with statement. 34 """ 35 def __init__(self, *args): 36 self.__suppressed_exc_types = (args) 37 38 def __enter__(self): 39 return self 40 41 def __exit__(self, exc_type, exc_value, traceback): 42 if exc_type and issubclass(exc_type, self.__suppressed_exc_types): 43 try: 44 logging.exception('Suppressed exception: %s(%s)', 45 exc_type, exc_value) 46 except Exception: 47 pass 48 return True 49 return False 50 51 52class DBusClient(object): 53 """ A base class of a DBus proxy client to test a DBus server. 54 55 This class is expected to be used along with a GLib main loop and provides 56 some convenient functions for testing the DBus API exposed by a DBus server. 57 """ 58 def __init__(self, main_loop, bus, bus_name, object_path): 59 """Initializes the instance. 60 61 Args: 62 main_loop: The GLib main loop. 63 bus: The bus where the DBus server is connected to. 64 bus_name: The bus name owned by the DBus server. 65 object_path: The object path of the DBus server. 66 """ 67 self.__signal_content = {} 68 self.main_loop = main_loop 69 self.signal_timeout_in_seconds = 10 70 logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"', 71 bus_name, object_path) 72 self.proxy_object = bus.get_object(bus_name, object_path) 73 74 def clear_signal_content(self, signal_name): 75 """Clears the content of the signal. 76 77 Args: 78 signal_name: The name of the signal. 79 """ 80 if signal_name in self.__signal_content: 81 self.__signal_content[signal_name] = None 82 83 def get_signal_content(self, signal_name): 84 """Gets the content of a signal. 85 86 Args: 87 signal_name: The name of the signal. 88 89 Returns: 90 The content of a signal or None if the signal is not being handled. 91 """ 92 return self.__signal_content.get(signal_name) 93 94 def handle_signal(self, interface, signal_name, argument_names=()): 95 """Registers a signal handler to handle a given signal. 96 97 Args: 98 interface: The DBus interface of the signal. 99 signal_name: The name of the signal. 100 argument_names: A list of argument names that the signal contains. 101 """ 102 if signal_name in self.__signal_content: 103 return 104 105 self.__signal_content[signal_name] = None 106 107 def signal_handler(*args): 108 self.__signal_content[signal_name] = dict(zip(argument_names, args)) 109 110 logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"', 111 signal_name, ', '.join(argument_names), interface) 112 self.proxy_object.connect_to_signal(signal_name, signal_handler, 113 interface) 114 115 def wait_for_signal(self, signal_name): 116 """Waits for the reception of a signal. 117 118 Args: 119 signal_name: The name of the signal to wait for. 120 121 Returns: 122 The content of the signal. 123 """ 124 if signal_name not in self.__signal_content: 125 return None 126 127 def check_signal_content(): 128 context = self.main_loop.get_context() 129 while context.iteration(False): 130 pass 131 return self.__signal_content[signal_name] is not None 132 133 logging.debug('Waiting for D-Bus signal "%s"', signal_name) 134 utils.poll_for_condition(condition=check_signal_content, 135 desc='%s signal' % signal_name, 136 timeout=self.signal_timeout_in_seconds) 137 content = self.__signal_content[signal_name] 138 logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content) 139 self.__signal_content[signal_name] = None 140 return content 141 142 def expect_signal(self, signal_name, expected_content): 143 """Waits the the reception of a signal and verifies its content. 144 145 Args: 146 signal_name: The name of the signal to wait for. 147 expected_content: The expected content of the signal, which can be 148 partially specified. Only specified fields are 149 compared between the actual and expected content. 150 151 Returns: 152 The actual content of the signal. 153 154 Raises: 155 error.TestFail: A test failure when there is a mismatch between the 156 actual and expected content of the signal. 157 """ 158 actual_content = self.wait_for_signal(signal_name) 159 logging.debug("%s signal: expected=%s actual=%s", 160 signal_name, expected_content, actual_content) 161 for argument, expected_value in expected_content.iteritems(): 162 if argument not in actual_content: 163 raise error.TestFail( 164 ('%s signal missing "%s": expected=%s, actual=%s') % 165 (signal_name, argument, expected_content, actual_content)) 166 167 if actual_content[argument] != expected_value: 168 raise error.TestFail( 169 ('%s signal not matched on "%s": expected=%s, actual=%s') % 170 (signal_name, argument, expected_content, actual_content)) 171 return actual_content 172 173 174class CrosDisksClient(DBusClient): 175 """A DBus proxy client for testing the CrosDisks DBus server. 176 """ 177 178 CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks' 179 CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks' 180 CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks' 181 DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties' 182 FORMAT_COMPLETED_SIGNAL = 'FormatCompleted' 183 FORMAT_COMPLETED_SIGNAL_ARGUMENTS = ( 184 'status', 'path' 185 ) 186 MOUNT_COMPLETED_SIGNAL = 'MountCompleted' 187 MOUNT_COMPLETED_SIGNAL_ARGUMENTS = ( 188 'status', 'source_path', 'source_type', 'mount_path' 189 ) 190 RENAME_COMPLETED_SIGNAL = 'RenameCompleted' 191 RENAME_COMPLETED_SIGNAL_ARGUMENTS = ( 192 'status', 'path' 193 ) 194 195 def __init__(self, main_loop, bus): 196 """Initializes the instance. 197 198 Args: 199 main_loop: The GLib main loop. 200 bus: The bus where the DBus server is connected to. 201 """ 202 super(CrosDisksClient, self).__init__(main_loop, bus, 203 self.CROS_DISKS_BUS_NAME, 204 self.CROS_DISKS_OBJECT_PATH) 205 self.interface = dbus.Interface(self.proxy_object, 206 self.CROS_DISKS_INTERFACE) 207 self.properties = dbus.Interface(self.proxy_object, 208 self.DBUS_PROPERTIES_INTERFACE) 209 self.handle_signal(self.CROS_DISKS_INTERFACE, 210 self.FORMAT_COMPLETED_SIGNAL, 211 self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS) 212 self.handle_signal(self.CROS_DISKS_INTERFACE, 213 self.MOUNT_COMPLETED_SIGNAL, 214 self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS) 215 self.handle_signal(self.CROS_DISKS_INTERFACE, 216 self.RENAME_COMPLETED_SIGNAL, 217 self.RENAME_COMPLETED_SIGNAL_ARGUMENTS) 218 219 def enumerate_auto_mountable_devices(self): 220 """Invokes the CrosDisks EnumerateAutoMountableDevices method. 221 222 Returns: 223 A list of sysfs paths of devices that are auto-mountable by 224 CrosDisks. 225 """ 226 return self.interface.EnumerateAutoMountableDevices() 227 228 def enumerate_devices(self): 229 """Invokes the CrosDisks EnumerateMountableDevices method. 230 231 Returns: 232 A list of sysfs paths of devices that are recognized by 233 CrosDisks. 234 """ 235 return self.interface.EnumerateDevices() 236 237 def get_device_properties(self, path): 238 """Invokes the CrosDisks GetDeviceProperties method. 239 240 Args: 241 path: The device path. 242 243 Returns: 244 The properties of the device in a dictionary. 245 """ 246 return self.interface.GetDeviceProperties(path) 247 248 def format(self, path, filesystem_type=None, options=None): 249 """Invokes the CrosDisks Format method. 250 251 Args: 252 path: The device path to format. 253 filesystem_type: The filesystem type used for formatting the device. 254 options: A list of options used for formatting the device. 255 """ 256 if filesystem_type is None: 257 filesystem_type = '' 258 if options is None: 259 options = [] 260 self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL) 261 self.interface.Format(path, filesystem_type, 262 dbus.Array(options, signature='s')) 263 264 def wait_for_format_completion(self): 265 """Waits for the CrosDisks FormatCompleted signal. 266 267 Returns: 268 The content of the FormatCompleted signal. 269 """ 270 return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL) 271 272 def expect_format_completion(self, expected_content): 273 """Waits and verifies for the CrosDisks FormatCompleted signal. 274 275 Args: 276 expected_content: The expected content of the FormatCompleted 277 signal, which can be partially specified. 278 Only specified fields are compared between the 279 actual and expected content. 280 281 Returns: 282 The actual content of the FormatCompleted signal. 283 284 Raises: 285 error.TestFail: A test failure when there is a mismatch between the 286 actual and expected content of the FormatCompleted 287 signal. 288 """ 289 return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL, 290 expected_content) 291 292 def rename(self, path, volume_name=None): 293 """Invokes the CrosDisks Rename method. 294 295 Args: 296 path: The device path to rename. 297 volume_name: The new name used for renaming. 298 """ 299 if volume_name is None: 300 volume_name = '' 301 self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL) 302 self.interface.Rename(path, volume_name) 303 304 def wait_for_rename_completion(self): 305 """Waits for the CrosDisks RenameCompleted signal. 306 307 Returns: 308 The content of the RenameCompleted signal. 309 """ 310 return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL) 311 312 def expect_rename_completion(self, expected_content): 313 """Waits and verifies for the CrosDisks RenameCompleted signal. 314 315 Args: 316 expected_content: The expected content of the RenameCompleted 317 signal, which can be partially specified. 318 Only specified fields are compared between the 319 actual and expected content. 320 321 Returns: 322 The actual content of the RenameCompleted signal. 323 324 Raises: 325 error.TestFail: A test failure when there is a mismatch between the 326 actual and expected content of the RenameCompleted 327 signal. 328 """ 329 return self.expect_signal(self.RENAME_COMPLETED_SIGNAL, 330 expected_content) 331 332 def mount(self, path, filesystem_type=None, options=None): 333 """Invokes the CrosDisks Mount method. 334 335 Args: 336 path: The device path to mount. 337 filesystem_type: The filesystem type used for mounting the device. 338 options: A list of options used for mounting the device. 339 """ 340 if filesystem_type is None: 341 filesystem_type = '' 342 if options is None: 343 options = [] 344 self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL) 345 self.interface.Mount(path, filesystem_type, 346 dbus.Array(options, signature='s')) 347 348 def unmount(self, path, options=None): 349 """Invokes the CrosDisks Unmount method. 350 351 Args: 352 path: The device or mount path to unmount. 353 options: A list of options used for unmounting the path. 354 """ 355 if options is None: 356 options = [] 357 self.interface.Unmount(path, dbus.Array(options, signature='s')) 358 359 def wait_for_mount_completion(self): 360 """Waits for the CrosDisks MountCompleted signal. 361 362 Returns: 363 The content of the MountCompleted signal. 364 """ 365 return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL) 366 367 def expect_mount_completion(self, expected_content): 368 """Waits and verifies for the CrosDisks MountCompleted signal. 369 370 Args: 371 expected_content: The expected content of the MountCompleted 372 signal, which can be partially specified. 373 Only specified fields are compared between the 374 actual and expected content. 375 376 Returns: 377 The actual content of the MountCompleted signal. 378 379 Raises: 380 error.TestFail: A test failure when there is a mismatch between the 381 actual and expected content of the MountCompleted 382 signal. 383 """ 384 return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL, 385 expected_content) 386 387 388class CrosDisksTester(GenericTesterMainLoop): 389 """A base tester class for testing the CrosDisks server. 390 391 A derived class should override the get_tests method to return a list of 392 test methods. The perform_one_test method invokes each test method in the 393 list to verify some functionalities of CrosDisks server. 394 """ 395 def __init__(self, test): 396 bus_loop = DBusGMainLoop(set_as_default=True) 397 bus = dbus.SystemBus(mainloop=bus_loop) 398 self.main_loop = gobject.MainLoop() 399 super(CrosDisksTester, self).__init__(test, self.main_loop) 400 self.cros_disks = CrosDisksClient(self.main_loop, bus) 401 402 def get_tests(self): 403 """Returns a list of test methods to be invoked by perform_one_test. 404 405 A derived class should override this method. 406 407 Returns: 408 A list of test methods. 409 """ 410 return [] 411 412 @ExceptionForward 413 def perform_one_test(self): 414 """Exercises each test method in the list returned by get_tests. 415 """ 416 tests = self.get_tests() 417 self.remaining_requirements = set([test.func_name for test in tests]) 418 for test in tests: 419 test() 420 self.requirement_completed(test.func_name) 421 422 423class FilesystemTestObject(object): 424 """A base class to represent a filesystem test object. 425 426 A filesystem test object can be a file, directory or symbolic link. 427 A derived class should override the _create and _verify method to implement 428 how the test object should be created and verified, respectively, on a 429 filesystem. 430 """ 431 def __init__(self, path, content, mode): 432 """Initializes the instance. 433 434 Args: 435 path: The relative path of the test object. 436 content: The content of the test object. 437 mode: The file permissions given to the test object. 438 """ 439 self._path = path 440 self._content = content 441 self._mode = mode 442 443 def create(self, base_dir): 444 """Creates the test object in a base directory. 445 446 Args: 447 base_dir: The base directory where the test object is created. 448 449 Returns: 450 True if the test object is created successfully or False otherwise. 451 """ 452 if not self._create(base_dir): 453 logging.debug('Failed to create filesystem test object at "%s"', 454 os.path.join(base_dir, self._path)) 455 return False 456 return True 457 458 def verify(self, base_dir): 459 """Verifies the test object in a base directory. 460 461 Args: 462 base_dir: The base directory where the test object is expected to be 463 found. 464 465 Returns: 466 True if the test object is found in the base directory and matches 467 the expected content, or False otherwise. 468 """ 469 if not self._verify(base_dir): 470 logging.debug('Failed to verify filesystem test object at "%s"', 471 os.path.join(base_dir, self._path)) 472 return False 473 return True 474 475 def _create(self, base_dir): 476 return False 477 478 def _verify(self, base_dir): 479 return False 480 481 482class FilesystemTestDirectory(FilesystemTestObject): 483 """A filesystem test object that represents a directory.""" 484 485 def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \ 486 stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH): 487 super(FilesystemTestDirectory, self).__init__(path, content, mode) 488 489 def _create(self, base_dir): 490 path = os.path.join(base_dir, self._path) if self._path else base_dir 491 492 if self._path: 493 with ExceptionSuppressor(OSError): 494 os.makedirs(path) 495 os.chmod(path, self._mode) 496 497 if not os.path.isdir(path): 498 return False 499 500 for content in self._content: 501 if not content.create(path): 502 return False 503 return True 504 505 def _verify(self, base_dir): 506 path = os.path.join(base_dir, self._path) if self._path else base_dir 507 if not os.path.isdir(path): 508 return False 509 510 for content in self._content: 511 if not content.verify(path): 512 return False 513 return True 514 515 516class FilesystemTestFile(FilesystemTestObject): 517 """A filesystem test object that represents a file.""" 518 519 def __init__(self, path, content, mode=stat.S_IRUSR|stat.S_IWUSR| \ 520 stat.S_IRGRP|stat.S_IROTH): 521 super(FilesystemTestFile, self).__init__(path, content, mode) 522 523 def _create(self, base_dir): 524 path = os.path.join(base_dir, self._path) 525 with ExceptionSuppressor(IOError): 526 with open(path, 'wb+') as f: 527 f.write(self._content) 528 with ExceptionSuppressor(OSError): 529 os.chmod(path, self._mode) 530 return True 531 return False 532 533 def _verify(self, base_dir): 534 path = os.path.join(base_dir, self._path) 535 with ExceptionSuppressor(IOError): 536 with open(path, 'rb') as f: 537 return f.read() == self._content 538 return False 539 540 541class DefaultFilesystemTestContent(FilesystemTestDirectory): 542 def __init__(self): 543 super(DefaultFilesystemTestContent, self).__init__('', [ 544 FilesystemTestFile('file1', '0123456789'), 545 FilesystemTestDirectory('dir1', [ 546 FilesystemTestFile('file1', ''), 547 FilesystemTestFile('file2', 'abcdefg'), 548 FilesystemTestDirectory('dir2', [ 549 FilesystemTestFile('file3', 'abcdefg'), 550 ]), 551 ]), 552 ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH) 553 554 555class VirtualFilesystemImage(object): 556 def __init__(self, block_size, block_count, filesystem_type, 557 *args, **kwargs): 558 """Initializes the instance. 559 560 Args: 561 block_size: The number of bytes of each block in the image. 562 block_count: The number of blocks in the image. 563 filesystem_type: The filesystem type to be given to the mkfs 564 program for formatting the image. 565 566 Keyword Args: 567 mount_filesystem_type: The filesystem type to be given to the 568 mount program for mounting the image. 569 mkfs_options: A list of options to be given to the mkfs program. 570 """ 571 self._block_size = block_size 572 self._block_count = block_count 573 self._filesystem_type = filesystem_type 574 self._mount_filesystem_type = kwargs.get('mount_filesystem_type') 575 if self._mount_filesystem_type is None: 576 self._mount_filesystem_type = filesystem_type 577 self._mkfs_options = kwargs.get('mkfs_options') 578 if self._mkfs_options is None: 579 self._mkfs_options = [] 580 self._image_file = None 581 self._loop_device = None 582 self._loop_device_stat = None 583 self._mount_dir = None 584 585 def __del__(self): 586 with ExceptionSuppressor(Exception): 587 self.clean() 588 589 def __enter__(self): 590 self.create() 591 return self 592 593 def __exit__(self, exc_type, exc_value, traceback): 594 self.clean() 595 return False 596 597 def _remove_temp_path(self, temp_path): 598 """Removes a temporary file or directory created using autotemp.""" 599 if temp_path: 600 with ExceptionSuppressor(Exception): 601 path = temp_path.name 602 temp_path.clean() 603 logging.debug('Removed "%s"', path) 604 605 def _remove_image_file(self): 606 """Removes the image file if one has been created.""" 607 self._remove_temp_path(self._image_file) 608 self._image_file = None 609 610 def _remove_mount_dir(self): 611 """Removes the mount directory if one has been created.""" 612 self._remove_temp_path(self._mount_dir) 613 self._mount_dir = None 614 615 @property 616 def image_file(self): 617 """Gets the path of the image file. 618 619 Returns: 620 The path of the image file or None if no image file has been 621 created. 622 """ 623 return self._image_file.name if self._image_file else None 624 625 @property 626 def loop_device(self): 627 """Gets the loop device where the image file is attached to. 628 629 Returns: 630 The path of the loop device where the image file is attached to or 631 None if no loop device is attaching the image file. 632 """ 633 return self._loop_device 634 635 @property 636 def mount_dir(self): 637 """Gets the directory where the image file is mounted to. 638 639 Returns: 640 The directory where the image file is mounted to or None if no 641 mount directory has been created. 642 """ 643 return self._mount_dir.name if self._mount_dir else None 644 645 def create(self): 646 """Creates a zero-filled image file with the specified size. 647 648 The created image file is temporary and removed when clean() 649 is called. 650 """ 651 self.clean() 652 self._image_file = autotemp.tempfile(unique_id='fsImage') 653 try: 654 logging.debug('Creating zero-filled image file at "%s"', 655 self._image_file.name) 656 utils.run('dd if=/dev/zero of=%s bs=%s count=%s' % 657 (self._image_file.name, self._block_size, 658 self._block_count)) 659 except error.CmdError as exc: 660 self._remove_image_file() 661 message = 'Failed to create filesystem image: %s' % exc 662 raise RuntimeError(message) 663 664 def clean(self): 665 """Removes the image file if one has been created. 666 667 Before removal, the image file is detached from the loop device that 668 it is attached to. 669 """ 670 self.detach_from_loop_device() 671 self._remove_image_file() 672 673 def attach_to_loop_device(self): 674 """Attaches the created image file to a loop device. 675 676 Creates the image file, if one has not been created, by calling 677 create(). 678 679 Returns: 680 The path of the loop device where the image file is attached to. 681 """ 682 if self._loop_device: 683 return self._loop_device 684 685 if not self._image_file: 686 self.create() 687 688 logging.debug('Attaching image file "%s" to loop device', 689 self._image_file.name) 690 utils.run('losetup -f %s' % self._image_file.name) 691 output = utils.system_output('losetup -j %s' % self._image_file.name) 692 # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)" 693 self._loop_device = output.split(':')[0] 694 logging.debug('Attached image file "%s" to loop device "%s"', 695 self._image_file.name, self._loop_device) 696 697 self._loop_device_stat = os.stat(self._loop_device) 698 logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)', 699 self._loop_device, 700 self._loop_device_stat.st_uid, 701 self._loop_device_stat.st_gid, 702 stat.S_IMODE(self._loop_device_stat.st_mode)) 703 return self._loop_device 704 705 def detach_from_loop_device(self): 706 """Detaches the image file from the loop device.""" 707 if not self._loop_device: 708 return 709 710 self.unmount() 711 712 logging.debug('Cleaning up remaining mount points of loop device "%s"', 713 self._loop_device) 714 utils.run('umount -f %s' % self._loop_device, ignore_status=True) 715 716 logging.debug('Restore ownership/permissions of loop device "%s"', 717 self._loop_device) 718 os.chmod(self._loop_device, 719 stat.S_IMODE(self._loop_device_stat.st_mode)) 720 os.chown(self._loop_device, 721 self._loop_device_stat.st_uid, self._loop_device_stat.st_gid) 722 723 logging.debug('Detaching image file "%s" from loop device "%s"', 724 self._image_file.name, self._loop_device) 725 utils.run('losetup -d %s' % self._loop_device) 726 self._loop_device = None 727 728 def format(self): 729 """Formats the image file as the specified filesystem.""" 730 self.attach_to_loop_device() 731 try: 732 logging.debug('Formatting image file at "%s" as "%s" filesystem', 733 self._image_file.name, self._filesystem_type) 734 utils.run('yes | mkfs -t %s %s %s' % 735 (self._filesystem_type, ' '.join(self._mkfs_options), 736 self._loop_device)) 737 logging.debug('blkid: %s', utils.system_output( 738 'blkid -c /dev/null %s' % self._loop_device, 739 ignore_status=True)) 740 except error.CmdError as exc: 741 message = 'Failed to format filesystem image: %s' % exc 742 raise RuntimeError(message) 743 744 def mount(self, options=None): 745 """Mounts the image file to a directory. 746 747 Args: 748 options: An optional list of mount options. 749 """ 750 if self._mount_dir: 751 return self._mount_dir.name 752 753 if options is None: 754 options = [] 755 756 options_arg = ','.join(options) 757 if options_arg: 758 options_arg = '-o ' + options_arg 759 760 self.attach_to_loop_device() 761 self._mount_dir = autotemp.tempdir(unique_id='fsImage') 762 try: 763 logging.debug('Mounting image file "%s" (%s) to directory "%s"', 764 self._image_file.name, self._loop_device, 765 self._mount_dir.name) 766 utils.run('mount -t %s %s %s %s' % 767 (self._mount_filesystem_type, options_arg, 768 self._loop_device, self._mount_dir.name)) 769 except error.CmdError as exc: 770 self._remove_mount_dir() 771 message = ('Failed to mount virtual filesystem image "%s": %s' % 772 (self._image_file.name, exc)) 773 raise RuntimeError(message) 774 return self._mount_dir.name 775 776 def unmount(self): 777 """Unmounts the image file from the mounted directory.""" 778 if not self._mount_dir: 779 return 780 781 try: 782 logging.debug('Unmounting image file "%s" (%s) from directory "%s"', 783 self._image_file.name, self._loop_device, 784 self._mount_dir.name) 785 utils.run('umount %s' % self._mount_dir.name) 786 except error.CmdError as exc: 787 message = ('Failed to unmount virtual filesystem image "%s": %s' % 788 (self._image_file.name, exc)) 789 raise RuntimeError(message) 790 finally: 791 self._remove_mount_dir() 792 793 def get_volume_label(self): 794 """Gets volume name information of |self._loop_device| 795 796 @return a string with volume name if it exists. 797 """ 798 # This script is run as root in a normal autotest run, 799 # so this works: It doesn't have access to the necessary info 800 # when run as a non-privileged user 801 cmd = "blkid -c /dev/null -o udev %s" % self._loop_device 802 output = utils.system_output(cmd, ignore_status=True) 803 804 for line in output.splitlines(): 805 udev_key, udev_val = line.split('=') 806 807 if udev_key == 'ID_FS_LABEL': 808 return udev_val 809 810 return None 811