1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6import shutil 7import stat 8import sys 9import tempfile 10import unittest 11import uuid 12import zipfile 13 14import mock 15 16from dependency_manager import dependency_manager_util 17from dependency_manager import exceptions 18 19 20class DependencyManagerUtilTest(unittest.TestCase): 21 # This class intentionally uses actual file I/O to test real system behavior. 22 23 def setUp(self): 24 self.tmp_dir = os.path.abspath(tempfile.mkdtemp(prefix='telemetry')) 25 self.sub_dir = os.path.join(self.tmp_dir, 'sub_dir') 26 os.mkdir(self.sub_dir) 27 28 self.read_only_path = (os.path.join(self.tmp_dir, 'read_only')) 29 with open(self.read_only_path, 'w+') as read_file: 30 read_file.write('Read-only file') 31 os.chmod(self.read_only_path, stat.S_IRUSR) 32 33 self.writable_path = (os.path.join(self.tmp_dir, 'writable')) 34 with open(self.writable_path, 'w+') as writable_file: 35 writable_file.write('Writable file') 36 os.chmod(self.writable_path, stat.S_IRUSR | stat.S_IWUSR) 37 38 self.executable_path = (os.path.join(self.tmp_dir, 'executable')) 39 with open(self.executable_path, 'w+') as executable_file: 40 executable_file.write('Executable file') 41 os.chmod(self.executable_path, stat.S_IRWXU) 42 43 self.sub_read_only_path = (os.path.join(self.sub_dir, 'read_only')) 44 with open(self.sub_read_only_path, 'w+') as read_file: 45 read_file.write('Read-only sub file') 46 os.chmod(self.sub_read_only_path, stat.S_IRUSR) 47 48 self.sub_writable_path = (os.path.join(self.sub_dir, 'writable')) 49 with open(self.sub_writable_path, 'w+') as writable_file: 50 writable_file.write('Writable sub file') 51 os.chmod(self.sub_writable_path, stat.S_IRUSR | stat.S_IWUSR) 52 53 self.sub_executable_path = (os.path.join(self.sub_dir, 'executable')) 54 with open(self.sub_executable_path, 'w+') as executable_file: 55 executable_file.write('Executable sub file') 56 os.chmod(self.sub_executable_path, stat.S_IRWXU) 57 58 self.AssertExpectedDirFiles(self.tmp_dir) 59 self.archive_path = self.CreateZipArchiveFromDir(self.tmp_dir) 60 61 def tearDown(self): 62 if os.path.isdir(self.tmp_dir): 63 dependency_manager_util.RemoveDir(self.tmp_dir) 64 if os.path.isfile(self.archive_path): 65 os.remove(self.archive_path) 66 67 def AssertExpectedDirFiles(self, top_dir): 68 sub_dir = os.path.join(top_dir, 'sub_dir') 69 read_only_path = (os.path.join(top_dir, 'read_only')) 70 writable_path = (os.path.join(top_dir, 'writable')) 71 executable_path = (os.path.join(top_dir, 'executable')) 72 sub_read_only_path = (os.path.join(sub_dir, 'read_only')) 73 sub_writable_path = (os.path.join(sub_dir, 'writable')) 74 sub_executable_path = (os.path.join(sub_dir, 'executable')) 75 # assert contents as expected 76 self.assertTrue(os.path.isdir(top_dir)) 77 self.assertTrue(os.path.isdir(sub_dir)) 78 self.assertTrue(os.path.isfile(read_only_path)) 79 self.assertTrue(os.path.isfile(writable_path)) 80 self.assertTrue(os.path.isfile(executable_path)) 81 self.assertTrue(os.path.isfile(sub_read_only_path)) 82 self.assertTrue(os.path.isfile(sub_writable_path)) 83 self.assertTrue(os.path.isfile(sub_executable_path)) 84 85 # assert permissions as expected 86 self.assertTrue( 87 stat.S_IRUSR & stat.S_IMODE(os.stat(read_only_path).st_mode)) 88 self.assertTrue( 89 stat.S_IRUSR & stat.S_IMODE(os.stat(sub_read_only_path).st_mode)) 90 self.assertTrue( 91 stat.S_IRUSR & stat.S_IMODE(os.stat(writable_path).st_mode)) 92 self.assertTrue( 93 stat.S_IWUSR & stat.S_IMODE(os.stat(writable_path).st_mode)) 94 self.assertTrue( 95 stat.S_IRUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode)) 96 self.assertTrue( 97 stat.S_IWUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode)) 98 if not sys.platform.startswith('win'): 99 self.assertEqual( 100 stat.S_IRWXU, 101 stat.S_IRWXU & stat.S_IMODE(os.stat(executable_path).st_mode)) 102 self.assertEqual( 103 stat.S_IRWXU, 104 stat.S_IRWXU & stat.S_IMODE(os.stat(sub_executable_path).st_mode)) 105 106 def CreateZipArchiveFromDir(self, dir_path): 107 try: 108 base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 109 archive_path = shutil.make_archive(base_path, 'zip', dir_path) 110 self.assertTrue(os.path.exists(archive_path)) 111 self.assertTrue(zipfile.is_zipfile(archive_path)) 112 except: 113 if os.path.isfile(archive_path): 114 os.remove(archive_path) 115 raise 116 return archive_path 117 118 def testRemoveDirWithSubDir(self): 119 dependency_manager_util.RemoveDir(self.tmp_dir) 120 121 self.assertFalse(os.path.exists(self.tmp_dir)) 122 self.assertFalse(os.path.exists(self.sub_dir)) 123 self.assertFalse(os.path.exists(self.read_only_path)) 124 self.assertFalse(os.path.exists(self.writable_path)) 125 self.assertFalse(os.path.isfile(self.executable_path)) 126 self.assertFalse(os.path.exists(self.sub_read_only_path)) 127 self.assertFalse(os.path.exists(self.sub_writable_path)) 128 self.assertFalse(os.path.isfile(self.sub_executable_path)) 129 130 def testUnzipFile(self): 131 self.AssertExpectedDirFiles(self.tmp_dir) 132 unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 133 dependency_manager_util.UnzipArchive(self.archive_path, unzip_path) 134 self.AssertExpectedDirFiles(unzip_path) 135 self.AssertExpectedDirFiles(self.tmp_dir) 136 dependency_manager_util.RemoveDir(unzip_path) 137 138 def testUnzipFileContainingLongPath(self): 139 try: 140 dir_path = self.tmp_dir 141 if sys.platform.startswith('win'): 142 dir_path = u'\\\\?\\' + dir_path 143 144 archive_suffix = '' 145 # 260 is the Windows API path length limit. 146 while len(archive_suffix) < 260: 147 archive_suffix = os.path.join(archive_suffix, 'really') 148 contents_dir_path = os.path.join(dir_path, archive_suffix) 149 os.makedirs(contents_dir_path) 150 filename = os.path.join(contents_dir_path, 'longpath.txt') 151 open(filename, 'a').close() 152 153 base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 154 archive_path = shutil.make_archive(base_path, 'zip', dir_path) 155 self.assertTrue(os.path.exists(archive_path)) 156 self.assertTrue(zipfile.is_zipfile(archive_path)) 157 except: 158 if os.path.isfile(archive_path): 159 os.remove(archive_path) 160 raise 161 162 unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 163 dependency_manager_util.UnzipArchive(archive_path, unzip_path) 164 dependency_manager_util.RemoveDir(unzip_path) 165 166 def testUnzipFileFailure(self): 167 unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 168 self.assertFalse(os.path.exists(unzip_path)) 169 with mock.patch( 170 'dependency_manager.dependency_manager_util.zipfile.ZipFile.extractall' # pylint: disable=line-too-long 171 ) as zipfile_mock: 172 zipfile_mock.side_effect = IOError 173 self.assertRaises( 174 IOError, dependency_manager_util.UnzipArchive, self.archive_path, 175 unzip_path) 176 self.AssertExpectedDirFiles(self.tmp_dir) 177 self.assertFalse(os.path.exists(unzip_path)) 178 179 def testVerifySafeArchivePasses(self): 180 with zipfile.ZipFile(self.archive_path) as archive: 181 dependency_manager_util.VerifySafeArchive(archive) 182 183 def testVerifySafeArchiveFailsOnRelativePathWithPardir(self): 184 tmp_file = tempfile.NamedTemporaryFile(delete=False) 185 tmp_file_name = tmp_file.name 186 tmp_file.write('Bad file!') 187 tmp_file.close() 188 with zipfile.ZipFile(self.archive_path, 'w') as archive: 189 archive.write(tmp_file_name, '../../foo') 190 self.assertRaises( 191 exceptions.ArchiveError, dependency_manager_util.VerifySafeArchive, 192 archive) 193 194