• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2010 Google Inc. All Rights Reserved.
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22"""Boto translation layer for resumable uploads.
23
24See http://code.google.com/apis/storage/docs/developer-guide.html#resumable
25for details.
26
27Resumable uploads will retry interrupted uploads, resuming at the byte
28count completed by the last upload attempt. If too many retries happen with
29no progress (per configurable num_retries param), the upload will be
30aborted in the current process.
31
32Unlike the boto implementation of resumable upload handler, this class does
33not directly interact with tracker files.
34
35Originally Google wrote and contributed this code to the boto project,
36then copied that code back into gsutil on the release of gsutil 4.0 which
37supports both boto and non-boto codepaths for resumable uploads.  Any bug
38fixes made to this file should also be integrated to resumable_upload_handler.py
39in boto, where applicable.
40
41TODO: gsutil-beta: Add a similar comment to the boto code.
42"""
43
44from __future__ import absolute_import
45
46import errno
47import httplib
48import random
49import re
50import socket
51import time
52import urlparse
53from boto import UserAgent
54from boto.connection import AWSAuthConnection
55from boto.exception import ResumableTransferDisposition
56from boto.exception import ResumableUploadException
57from gslib.exception import InvalidUrlError
58from gslib.util import GetMaxRetryDelay
59from gslib.util import GetNumRetries
60from gslib.util import XML_PROGRESS_CALLBACKS
61
62
63class BotoResumableUpload(object):
64  """Upload helper class for resumable uploads via boto."""
65
66  BUFFER_SIZE = 8192
67  RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
68                          socket.gaierror)
69
70  # (start, end) response indicating service has nothing (upload protocol uses
71  # inclusive numbering).
72  SERVICE_HAS_NOTHING = (0, -1)
73
74  def __init__(self, tracker_callback, logger,
75               resume_url=None, num_retries=None):
76    """Constructor. Instantiate once for each uploaded file.
77
78    Args:
79      tracker_callback: Callback function that takes a string argument.  Used
80                        by caller to track this upload across upload
81                        interruption.
82      logger: logging.logger instance to use for debug messages.
83      resume_url: If present, attempt to resume the upload at this URL.
84      num_retries: Number of times to retry the upload making no progress.
85                   This count resets every time we make progress, so the upload
86                   can span many more than this number of retries.
87    """
88    if resume_url:
89      self._SetUploadUrl(resume_url)
90    else:
91      self.upload_url = None
92    self.num_retries = num_retries
93    self.service_has_bytes = 0  # Byte count at last service check.
94    # Save upload_start_point in instance state so caller can find how
95    # much was transferred by this ResumableUploadHandler (across retries).
96    self.upload_start_point = None
97    self.tracker_callback = tracker_callback
98    self.logger = logger
99
100  def _SetUploadUrl(self, url):
101    """Saves URL and resets upload state.
102
103    Called when we start a new resumable upload or get a new tracker
104    URL for the upload.
105
106    Args:
107      url: URL string for the upload.
108
109    Raises InvalidUrlError if URL is syntactically invalid.
110    """
111    parse_result = urlparse.urlparse(url)
112    if (parse_result.scheme.lower() not in ['http', 'https'] or
113        not parse_result.netloc):
114      raise InvalidUrlError('Invalid upload URL (%s)' % url)
115    self.upload_url = url
116    self.upload_url_host = parse_result.netloc
117    self.upload_url_path = '%s?%s' % (
118        parse_result.path, parse_result.query)
119    self.service_has_bytes = 0
120
121  def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'):
122    return 'bytes %s/%s' % (range_spec, length_spec)
123
124  def _QueryServiceState(self, conn, file_length):
125    """Queries service to find out state of given upload.
126
127    Note that this method really just makes special case use of the
128    fact that the upload service always returns the current start/end
129    state whenever a PUT doesn't complete.
130
131    Args:
132      conn: HTTPConnection to use for the query.
133      file_length: Total length of the file.
134
135    Returns:
136      HTTP response from sending request.
137
138    Raises:
139      ResumableUploadException if problem querying service.
140    """
141    # Send an empty PUT so that service replies with this resumable
142    # transfer's state.
143    put_headers = {}
144    put_headers['Content-Range'] = (
145        self._BuildContentRangeHeader('*', file_length))
146    put_headers['Content-Length'] = '0'
147    return AWSAuthConnection.make_request(
148        conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path,
149        headers=put_headers, host=self.upload_url_host)
150
151  def _QueryServicePos(self, conn, file_length):
152    """Queries service to find out what bytes it currently has.
153
154    Args:
155      conn: HTTPConnection to use for the query.
156      file_length: Total length of the file.
157
158    Returns:
159      (service_start, service_end), where the values are inclusive.
160      For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2.
161
162    Raises:
163      ResumableUploadException if problem querying service.
164    """
165    resp = self._QueryServiceState(conn, file_length)
166    if resp.status == 200:
167      # To handle the boundary condition where the service has the complete
168      # file, we return (service_start, file_length-1). That way the
169      # calling code can always simply read up through service_end. (If we
170      # didn't handle this boundary condition here, the caller would have
171      # to check whether service_end == file_length and read one fewer byte
172      # in that case.)
173      return (0, file_length - 1)  # Completed upload.
174    if resp.status != 308:
175      # This means the service didn't have any state for the given
176      # upload ID, which can happen (for example) if the caller saved
177      # the upload URL to a file and then tried to restart the transfer
178      # after that upload ID has gone stale. In that case we need to
179      # start a new transfer (and the caller will then save the new
180      # upload URL to the tracker file).
181      raise ResumableUploadException(
182          'Got non-308 response (%s) from service state query' %
183          resp.status, ResumableTransferDisposition.START_OVER)
184    got_valid_response = False
185    range_spec = resp.getheader('range')
186    if range_spec:
187      # Parse 'bytes=<from>-<to>' range_spec.
188      m = re.search(r'bytes=(\d+)-(\d+)', range_spec)
189      if m:
190        service_start = long(m.group(1))
191        service_end = long(m.group(2))
192        got_valid_response = True
193    else:
194      # No Range header, which means the service does not yet have
195      # any bytes. Note that the Range header uses inclusive 'from'
196      # and 'to' values. Since Range 0-0 would mean that the service
197      # has byte 0, omitting the Range header is used to indicate that
198      # the service doesn't have any bytes.
199      return self.SERVICE_HAS_NOTHING
200    if not got_valid_response:
201      raise ResumableUploadException(
202          'Couldn\'t parse upload service state query response (%s)' %
203          str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
204    if conn.debug >= 1:
205      self.logger.debug('Service has: Range: %d - %d.', service_start,
206                        service_end)
207    return (service_start, service_end)
208
209  def _StartNewResumableUpload(self, key, headers=None):
210    """Starts a new resumable upload.
211
212    Args:
213      key: Boto Key representing the object to upload.
214      headers: Headers to use in the upload requests.
215
216    Raises:
217      ResumableUploadException if any errors occur.
218    """
219    conn = key.bucket.connection
220    if conn.debug >= 1:
221      self.logger.debug('Starting new resumable upload.')
222    self.service_has_bytes = 0
223
224    # Start a new resumable upload by sending a POST request with an
225    # empty body and the "X-Goog-Resumable: start" header. Include any
226    # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
227    # (and raise an exception if they tried to pass one, since it's
228    # a semantic error to specify it at this point, and if we were to
229    # include one now it would cause the service to expect that many
230    # bytes; the POST doesn't include the actual file bytes  We set
231    # the Content-Length in the subsequent PUT, based on the uploaded
232    # file size.
233    post_headers = {}
234    for k in headers:
235      if k.lower() == 'content-length':
236        raise ResumableUploadException(
237            'Attempt to specify Content-Length header (disallowed)',
238            ResumableTransferDisposition.ABORT)
239      post_headers[k] = headers[k]
240    post_headers[conn.provider.resumable_upload_header] = 'start'
241
242    resp = conn.make_request(
243        'POST', key.bucket.name, key.name, post_headers)
244    # Get upload URL from response 'Location' header.
245    body = resp.read()
246
247    # Check for various status conditions.
248    if resp.status in [429, 500, 503]:
249      # Retry after a delay.
250      raise ResumableUploadException(
251          'Got status %d from attempt to start resumable upload. '
252          'Will wait/retry' % resp.status,
253          ResumableTransferDisposition.WAIT_BEFORE_RETRY)
254    elif resp.status != 200 and resp.status != 201:
255      raise ResumableUploadException(
256          'Got status %d from attempt to start resumable upload. '
257          'Aborting' % resp.status,
258          ResumableTransferDisposition.ABORT)
259
260    # Else we got 200 or 201 response code, indicating the resumable
261    # upload was created.
262    upload_url = resp.getheader('Location')
263    if not upload_url:
264      raise ResumableUploadException(
265          'No resumable upload URL found in resumable initiation '
266          'POST response (%s)' % body,
267          ResumableTransferDisposition.WAIT_BEFORE_RETRY)
268    self._SetUploadUrl(upload_url)
269    self.tracker_callback(upload_url)
270
271  def _UploadFileBytes(self, conn, http_conn, fp, file_length,
272                       total_bytes_uploaded, cb, num_cb, headers):
273    """Attempts to upload file bytes.
274
275    Makes a single attempt using an existing resumable upload connection.
276
277    Args:
278      conn: HTTPConnection from the boto Key.
279      http_conn: Separate HTTPConnection for the transfer.
280      fp: File pointer containing bytes to upload.
281      file_length: Total length of the file.
282      total_bytes_uploaded: The total number of bytes uploaded.
283      cb: Progress callback function that takes (progress, total_size).
284      num_cb: Granularity of the callback (maximum number of times the
285              callback will be called during the file transfer). If negative,
286              perform callback with each buffer read.
287      headers: Headers to be used in the upload requests.
288
289    Returns:
290      (etag, generation, metageneration) from service upon success.
291
292    Raises:
293      ResumableUploadException if any problems occur.
294    """
295    buf = fp.read(self.BUFFER_SIZE)
296    if cb:
297      # The cb_count represents the number of full buffers to send between
298      # cb executions.
299      if num_cb > 2:
300        cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
301      elif num_cb < 0:
302        cb_count = -1
303      else:
304        cb_count = 0
305      i = 0
306      cb(total_bytes_uploaded, file_length)
307
308    # Build resumable upload headers for the transfer. Don't send a
309    # Content-Range header if the file is 0 bytes long, because the
310    # resumable upload protocol uses an *inclusive* end-range (so, sending
311    # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
312    put_headers = headers.copy() if headers else {}
313    if file_length:
314      if total_bytes_uploaded == file_length:
315        range_header = self._BuildContentRangeHeader(
316            '*', file_length)
317      else:
318        range_header = self._BuildContentRangeHeader(
319            '%d-%d' % (total_bytes_uploaded, file_length - 1),
320            file_length)
321      put_headers['Content-Range'] = range_header
322    # Set Content-Length to the total bytes we'll send with this PUT.
323    put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
324    http_request = AWSAuthConnection.build_base_http_request(
325        conn, 'PUT', path=self.upload_url_path, auth_path=None,
326        headers=put_headers, host=self.upload_url_host)
327    http_conn.putrequest('PUT', http_request.path)
328    for k in put_headers:
329      http_conn.putheader(k, put_headers[k])
330    http_conn.endheaders()
331
332    # Turn off debug on http connection so upload content isn't included
333    # in debug stream.
334    http_conn.set_debuglevel(0)
335    while buf:
336      http_conn.send(buf)
337      total_bytes_uploaded += len(buf)
338      if cb:
339        i += 1
340        if i == cb_count or cb_count == -1:
341          cb(total_bytes_uploaded, file_length)
342          i = 0
343      buf = fp.read(self.BUFFER_SIZE)
344    http_conn.set_debuglevel(conn.debug)
345    if cb:
346      cb(total_bytes_uploaded, file_length)
347    if total_bytes_uploaded != file_length:
348      # Abort (and delete the tracker file) so if the user retries
349      # they'll start a new resumable upload rather than potentially
350      # attempting to pick back up later where we left off.
351      raise ResumableUploadException(
352          'File changed during upload: EOF at %d bytes of %d byte file.' %
353          (total_bytes_uploaded, file_length),
354          ResumableTransferDisposition.ABORT)
355    resp = http_conn.getresponse()
356    # Restore http connection debug level.
357    http_conn.set_debuglevel(conn.debug)
358
359    if resp.status == 200:
360      # Success.
361      return (resp.getheader('etag'),
362              resp.getheader('x-goog-generation'),
363              resp.getheader('x-goog-metageneration'))
364    # Retry timeout (408) and status 429, 500 and 503 errors after a delay.
365    elif resp.status in [408, 429, 500, 503]:
366      disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
367    else:
368      # Catch all for any other error codes.
369      disposition = ResumableTransferDisposition.ABORT
370    raise ResumableUploadException('Got response code %d while attempting '
371                                   'upload (%s)' %
372                                   (resp.status, resp.reason), disposition)
373
374  def _AttemptResumableUpload(self, key, fp, file_length, headers, cb,
375                              num_cb):
376    """Attempts a resumable upload.
377
378    Args:
379      key: Boto key representing object to upload.
380      fp: File pointer containing upload bytes.
381      file_length: Total length of the upload.
382      headers: Headers to be used in upload requests.
383      cb: Progress callback function that takes (progress, total_size).
384      num_cb: Granularity of the callback (maximum number of times the
385              callback will be called during the file transfer). If negative,
386              perform callback with each buffer read.
387
388    Returns:
389      (etag, generation, metageneration) from service upon success.
390
391    Raises:
392      ResumableUploadException if any problems occur.
393    """
394    (service_start, service_end) = self.SERVICE_HAS_NOTHING
395    conn = key.bucket.connection
396    if self.upload_url:
397      # Try to resume existing resumable upload.
398      try:
399        (service_start, service_end) = (
400            self._QueryServicePos(conn, file_length))
401        self.service_has_bytes = service_start
402        if conn.debug >= 1:
403          self.logger.debug('Resuming transfer.')
404      except ResumableUploadException, e:
405        if conn.debug >= 1:
406          self.logger.debug('Unable to resume transfer (%s).', e.message)
407        self._StartNewResumableUpload(key, headers)
408    else:
409      self._StartNewResumableUpload(key, headers)
410
411    # upload_start_point allows the code that instantiated the
412    # ResumableUploadHandler to find out the point from which it started
413    # uploading (e.g., so it can correctly compute throughput).
414    if self.upload_start_point is None:
415      self.upload_start_point = service_end
416
417    total_bytes_uploaded = service_end + 1
418
419    # Start reading from the file based upon the number of bytes that the
420    # server has so far.
421    if total_bytes_uploaded < file_length:
422      fp.seek(total_bytes_uploaded)
423
424    conn = key.bucket.connection
425
426    # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
427    # pool connections) because httplib requires a new HTTP connection per
428    # transaction. (Without this, calling http_conn.getresponse() would get
429    # "ResponseNotReady".)
430    http_conn = conn.new_http_connection(self.upload_url_host, conn.port,
431                                         conn.is_secure)
432    http_conn.set_debuglevel(conn.debug)
433
434    # Make sure to close http_conn at end so if a local file read
435    # failure occurs partway through service will terminate current upload
436    # and can report that progress on next attempt.
437    try:
438      return self._UploadFileBytes(conn, http_conn, fp, file_length,
439                                   total_bytes_uploaded, cb, num_cb,
440                                   headers)
441    except (ResumableUploadException, socket.error):
442      resp = self._QueryServiceState(conn, file_length)
443      if resp.status == 400:
444        raise ResumableUploadException(
445            'Got 400 response from service state query after failed resumable '
446            'upload attempt. This can happen for various reasons, including '
447            'specifying an invalid request (e.g., an invalid canned ACL) or '
448            'if the file size changed between upload attempts',
449            ResumableTransferDisposition.ABORT)
450      else:
451        raise
452    finally:
453      http_conn.close()
454
455  def HandleResumableUploadException(self, e, debug):
456    if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS:
457      if debug >= 1:
458        self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
459                          'aborting but retaining tracker file', e.message)
460      raise
461    elif e.disposition == ResumableTransferDisposition.ABORT:
462      if debug >= 1:
463        self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
464                          'aborting and removing tracker file', e.message)
465      raise
466    elif e.disposition == ResumableTransferDisposition.START_OVER:
467      raise
468    else:
469      if debug >= 1:
470        self.logger.debug(
471            'Caught ResumableUploadException (%s) - will retry', e.message)
472
473  def TrackProgressLessIterations(self, service_had_bytes_before_attempt,
474                                  debug=0):
475    """Tracks the number of iterations without progress.
476
477    Performs randomized exponential backoff.
478
479    Args:
480      service_had_bytes_before_attempt: Number of bytes the service had prior
481                                       to this upload attempt.
482      debug: debug level 0..3
483    """
484    # At this point we had a re-tryable failure; see if made progress.
485    if self.service_has_bytes > service_had_bytes_before_attempt:
486      self.progress_less_iterations = 0   # If progress, reset counter.
487    else:
488      self.progress_less_iterations += 1
489
490    if self.progress_less_iterations > self.num_retries:
491      # Don't retry any longer in the current process.
492      raise ResumableUploadException(
493          'Too many resumable upload attempts failed without '
494          'progress. You might try this upload again later',
495          ResumableTransferDisposition.ABORT_CUR_PROCESS)
496
497    # Use binary exponential backoff to desynchronize client requests.
498    sleep_time_secs = min(random.random() * (2**self.progress_less_iterations),
499                          GetMaxRetryDelay())
500    if debug >= 1:
501      self.logger.debug('Got retryable failure (%d progress-less in a row).\n'
502                        'Sleeping %3.1f seconds before re-trying',
503                        self.progress_less_iterations, sleep_time_secs)
504    time.sleep(sleep_time_secs)
505
506  def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None,
507               num_cb=XML_PROGRESS_CALLBACKS):
508    """Upload a file to a key into a bucket on GS, resumable upload protocol.
509
510    Args:
511      key: `boto.s3.key.Key` or subclass representing the upload destination.
512      fp: File pointer to upload
513      size: Size of the file to upload.
514      headers: The headers to pass along with the PUT request
515      canned_acl: Optional canned ACL to apply to object.
516      cb: Callback function that will be called to report progress on
517          the upload.  The callback should accept two integer parameters, the
518          first representing the number of bytes that have been successfully
519          transmitted to GS, and the second representing the total number of
520          bytes that need to be transmitted.
521      num_cb: (optional) If a callback is specified with the cb parameter, this
522              parameter determines the granularity of the callback by defining
523              the maximum number of times the callback will be called during the
524              file transfer. Providing a negative integer will cause your
525              callback to be called with each buffer read.
526
527    Raises:
528      ResumableUploadException if a problem occurs during the transfer.
529    """
530
531    if not headers:
532      headers = {}
533    # If Content-Type header is present and set to None, remove it.
534    # This is gsutil's way of asking boto to refrain from auto-generating
535    # that header.
536    content_type = 'Content-Type'
537    if content_type in headers and headers[content_type] is None:
538      del headers[content_type]
539
540    if canned_acl:
541      headers[key.provider.acl_header] = canned_acl
542
543    headers['User-Agent'] = UserAgent
544
545    file_length = size
546    debug = key.bucket.connection.debug
547
548    # Use num-retries from constructor if one was provided; else check
549    # for a value specified in the boto config file; else default to 5.
550    if self.num_retries is None:
551      self.num_retries = GetNumRetries()
552    self.progress_less_iterations = 0
553
554    while True:  # Retry as long as we're making progress.
555      service_had_bytes_before_attempt = self.service_has_bytes
556      try:
557        # Save generation and metageneration in class state so caller
558        # can find these values, for use in preconditions of future
559        # operations on the uploaded object.
560        (_, self.generation, self.metageneration) = (
561            self._AttemptResumableUpload(key, fp, file_length,
562                                         headers, cb, num_cb))
563
564        key.generation = self.generation
565        if debug >= 1:
566          self.logger.debug('Resumable upload complete.')
567        return
568      except self.RETRYABLE_EXCEPTIONS, e:
569        if debug >= 1:
570          self.logger.debug('Caught exception (%s)', e.__repr__())
571        if isinstance(e, IOError) and e.errno == errno.EPIPE:
572          # Broken pipe error causes httplib to immediately
573          # close the socket (http://bugs.python.org/issue5542),
574          # so we need to close the connection before we resume
575          # the upload (which will cause a new connection to be
576          # opened the next time an HTTP request is sent).
577          key.bucket.connection.connection.close()
578      except ResumableUploadException, e:
579        self.HandleResumableUploadException(e, debug)
580
581      self.TrackProgressLessIterations(service_had_bytes_before_attempt,
582                                       debug=debug)
583