• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2015 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Helper functions for tracker file functionality."""
16
17import errno
18import hashlib
19import json
20import os
21import re
22
23from boto import config
24from gslib.exception import CommandException
25from gslib.util import CreateDirIfNeeded
26from gslib.util import GetGsutilStateDir
27from gslib.util import ResumableThreshold
28from gslib.util import UTF8
29
30# The maximum length of a file name can vary wildly between different
31# operating systems, so we always ensure that tracker files are less
32# than 100 characters in order to avoid any such issues.
33MAX_TRACKER_FILE_NAME_LENGTH = 100
34
35
36TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = (
37    'Couldn\'t write tracker file (%s): %s. This can happen if gsutil is '
38    'configured to save tracker files to an unwritable directory)')
39
40
41class TrackerFileType(object):
42  UPLOAD = 'upload'
43  DOWNLOAD = 'download'
44  DOWNLOAD_COMPONENT = 'download_component'
45  PARALLEL_UPLOAD = 'parallel_upload'
46  SLICED_DOWNLOAD = 'sliced_download'
47  REWRITE = 'rewrite'
48
49
50def _HashFilename(filename):
51  """Apply a hash function (SHA1) to shorten the passed file name.
52
53  The spec for the hashed file name is as follows:
54
55      TRACKER_<hash>_<trailing>
56
57  where hash is a SHA1 hash on the original file name and trailing is
58  the last 16 chars from the original file name. Max file name lengths
59  vary by operating system so the goal of this function is to ensure
60  the hashed version takes fewer than 100 characters.
61
62  Args:
63    filename: file name to be hashed.
64
65  Returns:
66    shorter, hashed version of passed file name
67  """
68  if isinstance(filename, unicode):
69    filename = filename.encode(UTF8)
70  else:
71    filename = unicode(filename, UTF8).encode(UTF8)
72  m = hashlib.sha1(filename)
73  return 'TRACKER_' + m.hexdigest() + '.' + filename[-16:]
74
75
76def CreateTrackerDirIfNeeded():
77  """Looks up or creates the gsutil tracker file directory.
78
79  This is the configured directory where gsutil keeps its resumable transfer
80  tracker files. This function creates it if it doesn't already exist.
81
82  Returns:
83    The pathname to the tracker directory.
84  """
85  tracker_dir = config.get(
86      'GSUtil', 'resumable_tracker_dir',
87      os.path.join(GetGsutilStateDir(), 'tracker-files'))
88  CreateDirIfNeeded(tracker_dir)
89  return tracker_dir
90
91
92def GetRewriteTrackerFilePath(src_bucket_name, src_obj_name, dst_bucket_name,
93                              dst_obj_name, api_selector):
94  """Gets the tracker file name described by the arguments.
95
96  Args:
97    src_bucket_name: Source bucket (string).
98    src_obj_name: Source object (string).
99    dst_bucket_name: Destination bucket (string).
100    dst_obj_name: Destination object (string)
101    api_selector: API to use for this operation.
102
103  Returns:
104    File path to tracker file.
105  """
106  # Encode the src and dest bucket and object names into the tracker file
107  # name.
108  res_tracker_file_name = (
109      re.sub('[/\\\\]', '_', 'rewrite__%s__%s__%s__%s__%s.token' %
110             (src_bucket_name, src_obj_name, dst_bucket_name,
111              dst_obj_name, api_selector)))
112
113  return _HashAndReturnPath(res_tracker_file_name, TrackerFileType.REWRITE)
114
115
116def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None,
117                       component_num=None):
118  """Gets the tracker file name described by the arguments.
119
120  Args:
121    dst_url: Destination URL for tracker file.
122    tracker_file_type: TrackerFileType for this operation.
123    api_selector: API to use for this operation.
124    src_url: Source URL for the source file name for parallel uploads.
125    component_num: Component number if this is a download component, else None.
126
127  Returns:
128    File path to tracker file.
129  """
130  if tracker_file_type == TrackerFileType.UPLOAD:
131    # Encode the dest bucket and object name into the tracker file name.
132    res_tracker_file_name = (
133        re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s__%s.url' %
134               (dst_url.bucket_name, dst_url.object_name, api_selector)))
135  elif tracker_file_type == TrackerFileType.DOWNLOAD:
136    # Encode the fully-qualified dest file name into the tracker file name.
137    res_tracker_file_name = (
138        re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' %
139               (os.path.realpath(dst_url.object_name), api_selector)))
140  elif tracker_file_type == TrackerFileType.DOWNLOAD_COMPONENT:
141    # Encode the fully-qualified dest file name and the component number
142    # into the tracker file name.
143    res_tracker_file_name = (
144        re.sub('[/\\\\]', '_', 'resumable_download__%s__%s__%d.etag' %
145               (os.path.realpath(dst_url.object_name), api_selector,
146                component_num)))
147  elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
148    # Encode the dest bucket and object names as well as the source file name
149    # into the tracker file name.
150    res_tracker_file_name = (
151        re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' %
152               (dst_url.bucket_name, dst_url.object_name,
153                src_url, api_selector)))
154  elif tracker_file_type == TrackerFileType.SLICED_DOWNLOAD:
155    # Encode the fully-qualified dest file name into the tracker file name.
156    res_tracker_file_name = (
157        re.sub('[/\\\\]', '_', 'sliced_download__%s__%s.etag' %
158               (os.path.realpath(dst_url.object_name), api_selector)))
159  elif tracker_file_type == TrackerFileType.REWRITE:
160    # Should use GetRewriteTrackerFilePath instead.
161    raise NotImplementedError()
162
163  return _HashAndReturnPath(res_tracker_file_name, tracker_file_type)
164
165
166def DeleteDownloadTrackerFiles(dst_url, api_selector):
167  """Deletes all tracker files corresponding to an object download.
168
169  Args:
170    dst_url: StorageUrl describing the destination file.
171    api_selector: The Cloud API implementation used.
172  """
173  # Delete non-sliced download tracker file.
174  DeleteTrackerFile(GetTrackerFilePath(dst_url, TrackerFileType.DOWNLOAD,
175                                       api_selector))
176
177  # Delete all sliced download tracker files.
178  tracker_files = GetSlicedDownloadTrackerFilePaths(dst_url, api_selector)
179  for tracker_file in tracker_files:
180    DeleteTrackerFile(tracker_file)
181
182
183def GetSlicedDownloadTrackerFilePaths(dst_url, api_selector,
184                                      num_components=None):
185  """Gets a list of sliced download tracker file paths.
186
187  The list consists of the parent tracker file path in index 0, and then
188  any existing component tracker files in [1:].
189
190  Args:
191    dst_url: Destination URL for tracker file.
192    api_selector: API to use for this operation.
193    num_components: The number of component tracker files, if already known.
194                    If not known, the number will be retrieved from the parent
195                    tracker file on disk.
196  Returns:
197    File path to tracker file.
198  """
199  parallel_tracker_file_path = GetTrackerFilePath(
200      dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector)
201  tracker_file_paths = [parallel_tracker_file_path]
202
203  # If we don't know the number of components, check the tracker file.
204  if num_components is None:
205    tracker_file = None
206    try:
207      tracker_file = open(parallel_tracker_file_path, 'r')
208      num_components = json.load(tracker_file)['num_components']
209    except (IOError, ValueError):
210      return tracker_file_paths
211    finally:
212      if tracker_file:
213        tracker_file.close()
214
215  for i in range(num_components):
216    tracker_file_paths.append(GetTrackerFilePath(
217        dst_url, TrackerFileType.DOWNLOAD_COMPONENT, api_selector,
218        component_num=i))
219
220  return tracker_file_paths
221
222
223def _HashAndReturnPath(res_tracker_file_name, tracker_file_type):
224  """Hashes and returns a tracker file path.
225
226  Args:
227    res_tracker_file_name: The tracker file name prior to it being hashed.
228    tracker_file_type: The TrackerFileType of res_tracker_file_name.
229
230  Returns:
231    Final (hashed) tracker file path.
232  """
233  resumable_tracker_dir = CreateTrackerDirIfNeeded()
234  hashed_tracker_file_name = _HashFilename(res_tracker_file_name)
235  tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(),
236                                 hashed_tracker_file_name)
237  tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
238                                  tracker_file_name)
239  assert len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH
240  return tracker_file_path
241
242
243def DeleteTrackerFile(tracker_file_name):
244  if tracker_file_name and os.path.exists(tracker_file_name):
245    os.unlink(tracker_file_name)
246
247
248def HashRewriteParameters(
249    src_obj_metadata, dst_obj_metadata, projection, src_generation=None,
250    gen_match=None, meta_gen_match=None, canned_acl=None, fields=None,
251    max_bytes_per_call=None):
252  """Creates an MD5 hex digest of the parameters for a rewrite call.
253
254  Resuming rewrites requires that the input parameters are identical. Thus,
255  the rewrite tracker file needs to represent the input parameters. For
256  easy comparison, hash the input values. If a user does a performs a
257  same-source/same-destination rewrite via a different command (for example,
258  with a changed ACL), the hashes will not match and we will restart the
259  rewrite from the beginning.
260
261  Args:
262    src_obj_metadata: apitools Object describing source object. Must include
263      bucket, name, and etag.
264    dst_obj_metadata: apitools Object describing destination object. Must
265      include bucket and object name
266    projection: Projection used for the API call.
267    src_generation: Optional source generation.
268    gen_match: Optional generation precondition.
269    meta_gen_match: Optional metageneration precondition.
270    canned_acl: Optional canned ACL string.
271    fields: Optional fields to include in response.
272    max_bytes_per_call: Optional maximum bytes rewritten per call.
273
274  Returns:
275    MD5 hex digest Hash of the input parameters, or None if required parameters
276    are missing.
277  """
278  if (not src_obj_metadata or
279      not src_obj_metadata.bucket or
280      not src_obj_metadata.name or
281      not src_obj_metadata.etag or
282      not dst_obj_metadata or
283      not dst_obj_metadata.bucket or
284      not dst_obj_metadata.name or
285      not projection):
286    return
287  md5_hash = hashlib.md5()
288  for input_param in (
289      src_obj_metadata, dst_obj_metadata, projection, src_generation,
290      gen_match, meta_gen_match, canned_acl, fields, max_bytes_per_call):
291    md5_hash.update(str(input_param))
292  return md5_hash.hexdigest()
293
294
295def ReadRewriteTrackerFile(tracker_file_name, rewrite_params_hash):
296  """Attempts to read a rewrite tracker file.
297
298  Args:
299    tracker_file_name: Tracker file path string.
300    rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed
301        by HashRewriteParameters.
302
303  Returns:
304    String rewrite_token for resuming rewrite requests if a matching tracker
305    file exists, None otherwise (which will result in starting a new rewrite).
306  """
307  # Check to see if we already have a matching tracker file.
308  tracker_file = None
309  if not rewrite_params_hash:
310    return
311  try:
312    tracker_file = open(tracker_file_name, 'r')
313    existing_hash = tracker_file.readline().rstrip('\n')
314    if existing_hash == rewrite_params_hash:
315      # Next line is the rewrite token.
316      return tracker_file.readline().rstrip('\n')
317  except IOError as e:
318    # Ignore non-existent file (happens first time a rewrite is attempted.
319    if e.errno != errno.ENOENT:
320      print('Couldn\'t read Copy tracker file (%s): %s. Restarting copy '
321            'from scratch.' %
322            (tracker_file_name, e.strerror))
323  finally:
324    if tracker_file:
325      tracker_file.close()
326
327
328def WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash,
329                            rewrite_token):
330  """Writes a rewrite tracker file.
331
332  Args:
333    tracker_file_name: Tracker file path string.
334    rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed
335        by HashRewriteParameters.
336    rewrite_token: Rewrite token string returned by the service.
337  """
338  _WriteTrackerFile(tracker_file_name, '%s\n%s\n' % (rewrite_params_hash,
339                                                     rewrite_token))
340
341
342def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
343                                    api_selector, start_byte,
344                                    existing_file_size, component_num=None):
345  """Checks for a download tracker file and creates one if it does not exist.
346
347  The methodology for determining the download start point differs between
348  normal and sliced downloads. For normal downloads, the existing bytes in
349  the file are presumed to be correct and have been previously downloaded from
350  the server (if a tracker file exists). In this case, the existing file size
351  is used to determine the download start point. For sliced downloads, the
352  number of bytes previously retrieved from the server cannot be determined
353  from the existing file size, and so the number of bytes known to have been
354  previously downloaded is retrieved from the tracker file.
355
356  Args:
357    src_obj_metadata: Metadata for the source object. Must include etag and
358                      generation.
359    dst_url: Destination URL for tracker file.
360    logger: For outputting log messages.
361    api_selector: API to use for this operation.
362    start_byte: The start byte of the byte range for this download.
363    existing_file_size: Size of existing file for this download on disk.
364    component_num: The component number, if this is a component of a parallel
365                   download, else None.
366
367  Returns:
368    tracker_file_name: The name of the tracker file, if one was used.
369    download_start_byte: The first byte that still needs to be downloaded.
370  """
371  assert src_obj_metadata.etag
372
373  tracker_file_name = None
374  if src_obj_metadata.size < ResumableThreshold():
375    # Don't create a tracker file for a small downloads; cross-process resumes
376    # won't work, but restarting a small download is inexpensive.
377    return tracker_file_name, start_byte
378
379  download_name = dst_url.object_name
380  if component_num is None:
381    tracker_file_type = TrackerFileType.DOWNLOAD
382  else:
383    tracker_file_type = TrackerFileType.DOWNLOAD_COMPONENT
384    download_name += ' component %d' % component_num
385
386  tracker_file_name = GetTrackerFilePath(dst_url, tracker_file_type,
387                                         api_selector,
388                                         component_num=component_num)
389  tracker_file = None
390  # Check to see if we already have a matching tracker file.
391  try:
392    tracker_file = open(tracker_file_name, 'r')
393    if tracker_file_type is TrackerFileType.DOWNLOAD:
394      etag_value = tracker_file.readline().rstrip('\n')
395      if etag_value == src_obj_metadata.etag:
396        return tracker_file_name, existing_file_size
397    elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
398      component_data = json.loads(tracker_file.read())
399      if (component_data['etag'] == src_obj_metadata.etag and
400          component_data['generation'] == src_obj_metadata.generation):
401        return tracker_file_name, component_data['download_start_byte']
402
403    logger.warn('Tracker file doesn\'t match for download of %s. Restarting '
404                'download from scratch.' % download_name)
405
406  except (IOError, ValueError) as e:
407    # Ignore non-existent file (happens first time a download
408    # is attempted on an object), but warn user for other errors.
409    if isinstance(e, ValueError) or e.errno != errno.ENOENT:
410      logger.warn('Couldn\'t read download tracker file (%s): %s. Restarting '
411                  'download from scratch.' % (tracker_file_name, str(e)))
412  finally:
413    if tracker_file:
414      tracker_file.close()
415
416  # There wasn't a matching tracker file, so create one and then start the
417  # download from scratch.
418  if tracker_file_type is TrackerFileType.DOWNLOAD:
419    _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag)
420  elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
421    WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
422                                      start_byte)
423  return tracker_file_name, start_byte
424
425
426def WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
427                                      current_file_pos):
428  """Updates or creates a download component tracker file on disk.
429
430  Args:
431    tracker_file_name: The name of the tracker file.
432    src_obj_metadata: Metadata for the source object. Must include etag.
433    current_file_pos: The current position in the file.
434  """
435  component_data = {'etag': src_obj_metadata.etag,
436                    'generation': src_obj_metadata.generation,
437                    'download_start_byte': current_file_pos}
438
439  _WriteTrackerFile(tracker_file_name, json.dumps(component_data))
440
441
442def _WriteTrackerFile(tracker_file_name, data):
443  """Creates a tracker file, storing the input data."""
444  try:
445    with os.fdopen(os.open(tracker_file_name,
446                           os.O_WRONLY | os.O_CREAT, 0600), 'w') as tf:
447      tf.write(data)
448    return False
449  except (IOError, OSError) as e:
450    raise RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
451
452
453def RaiseUnwritableTrackerFileException(tracker_file_name, error_str):
454  """Raises an exception when unable to write the tracker file."""
455  raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT %
456                         (tracker_file_name, error_str))
457