• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6import shutil
7import stat
8import sys
9import tempfile
10import unittest
11import uuid
12import zipfile
13
14import mock
15
16from dependency_manager import dependency_manager_util
17from dependency_manager import exceptions
18
19
20class DependencyManagerUtilTest(unittest.TestCase):
21  # This class intentionally uses actual file I/O to test real system behavior.
22
23  def setUp(self):
24    self.tmp_dir = os.path.abspath(tempfile.mkdtemp(prefix='telemetry'))
25    self.sub_dir = os.path.join(self.tmp_dir, 'sub_dir')
26    os.mkdir(self.sub_dir)
27
28    self.read_only_path = (os.path.join(self.tmp_dir, 'read_only'))
29    with open(self.read_only_path, 'w+') as read_file:
30      read_file.write('Read-only file')
31    os.chmod(self.read_only_path, stat.S_IRUSR)
32
33    self.writable_path = (os.path.join(self.tmp_dir, 'writable'))
34    with open(self.writable_path, 'w+') as writable_file:
35      writable_file.write('Writable file')
36    os.chmod(self.writable_path, stat.S_IRUSR | stat.S_IWUSR)
37
38    self.executable_path = (os.path.join(self.tmp_dir, 'executable'))
39    with open(self.executable_path, 'w+') as executable_file:
40      executable_file.write('Executable file')
41    os.chmod(self.executable_path, stat.S_IRWXU)
42
43    self.sub_read_only_path = (os.path.join(self.sub_dir, 'read_only'))
44    with open(self.sub_read_only_path, 'w+') as read_file:
45      read_file.write('Read-only sub file')
46    os.chmod(self.sub_read_only_path, stat.S_IRUSR)
47
48    self.sub_writable_path = (os.path.join(self.sub_dir, 'writable'))
49    with open(self.sub_writable_path, 'w+') as writable_file:
50      writable_file.write('Writable sub file')
51    os.chmod(self.sub_writable_path, stat.S_IRUSR | stat.S_IWUSR)
52
53    self.sub_executable_path = (os.path.join(self.sub_dir, 'executable'))
54    with open(self.sub_executable_path, 'w+') as executable_file:
55      executable_file.write('Executable sub file')
56    os.chmod(self.sub_executable_path, stat.S_IRWXU)
57
58    self.AssertExpectedDirFiles(self.tmp_dir)
59    self.archive_path = self.CreateZipArchiveFromDir(self.tmp_dir)
60
61  def tearDown(self):
62    if os.path.isdir(self.tmp_dir):
63      dependency_manager_util.RemoveDir(self.tmp_dir)
64    if os.path.isfile(self.archive_path):
65      os.remove(self.archive_path)
66
67  def AssertExpectedDirFiles(self, top_dir):
68    sub_dir = os.path.join(top_dir, 'sub_dir')
69    read_only_path = (os.path.join(top_dir, 'read_only'))
70    writable_path = (os.path.join(top_dir, 'writable'))
71    executable_path = (os.path.join(top_dir, 'executable'))
72    sub_read_only_path = (os.path.join(sub_dir, 'read_only'))
73    sub_writable_path = (os.path.join(sub_dir, 'writable'))
74    sub_executable_path = (os.path.join(sub_dir, 'executable'))
75    # assert contents as expected
76    self.assertTrue(os.path.isdir(top_dir))
77    self.assertTrue(os.path.isdir(sub_dir))
78    self.assertTrue(os.path.isfile(read_only_path))
79    self.assertTrue(os.path.isfile(writable_path))
80    self.assertTrue(os.path.isfile(executable_path))
81    self.assertTrue(os.path.isfile(sub_read_only_path))
82    self.assertTrue(os.path.isfile(sub_writable_path))
83    self.assertTrue(os.path.isfile(sub_executable_path))
84
85    # assert permissions as expected
86    self.assertTrue(
87        stat.S_IRUSR & stat.S_IMODE(os.stat(read_only_path).st_mode))
88    self.assertTrue(
89        stat.S_IRUSR & stat.S_IMODE(os.stat(sub_read_only_path).st_mode))
90    self.assertTrue(
91        stat.S_IRUSR & stat.S_IMODE(os.stat(writable_path).st_mode))
92    self.assertTrue(
93        stat.S_IWUSR & stat.S_IMODE(os.stat(writable_path).st_mode))
94    self.assertTrue(
95        stat.S_IRUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode))
96    self.assertTrue(
97        stat.S_IWUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode))
98    if not sys.platform.startswith('win'):
99      self.assertEqual(
100          stat.S_IRWXU,
101          stat.S_IRWXU & stat.S_IMODE(os.stat(executable_path).st_mode))
102      self.assertEqual(
103          stat.S_IRWXU,
104          stat.S_IRWXU & stat.S_IMODE(os.stat(sub_executable_path).st_mode))
105
106  def CreateZipArchiveFromDir(self, dir_path):
107    try:
108      base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
109      archive_path = shutil.make_archive(base_path, 'zip', dir_path)
110      self.assertTrue(os.path.exists(archive_path))
111      self.assertTrue(zipfile.is_zipfile(archive_path))
112    except:
113      if os.path.isfile(archive_path):
114        os.remove(archive_path)
115      raise
116    return archive_path
117
118  def testRemoveDirWithSubDir(self):
119    dependency_manager_util.RemoveDir(self.tmp_dir)
120
121    self.assertFalse(os.path.exists(self.tmp_dir))
122    self.assertFalse(os.path.exists(self.sub_dir))
123    self.assertFalse(os.path.exists(self.read_only_path))
124    self.assertFalse(os.path.exists(self.writable_path))
125    self.assertFalse(os.path.isfile(self.executable_path))
126    self.assertFalse(os.path.exists(self.sub_read_only_path))
127    self.assertFalse(os.path.exists(self.sub_writable_path))
128    self.assertFalse(os.path.isfile(self.sub_executable_path))
129
130  def testUnzipFile(self):
131    self.AssertExpectedDirFiles(self.tmp_dir)
132    unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
133    dependency_manager_util.UnzipArchive(self.archive_path, unzip_path)
134    self.AssertExpectedDirFiles(unzip_path)
135    self.AssertExpectedDirFiles(self.tmp_dir)
136    dependency_manager_util.RemoveDir(unzip_path)
137
138  def testUnzipFileContainingLongPath(self):
139    try:
140      dir_path = self.tmp_dir
141      if sys.platform.startswith('win'):
142        dir_path = u'\\\\?\\' + dir_path
143
144      archive_suffix = ''
145      # 260 is the Windows API path length limit.
146      while len(archive_suffix) < 260:
147        archive_suffix = os.path.join(archive_suffix, 'really')
148      contents_dir_path = os.path.join(dir_path, archive_suffix)
149      os.makedirs(contents_dir_path)
150      filename = os.path.join(contents_dir_path, 'longpath.txt')
151      open(filename, 'a').close()
152
153      base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
154      archive_path = shutil.make_archive(base_path, 'zip', dir_path)
155      self.assertTrue(os.path.exists(archive_path))
156      self.assertTrue(zipfile.is_zipfile(archive_path))
157    except:
158      if os.path.isfile(archive_path):
159        os.remove(archive_path)
160      raise
161
162    unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
163    dependency_manager_util.UnzipArchive(archive_path, unzip_path)
164    dependency_manager_util.RemoveDir(unzip_path)
165
166  def testUnzipFileFailure(self):
167    unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
168    self.assertFalse(os.path.exists(unzip_path))
169    with mock.patch(
170        'dependency_manager.dependency_manager_util.zipfile.ZipFile.extractall'  # pylint: disable=line-too-long
171        ) as zipfile_mock:
172      zipfile_mock.side_effect = IOError
173      self.assertRaises(
174          IOError, dependency_manager_util.UnzipArchive, self.archive_path,
175          unzip_path)
176    self.AssertExpectedDirFiles(self.tmp_dir)
177    self.assertFalse(os.path.exists(unzip_path))
178
179  def testVerifySafeArchivePasses(self):
180    with zipfile.ZipFile(self.archive_path) as archive:
181      dependency_manager_util.VerifySafeArchive(archive)
182
183  def testVerifySafeArchiveFailsOnRelativePathWithPardir(self):
184    tmp_file = tempfile.NamedTemporaryFile(delete=False)
185    tmp_file_name = tmp_file.name
186    tmp_file.write('Bad file!')
187    tmp_file.close()
188    with zipfile.ZipFile(self.archive_path, 'w') as archive:
189      archive.write(tmp_file_name, '../../foo')
190      self.assertRaises(
191          exceptions.ArchiveError, dependency_manager_util.VerifySafeArchive,
192          archive)
193
194