• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2011 Google Inc. All Rights Reserved.
3# Copyright 2011, Nexenta Systems Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Helper functions for copy functionality."""
17
18from __future__ import absolute_import
19
20import base64
21from collections import namedtuple
22import csv
23import datetime
24import errno
25import gzip
26from hashlib import md5
27import json
28import logging
29import mimetypes
30from operator import attrgetter
31import os
32import pickle
33import random
34import re
35import shutil
36import stat
37import subprocess
38import tempfile
39import textwrap
40import time
41import traceback
42
43from boto import config
44import crcmod
45
46import gslib
47from gslib.cloud_api import ArgumentException
48from gslib.cloud_api import CloudApi
49from gslib.cloud_api import NotFoundException
50from gslib.cloud_api import PreconditionException
51from gslib.cloud_api import Preconditions
52from gslib.cloud_api import ResumableDownloadException
53from gslib.cloud_api import ResumableUploadAbortException
54from gslib.cloud_api import ResumableUploadException
55from gslib.cloud_api import ResumableUploadStartOverException
56from gslib.cloud_api_helper import GetDownloadSerializationData
57from gslib.commands.compose import MAX_COMPOSE_ARITY
58from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
59from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
60from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
61from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
62from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
63from gslib.cs_api_map import ApiSelector
64from gslib.daisy_chain_wrapper import DaisyChainWrapper
65from gslib.exception import CommandException
66from gslib.exception import HashMismatchException
67from gslib.file_part import FilePart
68from gslib.hashing_helper import Base64EncodeHash
69from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
70from gslib.hashing_helper import CalculateHashesFromContents
71from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
72from gslib.hashing_helper import CHECK_HASH_NEVER
73from gslib.hashing_helper import ConcatCrc32c
74from gslib.hashing_helper import GetDownloadHashAlgs
75from gslib.hashing_helper import GetUploadHashAlgs
76from gslib.hashing_helper import HashingFileUploadWrapper
77from gslib.parallelism_framework_util import AtomicDict
78from gslib.progress_callback import ConstructAnnounceText
79from gslib.progress_callback import FileProgressCallbackHandler
80from gslib.progress_callback import ProgressCallbackWithBackoff
81from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
82from gslib.storage_url import ContainsWildcard
83from gslib.storage_url import StorageUrlFromString
84from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
85from gslib.tracker_file import DeleteDownloadTrackerFiles
86from gslib.tracker_file import DeleteTrackerFile
87from gslib.tracker_file import GetTrackerFilePath
88from gslib.tracker_file import RaiseUnwritableTrackerFileException
89from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
90from gslib.tracker_file import TrackerFileType
91from gslib.tracker_file import WriteDownloadComponentTrackerFile
92from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
93from gslib.translation_helper import CopyObjectMetadata
94from gslib.translation_helper import DEFAULT_CONTENT_TYPE
95from gslib.translation_helper import GenerationFromUrlAndString
96from gslib.translation_helper import ObjectMetadataFromHeaders
97from gslib.translation_helper import PreconditionsFromHeaders
98from gslib.translation_helper import S3MarkerAclFromObjectMetadata
99from gslib.util import CheckFreeSpace
100from gslib.util import CheckMultiprocessingAvailableAndInit
101from gslib.util import CreateLock
102from gslib.util import DEFAULT_FILE_BUFFER_SIZE
103from gslib.util import DivideAndCeil
104from gslib.util import GetCloudApiInstance
105from gslib.util import GetFileSize
106from gslib.util import GetJsonResumableChunkSize
107from gslib.util import GetMaxRetryDelay
108from gslib.util import GetNumRetries
109from gslib.util import GetStreamFromFileUrl
110from gslib.util import HumanReadableToBytes
111from gslib.util import IS_WINDOWS
112from gslib.util import IsCloudSubdirPlaceholder
113from gslib.util import MakeHumanReadable
114from gslib.util import MIN_SIZE_COMPUTE_LOGGING
115from gslib.util import ResumableThreshold
116from gslib.util import TEN_MIB
117from gslib.util import UsingCrcmodExtension
118from gslib.util import UTF8
119from gslib.wildcard_iterator import CreateWildcardIterator
120
121# pylint: disable=g-import-not-at-top
122if IS_WINDOWS:
123  import msvcrt
124
125# Declare copy_helper_opts as a global because namedtuple isn't aware of
126# assigning to a class member (which breaks pickling done by multiprocessing).
127# For details see
128# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
129# pylint: disable=global-at-module-level
130global global_copy_helper_opts
131
132# In-memory map of local files that are currently opened for write. Used to
133# ensure that if we write to the same file twice (say, for example, because the
134# user specified two identical source URLs), the writes occur serially.
135global open_files_map, open_files_lock
136open_files_map = (
137    AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
138    else AtomicDict(manager=gslib.util.manager))
139
140# We don't allow multiple processes on Windows, so using a process-safe lock
141# would be unnecessary.
142open_files_lock = CreateLock()
143
144# For debugging purposes; if True, files and objects that fail hash validation
145# will be saved with the below suffix appended.
146_RENAME_ON_HASH_MISMATCH = False
147_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'
148
149PARALLEL_UPLOAD_TEMP_NAMESPACE = (
150    u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
151
152PARALLEL_UPLOAD_STATIC_SALT = u"""
153PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
154The theory is that no user will have prepended this to the front of
155one of their object names and then done an MD5 hash of the name, and
156then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
157name. Note that there will be no problems with object name length since we
158hash the original name.
159"""
160
161# When uploading a file, get the following fields in the response for
162# filling in command output and manifests.
163UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size']
164
165# This tuple is used only to encapsulate the arguments needed for
166# command.Apply() in the parallel composite upload case.
167# Note that content_type is used instead of a full apitools Object() because
168# apitools objects are not picklable.
169# filename: String name of file.
170# file_start: start byte of file (may be in the middle of a file for partitioned
171#             files).
172# file_length: length of upload (may not be the entire length of a file for
173#              partitioned files).
174# src_url: FileUrl describing the source file.
175# dst_url: CloudUrl describing the destination component file.
176# canned_acl: canned_acl to apply to the uploaded file/component.
177# content_type: content-type for final object, used for setting content-type
178#               of components and final object.
179# tracker_file: tracker file for this component.
180# tracker_file_lock: tracker file lock for tracker file(s).
181PerformParallelUploadFileToObjectArgs = namedtuple(
182    'PerformParallelUploadFileToObjectArgs',
183    'filename file_start file_length src_url dst_url canned_acl '
184    'content_type tracker_file tracker_file_lock')
185
186PerformSlicedDownloadObjectToFileArgs = namedtuple(
187    'PerformSlicedDownloadObjectToFileArgs',
188    'component_num src_url src_obj_metadata dst_url download_file_name '
189    'start_byte end_byte')
190
191PerformSlicedDownloadReturnValues = namedtuple(
192    'PerformSlicedDownloadReturnValues',
193    'component_num crc32c bytes_transferred server_encoding')
194
195ObjectFromTracker = namedtuple('ObjectFromTracker',
196                               'object_name generation')
197
198# TODO: Refactor this file to be less cumbersome. In particular, some of the
199# different paths (e.g., uploading a file to an object vs. downloading an
200# object to a file) could be split into separate files.
201
202# Chunk size to use while zipping/unzipping gzip files.
203GZIP_CHUNK_SIZE = 8192
204
205# Number of bytes to wait before updating a sliced download component tracker
206# file.
207TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB
208
209PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
210
211# S3 requires special Multipart upload logic (that we currently don't implement)
212# for files > 5GiB in size.
213S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
214
215# TODO: Create a multiprocessing framework value allocator, then use it instead
216# of a dict.
217global suggested_sliced_transfers, suggested_sliced_transfers_lock
218suggested_sliced_transfers = (
219    AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
220    else AtomicDict(manager=gslib.util.manager))
221suggested_sliced_transfers_lock = CreateLock()
222
223
224class FileConcurrencySkipError(Exception):
225  """Raised when skipping a file due to a concurrent, duplicate copy."""
226
227
228def _RmExceptionHandler(cls, e):
229  """Simple exception handler to allow post-completion status."""
230  cls.logger.error(str(e))
231
232
233def _ParallelCopyExceptionHandler(cls, e):
234  """Simple exception handler to allow post-completion status."""
235  cls.logger.error(str(e))
236  cls.op_failure_count += 1
237  cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
238                   traceback.format_exc())
239
240
241def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
242  """Function argument to Apply for performing parallel composite uploads.
243
244  Args:
245    cls: Calling Command class.
246    args: PerformParallelUploadFileToObjectArgs tuple describing the target.
247    thread_state: gsutil Cloud API instance to use for the operation.
248
249  Returns:
250    StorageUrl representing a successfully uploaded component.
251  """
252  fp = FilePart(args.filename, args.file_start, args.file_length)
253  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
254  with fp:
255    # We take many precautions with the component names that make collisions
256    # effectively impossible. Specifying preconditions will just allow us to
257    # reach a state in which uploads will always fail on retries.
258    preconditions = None
259
260    # Fill in content type if one was provided.
261    dst_object_metadata = apitools_messages.Object(
262        name=args.dst_url.object_name,
263        bucket=args.dst_url.bucket_name,
264        contentType=args.content_type)
265
266    try:
267      if global_copy_helper_opts.canned_acl:
268        # No canned ACL support in JSON, force XML API to be used for
269        # upload/copy operations.
270        orig_prefer_api = gsutil_api.prefer_api
271        gsutil_api.prefer_api = ApiSelector.XML
272      ret = _UploadFileToObject(args.src_url, fp, args.file_length,
273                                args.dst_url, dst_object_metadata,
274                                preconditions, gsutil_api, cls.logger, cls,
275                                _ParallelCopyExceptionHandler,
276                                gzip_exts=None, allow_splitting=False)
277    finally:
278      if global_copy_helper_opts.canned_acl:
279        gsutil_api.prefer_api = orig_prefer_api
280
281  component = ret[2]
282  _AppendComponentTrackerToParallelUploadTrackerFile(
283      args.tracker_file, component, args.tracker_file_lock)
284  return ret
285
286
287CopyHelperOpts = namedtuple('CopyHelperOpts', [
288    'perform_mv',
289    'no_clobber',
290    'daisy_chain',
291    'read_args_from_stdin',
292    'print_ver',
293    'use_manifest',
294    'preserve_acl',
295    'canned_acl',
296    'skip_unsupported_objects',
297    'test_callback_file'])
298
299
300# pylint: disable=global-variable-undefined
301def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
302                         read_args_from_stdin=False, print_ver=False,
303                         use_manifest=False, preserve_acl=False,
304                         canned_acl=None, skip_unsupported_objects=False,
305                         test_callback_file=None):
306  """Creates CopyHelperOpts for passing options to CopyHelper."""
307  # We create a tuple with union of options needed by CopyHelper and any
308  # copy-related functionality in CpCommand, RsyncCommand, or Command class.
309  global global_copy_helper_opts
310  global_copy_helper_opts = CopyHelperOpts(
311      perform_mv=perform_mv,
312      no_clobber=no_clobber,
313      daisy_chain=daisy_chain,
314      read_args_from_stdin=read_args_from_stdin,
315      print_ver=print_ver,
316      use_manifest=use_manifest,
317      preserve_acl=preserve_acl,
318      canned_acl=canned_acl,
319      skip_unsupported_objects=skip_unsupported_objects,
320      test_callback_file=test_callback_file)
321  return global_copy_helper_opts
322
323
324# pylint: disable=global-variable-undefined
325# pylint: disable=global-variable-not-assigned
326def GetCopyHelperOpts():
327  """Returns namedtuple holding CopyHelper options."""
328  global global_copy_helper_opts
329  return global_copy_helper_opts
330
331
332def _SelectDownloadStrategy(dst_url):
333  """Get download strategy based on the destination object.
334
335  Args:
336    dst_url: Destination StorageUrl.
337
338  Returns:
339    gsutil Cloud API DownloadStrategy.
340  """
341  dst_is_special = False
342  if dst_url.IsFileUrl():
343    # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
344    if dst_url.object_name == os.devnull:
345      dst_is_special = True
346    try:
347      mode = os.stat(dst_url.object_name).st_mode
348      if stat.S_ISCHR(mode):
349        dst_is_special = True
350    except OSError:
351      pass
352
353  if dst_is_special:
354    return CloudApi.DownloadStrategy.ONE_SHOT
355  else:
356    return CloudApi.DownloadStrategy.RESUMABLE
357
358
359def _GetUploadTrackerData(tracker_file_name, logger):
360  """Reads tracker data from an upload tracker file if it exists.
361
362  Args:
363    tracker_file_name: Tracker file name for this upload.
364    logger: for outputting log messages.
365
366  Returns:
367    Serialization data if the tracker file already exists (resume existing
368    upload), None otherwise.
369  """
370  tracker_file = None
371
372  # If we already have a matching tracker file, get the serialization data
373  # so that we can resume the upload.
374  try:
375    tracker_file = open(tracker_file_name, 'r')
376    tracker_data = tracker_file.read()
377    return tracker_data
378  except IOError as e:
379    # Ignore non-existent file (happens first time a upload is attempted on an
380    # object, or when re-starting an upload after a
381    # ResumableUploadStartOverException), but warn user for other errors.
382    if e.errno != errno.ENOENT:
383      logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
384                  'upload from scratch.', tracker_file_name, e.strerror)
385  finally:
386    if tracker_file:
387      tracker_file.close()
388
389
390def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
391                               command_name):
392  """Ensures the destination URL names a container.
393
394  Acceptable containers include directory, bucket, bucket
395  subdir, and non-existent bucket subdir.
396
397  Args:
398    exp_dst_url: Wildcard-expanded destination StorageUrl.
399    have_existing_dst_container: bool indicator of whether exp_dst_url
400      names a container (directory, bucket, or existing bucket subdir).
401    command_name: Name of command making call. May not be the same as the
402        calling class's self.command_name in the case of commands implemented
403        atop other commands (like mv command).
404
405  Raises:
406    CommandException: if the URL being checked does not name a container.
407  """
408  if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
409      (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
410       and not have_existing_dst_container)):
411    raise CommandException('Destination URL must name a directory, bucket, '
412                           'or bucket\nsubdirectory for the multiple '
413                           'source form of the %s command.' % command_name)
414
415
416def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
417                                     have_existing_dest_subdir,
418                                     src_url_names_container,
419                                     recursion_requested):
420  """Checks whether dst_url should be treated as a bucket "sub-directory".
421
422  The decision about whether something constitutes a bucket "sub-directory"
423  depends on whether there are multiple sources in this request and whether
424  there is an existing bucket subdirectory. For example, when running the
425  command:
426    gsutil cp file gs://bucket/abc
427  if there's no existing gs://bucket/abc bucket subdirectory we should copy
428  file to the object gs://bucket/abc. In contrast, if
429  there's an existing gs://bucket/abc bucket subdirectory we should copy
430  file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
431  exists, when running the command:
432    gsutil cp file1 file2 gs://bucket/abc
433  we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
434  Finally, for recursive copies, if the source is a container then we should
435  copy to a container as the target.  For example, when running the command:
436    gsutil cp -r dir1 gs://bucket/dir2
437  we should copy the subtree of dir1 to gs://bucket/dir2.
438
439  Note that we don't disallow naming a bucket "sub-directory" where there's
440  already an object at that URL. For example it's legitimate (albeit
441  confusing) to have an object called gs://bucket/dir and
442  then run the command
443  gsutil cp file1 file2 gs://bucket/dir
444  Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
445  and gs://bucket/dir/file2.
446
447  Args:
448    have_multiple_srcs: Bool indicator of whether this is a multi-source
449        operation.
450    dst_url: StorageUrl to check.
451    have_existing_dest_subdir: bool indicator whether dest is an existing
452      subdirectory.
453    src_url_names_container: bool indicator of whether the source URL
454      is a container.
455    recursion_requested: True if a recursive operation has been requested.
456
457  Returns:
458    bool indicator.
459  """
460  if have_existing_dest_subdir:
461    return True
462  if dst_url.IsCloudUrl():
463    return (have_multiple_srcs or
464            (src_url_names_container and recursion_requested))
465
466
467def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
468                                  have_existing_dest_subdir, dst_url,
469                                  recursion_requested):
470  """Checks that dst_url names a single file/object after wildcard expansion.
471
472  It is possible that an object path might name a bucket sub-directory.
473
474  Args:
475    have_multiple_srcs: Bool indicator of whether this is a multi-source
476        operation.
477    have_existing_dest_subdir: bool indicator whether dest is an existing
478      subdirectory.
479    dst_url: StorageUrl to check.
480    recursion_requested: True if a recursive operation has been requested.
481
482  Returns:
483    bool indicator.
484  """
485  if recursion_requested:
486    return False
487  if dst_url.IsFileUrl():
488    return not dst_url.IsDirectory()
489  else:  # dst_url.IsCloudUrl()
490    return (not have_multiple_srcs and
491            not have_existing_dest_subdir and
492            dst_url.IsObject())
493
494
495def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
496                    have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
497                    recursion_requested):
498  """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.
499
500  Uses context-dependent naming rules that mimic Linux cp and mv behavior.
501
502  Args:
503    src_url: Source StorageUrl to be copied.
504    exp_src_url: Single StorageUrl from wildcard expansion of src_url.
505    src_url_names_container: True if src_url names a container (including the
506        case of a wildcard-named bucket subdir (like gs://bucket/abc,
507        where gs://bucket/abc/* matched some objects).
508    have_multiple_srcs: True if this is a multi-source request. This can be
509        true if src_url wildcard-expanded to multiple URLs or if there were
510        multiple source URLs in the request.
511    exp_dst_url: the expanded StorageUrl requested for the cp destination.
512        Final written path is constructed from this plus a context-dependent
513        variant of src_url.
514    have_existing_dest_subdir: bool indicator whether dest is an existing
515      subdirectory.
516    recursion_requested: True if a recursive operation has been requested.
517
518  Returns:
519    StorageUrl to use for copy.
520
521  Raises:
522    CommandException if destination object name not specified for
523    source and source is a stream.
524  """
525  if _ShouldTreatDstUrlAsSingleton(
526      have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
527      recursion_requested):
528    # We're copying one file or object to one file or object.
529    return exp_dst_url
530
531  if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
532    if have_existing_dest_subdir:
533      raise CommandException('Destination object name needed when '
534                             'source is a stream')
535    return exp_dst_url
536
537  if not recursion_requested and not have_multiple_srcs:
538    # We're copying one file or object to a subdirectory. Append final comp
539    # of exp_src_url to exp_dst_url.
540    src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
541    return StorageUrlFromString('%s%s%s' % (
542        exp_dst_url.url_string.rstrip(exp_dst_url.delim),
543        exp_dst_url.delim, src_final_comp))
544
545  # Else we're copying multiple sources to a directory, bucket, or a bucket
546  # "sub-directory".
547
548  # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
549  # a copy to a directory. (The check for copying to a directory needs
550  # special-case handling so that the command:
551  #   gsutil cp gs://bucket/obj dir
552  # will turn into file://dir/ instead of file://dir -- the latter would cause
553  # the file "dirobj" to be created.)
554  # Note: need to check have_multiple_srcs or src_url.names_container()
555  # because src_url could be a bucket containing a single object, named
556  # as gs://bucket.
557  if ((have_multiple_srcs or src_url_names_container or
558       (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
559      and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
560    exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
561                                                 exp_dst_url.delim))
562
563  # Making naming behavior match how things work with local Linux cp and mv
564  # operations depends on many factors, including whether the destination is a
565  # container, the plurality of the source(s), and whether the mv command is
566  # being used:
567  # 1. For the "mv" command that specifies a non-existent destination subdir,
568  #    renaming should occur at the level of the src subdir, vs appending that
569  #    subdir beneath the dst subdir like is done for copying. For example:
570  #      gsutil rm -r gs://bucket
571  #      gsutil cp -r dir1 gs://bucket
572  #      gsutil cp -r dir2 gs://bucket/subdir1
573  #      gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
574  #    would (if using cp naming behavior) end up with paths like:
575  #      gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
576  #    whereas mv naming behavior should result in:
577  #      gs://bucket/subdir2/dir2/.svn/all-wcprops
578  # 2. Copying from directories, buckets, or bucket subdirs should result in
579  #    objects/files mirroring the source directory hierarchy. For example:
580  #      gsutil cp dir1/dir2 gs://bucket
581  #    should create the object gs://bucket/dir2/file2, assuming dir1/dir2
582  #    contains file2).
583  #    To be consistent with Linux cp behavior, there's one more wrinkle when
584  #    working with subdirs: The resulting object names depend on whether the
585  #    destination subdirectory exists. For example, if gs://bucket/subdir
586  #    exists, the command:
587  #      gsutil cp -r dir1/dir2 gs://bucket/subdir
588  #    should create objects named like gs://bucket/subdir/dir2/a/b/c. In
589  #    contrast, if gs://bucket/subdir does not exist, this same command
590  #    should create objects named like gs://bucket/subdir/a/b/c.
591  # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
592  #    should result in objects/files named by the final source file name
593  #    component. Example:
594  #      gsutil cp dir1/*.txt gs://bucket
595  #    should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
596  #    assuming dir1 contains f1.txt and f2.txt.
597
598  recursive_move_to_new_subdir = False
599  if (global_copy_helper_opts.perform_mv and recursion_requested
600      and src_url_names_container and not have_existing_dest_subdir):
601    # Case 1. Handle naming rules for bucket subdir mv. Here we want to
602    # line up the src_url against its expansion, to find the base to build
603    # the new name. For example, running the command:
604    #   gsutil mv gs://bucket/abcd gs://bucket/xyz
605    # when processing exp_src_url=gs://bucket/abcd/123
606    # exp_src_url_tail should become /123
607    # Note: mv.py code disallows wildcard specification of source URL.
608    recursive_move_to_new_subdir = True
609    exp_src_url_tail = (
610        exp_src_url.url_string[len(src_url.url_string):])
611    dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
612                              exp_src_url_tail.strip('/'))
613
614  elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
615                                    exp_dst_url.IsDirectory()):
616    # Case 2.  Container copy to a destination other than a file.
617    # Build dst_key_name from subpath of exp_src_url past
618    # where src_url ends. For example, for src_url=gs://bucket/ and
619    # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
620    # src_subdir/obj.
621    src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
622    dst_key_name = exp_src_url.versionless_url_string[
623        len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
624    # Handle case where dst_url is a non-existent subdir.
625    if not have_existing_dest_subdir:
626      dst_key_name = dst_key_name.partition(src_url.delim)[-1]
627    # Handle special case where src_url was a directory named with '.' or
628    # './', so that running a command like:
629    #   gsutil cp -r . gs://dest
630    # will produce obj names of the form gs://dest/abc instead of
631    # gs://dest/./abc.
632    if dst_key_name.startswith('.%s' % os.sep):
633      dst_key_name = dst_key_name[2:]
634
635  else:
636    # Case 3.
637    dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]
638
639  if (not recursive_move_to_new_subdir and (
640      exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
641          have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
642          src_url_names_container, recursion_requested))):
643    if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
644        exp_dst_url.delim):
645      dst_key_name = '%s%s%s' % (
646          exp_dst_url.object_name.rstrip(exp_dst_url.delim),
647          exp_dst_url.delim, dst_key_name)
648    else:
649      delim = exp_dst_url.delim if exp_dst_url.object_name else ''
650      dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
651                                 delim, dst_key_name)
652
653  new_exp_dst_url = exp_dst_url.Clone()
654  new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
655                                                     exp_dst_url.delim)
656  return new_exp_dst_url
657
658
659def _CreateDigestsFromDigesters(digesters):
660  digests = {}
661  if digesters:
662    for alg in digesters:
663      digests[alg] = base64.encodestring(
664          digesters[alg].digest()).rstrip('\n')
665  return digests
666
667
668def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
669                                src_obj_metadata):
670  """Creates a base64 CRC32C and/or MD5 digest from file_name.
671
672  Args:
673    logger: For outputting log messages.
674    algs: List of algorithms to compute.
675    file_name: File to digest.
676    final_file_name: Permanent location to be used for the downloaded file
677                     after validation (used for logging).
678    src_obj_metadata: Metadata of source object.
679
680  Returns:
681    Dict of algorithm name : base 64 encoded digest
682  """
683  hash_dict = {}
684  if 'md5' in algs:
685    hash_dict['md5'] = md5()
686  if 'crc32c' in algs:
687    hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
688  with open(file_name, 'rb') as fp:
689    CalculateHashesFromContents(
690        fp, hash_dict, ProgressCallbackWithBackoff(
691            src_obj_metadata.size,
692            FileProgressCallbackHandler(
693                ConstructAnnounceText('Hashing', final_file_name),
694                logger).call))
695  digests = {}
696  for alg_name, digest in hash_dict.iteritems():
697    digests[alg_name] = Base64EncodeHash(digest.hexdigest())
698  return digests
699
700
701def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
702                      dst_obj_metadata):
703  """Validates integrity of two cloud objects copied via daisy-chain.
704
705  Args:
706    logger: for outputting log messages.
707    src_url: CloudUrl for source cloud object.
708    dst_url: CloudUrl for destination cloud object.
709    src_obj_metadata: Cloud Object metadata for object being downloaded from.
710    dst_obj_metadata: Cloud Object metadata for object being uploaded to.
711
712  Raises:
713    CommandException: if cloud digests don't match local digests.
714  """
715  checked_one = False
716  download_hashes = {}
717  upload_hashes = {}
718  if src_obj_metadata.md5Hash:
719    download_hashes['md5'] = src_obj_metadata.md5Hash
720  if src_obj_metadata.crc32c:
721    download_hashes['crc32c'] = src_obj_metadata.crc32c
722  if dst_obj_metadata.md5Hash:
723    upload_hashes['md5'] = dst_obj_metadata.md5Hash
724  if dst_obj_metadata.crc32c:
725    upload_hashes['crc32c'] = dst_obj_metadata.crc32c
726
727  for alg, upload_b64_digest in upload_hashes.iteritems():
728    if alg not in download_hashes:
729      continue
730
731    download_b64_digest = download_hashes[alg]
732    logger.debug(
733        'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
734        dst_url, download_b64_digest, upload_b64_digest)
735    if download_b64_digest != upload_b64_digest:
736      raise HashMismatchException(
737          '%s signature for source object (%s) doesn\'t match '
738          'destination object digest (%s). Object (%s) will be deleted.' % (
739              alg, download_b64_digest, upload_b64_digest, dst_url))
740    checked_one = True
741  if not checked_one:
742    # One known way this can currently happen is when downloading objects larger
743    # than 5 GiB from S3 (for which the etag is not an MD5).
744    logger.warn(
745        'WARNING: Found no hashes to validate object downloaded from %s and '
746        'uploaded to %s. Integrity cannot be assured without hashes.',
747        src_url, dst_url)
748
749
750def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
751                 is_upload=False):
752  """Validates integrity by comparing cloud digest to local digest.
753
754  Args:
755    logger: for outputting log messages.
756    obj_url: CloudUrl for cloud object.
757    obj_metadata: Cloud Object being downloaded from or uploaded to.
758    file_name: Local file name on disk being downloaded to or uploaded from
759               (used only for logging).
760    digests: Computed Digests for the object.
761    is_upload: If true, comparing for an uploaded object (controls logging).
762
763  Raises:
764    CommandException: if cloud digests don't match local digests.
765  """
766  local_hashes = digests
767  cloud_hashes = {}
768  if obj_metadata.md5Hash:
769    cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
770  if obj_metadata.crc32c:
771    cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')
772
773  checked_one = False
774  for alg in local_hashes:
775    if alg not in cloud_hashes:
776      continue
777
778    local_b64_digest = local_hashes[alg]
779    cloud_b64_digest = cloud_hashes[alg]
780    logger.debug(
781        'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
782        local_b64_digest, cloud_b64_digest)
783    if local_b64_digest != cloud_b64_digest:
784
785      raise HashMismatchException(
786          '%s signature computed for local file (%s) doesn\'t match '
787          'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
788              alg, local_b64_digest, cloud_b64_digest,
789              'Cloud object' if is_upload else 'Local file',
790              obj_url if is_upload else file_name))
791    checked_one = True
792  if not checked_one:
793    if is_upload:
794      logger.warn(
795          'WARNING: Found no hashes to validate object uploaded to %s. '
796          'Integrity cannot be assured without hashes.', obj_url)
797    else:
798    # One known way this can currently happen is when downloading objects larger
799    # than 5 GB from S3 (for which the etag is not an MD5).
800      logger.warn(
801          'WARNING: Found no hashes to validate object downloaded to %s. '
802          'Integrity cannot be assured without hashes.', file_name)
803
804
805def IsNoClobberServerException(e):
806  """Checks to see if the server attempted to clobber a file.
807
808  In this case we specified via a precondition that we didn't want the file
809  clobbered.
810
811  Args:
812    e: The Exception that was generated by a failed copy operation
813
814  Returns:
815    bool indicator - True indicates that the server did attempt to clobber
816        an existing file.
817  """
818  return ((isinstance(e, PreconditionException)) or
819          (isinstance(e, ResumableUploadException) and '412' in e.message))
820
821
822def CheckForDirFileConflict(exp_src_url, dst_url):
823  """Checks whether copying exp_src_url into dst_url is not possible.
824
825     This happens if a directory exists in local file system where a file
826     needs to go or vice versa. In that case we print an error message and
827     exits. Example: if the file "./x" exists and you try to do:
828       gsutil cp gs://mybucket/x/y .
829     the request can't succeed because it requires a directory where
830     the file x exists.
831
832     Note that we don't enforce any corresponding restrictions for buckets,
833     because the flat namespace semantics for buckets doesn't prohibit such
834     cases the way hierarchical file systems do. For example, if a bucket
835     contains an object called gs://bucket/dir and then you run the command:
836       gsutil cp file1 file2 gs://bucket/dir
837     you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
838     gs://bucket/dir/file2.
839
840  Args:
841    exp_src_url: Expanded source StorageUrl.
842    dst_url: Destination StorageUrl.
843
844  Raises:
845    CommandException: if errors encountered.
846  """
847  if dst_url.IsCloudUrl():
848    # The problem can only happen for file destination URLs.
849    return
850  dst_path = dst_url.object_name
851  final_dir = os.path.dirname(dst_path)
852  if os.path.isfile(final_dir):
853    raise CommandException('Cannot retrieve %s because a file exists '
854                           'where a directory needs to be created (%s).' %
855                           (exp_src_url.url_string, final_dir))
856  if os.path.isdir(dst_path):
857    raise CommandException('Cannot retrieve %s because a directory exists '
858                           '(%s) where the file needs to be created.' %
859                           (exp_src_url.url_string, dst_path))
860
861
862def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
863                   dst_bucket_url, random_prefix, tracker_file,
864                   tracker_file_lock):
865  """Partitions a file into FilePart objects to be uploaded and later composed.
866
867  These objects, when composed, will match the original file. This entails
868  splitting the file into parts, naming and forming a destination URL for each
869  part, and also providing the PerformParallelUploadFileToObjectArgs
870  corresponding to each part.
871
872  Args:
873    fp: The file object to be partitioned.
874    file_size: The size of fp, in bytes.
875    src_url: Source FileUrl from the original command.
876    content_type: content type for the component and final objects.
877    canned_acl: The user-provided canned_acl, if applicable.
878    dst_bucket_url: CloudUrl for the destination bucket
879    random_prefix: The randomly-generated prefix used to prevent collisions
880                   among the temporary component names.
881    tracker_file: The path to the parallel composite upload tracker file.
882    tracker_file_lock: The lock protecting access to the tracker file.
883
884  Returns:
885    dst_args: The destination URIs for the temporary component objects.
886  """
887  parallel_composite_upload_component_size = HumanReadableToBytes(
888      config.get('GSUtil', 'parallel_composite_upload_component_size',
889                 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
890  (num_components, component_size) = _GetPartitionInfo(
891      file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
892
893  dst_args = {}  # Arguments to create commands and pass to subprocesses.
894  file_names = []  # Used for the 2-step process of forming dst_args.
895  for i in range(num_components):
896    # "Salt" the object name with something a user is very unlikely to have
897    # used in an object name, then hash the extended name to make sure
898    # we don't run into problems with name length. Using a deterministic
899    # naming scheme for the temporary components allows users to take
900    # advantage of resumable uploads for each component.
901    encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
902    content_md5 = md5()
903    content_md5.update(encoded_name)
904    digest = content_md5.hexdigest()
905    temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
906                      digest + '_' + str(i))
907    tmp_dst_url = dst_bucket_url.Clone()
908    tmp_dst_url.object_name = temp_file_name
909
910    if i < (num_components - 1):
911      # Every component except possibly the last is the same size.
912      file_part_length = component_size
913    else:
914      # The last component just gets all of the remaining bytes.
915      file_part_length = (file_size - ((num_components -1) * component_size))
916    offset = i * component_size
917    func_args = PerformParallelUploadFileToObjectArgs(
918        fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
919        content_type, tracker_file, tracker_file_lock)
920    file_names.append(temp_file_name)
921    dst_args[temp_file_name] = func_args
922
923  return dst_args
924
925
926def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
927                               canned_acl, file_size, preconditions, gsutil_api,
928                               command_obj, copy_exception_handler):
929  """Uploads a local file to a cloud object using parallel composite upload.
930
931  The file is partitioned into parts, and then the parts are uploaded in
932  parallel, composed to form the original destination object, and deleted.
933
934  Args:
935    fp: The file object to be uploaded.
936    src_url: FileUrl representing the local file.
937    dst_url: CloudUrl representing the destination file.
938    dst_obj_metadata: apitools Object describing the destination object.
939    canned_acl: The canned acl to apply to the object, if any.
940    file_size: The size of the source file in bytes.
941    preconditions: Cloud API Preconditions for the final object.
942    gsutil_api: gsutil Cloud API instance to use.
943    command_obj: Command object (for calling Apply).
944    copy_exception_handler: Copy exception handler (for use in Apply).
945
946  Returns:
947    Elapsed upload time, uploaded Object with generation, crc32c, and size
948    fields populated.
949  """
950  start_time = time.time()
951  dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
952  api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
953  # Determine which components, if any, have already been successfully
954  # uploaded.
955  tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
956                                    api_selector, src_url)
957  tracker_file_lock = CreateLock()
958  (random_prefix, existing_components) = (
959      _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
960
961  # Create the initial tracker file for the upload.
962  _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
963                                   existing_components, tracker_file_lock)
964
965  # Get the set of all components that should be uploaded.
966  dst_args = _PartitionFile(
967      fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
968      dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)
969
970  (components_to_upload, existing_components, existing_objects_to_delete) = (
971      FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
972                               gsutil_api))
973
974  # In parallel, copy all of the file parts that haven't already been
975  # uploaded to temporary objects.
976  cp_results = command_obj.Apply(
977      _PerformParallelUploadFileToObject, components_to_upload,
978      copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
979      arg_checker=gslib.command.DummyArgChecker,
980      parallel_operations_override=True, should_return_results=True)
981  uploaded_components = []
982  for cp_result in cp_results:
983    uploaded_components.append(cp_result[2])
984  components = uploaded_components + existing_components
985
986  if len(components) == len(dst_args):
987    # Only try to compose if all of the components were uploaded successfully.
988
989    def _GetComponentNumber(component):
990      return int(component.object_name[component.object_name.rfind('_')+1:])
991    # Sort the components so that they will be composed in the correct order.
992    components = sorted(components, key=_GetComponentNumber)
993
994    request_components = []
995    for component_url in components:
996      src_obj_metadata = (
997          apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
998              name=component_url.object_name))
999      if component_url.HasGeneration():
1000        src_obj_metadata.generation = long(component_url.generation)
1001      request_components.append(src_obj_metadata)
1002
1003    composed_object = gsutil_api.ComposeObject(
1004        request_components, dst_obj_metadata, preconditions=preconditions,
1005        provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
1006
1007    try:
1008      # Make sure only to delete things that we know were successfully
1009      # uploaded (as opposed to all of the objects that we attempted to
1010      # create) so that we don't delete any preexisting objects, except for
1011      # those that were uploaded by a previous, failed run and have since
1012      # changed (but still have an old generation lying around).
1013      objects_to_delete = components + existing_objects_to_delete
1014      command_obj.Apply(
1015          _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
1016          arg_checker=gslib.command.DummyArgChecker,
1017          parallel_operations_override=True)
1018    except Exception:  # pylint: disable=broad-except
1019      # If some of the delete calls fail, don't cause the whole command to
1020      # fail. The copy was successful iff the compose call succeeded, so
1021      # reduce this to a warning.
1022      logging.warning(
1023          'Failed to delete some of the following temporary objects:\n' +
1024          '\n'.join(dst_args.keys()))
1025    finally:
1026      with tracker_file_lock:
1027        if os.path.exists(tracker_file):
1028          os.unlink(tracker_file)
1029  else:
1030    # Some of the components failed to upload. In this case, we want to exit
1031    # without deleting the objects.
1032    raise CommandException(
1033        'Some temporary components were not uploaded successfully. '
1034        'Please retry this upload.')
1035
1036  elapsed_time = time.time() - start_time
1037  return elapsed_time, composed_object
1038
1039
1040def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
1041                                     file_size, canned_acl=None):
1042  """Determines whether parallel composite upload strategy should be used.
1043
1044  Args:
1045    logger: for outputting log messages.
1046    allow_splitting: If false, then this function returns false.
1047    src_url: FileUrl corresponding to a local file.
1048    dst_url: CloudUrl corresponding to destination cloud object.
1049    file_size: The size of the source file, in bytes.
1050    canned_acl: Canned ACL to apply to destination object, if any.
1051
1052  Returns:
1053    True iff a parallel upload should be performed on the source file.
1054  """
1055  global suggested_slice_transfers, suggested_sliced_transfers_lock
1056  parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
1057      'GSUtil', 'parallel_composite_upload_threshold',
1058      DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
1059
1060  all_factors_but_size = (
1061      allow_splitting  # Don't split the pieces multiple times.
1062      and not src_url.IsStream()  # We can't partition streams.
1063      and dst_url.scheme == 'gs'  # Compose is only for gs.
1064      and not canned_acl)  # TODO: Implement canned ACL support for compose.
1065
1066  # Since parallel composite uploads are disabled by default, make user aware of
1067  # them.
1068  # TODO: Once compiled crcmod is being distributed by major Linux distributions
1069  # remove this check.
1070  if (all_factors_but_size and parallel_composite_upload_threshold == 0
1071      and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
1072    with suggested_sliced_transfers_lock:
1073      if not suggested_sliced_transfers.get('suggested'):
1074        logger.info('\n'.join(textwrap.wrap(
1075            '==> NOTE: You are uploading one or more large file(s), which '
1076            'would run significantly faster if you enable parallel composite '
1077            'uploads. This feature can be enabled by editing the '
1078            '"parallel_composite_upload_threshold" value in your .boto '
1079            'configuration file. However, note that if you do this you and any '
1080            'users that download such composite files will need to have a '
1081            'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
1082        suggested_sliced_transfers['suggested'] = True
1083
1084  return (all_factors_but_size
1085          and parallel_composite_upload_threshold > 0
1086          and file_size >= parallel_composite_upload_threshold)
1087
1088
1089def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
1090                         treat_nonexistent_object_as_subdir=False):
1091  """Expands wildcard if present in url_str.
1092
1093  Args:
1094    url_str: String representation of requested url.
1095    gsutil_api: gsutil Cloud API instance to use.
1096    debug: debug level to use (for iterators).
1097    project_id: project ID to use (for iterators).
1098    treat_nonexistent_object_as_subdir: indicates if should treat a non-existent
1099                                        object as a subdir.
1100
1101  Returns:
1102      (exp_url, have_existing_dst_container)
1103      where exp_url is a StorageUrl
1104      and have_existing_dst_container is a bool indicating whether
1105      exp_url names an existing directory, bucket, or bucket subdirectory.
1106      In the case where we match a subdirectory AND an object, the
1107      object is returned.
1108
1109  Raises:
1110    CommandException: if url_str matched more than 1 URL.
1111  """
1112  # Handle wildcarded url case.
1113  if ContainsWildcard(url_str):
1114    blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
1115                                                debug=debug,
1116                                                project_id=project_id))
1117    if len(blr_expansion) != 1:
1118      raise CommandException('Destination (%s) must match exactly 1 URL' %
1119                             url_str)
1120    blr = blr_expansion[0]
1121    # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
1122    # directories.
1123    return (StorageUrlFromString(blr.url_string), not blr.IsObject())
1124
1125  storage_url = StorageUrlFromString(url_str)
1126
1127  # Handle non-wildcarded URL.
1128  if storage_url.IsFileUrl():
1129    return (storage_url, storage_url.IsDirectory())
1130
1131  # At this point we have a cloud URL.
1132  if storage_url.IsBucket():
1133    return (storage_url, True)
1134
1135  # For object/prefix URLs, there are four cases that indicate the destination
1136  # is a cloud subdirectory; these are always considered to be an existing
1137  # container. Checking each case allows gsutil to provide Unix-like
1138  # destination folder semantics, but requires up to three HTTP calls, noted
1139  # below.
1140
1141  # Case 1: If a placeholder object ending with '/' exists.
1142  if IsCloudSubdirPlaceholder(storage_url):
1143    return (storage_url, True)
1144
1145  # HTTP call to make an eventually consistent check for a matching prefix,
1146  # _$folder$, or empty listing.
1147  expansion_empty = True
1148  list_iterator = gsutil_api.ListObjects(
1149      storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
1150      provider=storage_url.scheme, fields=['prefixes', 'items/name'])
1151  for obj_or_prefix in list_iterator:
1152    # To conserve HTTP calls for the common case, we make a single listing
1153    # that covers prefixes and object names. Listing object names covers the
1154    # _$folder$ case and the nonexistent-object-as-subdir case. However, if
1155    # there are many existing objects for which the target URL is an exact
1156    # prefix, this listing could be paginated and span multiple HTTP calls.
1157    # If this case becomes common, we could heurestically abort the
1158    # listing operation after the first page of results and just query for the
1159    # _$folder$ object directly using GetObjectMetadata.
1160    expansion_empty = False
1161
1162    if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
1163      # Case 2: If there is a matching prefix when listing the destination URL.
1164      return (storage_url, True)
1165    elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
1166          obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
1167      # Case 3: If a placeholder object matching destination + _$folder$
1168      # exists.
1169      return (storage_url, True)
1170
1171  # Case 4: If no objects/prefixes matched, and nonexistent objects should be
1172  # treated as subdirectories.
1173  return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)
1174
1175
1176def FixWindowsNaming(src_url, dst_url):
1177  """Translates Windows pathnames to cloud pathnames.
1178
1179  Rewrites the destination URL built by ConstructDstUrl().
1180
1181  Args:
1182    src_url: Source StorageUrl to be copied.
1183    dst_url: The destination StorageUrl built by ConstructDstUrl().
1184
1185  Returns:
1186    StorageUrl to use for copy.
1187  """
1188  if (src_url.IsFileUrl() and src_url.delim == '\\'
1189      and dst_url.IsCloudUrl()):
1190    trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
1191    dst_url = StorageUrlFromString(trans_url_str)
1192  return dst_url
1193
1194
1195def SrcDstSame(src_url, dst_url):
1196  """Checks if src_url and dst_url represent the same object or file.
1197
1198  We don't handle anything about hard or symbolic links.
1199
1200  Args:
1201    src_url: Source StorageUrl.
1202    dst_url: Destination StorageUrl.
1203
1204  Returns:
1205    Bool indicator.
1206  """
1207  if src_url.IsFileUrl() and dst_url.IsFileUrl():
1208    # Translate a/b/./c to a/b/c, so src=dst comparison below works.
1209    new_src_path = os.path.normpath(src_url.object_name)
1210    new_dst_path = os.path.normpath(dst_url.object_name)
1211    return new_src_path == new_dst_path
1212  else:
1213    return (src_url.url_string == dst_url.url_string and
1214            src_url.generation == dst_url.generation)
1215
1216
1217def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
1218  """Logs copy operation, including Content-Type if appropriate.
1219
1220  Args:
1221    logger: logger instance to use for output.
1222    src_url: Source StorageUrl.
1223    dst_url: Destination StorageUrl.
1224    dst_obj_metadata: Object-specific metadata that should be overidden during
1225                      the copy.
1226  """
1227  if (dst_url.IsCloudUrl() and dst_obj_metadata and
1228      dst_obj_metadata.contentType):
1229    content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
1230  else:
1231    content_type_msg = ''
1232  if src_url.IsFileUrl() and src_url.IsStream():
1233    logger.info('Copying from <STDIN>%s...', content_type_msg)
1234  else:
1235    logger.info('Copying %s%s...', src_url.url_string, content_type_msg)
1236
1237
1238# pylint: disable=undefined-variable
1239def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
1240                            dst_obj_metadata, preconditions, gsutil_api,
1241                            logger):
1242  """Performs copy-in-the cloud from specified src to dest object.
1243
1244  Args:
1245    src_url: Source CloudUrl.
1246    src_obj_metadata: Metadata for source object; must include etag and size.
1247    dst_url: Destination CloudUrl.
1248    dst_obj_metadata: Object-specific metadata that should be overidden during
1249                      the copy.
1250    preconditions: Preconditions to use for the copy.
1251    gsutil_api: gsutil Cloud API instance to use for the copy.
1252    logger: logging.Logger for log message output.
1253
1254  Returns:
1255    (elapsed_time, bytes_transferred, dst_url with generation,
1256    md5 hash of destination) excluding overhead like initial GET.
1257
1258  Raises:
1259    CommandException: if errors encountered.
1260  """
1261  start_time = time.time()
1262
1263  progress_callback = FileProgressCallbackHandler(
1264      ConstructAnnounceText('Copying', dst_url.url_string), logger).call
1265  if global_copy_helper_opts.test_callback_file:
1266    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1267      progress_callback = pickle.loads(test_fp.read()).call
1268  dst_obj = gsutil_api.CopyObject(
1269      src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation,
1270      canned_acl=global_copy_helper_opts.canned_acl,
1271      preconditions=preconditions, progress_callback=progress_callback,
1272      provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1273
1274  end_time = time.time()
1275
1276  result_url = dst_url.Clone()
1277  result_url.generation = GenerationFromUrlAndString(result_url,
1278                                                     dst_obj.generation)
1279
1280  return (end_time - start_time, src_obj_metadata.size, result_url,
1281          dst_obj.md5Hash)
1282
1283
1284def _SetContentTypeFromFile(src_url, dst_obj_metadata):
1285  """Detects and sets Content-Type if src_url names a local file.
1286
1287  Args:
1288    src_url: Source StorageUrl.
1289    dst_obj_metadata: Object-specific metadata that should be overidden during
1290                     the copy.
1291  """
1292  # contentType == '' if user requested default type.
1293  if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
1294      and not src_url.IsStream()):
1295    # Only do content type recognition if src_url is a file. Object-to-object
1296    # copies with no -h Content-Type specified re-use the content type of the
1297    # source object.
1298    object_name = src_url.object_name
1299    content_type = None
1300    # Streams (denoted by '-') are expected to be 'application/octet-stream'
1301    # and 'file' would partially consume them.
1302    if object_name != '-':
1303      if config.getbool('GSUtil', 'use_magicfile', False):
1304        p = subprocess.Popen(['file', '--mime-type', object_name],
1305                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1306        output, error = p.communicate()
1307        p.stdout.close()
1308        p.stderr.close()
1309        if p.returncode != 0 or error:
1310          raise CommandException(
1311              'Encountered error running "file --mime-type %s" '
1312              '(returncode=%d).\n%s' % (object_name, p.returncode, error))
1313        # Parse output by removing line delimiter and splitting on last ":
1314        content_type = output.rstrip().rpartition(': ')[2]
1315      else:
1316        content_type = mimetypes.guess_type(object_name)[0]
1317    if not content_type:
1318      content_type = DEFAULT_CONTENT_TYPE
1319    dst_obj_metadata.contentType = content_type
1320
1321
1322# pylint: disable=undefined-variable
1323def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
1324                                    src_obj_size, dst_url, dst_obj_metadata,
1325                                    preconditions, gsutil_api, logger):
1326  """Uploads the file using a non-resumable strategy.
1327
1328  Args:
1329    src_url: Source StorageUrl to upload.
1330    src_obj_filestream: File pointer to uploadable bytes.
1331    src_obj_size: Size of the source object.
1332    dst_url: Destination StorageUrl for the upload.
1333    dst_obj_metadata: Metadata for the target object.
1334    preconditions: Preconditions for the upload, if any.
1335    gsutil_api: gsutil Cloud API instance to use for the upload.
1336    logger: For outputting log messages.
1337
1338  Returns:
1339    Elapsed upload time, uploaded Object with generation, md5, and size fields
1340    populated.
1341  """
1342  progress_callback = FileProgressCallbackHandler(
1343      ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1344  if global_copy_helper_opts.test_callback_file:
1345    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1346      progress_callback = pickle.loads(test_fp.read()).call
1347  start_time = time.time()
1348
1349  if src_url.IsStream():
1350    # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
1351    uploaded_object = gsutil_api.UploadObjectStreaming(
1352        src_obj_filestream, object_metadata=dst_obj_metadata,
1353        canned_acl=global_copy_helper_opts.canned_acl,
1354        preconditions=preconditions, progress_callback=progress_callback,
1355        provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1356  else:
1357    uploaded_object = gsutil_api.UploadObject(
1358        src_obj_filestream, object_metadata=dst_obj_metadata,
1359        canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
1360        preconditions=preconditions, progress_callback=progress_callback,
1361        provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1362  end_time = time.time()
1363  elapsed_time = end_time - start_time
1364
1365  return elapsed_time, uploaded_object
1366
1367
1368# pylint: disable=undefined-variable
1369def _UploadFileToObjectResumable(src_url, src_obj_filestream,
1370                                 src_obj_size, dst_url, dst_obj_metadata,
1371                                 preconditions, gsutil_api, logger):
1372  """Uploads the file using a resumable strategy.
1373
1374  Args:
1375    src_url: Source FileUrl to upload.  Must not be a stream.
1376    src_obj_filestream: File pointer to uploadable bytes.
1377    src_obj_size: Size of the source object.
1378    dst_url: Destination StorageUrl for the upload.
1379    dst_obj_metadata: Metadata for the target object.
1380    preconditions: Preconditions for the upload, if any.
1381    gsutil_api: gsutil Cloud API instance to use for the upload.
1382    logger: for outputting log messages.
1383
1384  Returns:
1385    Elapsed upload time, uploaded Object with generation, md5, and size fields
1386    populated.
1387  """
1388  tracker_file_name = GetTrackerFilePath(
1389      dst_url, TrackerFileType.UPLOAD,
1390      gsutil_api.GetApiSelector(provider=dst_url.scheme))
1391
1392  def _UploadTrackerCallback(serialization_data):
1393    """Creates a new tracker file for starting an upload from scratch.
1394
1395    This function is called by the gsutil Cloud API implementation and the
1396    the serialization data is implementation-specific.
1397
1398    Args:
1399      serialization_data: Serialization data used in resuming the upload.
1400    """
1401    tracker_file = None
1402    try:
1403      tracker_file = open(tracker_file_name, 'w')
1404      tracker_file.write(str(serialization_data))
1405    except IOError as e:
1406      RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
1407    finally:
1408      if tracker_file:
1409        tracker_file.close()
1410
1411  # This contains the upload URL, which will uniquely identify the
1412  # destination object.
1413  tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
1414  if tracker_data:
1415    logger.info(
1416        'Resuming upload for %s', src_url.url_string)
1417
1418  retryable = True
1419
1420  progress_callback = FileProgressCallbackHandler(
1421      ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1422  if global_copy_helper_opts.test_callback_file:
1423    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1424      progress_callback = pickle.loads(test_fp.read()).call
1425
1426  start_time = time.time()
1427  num_startover_attempts = 0
1428  # This loop causes us to retry when the resumable upload failed in a way that
1429  # requires starting over with a new upload ID. Retries within a single upload
1430  # ID within the current process are handled in
1431  # gsutil_api.UploadObjectResumable, and retries within a single upload ID
1432  # spanning processes happens if an exception occurs not caught below (which
1433  # will leave the tracker file in place, and cause the upload ID to be reused
1434  # the next time the user runs gsutil and attempts the same upload).
1435  while retryable:
1436    try:
1437      uploaded_object = gsutil_api.UploadObjectResumable(
1438          src_obj_filestream, object_metadata=dst_obj_metadata,
1439          canned_acl=global_copy_helper_opts.canned_acl,
1440          preconditions=preconditions, provider=dst_url.scheme,
1441          size=src_obj_size, serialization_data=tracker_data,
1442          fields=UPLOAD_RETURN_FIELDS,
1443          tracker_callback=_UploadTrackerCallback,
1444          progress_callback=progress_callback)
1445      retryable = False
1446    except ResumableUploadStartOverException, e:
1447      # This can happen, for example, if the server sends a 410 response code.
1448      # In that case the current resumable upload ID can't be reused, so delete
1449      # the tracker file and try again up to max retries.
1450      num_startover_attempts += 1
1451      retryable = (num_startover_attempts < GetNumRetries())
1452      if not retryable:
1453        raise
1454
1455      # If the server sends a 404 response code, then the upload should only
1456      # be restarted if it was the object (and not the bucket) that was missing.
1457      try:
1458        gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
1459      except NotFoundException:
1460        raise
1461
1462      logger.info('Restarting upload from scratch after exception %s', e)
1463      DeleteTrackerFile(tracker_file_name)
1464      tracker_data = None
1465      src_obj_filestream.seek(0)
1466      # Reset the progress callback handler.
1467      progress_callback = FileProgressCallbackHandler(
1468          ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1469      logger.info('\n'.join(textwrap.wrap(
1470          'Resumable upload of %s failed with a response code indicating we '
1471          'need to start over with a new resumable upload ID. Backing off '
1472          'and retrying.' % src_url.url_string)))
1473      time.sleep(min(random.random() * (2 ** num_startover_attempts),
1474                     GetMaxRetryDelay()))
1475    except ResumableUploadAbortException:
1476      retryable = False
1477      raise
1478    finally:
1479      if not retryable:
1480        DeleteTrackerFile(tracker_file_name)
1481
1482  end_time = time.time()
1483  elapsed_time = end_time - start_time
1484
1485  return (elapsed_time, uploaded_object)
1486
1487
1488def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
1489  """Compresses a to-be-uploaded local file to save bandwidth.
1490
1491  Args:
1492    src_url: Source FileUrl.
1493    src_obj_filestream: Read stream of the source file - will be consumed
1494                        and closed.
1495    src_obj_size: Size of the source file.
1496    logger: for outputting log messages.
1497
1498  Returns:
1499    StorageUrl path to compressed file, compressed file size.
1500  """
1501  # TODO: Compress using a streaming model as opposed to all at once here.
1502  if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
1503    logger.info(
1504        'Compressing %s (to tmp)...', src_url)
1505  (gzip_fh, gzip_path) = tempfile.mkstemp()
1506  gzip_fp = None
1507  try:
1508    # Check for temp space. Assume the compressed object is at most 2x
1509    # the size of the object (normally should compress to smaller than
1510    # the object)
1511    if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
1512      raise CommandException('Inadequate temp space available to compress '
1513                             '%s. See the CHANGING TEMP DIRECTORIES section '
1514                             'of "gsutil help cp" for more info.' % src_url)
1515    gzip_fp = gzip.open(gzip_path, 'wb')
1516    data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1517    while data:
1518      gzip_fp.write(data)
1519      data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1520  finally:
1521    if gzip_fp:
1522      gzip_fp.close()
1523    os.close(gzip_fh)
1524    src_obj_filestream.close()
1525  gzip_size = os.path.getsize(gzip_path)
1526  return StorageUrlFromString(gzip_path), gzip_size
1527
1528
1529def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
1530                        dst_url, dst_obj_metadata, preconditions, gsutil_api,
1531                        logger, command_obj, copy_exception_handler,
1532                        gzip_exts=None, allow_splitting=True):
1533  """Uploads a local file to an object.
1534
1535  Args:
1536    src_url: Source FileUrl.
1537    src_obj_filestream: Read stream of the source file to be read and closed.
1538    src_obj_size: Size of the source file.
1539    dst_url: Destination CloudUrl.
1540    dst_obj_metadata: Metadata to be applied to the destination object.
1541    preconditions: Preconditions to use for the copy.
1542    gsutil_api: gsutil Cloud API to use for the copy.
1543    logger: for outputting log messages.
1544    command_obj: command object for use in Apply in parallel composite uploads.
1545    copy_exception_handler: For handling copy exceptions during Apply.
1546    gzip_exts: List of file extensions to gzip prior to upload, if any.
1547    allow_splitting: Whether to allow the file to be split into component
1548                     pieces for an parallel composite upload.
1549
1550  Returns:
1551    (elapsed_time, bytes_transferred, dst_url with generation,
1552    md5 hash of destination) excluding overhead like initial GET.
1553
1554  Raises:
1555    CommandException: if errors encountered.
1556  """
1557  if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
1558    content_language = config.get_value('GSUtil', 'content_language')
1559    if content_language:
1560      dst_obj_metadata.contentLanguage = content_language
1561
1562  fname_parts = src_url.object_name.split('.')
1563  upload_url = src_url
1564  upload_stream = src_obj_filestream
1565  upload_size = src_obj_size
1566  zipped_file = False
1567  if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
1568    upload_url, upload_size = _CompressFileForUpload(
1569        src_url, src_obj_filestream, src_obj_size, logger)
1570    upload_stream = open(upload_url.object_name, 'rb')
1571    dst_obj_metadata.contentEncoding = 'gzip'
1572    zipped_file = True
1573
1574  elapsed_time = None
1575  uploaded_object = None
1576  hash_algs = GetUploadHashAlgs()
1577  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1578
1579  parallel_composite_upload = _ShouldDoParallelCompositeUpload(
1580      logger, allow_splitting, upload_url, dst_url, src_obj_size,
1581      canned_acl=global_copy_helper_opts.canned_acl)
1582
1583  if (src_url.IsStream() and
1584      gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
1585    orig_stream = upload_stream
1586    # Add limited seekable properties to the stream via buffering.
1587    upload_stream = ResumableStreamingJsonUploadWrapper(
1588        orig_stream, GetJsonResumableChunkSize())
1589
1590  if not parallel_composite_upload and len(hash_algs):
1591    # Parallel composite uploads calculate hashes per-component in subsequent
1592    # calls to this function, but the composition of the final object is a
1593    # cloud-only operation.
1594    wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
1595                                                  hash_algs, upload_url, logger)
1596  else:
1597    wrapped_filestream = upload_stream
1598
1599  try:
1600    if parallel_composite_upload:
1601      elapsed_time, uploaded_object = _DoParallelCompositeUpload(
1602          upload_stream, upload_url, dst_url, dst_obj_metadata,
1603          global_copy_helper_opts.canned_acl, upload_size, preconditions,
1604          gsutil_api, command_obj, copy_exception_handler)
1605    elif upload_size < ResumableThreshold() or src_url.IsStream():
1606      elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
1607          upload_url, wrapped_filestream, upload_size, dst_url,
1608          dst_obj_metadata, preconditions, gsutil_api, logger)
1609    else:
1610      elapsed_time, uploaded_object = _UploadFileToObjectResumable(
1611          upload_url, wrapped_filestream, upload_size, dst_url,
1612          dst_obj_metadata, preconditions, gsutil_api, logger)
1613
1614  finally:
1615    if zipped_file:
1616      try:
1617        os.unlink(upload_url.object_name)
1618      # Windows sometimes complains the temp file is locked when you try to
1619      # delete it.
1620      except Exception:  # pylint: disable=broad-except
1621        logger.warning(
1622            'Could not delete %s. This can occur in Windows because the '
1623            'temporary file is still locked.', upload_url.object_name)
1624    # In the gzip case, this is the gzip stream.  _CompressFileForUpload will
1625    # have already closed the original source stream.
1626    upload_stream.close()
1627
1628  if not parallel_composite_upload:
1629    try:
1630      digests = _CreateDigestsFromDigesters(digesters)
1631      _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
1632                   digests, is_upload=True)
1633    except HashMismatchException:
1634      if _RENAME_ON_HASH_MISMATCH:
1635        corrupted_obj_metadata = apitools_messages.Object(
1636            name=dst_obj_metadata.name,
1637            bucket=dst_obj_metadata.bucket,
1638            etag=uploaded_object.etag)
1639        dst_obj_metadata.name = (dst_url.object_name +
1640                                 _RENAME_ON_HASH_MISMATCH_SUFFIX)
1641        gsutil_api.CopyObject(corrupted_obj_metadata,
1642                              dst_obj_metadata, provider=dst_url.scheme)
1643      # If the digest doesn't match, delete the object.
1644      gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
1645                              generation=uploaded_object.generation,
1646                              provider=dst_url.scheme)
1647      raise
1648
1649  result_url = dst_url.Clone()
1650
1651  result_url.generation = uploaded_object.generation
1652  result_url.generation = GenerationFromUrlAndString(
1653      result_url, uploaded_object.generation)
1654
1655  return (elapsed_time, uploaded_object.size, result_url,
1656          uploaded_object.md5Hash)
1657
1658
1659def _GetDownloadFile(dst_url, src_obj_metadata, logger):
1660  """Creates a new download file, and deletes the file that will be replaced.
1661
1662  Names and creates a temporary file for this download. Also, if there is an
1663  existing file at the path where this file will be placed after the download
1664  is completed, that file will be deleted.
1665
1666  Args:
1667    dst_url: Destination FileUrl.
1668    src_obj_metadata: Metadata from the source object.
1669    logger: for outputting log messages.
1670
1671  Returns:
1672    (download_file_name, need_to_unzip)
1673    download_file_name: The name of the temporary file to which the object will
1674                        be downloaded.
1675    need_to_unzip: If true, a temporary zip file was used and must be
1676                   uncompressed as part of validation.
1677  """
1678  dir_name = os.path.dirname(dst_url.object_name)
1679  if dir_name and not os.path.exists(dir_name):
1680    # Do dir creation in try block so can ignore case where dir already
1681    # exists. This is needed to avoid a race condition when running gsutil
1682    # -m cp.
1683    try:
1684      os.makedirs(dir_name)
1685    except OSError, e:
1686      if e.errno != errno.EEXIST:
1687        raise
1688
1689  need_to_unzip = False
1690  # For gzipped objects download to a temp file and unzip. For the XML API,
1691  # this represents the result of a HEAD request. For the JSON API, this is
1692  # the stored encoding which the service may not respect. However, if the
1693  # server sends decompressed bytes for a file that is stored compressed
1694  # (double compressed case), there is no way we can validate the hash and
1695  # we will fail our hash check for the object.
1696  if (src_obj_metadata.contentEncoding and
1697      src_obj_metadata.contentEncoding.lower().endswith('gzip')):
1698    need_to_unzip = True
1699    download_file_name = _GetDownloadTempZipFileName(dst_url)
1700    logger.info(
1701        'Downloading to temp gzip filename %s', download_file_name)
1702  else:
1703    download_file_name = _GetDownloadTempFileName(dst_url)
1704
1705  # If a file exists at the permanent destination (where the file will be moved
1706  # after the download is completed), delete it here to reduce disk space
1707  # requirements.
1708  if os.path.exists(dst_url.object_name):
1709    os.unlink(dst_url.object_name)
1710
1711  # Downloads open the temporary download file in r+b mode, which requires it
1712  # to already exist, so we create it here if it doesn't exist already.
1713  fp = open(download_file_name, 'ab')
1714  fp.close()
1715  return download_file_name, need_to_unzip
1716
1717
1718def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
1719                            allow_splitting, logger):
1720  """Determines whether the sliced download strategy should be used.
1721
1722  Args:
1723    download_strategy: CloudApi download strategy.
1724    src_obj_metadata: Metadata from the source object.
1725    allow_splitting: If false, then this function returns false.
1726    logger: logging.Logger for log message output.
1727
1728  Returns:
1729    True iff a sliced download should be performed on the source file.
1730  """
1731  sliced_object_download_threshold = HumanReadableToBytes(config.get(
1732      'GSUtil', 'sliced_object_download_threshold',
1733      DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
1734
1735  max_components = config.getint(
1736      'GSUtil', 'sliced_object_download_max_components',
1737      DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
1738
1739  # Don't use sliced download if it will prevent us from performing an
1740  # integrity check.
1741  check_hashes_config = config.get(
1742      'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
1743  parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
1744  hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER
1745
1746  use_slice = (
1747      allow_splitting
1748      and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
1749      and max_components > 1
1750      and hashing_okay
1751      and sliced_object_download_threshold > 0
1752      and src_obj_metadata.size >= sliced_object_download_threshold)
1753
1754  if (not use_slice
1755      and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
1756      and not UsingCrcmodExtension(crcmod)
1757      and check_hashes_config != CHECK_HASH_NEVER):
1758    with suggested_sliced_transfers_lock:
1759      if not suggested_sliced_transfers.get('suggested'):
1760        logger.info('\n'.join(textwrap.wrap(
1761            '==> NOTE: You are downloading one or more large file(s), which '
1762            'would run significantly faster if you enabled sliced object '
1763            'uploads. This feature is enabled by default but requires that '
1764            'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
1765        suggested_sliced_transfers['suggested'] = True
1766
1767  return use_slice
1768
1769
1770def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
1771  """Function argument to Apply for performing sliced downloads.
1772
1773  Args:
1774    cls: Calling Command class.
1775    args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
1776    thread_state: gsutil Cloud API instance to use for the operation.
1777
1778  Returns:
1779    PerformSlicedDownloadReturnValues named-tuple filled with:
1780    component_num: The component number for this download.
1781    crc32c: CRC32C hash value (integer) of the downloaded bytes.
1782    bytes_transferred: The number of bytes transferred, potentially less
1783                       than the component size if the download was resumed.
1784  """
1785  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
1786  hash_algs = GetDownloadHashAlgs(
1787      cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
1788  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1789
1790  (bytes_transferred, server_encoding) = (
1791      _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
1792                                     args.dst_url, args.download_file_name,
1793                                     gsutil_api, cls.logger, digesters,
1794                                     component_num=args.component_num,
1795                                     start_byte=args.start_byte,
1796                                     end_byte=args.end_byte))
1797
1798  crc32c_val = None
1799  if 'crc32c' in digesters:
1800    crc32c_val = digesters['crc32c'].crcValue
1801  return PerformSlicedDownloadReturnValues(
1802      args.component_num, crc32c_val, bytes_transferred, server_encoding)
1803
1804
1805def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
1806                                        download_file_name, logger,
1807                                        api_selector, num_components):
1808  """Maintains sliced download tracker files in order to permit resumability.
1809
1810  Reads or creates a sliced download tracker file representing this object
1811  download. Upon an attempt at cross-process resumption, the contents of the
1812  sliced download tracker file are verified to make sure a resumption is
1813  possible and appropriate. In the case that a resumption should not be
1814  attempted, existing component tracker files are deleted (to prevent child
1815  processes from attempting resumption), and a new sliced download tracker
1816  file is created.
1817
1818  Args:
1819    src_obj_metadata: Metadata from the source object. Must include etag and
1820                      generation.
1821    dst_url: Destination FileUrl.
1822    download_file_name: Temporary file name to be used for the download.
1823    logger: for outputting log messages.
1824    api_selector: The Cloud API implementation used.
1825    num_components: The number of components to perform this download with.
1826  """
1827  assert src_obj_metadata.etag
1828  tracker_file = None
1829
1830  # Only can happen if the resumable threshold is set higher than the
1831  # parallel transfer threshold.
1832  if src_obj_metadata.size < ResumableThreshold():
1833    return
1834
1835  tracker_file_name = GetTrackerFilePath(dst_url,
1836                                         TrackerFileType.SLICED_DOWNLOAD,
1837                                         api_selector)
1838
1839  # Check to see if we should attempt resuming the download.
1840  try:
1841    fp = open(download_file_name, 'rb')
1842    existing_file_size = GetFileSize(fp)
1843    # A parallel resumption should be attempted only if the destination file
1844    # size is exactly the same as the source size and the tracker file matches.
1845    if existing_file_size == src_obj_metadata.size:
1846      tracker_file = open(tracker_file_name, 'r')
1847      tracker_file_data = json.load(tracker_file)
1848      if (tracker_file_data['etag'] == src_obj_metadata.etag and
1849          tracker_file_data['generation'] == src_obj_metadata.generation and
1850          tracker_file_data['num_components'] == num_components):
1851        return
1852      else:
1853        tracker_file.close()
1854        logger.warn('Sliced download tracker file doesn\'t match for '
1855                    'download of %s. Restarting download from scratch.' %
1856                    dst_url.object_name)
1857
1858  except (IOError, ValueError) as e:
1859    # Ignore non-existent file (happens first time a download
1860    # is attempted on an object), but warn user for other errors.
1861    if isinstance(e, ValueError) or e.errno != errno.ENOENT:
1862      logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
1863                  'Restarting download from scratch.' %
1864                  (tracker_file_name, str(e)))
1865  finally:
1866    if fp:
1867      fp.close()
1868    if tracker_file:
1869      tracker_file.close()
1870
1871  # Delete component tracker files to guarantee download starts from scratch.
1872  DeleteDownloadTrackerFiles(dst_url, api_selector)
1873
1874  # Create a new sliced download tracker file to represent this download.
1875  try:
1876    with open(tracker_file_name, 'w') as tracker_file:
1877      tracker_file_data = {'etag': src_obj_metadata.etag,
1878                           'generation': src_obj_metadata.generation,
1879                           'num_components': num_components}
1880      tracker_file.write(json.dumps(tracker_file_data))
1881  except IOError as e:
1882    RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
1883
1884
1885class SlicedDownloadFileWrapper(object):
1886  """Wraps a file object to be used in GetObjectMedia for sliced downloads.
1887
1888  In order to allow resumability, the file object used by each thread in a
1889  sliced object download should be wrapped using SlicedDownloadFileWrapper.
1890  Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
1891  download component tracker file for this component to be updated periodically,
1892  while the downloaded bytes are normally written to file.
1893  """
1894
1895  def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
1896               end_byte):
1897    """Initializes the SlicedDownloadFileWrapper.
1898
1899    Args:
1900      fp: The already-open file object to be used for writing in
1901          GetObjectMedia. Data will be written to file starting at the current
1902          seek position.
1903      tracker_file_name: The name of the tracker file for this component.
1904      src_obj_metadata: Metadata from the source object. Must include etag and
1905                        generation.
1906      start_byte: The first byte to be downloaded for this parallel component.
1907      end_byte: The last byte to be downloaded for this parallel component.
1908    """
1909    self._orig_fp = fp
1910    self._tracker_file_name = tracker_file_name
1911    self._src_obj_metadata = src_obj_metadata
1912    self._last_tracker_file_byte = None
1913    self._start_byte = start_byte
1914    self._end_byte = end_byte
1915
1916  def write(self, data):  # pylint: disable=invalid-name
1917    current_file_pos = self._orig_fp.tell()
1918    assert (self._start_byte <= current_file_pos and
1919            current_file_pos + len(data) <= self._end_byte + 1)
1920
1921    self._orig_fp.write(data)
1922    current_file_pos = self._orig_fp.tell()
1923
1924    threshold = TRACKERFILE_UPDATE_THRESHOLD
1925    if (self._last_tracker_file_byte is None or
1926        current_file_pos - self._last_tracker_file_byte > threshold or
1927        current_file_pos == self._end_byte + 1):
1928      WriteDownloadComponentTrackerFile(
1929          self._tracker_file_name, self._src_obj_metadata, current_file_pos)
1930      self._last_tracker_file_byte = current_file_pos
1931
1932  def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
1933    if whence == os.SEEK_END:
1934      self._orig_fp.seek(offset + self._end_byte + 1)
1935    else:
1936      self._orig_fp.seek(offset, whence)
1937    assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1
1938
1939  def tell(self):  # pylint: disable=invalid-name
1940    return self._orig_fp.tell()
1941
1942  def flush(self):  # pylint: disable=invalid-name
1943    self._orig_fp.flush()
1944
1945  def close(self):  # pylint: disable=invalid-name
1946    if self._orig_fp:
1947      self._orig_fp.close()
1948
1949
1950def _PartitionObject(src_url, src_obj_metadata, dst_url,
1951                     download_file_name):
1952  """Partitions an object into components to be downloaded.
1953
1954  Each component is a byte range of the object. The byte ranges
1955  of the returned components are mutually exclusive and collectively
1956  exhaustive. The byte ranges are inclusive at both end points.
1957
1958  Args:
1959    src_url: Source CloudUrl.
1960    src_obj_metadata: Metadata from the source object.
1961    dst_url: Destination FileUrl.
1962    download_file_name: Temporary file name to be used for the download.
1963
1964  Returns:
1965    components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
1966                            to be used in Apply for the sliced download.
1967  """
1968  sliced_download_component_size = HumanReadableToBytes(
1969      config.get('GSUtil', 'sliced_object_download_component_size',
1970                 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))
1971
1972  max_components = config.getint(
1973      'GSUtil', 'sliced_object_download_max_components',
1974      DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
1975
1976  num_components, component_size = _GetPartitionInfo(
1977      src_obj_metadata.size, max_components, sliced_download_component_size)
1978
1979  components_to_download = []
1980  component_lengths = []
1981  for i in range(num_components):
1982    start_byte = i * component_size
1983    end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
1984    component_lengths.append(end_byte - start_byte + 1)
1985    components_to_download.append(
1986        PerformSlicedDownloadObjectToFileArgs(
1987            i, src_url, src_obj_metadata, dst_url, download_file_name,
1988            start_byte, end_byte))
1989  return components_to_download, component_lengths
1990
1991
1992def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
1993                      command_obj, logger, copy_exception_handler,
1994                      api_selector):
1995  """Downloads a cloud object to a local file using sliced download.
1996
1997  Byte ranges are decided for each thread/process, and then the parts are
1998  downloaded in parallel.
1999
2000  Args:
2001    src_url: Source CloudUrl.
2002    src_obj_metadata: Metadata from the source object.
2003    dst_url: Destination FileUrl.
2004    download_file_name: Temporary file name to be used for download.
2005    command_obj: command object for use in Apply in parallel composite uploads.
2006    logger: for outputting log messages.
2007    copy_exception_handler: For handling copy exceptions during Apply.
2008    api_selector: The Cloud API implementation used.
2009
2010  Returns:
2011    (bytes_transferred, crc32c)
2012    bytes_transferred: Number of bytes transferred from server this call.
2013    crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
2014            crc32c hashing wasn't performed.
2015  """
2016  components_to_download, component_lengths = _PartitionObject(
2017      src_url, src_obj_metadata, dst_url, download_file_name)
2018
2019  num_components = len(components_to_download)
2020  _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
2021                                      download_file_name, logger,
2022                                      api_selector, num_components)
2023
2024  # Resize the download file so each child process can seek to its start byte.
2025  with open(download_file_name, 'ab') as fp:
2026    fp.truncate(src_obj_metadata.size)
2027
2028  cp_results = command_obj.Apply(
2029      _PerformSlicedDownloadObjectToFile, components_to_download,
2030      copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
2031      parallel_operations_override=True, should_return_results=True)
2032
2033  if len(cp_results) < num_components:
2034    raise CommandException(
2035        'Some components of %s were not downloaded successfully. '
2036        'Please retry this download.' % dst_url.object_name)
2037
2038  # Crc32c hashes have to be concatenated in the correct order.
2039  cp_results = sorted(cp_results, key=attrgetter('component_num'))
2040  crc32c = cp_results[0].crc32c
2041  if crc32c is not None:
2042    for i in range(1, num_components):
2043      crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
2044                            component_lengths[i])
2045
2046  bytes_transferred = 0
2047  expect_gzip = (src_obj_metadata.contentEncoding and
2048                 src_obj_metadata.contentEncoding.lower().endswith('gzip'))
2049  for cp_result in cp_results:
2050    bytes_transferred += cp_result.bytes_transferred
2051    server_gzip = (cp_result.server_encoding and
2052                   cp_result.server_encoding.lower().endswith('gzip'))
2053    # If the server gzipped any components on the fly, we will have no chance of
2054    # properly reconstructing the file.
2055    if server_gzip and not expect_gzip:
2056      raise CommandException(
2057          'Download of %s failed because the server sent back data with an '
2058          'unexpected encoding.' % dst_url.object_name)
2059
2060  return bytes_transferred, crc32c
2061
2062
2063def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
2064                                   download_file_name, gsutil_api, logger,
2065                                   digesters, component_num=None, start_byte=0,
2066                                   end_byte=None):
2067  """Downloads an object to a local file using the resumable strategy.
2068
2069  Args:
2070    src_url: Source CloudUrl.
2071    src_obj_metadata: Metadata from the source object.
2072    dst_url: Destination FileUrl.
2073    download_file_name: Temporary file name to be used for download.
2074    gsutil_api: gsutil Cloud API instance to use for the download.
2075    logger: for outputting log messages.
2076    digesters: Digesters corresponding to the hash algorithms that will be used
2077               for validation.
2078    component_num: Which component of a sliced download this call is for, or
2079                   None if this is not a sliced download.
2080    start_byte: The first byte of a byte range for a sliced download.
2081    end_byte: The last byte of a byte range for a sliced download.
2082
2083  Returns:
2084    (bytes_transferred, server_encoding)
2085    bytes_transferred: Number of bytes transferred from server this call.
2086    server_encoding: Content-encoding string if it was detected that the server
2087                     sent encoded bytes during transfer, None otherwise.
2088  """
2089  if end_byte is None:
2090    end_byte = src_obj_metadata.size - 1
2091  download_size = end_byte - start_byte + 1
2092
2093  is_sliced = component_num is not None
2094  api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
2095  server_encoding = None
2096
2097  # Used for logging
2098  download_name = dst_url.object_name
2099  if is_sliced:
2100    download_name += ' component %d' % component_num
2101
2102  try:
2103    fp = open(download_file_name, 'r+b')
2104    fp.seek(start_byte)
2105    api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
2106    existing_file_size = GetFileSize(fp)
2107
2108    tracker_file_name, download_start_byte = (
2109        ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
2110                                        api_selector, start_byte,
2111                                        existing_file_size, component_num))
2112
2113    if download_start_byte < start_byte or download_start_byte > end_byte + 1:
2114      DeleteTrackerFile(tracker_file_name)
2115      raise CommandException(
2116          'Resumable download start point for %s is not in the correct byte '
2117          'range. Deleting tracker file, so if you re-try this download it '
2118          'will start from scratch' % download_name)
2119
2120    download_complete = (download_start_byte == start_byte + download_size)
2121    resuming = (download_start_byte != start_byte) and not download_complete
2122    if resuming:
2123      logger.info('Resuming download for %s', download_name)
2124    elif download_complete:
2125      logger.info(
2126          'Download already complete for %s, skipping download but '
2127          'will run integrity checks.', download_name)
2128
2129    # This is used for resuming downloads, but also for passing the mediaLink
2130    # and size into the download for new downloads so that we can avoid
2131    # making an extra HTTP call.
2132    serialization_data = GetDownloadSerializationData(
2133        src_obj_metadata, progress=download_start_byte)
2134
2135    if resuming or download_complete:
2136      # Catch up our digester with the hash data.
2137      bytes_digested = 0
2138      total_bytes_to_digest = download_start_byte - start_byte
2139      hash_callback = ProgressCallbackWithBackoff(
2140          total_bytes_to_digest,
2141          FileProgressCallbackHandler(
2142              ConstructAnnounceText('Hashing',
2143                                    dst_url.url_string), logger).call)
2144
2145      while bytes_digested < total_bytes_to_digest:
2146        bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
2147                            total_bytes_to_digest - bytes_digested)
2148        data = fp.read(bytes_to_read)
2149        bytes_digested += bytes_to_read
2150        for alg_name in digesters:
2151          digesters[alg_name].update(data)
2152        hash_callback.Progress(len(data))
2153
2154    elif not is_sliced:
2155      # Delete file contents and start entire object download from scratch.
2156      fp.truncate(0)
2157      existing_file_size = 0
2158
2159    progress_callback = FileProgressCallbackHandler(
2160        ConstructAnnounceText('Downloading', dst_url.url_string), logger,
2161        start_byte, download_size).call
2162
2163    if global_copy_helper_opts.test_callback_file:
2164      with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
2165        progress_callback = pickle.loads(test_fp.read()).call
2166
2167    if is_sliced and src_obj_metadata.size >= ResumableThreshold():
2168      fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
2169                                     start_byte, end_byte)
2170
2171    # TODO: With gzip encoding (which may occur on-the-fly and not be part of
2172    # the object's metadata), when we request a range to resume, it's possible
2173    # that the server will just resend the entire object, which means our
2174    # caught-up hash will be incorrect.  We recalculate the hash on
2175    # the local file in the case of a failed gzip hash anyway, but it would
2176    # be better if we actively detected this case.
2177    if not download_complete:
2178      fp.seek(download_start_byte)
2179      server_encoding = gsutil_api.GetObjectMedia(
2180          src_url.bucket_name, src_url.object_name, fp,
2181          start_byte=download_start_byte, end_byte=end_byte,
2182          generation=src_url.generation, object_size=src_obj_metadata.size,
2183          download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
2184          provider=src_url.scheme, serialization_data=serialization_data,
2185          digesters=digesters, progress_callback=progress_callback)
2186
2187  except ResumableDownloadException as e:
2188    logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
2189                   e.reason, download_name)
2190    raise
2191  finally:
2192    if fp:
2193      fp.close()
2194
2195  bytes_transferred = end_byte - download_start_byte + 1
2196  return bytes_transferred, server_encoding
2197
2198
2199def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
2200                                      download_file_name, gsutil_api, logger,
2201                                      digesters):
2202  """Downloads an object to a local file using the non-resumable strategy.
2203
2204  Args:
2205    src_url: Source CloudUrl.
2206    src_obj_metadata: Metadata from the source object.
2207    dst_url: Destination FileUrl.
2208    download_file_name: Temporary file name to be used for download.
2209    gsutil_api: gsutil Cloud API instance to use for the download.
2210    logger: for outputting log messages.
2211    digesters: Digesters corresponding to the hash algorithms that will be used
2212               for validation.
2213  Returns:
2214    (bytes_transferred, server_encoding)
2215    bytes_transferred: Number of bytes transferred from server this call.
2216    server_encoding: Content-encoding string if it was detected that the server
2217                     sent encoded bytes during transfer, None otherwise.
2218  """
2219  try:
2220    fp = open(download_file_name, 'w')
2221
2222    # This is used to pass the mediaLink and the size into the download so that
2223    # we can avoid making an extra HTTP call.
2224    serialization_data = GetDownloadSerializationData(src_obj_metadata)
2225
2226    progress_callback = FileProgressCallbackHandler(
2227        ConstructAnnounceText('Downloading', dst_url.url_string), logger).call
2228
2229    if global_copy_helper_opts.test_callback_file:
2230      with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
2231        progress_callback = pickle.loads(test_fp.read()).call
2232
2233    server_encoding = gsutil_api.GetObjectMedia(
2234        src_url.bucket_name, src_url.object_name, fp,
2235        generation=src_url.generation, object_size=src_obj_metadata.size,
2236        download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
2237        provider=src_url.scheme, serialization_data=serialization_data,
2238        digesters=digesters, progress_callback=progress_callback)
2239  finally:
2240    if fp:
2241      fp.close()
2242
2243  return src_obj_metadata.size, server_encoding
2244
2245
2246def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2247                          gsutil_api, logger, command_obj,
2248                          copy_exception_handler, allow_splitting=True):
2249  """Downloads an object to a local file.
2250
2251  Args:
2252    src_url: Source CloudUrl.
2253    src_obj_metadata: Metadata from the source object.
2254    dst_url: Destination FileUrl.
2255    gsutil_api: gsutil Cloud API instance to use for the download.
2256    logger: for outputting log messages.
2257    command_obj: command object for use in Apply in sliced downloads.
2258    copy_exception_handler: For handling copy exceptions during Apply.
2259    allow_splitting: Whether or not to allow sliced download.
2260  Returns:
2261    (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
2262    excludes initial GET.
2263
2264  Raises:
2265    FileConcurrencySkipError: if this download is already in progress.
2266    CommandException: if other errors encountered.
2267  """
2268  global open_files_map, open_files_lock
2269  if dst_url.object_name.endswith(dst_url.delim):
2270    logger.warn('\n'.join(textwrap.wrap(
2271        'Skipping attempt to download to filename ending with slash (%s). This '
2272        'typically happens when using gsutil to download from a subdirectory '
2273        'created by the Cloud Console (https://cloud.google.com/console)'
2274        % dst_url.object_name)))
2275    return (0, 0, dst_url, '')
2276
2277  api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
2278  download_strategy = _SelectDownloadStrategy(dst_url)
2279  sliced_download = _ShouldDoSlicedDownload(
2280      download_strategy, src_obj_metadata, allow_splitting, logger)
2281
2282  download_file_name, need_to_unzip = _GetDownloadFile(
2283      dst_url, src_obj_metadata, logger)
2284
2285  # Ensure another process/thread is not already writing to this file.
2286  with open_files_lock:
2287    if open_files_map.get(download_file_name, False):
2288      raise FileConcurrencySkipError
2289    open_files_map[download_file_name] = True
2290
2291  # Set up hash digesters.
2292  consider_md5 = src_obj_metadata.md5Hash and not sliced_download
2293  hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
2294                                  consider_crc32c=src_obj_metadata.crc32c)
2295  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
2296
2297  # Tracks whether the server used a gzip encoding.
2298  server_encoding = None
2299  download_complete = (src_obj_metadata.size == 0)
2300  bytes_transferred = 0
2301
2302  start_time = time.time()
2303  if not download_complete:
2304    if sliced_download:
2305      (bytes_transferred, crc32c) = (
2306          _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
2307                            download_file_name, command_obj, logger,
2308                            copy_exception_handler, api_selector))
2309      if 'crc32c' in digesters:
2310        digesters['crc32c'].crcValue = crc32c
2311    elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
2312      (bytes_transferred, server_encoding) = (
2313          _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
2314                                            download_file_name, gsutil_api,
2315                                            logger, digesters))
2316    elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
2317      (bytes_transferred, server_encoding) = (
2318          _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
2319                                         download_file_name, gsutil_api, logger,
2320                                         digesters))
2321    else:
2322      raise CommandException('Invalid download strategy %s chosen for'
2323                             'file %s' % (download_strategy,
2324                                          download_file_name))
2325  end_time = time.time()
2326
2327  server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
2328  local_md5 = _ValidateAndCompleteDownload(
2329      logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
2330      digesters, hash_algs, download_file_name, api_selector, bytes_transferred)
2331
2332  with open_files_lock:
2333    open_files_map.delete(download_file_name)
2334
2335  return (end_time - start_time, bytes_transferred, dst_url, local_md5)
2336
2337
2338def _GetDownloadTempZipFileName(dst_url):
2339  """Returns temporary file name for a temporarily compressed download."""
2340  return '%s_.gztmp' % dst_url.object_name
2341
2342
2343def _GetDownloadTempFileName(dst_url):
2344  """Returns temporary download file name for uncompressed downloads."""
2345  return '%s_.gstmp' % dst_url.object_name
2346
2347
2348def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
2349                                 need_to_unzip, server_gzip, digesters,
2350                                 hash_algs, download_file_name,
2351                                 api_selector, bytes_transferred):
2352  """Validates and performs necessary operations on a downloaded file.
2353
2354  Validates the integrity of the downloaded file using hash_algs. If the file
2355  was compressed (temporarily), the file will be decompressed. Then, if the
2356  integrity of the file was successfully validated, the file will be moved
2357  from its temporary download location to its permanent location on disk.
2358
2359  Args:
2360    logger: For outputting log messages.
2361    src_url: StorageUrl for the source object.
2362    src_obj_metadata: Metadata for the source object, potentially containing
2363                      hash values.
2364    dst_url: StorageUrl describing the destination file.
2365    need_to_unzip: If true, a temporary zip file was used and must be
2366                   uncompressed as part of validation.
2367    server_gzip: If true, the server gzipped the bytes (regardless of whether
2368                 the object metadata claimed it was gzipped).
2369    digesters: dict of {string, hash digester} that contains up-to-date digests
2370               computed during the download. If a digester for a particular
2371               algorithm is None, an up-to-date digest is not available and the
2372               hash must be recomputed from the local file.
2373    hash_algs: dict of {string, hash algorithm} that can be used if digesters
2374               don't have up-to-date digests.
2375    download_file_name: Temporary file name that was used for download.
2376    api_selector: The Cloud API implementation used (used tracker file naming).
2377    bytes_transferred: Number of bytes downloaded (used for logging).
2378
2379  Returns:
2380    An MD5 of the local file, if one was calculated as part of the integrity
2381    check.
2382  """
2383  final_file_name = dst_url.object_name
2384  file_name = download_file_name
2385  digesters_succeeded = True
2386
2387  for alg in digesters:
2388    # If we get a digester with a None algorithm, the underlying
2389    # implementation failed to calculate a digest, so we will need to
2390    # calculate one from scratch.
2391    if not digesters[alg]:
2392      digesters_succeeded = False
2393      break
2394
2395  if digesters_succeeded:
2396    local_hashes = _CreateDigestsFromDigesters(digesters)
2397  else:
2398    local_hashes = _CreateDigestsFromLocalFile(
2399        logger, hash_algs, file_name, final_file_name, src_obj_metadata)
2400
2401  digest_verified = True
2402  hash_invalid_exception = None
2403  try:
2404    _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
2405                 local_hashes)
2406    DeleteDownloadTrackerFiles(dst_url, api_selector)
2407  except HashMismatchException, e:
2408    # If an non-gzipped object gets sent with gzip content encoding, the hash
2409    # we calculate will match the gzipped bytes, not the original object. Thus,
2410    # we'll need to calculate and check it after unzipping.
2411    if server_gzip:
2412      logger.debug(
2413          'Hash did not match but server gzipped the content, will '
2414          'recalculate.')
2415      digest_verified = False
2416    elif api_selector == ApiSelector.XML:
2417      logger.debug(
2418          'Hash did not match but server may have gzipped the content, will '
2419          'recalculate.')
2420      # Save off the exception in case this isn't a gzipped file.
2421      hash_invalid_exception = e
2422      digest_verified = False
2423    else:
2424      DeleteDownloadTrackerFiles(dst_url, api_selector)
2425      if _RENAME_ON_HASH_MISMATCH:
2426        os.rename(file_name,
2427                  final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
2428      else:
2429        os.unlink(file_name)
2430      raise
2431
2432  if need_to_unzip or server_gzip:
2433    # Log that we're uncompressing if the file is big enough that
2434    # decompressing would make it look like the transfer "stalled" at the end.
2435    if bytes_transferred > TEN_MIB:
2436      logger.info(
2437          'Uncompressing temporarily gzipped file to %s...', final_file_name)
2438
2439    gzip_fp = None
2440    try:
2441      # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
2442      # suffix.
2443      gzip_fp = gzip.open(file_name, 'rb')
2444      with open(final_file_name, 'wb') as f_out:
2445        data = gzip_fp.read(GZIP_CHUNK_SIZE)
2446        while data:
2447          f_out.write(data)
2448          data = gzip_fp.read(GZIP_CHUNK_SIZE)
2449    except IOError, e:
2450      # In the XML case where we don't know if the file was gzipped, raise
2451      # the original hash exception if we find that it wasn't.
2452      if 'Not a gzipped file' in str(e) and hash_invalid_exception:
2453        # Linter improperly thinks we're raising None despite the above check.
2454        # pylint: disable=raising-bad-type
2455        raise hash_invalid_exception
2456    finally:
2457      if gzip_fp:
2458        gzip_fp.close()
2459
2460    os.unlink(file_name)
2461    file_name = final_file_name
2462
2463  if not digest_verified:
2464    try:
2465      # Recalculate hashes on the unzipped local file.
2466      local_hashes = _CreateDigestsFromLocalFile(
2467          logger, hash_algs, file_name, final_file_name, src_obj_metadata)
2468      _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
2469                   local_hashes)
2470      DeleteDownloadTrackerFiles(dst_url, api_selector)
2471    except HashMismatchException:
2472      DeleteDownloadTrackerFiles(dst_url, api_selector)
2473      if _RENAME_ON_HASH_MISMATCH:
2474        os.rename(file_name,
2475                  file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
2476      else:
2477        os.unlink(file_name)
2478      raise
2479
2480  if file_name != final_file_name:
2481    # Data is still in a temporary file, so move it to a permanent location.
2482    if os.path.exists(final_file_name):
2483      os.unlink(final_file_name)
2484    os.rename(file_name,
2485              final_file_name)
2486
2487  if 'md5' in local_hashes:
2488    return local_hashes['md5']
2489
2490
2491def _CopyFileToFile(src_url, dst_url):
2492  """Copies a local file to a local file.
2493
2494  Args:
2495    src_url: Source FileUrl.
2496    dst_url: Destination FileUrl.
2497  Returns:
2498    (elapsed_time, bytes_transferred, dst_url, md5=None).
2499
2500  Raises:
2501    CommandException: if errors encountered.
2502  """
2503  src_fp = GetStreamFromFileUrl(src_url)
2504  dir_name = os.path.dirname(dst_url.object_name)
2505  if dir_name and not os.path.exists(dir_name):
2506    os.makedirs(dir_name)
2507  dst_fp = open(dst_url.object_name, 'wb')
2508  start_time = time.time()
2509  shutil.copyfileobj(src_fp, dst_fp)
2510  end_time = time.time()
2511  return (end_time - start_time, os.path.getsize(dst_url.object_name),
2512          dst_url, None)
2513
2514
2515def _DummyTrackerCallback(_):
2516  pass
2517
2518
2519# pylint: disable=undefined-variable
2520def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
2521                                dst_obj_metadata, preconditions, gsutil_api,
2522                                logger):
2523  """Copies from src_url to dst_url in "daisy chain" mode.
2524
2525  See -D OPTION documentation about what daisy chain mode is.
2526
2527  Args:
2528    src_url: Source CloudUrl
2529    src_obj_metadata: Metadata from source object
2530    dst_url: Destination CloudUrl
2531    dst_obj_metadata: Object-specific metadata that should be overidden during
2532                      the copy.
2533    preconditions: Preconditions to use for the copy.
2534    gsutil_api: gsutil Cloud API to use for the copy.
2535    logger: For outputting log messages.
2536
2537  Returns:
2538    (elapsed_time, bytes_transferred, dst_url with generation,
2539    md5 hash of destination) excluding overhead like initial GET.
2540
2541  Raises:
2542    CommandException: if errors encountered.
2543  """
2544  # We don't attempt to preserve ACLs across providers because
2545  # GCS and S3 support different ACLs and disjoint principals.
2546  if (global_copy_helper_opts.preserve_acl
2547      and src_url.scheme != dst_url.scheme):
2548    raise NotImplementedError(
2549        'Cross-provider cp -p not supported')
2550  if not global_copy_helper_opts.preserve_acl:
2551    dst_obj_metadata.acl = []
2552
2553  # Don't use callbacks for downloads on the daisy chain wrapper because
2554  # upload callbacks will output progress, but respect test hooks if present.
2555  progress_callback = None
2556  if global_copy_helper_opts.test_callback_file:
2557    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
2558      progress_callback = pickle.loads(test_fp.read()).call
2559
2560  start_time = time.time()
2561  upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
2562                                progress_callback=progress_callback)
2563  uploaded_object = None
2564  if src_obj_metadata.size == 0:
2565    # Resumable uploads of size 0 are not supported.
2566    uploaded_object = gsutil_api.UploadObject(
2567        upload_fp, object_metadata=dst_obj_metadata,
2568        canned_acl=global_copy_helper_opts.canned_acl,
2569        preconditions=preconditions, provider=dst_url.scheme,
2570        fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
2571  else:
2572    # TODO: Support process-break resumes. This will resume across connection
2573    # breaks and server errors, but the tracker callback is a no-op so this
2574    # won't resume across gsutil runs.
2575    # TODO: Test retries via test_callback_file.
2576    uploaded_object = gsutil_api.UploadObjectResumable(
2577        upload_fp, object_metadata=dst_obj_metadata,
2578        canned_acl=global_copy_helper_opts.canned_acl,
2579        preconditions=preconditions, provider=dst_url.scheme,
2580        fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
2581        progress_callback=FileProgressCallbackHandler(
2582            ConstructAnnounceText('Uploading', dst_url.url_string),
2583            logger).call,
2584        tracker_callback=_DummyTrackerCallback)
2585  end_time = time.time()
2586
2587  try:
2588    _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
2589                      uploaded_object)
2590  except HashMismatchException:
2591    if _RENAME_ON_HASH_MISMATCH:
2592      corrupted_obj_metadata = apitools_messages.Object(
2593          name=dst_obj_metadata.name,
2594          bucket=dst_obj_metadata.bucket,
2595          etag=uploaded_object.etag)
2596      dst_obj_metadata.name = (dst_url.object_name +
2597                               _RENAME_ON_HASH_MISMATCH_SUFFIX)
2598      gsutil_api.CopyObject(corrupted_obj_metadata,
2599                            dst_obj_metadata, provider=dst_url.scheme)
2600    # If the digest doesn't match, delete the object.
2601    gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
2602                            generation=uploaded_object.generation,
2603                            provider=dst_url.scheme)
2604    raise
2605
2606  result_url = dst_url.Clone()
2607  result_url.generation = GenerationFromUrlAndString(
2608      result_url, uploaded_object.generation)
2609
2610  return (end_time - start_time, src_obj_metadata.size, result_url,
2611          uploaded_object.md5Hash)
2612
2613
2614# pylint: disable=undefined-variable
2615# pylint: disable=too-many-statements
2616def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
2617                copy_exception_handler, allow_splitting=True,
2618                headers=None, manifest=None, gzip_exts=None):
2619  """Performs copy from src_url to dst_url, handling various special cases.
2620
2621  Args:
2622    logger: for outputting log messages.
2623    src_url: Source StorageUrl.
2624    dst_url: Destination StorageUrl.
2625    gsutil_api: gsutil Cloud API instance to use for the copy.
2626    command_obj: command object for use in Apply in parallel composite uploads
2627        and sliced object downloads.
2628    copy_exception_handler: for handling copy exceptions during Apply.
2629    allow_splitting: Whether to allow the file to be split into component
2630                     pieces for an parallel composite upload or download.
2631    headers: optional headers to use for the copy operation.
2632    manifest: optional manifest for tracking copy operations.
2633    gzip_exts: List of file extensions to gzip for uploads, if any.
2634
2635  Returns:
2636    (elapsed_time, bytes_transferred, version-specific dst_url) excluding
2637    overhead like initial GET.
2638
2639  Raises:
2640    ItemExistsError: if no clobber flag is specified and the destination
2641        object already exists.
2642    SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
2643        and the source is an unsupported type.
2644    CommandException: if other errors encountered.
2645  """
2646  if headers:
2647    dst_obj_headers = headers.copy()
2648  else:
2649    dst_obj_headers = {}
2650
2651  # Create a metadata instance for each destination object so metadata
2652  # such as content-type can be applied per-object.
2653  # Initialize metadata from any headers passed in via -h.
2654  dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
2655
2656  if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
2657    preconditions = PreconditionsFromHeaders(dst_obj_headers)
2658  else:
2659    preconditions = Preconditions()
2660
2661  src_obj_metadata = None
2662  src_obj_filestream = None
2663  if src_url.IsCloudUrl():
2664    src_obj_fields = None
2665    if dst_url.IsCloudUrl():
2666      # For cloud or daisy chain copy, we need every copyable field.
2667      # If we're not modifying or overriding any of the fields, we can get
2668      # away without retrieving the object metadata because the copy
2669      # operation can succeed with just the destination bucket and object
2670      # name.  But if we are sending any metadata, the JSON API will expect a
2671      # complete object resource.  Since we want metadata like the object size
2672      # for our own tracking, we just get all of the metadata here.
2673      src_obj_fields = ['cacheControl', 'componentCount',
2674                        'contentDisposition', 'contentEncoding',
2675                        'contentLanguage', 'contentType', 'crc32c',
2676                        'etag', 'generation', 'md5Hash', 'mediaLink',
2677                        'metadata', 'metageneration', 'size']
2678      # We only need the ACL if we're going to preserve it.
2679      if global_copy_helper_opts.preserve_acl:
2680        src_obj_fields.append('acl')
2681      if (src_url.scheme == dst_url.scheme
2682          and not global_copy_helper_opts.daisy_chain):
2683        copy_in_the_cloud = True
2684      else:
2685        copy_in_the_cloud = False
2686    else:
2687      # Just get the fields needed to validate the download.
2688      src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
2689                        'mediaLink', 'md5Hash', 'size', 'generation']
2690
2691    if (src_url.scheme == 's3' and
2692        global_copy_helper_opts.skip_unsupported_objects):
2693      src_obj_fields.append('storageClass')
2694
2695    try:
2696      src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
2697      src_obj_metadata = gsutil_api.GetObjectMetadata(
2698          src_url.bucket_name, src_url.object_name,
2699          generation=src_generation, provider=src_url.scheme,
2700          fields=src_obj_fields)
2701    except NotFoundException:
2702      raise CommandException(
2703          'NotFoundException: Could not retrieve source object %s.' %
2704          src_url.url_string)
2705    if (src_url.scheme == 's3' and
2706        global_copy_helper_opts.skip_unsupported_objects and
2707        src_obj_metadata.storageClass == 'GLACIER'):
2708      raise SkipGlacierError()
2709
2710    src_obj_size = src_obj_metadata.size
2711    dst_obj_metadata.contentType = src_obj_metadata.contentType
2712    if global_copy_helper_opts.preserve_acl:
2713      dst_obj_metadata.acl = src_obj_metadata.acl
2714      # Special case for S3-to-S3 copy URLs using
2715      # global_copy_helper_opts.preserve_acl.
2716      # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
2717      # is not s3 (and thus differs from src_url).
2718      if src_url.scheme == 's3':
2719        acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
2720        if acl_text:
2721          AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
2722  else:
2723    try:
2724      src_obj_filestream = GetStreamFromFileUrl(src_url)
2725    except Exception, e:  # pylint: disable=broad-except
2726      if command_obj.continue_on_error:
2727        message = 'Error copying %s: %s' % (src_url, str(e))
2728        command_obj.op_failure_count += 1
2729        logger.error(message)
2730        return
2731      else:
2732        raise CommandException('Error opening file "%s": %s.' % (src_url,
2733                                                                 e.message))
2734    if src_url.IsStream():
2735      src_obj_size = None
2736    else:
2737      src_obj_size = os.path.getsize(src_url.object_name)
2738
2739  if global_copy_helper_opts.use_manifest:
2740    # Set the source size in the manifest.
2741    manifest.Set(src_url.url_string, 'size', src_obj_size)
2742
2743  if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
2744      and src_url != 's3'):
2745    raise CommandException(
2746        '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
2747        'objects greater than %s in size require multipart uploads, which '
2748        'gsutil does not support.' % (src_url,
2749                                      MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))
2750
2751  # On Windows, stdin is opened as text mode instead of binary which causes
2752  # problems when piping a binary file, so this switches it to binary mode.
2753  if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
2754    msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
2755
2756  if global_copy_helper_opts.no_clobber:
2757    # There are two checks to prevent clobbering:
2758    # 1) The first check is to see if the URL
2759    #    already exists at the destination and prevent the upload/download
2760    #    from happening. This is done by the exists() call.
2761    # 2) The second check is only relevant if we are writing to gs. We can
2762    #    enforce that the server only writes the object if it doesn't exist
2763    #    by specifying the header below. This check only happens at the
2764    #    server after the complete file has been uploaded. We specify this
2765    #    header to prevent a race condition where a destination file may
2766    #    be created after the first check and before the file is fully
2767    #    uploaded.
2768    # In order to save on unnecessary uploads/downloads we perform both
2769    # checks. However, this may come at the cost of additional HTTP calls.
2770    if preconditions.gen_match:
2771      raise ArgumentException('Specifying x-goog-if-generation-match is '
2772                              'not supported with cp -n')
2773    else:
2774      preconditions.gen_match = 0
2775    if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
2776      # The local file may be a partial. Check the file sizes.
2777      if src_obj_size == os.path.getsize(dst_url.object_name):
2778        raise ItemExistsError()
2779    elif dst_url.IsCloudUrl():
2780      try:
2781        dst_object = gsutil_api.GetObjectMetadata(
2782            dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
2783      except NotFoundException:
2784        dst_object = None
2785      if dst_object:
2786        raise ItemExistsError()
2787
2788  if dst_url.IsCloudUrl():
2789    # Cloud storage API gets object and bucket name from metadata.
2790    dst_obj_metadata.name = dst_url.object_name
2791    dst_obj_metadata.bucket = dst_url.bucket_name
2792    if src_url.IsCloudUrl():
2793      # Preserve relevant metadata from the source object if it's not already
2794      # provided from the headers.
2795      CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
2796      src_obj_metadata.name = src_url.object_name
2797      src_obj_metadata.bucket = src_url.bucket_name
2798    else:
2799      _SetContentTypeFromFile(src_url, dst_obj_metadata)
2800  else:
2801    # Files don't have Cloud API metadata.
2802    dst_obj_metadata = None
2803
2804  _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
2805
2806  if src_url.IsCloudUrl():
2807    if dst_url.IsFileUrl():
2808      return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2809                                   gsutil_api, logger, command_obj,
2810                                   copy_exception_handler,
2811                                   allow_splitting=allow_splitting)
2812    elif copy_in_the_cloud:
2813      return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
2814                                     dst_obj_metadata, preconditions,
2815                                     gsutil_api, logger)
2816    else:
2817      return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
2818                                         dst_url, dst_obj_metadata,
2819                                         preconditions, gsutil_api, logger)
2820  else:  # src_url.IsFileUrl()
2821    if dst_url.IsCloudUrl():
2822      return _UploadFileToObject(
2823          src_url, src_obj_filestream, src_obj_size, dst_url,
2824          dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
2825          copy_exception_handler, gzip_exts=gzip_exts,
2826          allow_splitting=allow_splitting)
2827    else:  # dst_url.IsFileUrl()
2828      return _CopyFileToFile(src_url, dst_url)
2829
2830
2831class Manifest(object):
2832  """Stores the manifest items for the CpCommand class."""
2833
2834  def __init__(self, path):
2835    # self.items contains a dictionary of rows
2836    self.items = {}
2837    self.manifest_filter = {}
2838    self.lock = CreateLock()
2839
2840    self.manifest_path = os.path.expanduser(path)
2841    self._ParseManifest()
2842    self._CreateManifestFile()
2843
2844  def _ParseManifest(self):
2845    """Load and parse a manifest file.
2846
2847    This information will be used to skip any files that have a skip or OK
2848    status.
2849    """
2850    try:
2851      if os.path.exists(self.manifest_path):
2852        with open(self.manifest_path, 'rb') as f:
2853          first_row = True
2854          reader = csv.reader(f)
2855          for row in reader:
2856            if first_row:
2857              try:
2858                source_index = row.index('Source')
2859                result_index = row.index('Result')
2860              except ValueError:
2861                # No header and thus not a valid manifest file.
2862                raise CommandException(
2863                    'Missing headers in manifest file: %s' % self.manifest_path)
2864            first_row = False
2865            source = row[source_index]
2866            result = row[result_index]
2867            if result in ['OK', 'skip']:
2868              # We're always guaranteed to take the last result of a specific
2869              # source url.
2870              self.manifest_filter[source] = result
2871    except IOError:
2872      raise CommandException('Could not parse %s' % self.manifest_path)
2873
2874  def WasSuccessful(self, src):
2875    """Returns whether the specified src url was marked as successful."""
2876    return src in self.manifest_filter
2877
2878  def _CreateManifestFile(self):
2879    """Opens the manifest file and assigns it to the file pointer."""
2880    try:
2881      if ((not os.path.exists(self.manifest_path))
2882          or (os.stat(self.manifest_path).st_size == 0)):
2883        # Add headers to the new file.
2884        with open(self.manifest_path, 'wb', 1) as f:
2885          writer = csv.writer(f)
2886          writer.writerow(['Source',
2887                           'Destination',
2888                           'Start',
2889                           'End',
2890                           'Md5',
2891                           'UploadId',
2892                           'Source Size',
2893                           'Bytes Transferred',
2894                           'Result',
2895                           'Description'])
2896    except IOError:
2897      raise CommandException('Could not create manifest file.')
2898
2899  def Set(self, url, key, value):
2900    if value is None:
2901      # In case we don't have any information to set we bail out here.
2902      # This is so that we don't clobber existing information.
2903      # To zero information pass '' instead of None.
2904      return
2905    if url in self.items:
2906      self.items[url][key] = value
2907    else:
2908      self.items[url] = {key: value}
2909
2910  def Initialize(self, source_url, destination_url):
2911    # Always use the source_url as the key for the item. This is unique.
2912    self.Set(source_url, 'source_uri', source_url)
2913    self.Set(source_url, 'destination_uri', destination_url)
2914    self.Set(source_url, 'start_time', datetime.datetime.utcnow())
2915
2916  def SetResult(self, source_url, bytes_transferred, result,
2917                description=''):
2918    self.Set(source_url, 'bytes', bytes_transferred)
2919    self.Set(source_url, 'result', result)
2920    self.Set(source_url, 'description', description)
2921    self.Set(source_url, 'end_time', datetime.datetime.utcnow())
2922    self._WriteRowToManifestFile(source_url)
2923    self._RemoveItemFromManifest(source_url)
2924
2925  def _WriteRowToManifestFile(self, url):
2926    """Writes a manifest entry to the manifest file for the url argument."""
2927    row_item = self.items[url]
2928    data = [
2929        str(row_item['source_uri'].encode(UTF8)),
2930        str(row_item['destination_uri'].encode(UTF8)),
2931        '%sZ' % row_item['start_time'].isoformat(),
2932        '%sZ' % row_item['end_time'].isoformat(),
2933        row_item['md5'] if 'md5' in row_item else '',
2934        row_item['upload_id'] if 'upload_id' in row_item else '',
2935        str(row_item['size']) if 'size' in row_item else '',
2936        str(row_item['bytes']) if 'bytes' in row_item else '',
2937        row_item['result'],
2938        row_item['description'].encode(UTF8)]
2939
2940    # Aquire a lock to prevent multiple threads writing to the same file at
2941    # the same time. This would cause a garbled mess in the manifest file.
2942    with self.lock:
2943      with open(self.manifest_path, 'a', 1) as f:  # 1 == line buffered
2944        writer = csv.writer(f)
2945        writer.writerow(data)
2946
2947  def _RemoveItemFromManifest(self, url):
2948    # Remove the item from the dictionary since we're done with it and
2949    # we don't want the dictionary to grow too large in memory for no good
2950    # reason.
2951    del self.items[url]
2952
2953
2954class ItemExistsError(Exception):
2955  """Exception class for objects that are skipped because they already exist."""
2956  pass
2957
2958
2959class SkipUnsupportedObjectError(Exception):
2960  """Exception for objects skipped because they are an unsupported type."""
2961
2962  def __init__(self):
2963    super(SkipUnsupportedObjectError, self).__init__()
2964    self.unsupported_type = 'Unknown'
2965
2966
2967class SkipGlacierError(SkipUnsupportedObjectError):
2968  """Exception for objects skipped because they are an unsupported type."""
2969
2970  def __init__(self):
2971    super(SkipGlacierError, self).__init__()
2972    self.unsupported_type = 'GLACIER'
2973
2974
2975def GetPathBeforeFinalDir(url):
2976  """Returns the path section before the final directory component of the URL.
2977
2978  This handles cases for file system directories, bucket, and bucket
2979  subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
2980  and for file://dir we'll return file://
2981
2982  Args:
2983    url: StorageUrl representing a filesystem directory, cloud bucket or
2984         bucket subdir.
2985
2986  Returns:
2987    String name of above-described path, sans final path separator.
2988  """
2989  sep = url.delim
2990  if url.IsFileUrl():
2991    past_scheme = url.url_string[len('file://'):]
2992    if past_scheme.find(sep) == -1:
2993      return 'file://'
2994    else:
2995      return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
2996  if url.IsBucket():
2997    return '%s://' % url.scheme
2998  # Else it names a bucket subdir.
2999  return url.url_string.rstrip(sep).rpartition(sep)[0]
3000
3001
3002def _GetPartitionInfo(file_size, max_components, default_component_size):
3003  """Gets info about a file partition for parallel file/object transfers.
3004
3005  Args:
3006    file_size: The number of bytes in the file to be partitioned.
3007    max_components: The maximum number of components that can be composed.
3008    default_component_size: The size of a component, assuming that
3009                            max_components is infinite.
3010  Returns:
3011    The number of components in the partitioned file, and the size of each
3012    component (except the last, which will have a different size iff
3013    file_size != 0 (mod num_components)).
3014  """
3015  # num_components = ceil(file_size / default_component_size)
3016  num_components = DivideAndCeil(file_size, default_component_size)
3017
3018  # num_components must be in the range [2, max_components]
3019  num_components = max(min(num_components, max_components), 2)
3020
3021  # component_size = ceil(file_size / num_components)
3022  component_size = DivideAndCeil(file_size, num_components)
3023  return (num_components, component_size)
3024
3025
3026def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
3027  """Wrapper func to be used with command.Apply to delete temporary objects."""
3028  gsutil_api = GetCloudApiInstance(cls, thread_state)
3029  try:
3030    gsutil_api.DeleteObject(
3031        url_to_delete.bucket_name, url_to_delete.object_name,
3032        generation=url_to_delete.generation, provider=url_to_delete.scheme)
3033  except NotFoundException:
3034    # The temporary object could already be gone if a retry was
3035    # issued at a lower layer but the original request succeeded.
3036    # Barring other errors, the top-level command should still report success,
3037    # so don't raise here.
3038    pass
3039
3040
3041def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
3042  """Parse the tracker file from the last parallel composite upload attempt.
3043
3044  If it exists, the tracker file is of the format described in
3045  _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
3046  read, then the upload will start from the beginning.
3047
3048  Args:
3049    tracker_file: The name of the file to parse.
3050    tracker_file_lock: Lock protecting access to the tracker file.
3051
3052  Returns:
3053    random_prefix: A randomly-generated prefix to the name of the
3054                   temporary components.
3055    existing_objects: A list of ObjectFromTracker objects representing
3056                      the set of files that have already been uploaded.
3057  """
3058
3059  def GenerateRandomPrefix():
3060    return str(random.randint(1, (10 ** 10) - 1))
3061
3062  existing_objects = []
3063  try:
3064    with tracker_file_lock:
3065      with open(tracker_file, 'r') as fp:
3066        lines = fp.readlines()
3067        lines = [line.strip() for line in lines]
3068        if not lines:
3069          print('Parallel upload tracker file (%s) was invalid. '
3070                'Restarting upload from scratch.' % tracker_file)
3071          lines = [GenerateRandomPrefix()]
3072
3073  except IOError as e:
3074    # We can't read the tracker file, so generate a new random prefix.
3075    lines = [GenerateRandomPrefix()]
3076
3077    # Ignore non-existent file (happens first time an upload
3078    # is attempted on a file), but warn user for other errors.
3079    if e.errno != errno.ENOENT:
3080      # Will restart because we failed to read in the file.
3081      print('Couldn\'t read parallel upload tracker file (%s): %s. '
3082            'Restarting upload from scratch.' % (tracker_file, e.strerror))
3083
3084  # The first line contains the randomly-generated prefix.
3085  random_prefix = lines[0]
3086
3087  # The remaining lines were written in pairs to describe a single component
3088  # in the form:
3089  #   object_name (without random prefix)
3090  #   generation
3091  # Newlines are used as the delimiter because only newlines and carriage
3092  # returns are invalid characters in object names, and users can specify
3093  # a custom prefix in the config file.
3094  i = 1
3095  while i < len(lines):
3096    (name, generation) = (lines[i], lines[i+1])
3097    if not generation:
3098      # Cover the '' case.
3099      generation = None
3100    existing_objects.append(ObjectFromTracker(name, generation))
3101    i += 2
3102  return (random_prefix, existing_objects)
3103
3104
3105def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
3106                                                       tracker_file_lock):
3107  """Appends info about the uploaded component to an existing tracker file.
3108
3109  Follows the format described in _CreateParallelUploadTrackerFile.
3110
3111  Args:
3112    tracker_file: Tracker file to append to.
3113    component: Component that was uploaded.
3114    tracker_file_lock: Thread and process-safe Lock for the tracker file.
3115  """
3116  lines = _GetParallelUploadTrackerFileLinesForComponents([component])
3117  lines = [line + '\n' for line in lines]
3118  with tracker_file_lock:
3119    with open(tracker_file, 'a') as f:
3120      f.writelines(lines)
3121
3122
3123def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
3124                                     tracker_file_lock):
3125  """Writes information about components that were successfully uploaded.
3126
3127  This way the upload can be resumed at a later date. The tracker file has
3128  the format:
3129    random_prefix
3130    temp_object_1_name
3131    temp_object_1_generation
3132    .
3133    .
3134    .
3135    temp_object_N_name
3136    temp_object_N_generation
3137    where N is the number of components that have been successfully uploaded.
3138
3139  Args:
3140    tracker_file: The name of the parallel upload tracker file.
3141    random_prefix: The randomly-generated prefix that was used for
3142                   for uploading any existing components.
3143    components: A list of ObjectFromTracker objects that were uploaded.
3144    tracker_file_lock: The lock protecting access to the tracker file.
3145  """
3146  lines = [random_prefix]
3147  lines += _GetParallelUploadTrackerFileLinesForComponents(components)
3148  lines = [line + '\n' for line in lines]
3149  try:
3150    with tracker_file_lock:
3151      open(tracker_file, 'w').close()  # Clear the file.
3152      with open(tracker_file, 'w') as f:
3153        f.writelines(lines)
3154  except IOError as e:
3155    RaiseUnwritableTrackerFileException(tracker_file, e.strerror)
3156
3157
3158def _GetParallelUploadTrackerFileLinesForComponents(components):
3159  """Return a list of the lines for use in a parallel upload tracker file.
3160
3161  The lines represent the given components, using the format as described in
3162  _CreateParallelUploadTrackerFile.
3163
3164  Args:
3165    components: A list of ObjectFromTracker objects that were uploaded.
3166
3167  Returns:
3168    Lines describing components with their generation for outputting to the
3169    tracker file.
3170  """
3171  lines = []
3172  for component in components:
3173    generation = None
3174    generation = component.generation
3175    if not generation:
3176      generation = ''
3177    lines += [component.object_name, str(generation)]
3178  return lines
3179
3180
3181def FilterExistingComponents(dst_args, existing_components, bucket_url,
3182                             gsutil_api):
3183  """Determines course of action for component objects.
3184
3185  Given the list of all target objects based on partitioning the file and
3186  the list of objects that have already been uploaded successfully,
3187  this function determines which objects should be uploaded, which
3188  existing components are still valid, and which existing components should
3189  be deleted.
3190
3191  Args:
3192    dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
3193              calculated by partitioning the file.
3194    existing_components: A list of ObjectFromTracker objects that have been
3195                         uploaded in the past.
3196    bucket_url: CloudUrl of the bucket in which the components exist.
3197    gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.
3198
3199  Returns:
3200    components_to_upload: List of components that need to be uploaded.
3201    uploaded_components: List of components that have already been
3202                         uploaded and are still valid.
3203    existing_objects_to_delete: List of components that have already
3204                                been uploaded, but are no longer valid
3205                                and are in a versioned bucket, and
3206                                therefore should be deleted.
3207  """
3208  components_to_upload = []
3209  existing_component_names = [component.object_name
3210                              for component in existing_components]
3211  for component_name in dst_args:
3212    if component_name not in existing_component_names:
3213      components_to_upload.append(dst_args[component_name])
3214
3215  objects_already_chosen = []
3216
3217  # Don't reuse any temporary components whose MD5 doesn't match the current
3218  # MD5 of the corresponding part of the file. If the bucket is versioned,
3219  # also make sure that we delete the existing temporary version.
3220  existing_objects_to_delete = []
3221  uploaded_components = []
3222  for tracker_object in existing_components:
3223    if (tracker_object.object_name not in dst_args.keys()
3224        or tracker_object.object_name in objects_already_chosen):
3225      # This could happen if the component size has changed. This also serves
3226      # to handle object names that get duplicated in the tracker file due
3227      # to people doing things they shouldn't (e.g., overwriting an existing
3228      # temporary component in a versioned bucket).
3229
3230      url = bucket_url.Clone()
3231      url.object_name = tracker_object.object_name
3232      url.generation = tracker_object.generation
3233      existing_objects_to_delete.append(url)
3234      continue
3235
3236    dst_arg = dst_args[tracker_object.object_name]
3237    file_part = FilePart(dst_arg.filename, dst_arg.file_start,
3238                         dst_arg.file_length)
3239    # TODO: calculate MD5's in parallel when possible.
3240    content_md5 = CalculateB64EncodedMd5FromContents(file_part)
3241
3242    try:
3243      # Get the MD5 of the currently-existing component.
3244      dst_url = dst_arg.dst_url
3245      dst_metadata = gsutil_api.GetObjectMetadata(
3246          dst_url.bucket_name, dst_url.object_name,
3247          generation=dst_url.generation, provider=dst_url.scheme,
3248          fields=['md5Hash', 'etag'])
3249      cloud_md5 = dst_metadata.md5Hash
3250    except Exception:  # pylint: disable=broad-except
3251      # We don't actually care what went wrong - we couldn't retrieve the
3252      # object to check the MD5, so just upload it again.
3253      cloud_md5 = None
3254
3255    if cloud_md5 != content_md5:
3256      components_to_upload.append(dst_arg)
3257      objects_already_chosen.append(tracker_object.object_name)
3258      if tracker_object.generation:
3259        # If the old object doesn't have a generation (i.e., it isn't in a
3260        # versioned bucket), then we will just overwrite it anyway.
3261        invalid_component_with_generation = dst_arg.dst_url.Clone()
3262        invalid_component_with_generation.generation = tracker_object.generation
3263        existing_objects_to_delete.append(invalid_component_with_generation)
3264    else:
3265      url = dst_arg.dst_url.Clone()
3266      url.generation = tracker_object.generation
3267      uploaded_components.append(url)
3268      objects_already_chosen.append(tracker_object.object_name)
3269
3270  if uploaded_components:
3271    logging.info('Found %d existing temporary components to reuse.',
3272                 len(uploaded_components))
3273
3274  return (components_to_upload, uploaded_components,
3275          existing_objects_to_delete)
3276