• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Upload and download support for apitools."""
18from __future__ import print_function
19
20import email.generator as email_generator
21import email.mime.multipart as mime_multipart
22import email.mime.nonmultipart as mime_nonmultipart
23import io
24import json
25import mimetypes
26import os
27import threading
28
29import six
30from six.moves import http_client
31
32from apitools.base.py import buffered_stream
33from apitools.base.py import exceptions
34from apitools.base.py import http_wrapper
35from apitools.base.py import stream_slice
36from apitools.base.py import util
37
38__all__ = [
39    'Download',
40    'Upload',
41    'RESUMABLE_UPLOAD',
42    'SIMPLE_UPLOAD',
43    'DownloadProgressPrinter',
44    'DownloadCompletePrinter',
45    'UploadProgressPrinter',
46    'UploadCompletePrinter',
47]
48
49_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
50SIMPLE_UPLOAD = 'simple'
51RESUMABLE_UPLOAD = 'resumable'
52
53
54def DownloadProgressPrinter(response, unused_download):
55    """Print download progress based on response."""
56    if 'content-range' in response.info:
57        print('Received %s' % response.info['content-range'])
58    else:
59        print('Received %d bytes' % response.length)
60
61
62def DownloadCompletePrinter(unused_response, unused_download):
63    """Print information about a completed download."""
64    print('Download complete')
65
66
67def UploadProgressPrinter(response, unused_upload):
68    """Print upload progress based on response."""
69    print('Sent %s' % response.info['range'])
70
71
72def UploadCompletePrinter(unused_response, unused_upload):
73    """Print information about a completed upload."""
74    print('Upload complete')
75
76
77class _Transfer(object):
78
79    """Generic bits common to Uploads and Downloads."""
80
81    def __init__(self, stream, close_stream=False, chunksize=None,
82                 auto_transfer=True, http=None, num_retries=5):
83        self.__bytes_http = None
84        self.__close_stream = close_stream
85        self.__http = http
86        self.__stream = stream
87        self.__url = None
88
89        self.__num_retries = 5
90        # Let the @property do validation
91        self.num_retries = num_retries
92
93        self.retry_func = (
94            http_wrapper.HandleExceptionsAndRebuildHttpConnections)
95        self.auto_transfer = auto_transfer
96        self.chunksize = chunksize or 1048576
97
98    def __repr__(self):
99        return str(self)
100
101    @property
102    def close_stream(self):
103        return self.__close_stream
104
105    @property
106    def http(self):
107        return self.__http
108
109    @property
110    def bytes_http(self):
111        return self.__bytes_http or self.http
112
113    @bytes_http.setter
114    def bytes_http(self, value):
115        self.__bytes_http = value
116
117    @property
118    def num_retries(self):
119        return self.__num_retries
120
121    @num_retries.setter
122    def num_retries(self, value):
123        util.Typecheck(value, six.integer_types)
124        if value < 0:
125            raise exceptions.InvalidDataError(
126                'Cannot have negative value for num_retries')
127        self.__num_retries = value
128
129    @property
130    def stream(self):
131        return self.__stream
132
133    @property
134    def url(self):
135        return self.__url
136
137    def _Initialize(self, http, url):
138        """Initialize this download by setting self.http and self.url.
139
140        We want the user to be able to override self.http by having set
141        the value in the constructor; in that case, we ignore the provided
142        http.
143
144        Args:
145          http: An httplib2.Http instance or None.
146          url: The url for this transfer.
147
148        Returns:
149          None. Initializes self.
150        """
151        self.EnsureUninitialized()
152        if self.http is None:
153            self.__http = http or http_wrapper.GetHttp()
154        self.__url = url
155
156    @property
157    def initialized(self):
158        return self.url is not None and self.http is not None
159
160    @property
161    def _type_name(self):
162        return type(self).__name__
163
164    def EnsureInitialized(self):
165        if not self.initialized:
166            raise exceptions.TransferInvalidError(
167                'Cannot use uninitialized %s', self._type_name)
168
169    def EnsureUninitialized(self):
170        if self.initialized:
171            raise exceptions.TransferInvalidError(
172                'Cannot re-initialize %s', self._type_name)
173
174    def __del__(self):
175        if self.__close_stream:
176            self.__stream.close()
177
178    def _ExecuteCallback(self, callback, response):
179        # TODO(craigcitro): Push these into a queue.
180        if callback is not None:
181            threading.Thread(target=callback, args=(response, self)).start()
182
183
184class Download(_Transfer):
185
186    """Data for a single download.
187
188    Public attributes:
189      chunksize: default chunksize to use for transfers.
190    """
191    _ACCEPTABLE_STATUSES = set((
192        http_client.OK,
193        http_client.NO_CONTENT,
194        http_client.PARTIAL_CONTENT,
195        http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
196    ))
197    _REQUIRED_SERIALIZATION_KEYS = set((
198        'auto_transfer', 'progress', 'total_size', 'url'))
199
200    def __init__(self, stream, progress_callback=None, finish_callback=None,
201                 **kwds):
202        total_size = kwds.pop('total_size', None)
203        super(Download, self).__init__(stream, **kwds)
204        self.__initial_response = None
205        self.__progress = 0
206        self.__total_size = total_size
207        self.__encoding = None
208
209        self.progress_callback = progress_callback
210        self.finish_callback = finish_callback
211
212    @property
213    def progress(self):
214        return self.__progress
215
216    @property
217    def encoding(self):
218        return self.__encoding
219
220    @classmethod
221    def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
222        """Create a new download object from a filename."""
223        path = os.path.expanduser(filename)
224        if os.path.exists(path) and not overwrite:
225            raise exceptions.InvalidUserInputError(
226                'File %s exists and overwrite not specified' % path)
227        return cls(open(path, 'wb'), close_stream=True,
228                   auto_transfer=auto_transfer, **kwds)
229
230    @classmethod
231    def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
232        """Create a new Download object from a stream."""
233        return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
234                   **kwds)
235
236    @classmethod
237    def FromData(cls, stream, json_data, http=None, auto_transfer=None,
238                 **kwds):
239        """Create a new Download object from a stream and serialized data."""
240        info = json.loads(json_data)
241        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
242        if missing_keys:
243            raise exceptions.InvalidDataError(
244                'Invalid serialization data, missing keys: %s' % (
245                    ', '.join(missing_keys)))
246        download = cls.FromStream(stream, **kwds)
247        if auto_transfer is not None:
248            download.auto_transfer = auto_transfer
249        else:
250            download.auto_transfer = info['auto_transfer']
251        setattr(download, '_Download__progress', info['progress'])
252        setattr(download, '_Download__total_size', info['total_size'])
253        download._Initialize(  # pylint: disable=protected-access
254            http, info['url'])
255        return download
256
257    @property
258    def serialization_data(self):
259        self.EnsureInitialized()
260        return {
261            'auto_transfer': self.auto_transfer,
262            'progress': self.progress,
263            'total_size': self.total_size,
264            'url': self.url,
265        }
266
267    @property
268    def total_size(self):
269        return self.__total_size
270
271    def __str__(self):
272        if not self.initialized:
273            return 'Download (uninitialized)'
274        return 'Download with %d/%s bytes transferred from url %s' % (
275            self.progress, self.total_size, self.url)
276
277    def ConfigureRequest(self, http_request, url_builder):
278        url_builder.query_params['alt'] = 'media'
279        # TODO(craigcitro): We need to send range requests because by
280        # default httplib2 stores entire reponses in memory. Override
281        # httplib2's download method (as gsutil does) so that this is not
282        # necessary.
283        http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
284
285    def __SetTotal(self, info):
286        if 'content-range' in info:
287            _, _, total = info['content-range'].rpartition('/')
288            if total != '*':
289                self.__total_size = int(total)
290        # Note "total_size is None" means we don't know it; if no size
291        # info was returned on our initial range request, that means we
292        # have a 0-byte file. (That last statement has been verified
293        # empirically, but is not clearly documented anywhere.)
294        if self.total_size is None:
295            self.__total_size = 0
296
297    def InitializeDownload(self, http_request, http=None, client=None):
298        """Initialize this download by making a request.
299
300        Args:
301          http_request: The HttpRequest to use to initialize this download.
302          http: The httplib2.Http instance for this request.
303          client: If provided, let this client process the final URL before
304              sending any additional requests. If client is provided and
305              http is not, client.http will be used instead.
306        """
307        self.EnsureUninitialized()
308        if http is None and client is None:
309            raise exceptions.UserError('Must provide client or http.')
310        http = http or client.http
311        if client is not None:
312            http_request.url = client.FinalizeTransferUrl(http_request.url)
313        url = http_request.url
314        if self.auto_transfer:
315            end_byte = self.__ComputeEndByte(0)
316            self.__SetRangeHeader(http_request, 0, end_byte)
317            response = http_wrapper.MakeRequest(
318                self.bytes_http or http, http_request)
319            if response.status_code not in self._ACCEPTABLE_STATUSES:
320                raise exceptions.HttpError.FromResponse(response)
321            self.__initial_response = response
322            self.__SetTotal(response.info)
323            url = response.info.get('content-location', response.request_url)
324        if client is not None:
325            url = client.FinalizeTransferUrl(url)
326        self._Initialize(http, url)
327        # Unless the user has requested otherwise, we want to just
328        # go ahead and pump the bytes now.
329        if self.auto_transfer:
330            self.StreamInChunks()
331
332    def __NormalizeStartEnd(self, start, end=None):
333        if end is not None:
334            if start < 0:
335                raise exceptions.TransferInvalidError(
336                    'Cannot have end index with negative start index')
337            elif start >= self.total_size:
338                raise exceptions.TransferInvalidError(
339                    'Cannot have start index greater than total size')
340            end = min(end, self.total_size - 1)
341            if end < start:
342                raise exceptions.TransferInvalidError(
343                    'Range requested with end[%s] < start[%s]' % (end, start))
344            return start, end
345        else:
346            if start < 0:
347                start = max(0, start + self.total_size)
348            return start, self.total_size - 1
349
350    def __SetRangeHeader(self, request, start, end=None):
351        if start < 0:
352            request.headers['range'] = 'bytes=%d' % start
353        elif end is None:
354            request.headers['range'] = 'bytes=%d-' % start
355        else:
356            request.headers['range'] = 'bytes=%d-%d' % (start, end)
357
358    def __ComputeEndByte(self, start, end=None, use_chunks=True):
359        """Compute the last byte to fetch for this request.
360
361        This is all based on the HTTP spec for Range and
362        Content-Range.
363
364        Note that this is potentially confusing in several ways:
365          * the value for the last byte is 0-based, eg "fetch 10 bytes
366            from the beginning" would return 9 here.
367          * if we have no information about size, and don't want to
368            use the chunksize, we'll return None.
369        See the tests for more examples.
370
371        Args:
372          start: byte to start at.
373          end: (int or None, default: None) Suggested last byte.
374          use_chunks: (bool, default: True) If False, ignore self.chunksize.
375
376        Returns:
377          Last byte to use in a Range header, or None.
378
379        """
380        end_byte = end
381
382        if start < 0 and not self.total_size:
383            return end_byte
384
385        if use_chunks:
386            alternate = start + self.chunksize - 1
387            if end_byte is not None:
388                end_byte = min(end_byte, alternate)
389            else:
390                end_byte = alternate
391
392        if self.total_size:
393            alternate = self.total_size - 1
394            if end_byte is not None:
395                end_byte = min(end_byte, alternate)
396            else:
397                end_byte = alternate
398
399        return end_byte
400
401    def __GetChunk(self, start, end, additional_headers=None):
402        """Retrieve a chunk, and return the full response."""
403        self.EnsureInitialized()
404        request = http_wrapper.Request(url=self.url)
405        self.__SetRangeHeader(request, start, end=end)
406        if additional_headers is not None:
407            request.headers.update(additional_headers)
408        return http_wrapper.MakeRequest(
409            self.bytes_http, request, retry_func=self.retry_func,
410            retries=self.num_retries)
411
412    def __ProcessResponse(self, response):
413        """Process response (by updating self and writing to self.stream)."""
414        if response.status_code not in self._ACCEPTABLE_STATUSES:
415            # We distinguish errors that mean we made a mistake in setting
416            # up the transfer versus something we should attempt again.
417            if response.status_code in (http_client.FORBIDDEN,
418                                        http_client.NOT_FOUND):
419                raise exceptions.HttpError.FromResponse(response)
420            else:
421                raise exceptions.TransferRetryError(response.content)
422        if response.status_code in (http_client.OK,
423                                    http_client.PARTIAL_CONTENT):
424            self.stream.write(response.content)
425            self.__progress += response.length
426            if response.info and 'content-encoding' in response.info:
427                # TODO(craigcitro): Handle the case where this changes over a
428                # download.
429                self.__encoding = response.info['content-encoding']
430        elif response.status_code == http_client.NO_CONTENT:
431            # It's important to write something to the stream for the case
432            # of a 0-byte download to a file, as otherwise python won't
433            # create the file.
434            self.stream.write('')
435        return response
436
437    def GetRange(self, start, end=None, additional_headers=None,
438                 use_chunks=True):
439        """Retrieve a given byte range from this download, inclusive.
440
441        Range must be of one of these three forms:
442        * 0 <= start, end = None: Fetch from start to the end of the file.
443        * 0 <= start <= end: Fetch the bytes from start to end.
444        * start < 0, end = None: Fetch the last -start bytes of the file.
445
446        (These variations correspond to those described in the HTTP 1.1
447        protocol for range headers in RFC 2616, sec. 14.35.1.)
448
449        Args:
450          start: (int) Where to start fetching bytes. (See above.)
451          end: (int, optional) Where to stop fetching bytes. (See above.)
452          additional_headers: (bool, optional) Any additional headers to
453              pass with the request.
454          use_chunks: (bool, default: True) If False, ignore self.chunksize
455              and fetch this range in a single request.
456
457        Returns:
458          None. Streams bytes into self.stream.
459        """
460        self.EnsureInitialized()
461        progress_end_normalized = False
462        if self.total_size is not None:
463            progress, end_byte = self.__NormalizeStartEnd(start, end)
464            progress_end_normalized = True
465        else:
466            progress = start
467            end_byte = end
468        while (not progress_end_normalized or end_byte is None or
469               progress <= end_byte):
470            end_byte = self.__ComputeEndByte(progress, end=end_byte,
471                                             use_chunks=use_chunks)
472            response = self.__GetChunk(progress, end_byte,
473                                       additional_headers=additional_headers)
474            if not progress_end_normalized:
475                self.__SetTotal(response.info)
476                progress, end_byte = self.__NormalizeStartEnd(start, end)
477                progress_end_normalized = True
478            response = self.__ProcessResponse(response)
479            progress += response.length
480            if response.length == 0:
481                raise exceptions.TransferRetryError(
482                    'Zero bytes unexpectedly returned in download response')
483
484    def StreamInChunks(self, callback=None, finish_callback=None,
485                       additional_headers=None):
486        """Stream the entire download in chunks."""
487        self.StreamMedia(callback=callback, finish_callback=finish_callback,
488                         additional_headers=additional_headers,
489                         use_chunks=True)
490
491    def StreamMedia(self, callback=None, finish_callback=None,
492                    additional_headers=None, use_chunks=True):
493        """Stream the entire download.
494
495        Args:
496          callback: (default: None) Callback to call as each chunk is
497              completed.
498          finish_callback: (default: None) Callback to call when the
499              download is complete.
500          additional_headers: (default: None) Additional headers to
501              include in fetching bytes.
502          use_chunks: (bool, default: True) If False, ignore self.chunksize
503              and stream this download in a single request.
504
505        Returns:
506            None. Streams bytes into self.stream.
507        """
508        callback = callback or self.progress_callback
509        finish_callback = finish_callback or self.finish_callback
510
511        self.EnsureInitialized()
512        while True:
513            if self.__initial_response is not None:
514                response = self.__initial_response
515                self.__initial_response = None
516            else:
517                end_byte = self.__ComputeEndByte(self.progress,
518                                                 use_chunks=use_chunks)
519                response = self.__GetChunk(
520                    self.progress, end_byte,
521                    additional_headers=additional_headers)
522            if self.total_size is None:
523                self.__SetTotal(response.info)
524            response = self.__ProcessResponse(response)
525            self._ExecuteCallback(callback, response)
526            if (response.status_code == http_client.OK or
527                    self.progress >= self.total_size):
528                break
529        self._ExecuteCallback(finish_callback, response)
530
531
532class Upload(_Transfer):
533
534    """Data for a single Upload.
535
536    Fields:
537      stream: The stream to upload.
538      mime_type: MIME type of the upload.
539      total_size: (optional) Total upload size for the stream.
540      close_stream: (default: False) Whether or not we should close the
541          stream when finished with the upload.
542      auto_transfer: (default: True) If True, stream all bytes as soon as
543          the upload is created.
544    """
545    _REQUIRED_SERIALIZATION_KEYS = set((
546        'auto_transfer', 'mime_type', 'total_size', 'url'))
547
548    def __init__(self, stream, mime_type, total_size=None, http=None,
549                 close_stream=False, chunksize=None, auto_transfer=True,
550                 progress_callback=None, finish_callback=None,
551                 **kwds):
552        super(Upload, self).__init__(
553            stream, close_stream=close_stream, chunksize=chunksize,
554            auto_transfer=auto_transfer, http=http, **kwds)
555        self.__complete = False
556        self.__final_response = None
557        self.__mime_type = mime_type
558        self.__progress = 0
559        self.__server_chunk_granularity = None
560        self.__strategy = None
561        self.__total_size = None
562
563        self.progress_callback = progress_callback
564        self.finish_callback = finish_callback
565        self.total_size = total_size
566
567    @property
568    def progress(self):
569        return self.__progress
570
571    @classmethod
572    def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
573        """Create a new Upload object from a filename."""
574        path = os.path.expanduser(filename)
575        if not os.path.exists(path):
576            raise exceptions.NotFoundError('Could not find file %s' % path)
577        if not mime_type:
578            mime_type, _ = mimetypes.guess_type(path)
579            if mime_type is None:
580                raise exceptions.InvalidUserInputError(
581                    'Could not guess mime type for %s' % path)
582        size = os.stat(path).st_size
583        return cls(open(path, 'rb'), mime_type, total_size=size,
584                   close_stream=True, auto_transfer=auto_transfer, **kwds)
585
586    @classmethod
587    def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
588                   **kwds):
589        """Create a new Upload object from a stream."""
590        if mime_type is None:
591            raise exceptions.InvalidUserInputError(
592                'No mime_type specified for stream')
593        return cls(stream, mime_type, total_size=total_size,
594                   close_stream=False, auto_transfer=auto_transfer, **kwds)
595
596    @classmethod
597    def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
598        """Create a new Upload of stream from serialized json_data and http."""
599        info = json.loads(json_data)
600        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
601        if missing_keys:
602            raise exceptions.InvalidDataError(
603                'Invalid serialization data, missing keys: %s' % (
604                    ', '.join(missing_keys)))
605        if 'total_size' in kwds:
606            raise exceptions.InvalidUserInputError(
607                'Cannot override total_size on serialized Upload')
608        upload = cls.FromStream(stream, info['mime_type'],
609                                total_size=info.get('total_size'), **kwds)
610        if isinstance(stream, io.IOBase) and not stream.seekable():
611            raise exceptions.InvalidUserInputError(
612                'Cannot restart resumable upload on non-seekable stream')
613        if auto_transfer is not None:
614            upload.auto_transfer = auto_transfer
615        else:
616            upload.auto_transfer = info['auto_transfer']
617        upload.strategy = RESUMABLE_UPLOAD
618        upload._Initialize(  # pylint: disable=protected-access
619            http, info['url'])
620        upload.RefreshResumableUploadState()
621        upload.EnsureInitialized()
622        if upload.auto_transfer:
623            upload.StreamInChunks()
624        return upload
625
626    @property
627    def serialization_data(self):
628        self.EnsureInitialized()
629        if self.strategy != RESUMABLE_UPLOAD:
630            raise exceptions.InvalidDataError(
631                'Serialization only supported for resumable uploads')
632        return {
633            'auto_transfer': self.auto_transfer,
634            'mime_type': self.mime_type,
635            'total_size': self.total_size,
636            'url': self.url,
637        }
638
639    @property
640    def complete(self):
641        return self.__complete
642
643    @property
644    def mime_type(self):
645        return self.__mime_type
646
647    def __str__(self):
648        if not self.initialized:
649            return 'Upload (uninitialized)'
650        return 'Upload with %d/%s bytes transferred for url %s' % (
651            self.progress, self.total_size or '???', self.url)
652
653    @property
654    def strategy(self):
655        return self.__strategy
656
657    @strategy.setter
658    def strategy(self, value):
659        if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
660            raise exceptions.UserError((
661                'Invalid value "%s" for upload strategy, must be one of '
662                '"simple" or "resumable".') % value)
663        self.__strategy = value
664
665    @property
666    def total_size(self):
667        return self.__total_size
668
669    @total_size.setter
670    def total_size(self, value):
671        self.EnsureUninitialized()
672        self.__total_size = value
673
674    def __SetDefaultUploadStrategy(self, upload_config, http_request):
675        """Determine and set the default upload strategy for this upload.
676
677        We generally prefer simple or multipart, unless we're forced to
678        use resumable. This happens when any of (1) the upload is too
679        large, (2) the simple endpoint doesn't support multipart requests
680        and we have metadata, or (3) there is no simple upload endpoint.
681
682        Args:
683          upload_config: Configuration for the upload endpoint.
684          http_request: The associated http request.
685
686        Returns:
687          None.
688        """
689        if upload_config.resumable_path is None:
690            self.strategy = SIMPLE_UPLOAD
691        if self.strategy is not None:
692            return
693        strategy = SIMPLE_UPLOAD
694        if (self.total_size is not None and
695                self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
696            strategy = RESUMABLE_UPLOAD
697        if http_request.body and not upload_config.simple_multipart:
698            strategy = RESUMABLE_UPLOAD
699        if not upload_config.simple_path:
700            strategy = RESUMABLE_UPLOAD
701        self.strategy = strategy
702
703    def ConfigureRequest(self, upload_config, http_request, url_builder):
704        """Configure the request and url for this upload."""
705        # Validate total_size vs. max_size
706        if (self.total_size and upload_config.max_size and
707                self.total_size > upload_config.max_size):
708            raise exceptions.InvalidUserInputError(
709                'Upload too big: %s larger than max size %s' % (
710                    self.total_size, upload_config.max_size))
711        # Validate mime type
712        if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
713            raise exceptions.InvalidUserInputError(
714                'MIME type %s does not match any accepted MIME ranges %s' % (
715                    self.mime_type, upload_config.accept))
716
717        self.__SetDefaultUploadStrategy(upload_config, http_request)
718        if self.strategy == SIMPLE_UPLOAD:
719            url_builder.relative_path = upload_config.simple_path
720            if http_request.body:
721                url_builder.query_params['uploadType'] = 'multipart'
722                self.__ConfigureMultipartRequest(http_request)
723            else:
724                url_builder.query_params['uploadType'] = 'media'
725                self.__ConfigureMediaRequest(http_request)
726        else:
727            url_builder.relative_path = upload_config.resumable_path
728            url_builder.query_params['uploadType'] = 'resumable'
729            self.__ConfigureResumableRequest(http_request)
730
731    def __ConfigureMediaRequest(self, http_request):
732        """Configure http_request as a simple request for this upload."""
733        http_request.headers['content-type'] = self.mime_type
734        http_request.body = self.stream.read()
735        http_request.loggable_body = '<media body>'
736
737    def __ConfigureMultipartRequest(self, http_request):
738        """Configure http_request as a multipart request for this upload."""
739        # This is a multipart/related upload.
740        msg_root = mime_multipart.MIMEMultipart('related')
741        # msg_root should not write out its own headers
742        setattr(msg_root, '_write_headers', lambda self: None)
743
744        # attach the body as one part
745        msg = mime_nonmultipart.MIMENonMultipart(
746            *http_request.headers['content-type'].split('/'))
747        msg.set_payload(http_request.body)
748        msg_root.attach(msg)
749
750        # attach the media as the second part
751        msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
752        msg['Content-Transfer-Encoding'] = 'binary'
753        msg.set_payload(self.stream.read())
754        msg_root.attach(msg)
755
756        # NOTE: We encode the body, but can't use
757        #       `email.message.Message.as_string` because it prepends
758        #       `> ` to `From ` lines.
759        fp = six.BytesIO()
760        if six.PY3:
761            generator_class = email_generator.BytesGenerator
762        else:
763            generator_class = email_generator.Generator
764        g = generator_class(fp, mangle_from_=False)
765        g.flatten(msg_root, unixfrom=False)
766        http_request.body = fp.getvalue()
767
768        multipart_boundary = msg_root.get_boundary()
769        http_request.headers['content-type'] = (
770            'multipart/related; boundary=%r' % multipart_boundary)
771        if isinstance(multipart_boundary, six.text_type):
772            multipart_boundary = multipart_boundary.encode('ascii')
773
774        body_components = http_request.body.split(multipart_boundary)
775        headers, _, _ = body_components[-2].partition(b'\n\n')
776        body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--'])
777        http_request.loggable_body = multipart_boundary.join(body_components)
778
779    def __ConfigureResumableRequest(self, http_request):
780        http_request.headers['X-Upload-Content-Type'] = self.mime_type
781        if self.total_size is not None:
782            http_request.headers[
783                'X-Upload-Content-Length'] = str(self.total_size)
784
785    def RefreshResumableUploadState(self):
786        """Talk to the server and refresh the state of this resumable upload.
787
788        Returns:
789          Response if the upload is complete.
790        """
791        if self.strategy != RESUMABLE_UPLOAD:
792            return
793        self.EnsureInitialized()
794        refresh_request = http_wrapper.Request(
795            url=self.url, http_method='PUT',
796            headers={'Content-Range': 'bytes */*'})
797        refresh_response = http_wrapper.MakeRequest(
798            self.http, refresh_request, redirections=0,
799            retries=self.num_retries)
800        range_header = self._GetRangeHeaderFromResponse(refresh_response)
801        if refresh_response.status_code in (http_client.OK,
802                                            http_client.CREATED):
803            self.__complete = True
804            self.__progress = self.total_size
805            self.stream.seek(self.progress)
806            # If we're finished, the refresh response will contain the metadata
807            # originally requested. Cache it so it can be returned in
808            # StreamInChunks.
809            self.__final_response = refresh_response
810        elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
811            if range_header is None:
812                self.__progress = 0
813            else:
814                self.__progress = self.__GetLastByte(range_header) + 1
815            self.stream.seek(self.progress)
816        else:
817            raise exceptions.HttpError.FromResponse(refresh_response)
818
819    def _GetRangeHeaderFromResponse(self, response):
820        return response.info.get('Range', response.info.get('range'))
821
822    def InitializeUpload(self, http_request, http=None, client=None):
823        """Initialize this upload from the given http_request."""
824        if self.strategy is None:
825            raise exceptions.UserError(
826                'No upload strategy set; did you call ConfigureRequest?')
827        if http is None and client is None:
828            raise exceptions.UserError('Must provide client or http.')
829        if self.strategy != RESUMABLE_UPLOAD:
830            return
831        http = http or client.http
832        if client is not None:
833            http_request.url = client.FinalizeTransferUrl(http_request.url)
834        self.EnsureUninitialized()
835        http_response = http_wrapper.MakeRequest(http, http_request,
836                                                 retries=self.num_retries)
837        if http_response.status_code != http_client.OK:
838            raise exceptions.HttpError.FromResponse(http_response)
839
840        self.__server_chunk_granularity = http_response.info.get(
841            'X-Goog-Upload-Chunk-Granularity')
842        url = http_response.info['location']
843        if client is not None:
844            url = client.FinalizeTransferUrl(url)
845        self._Initialize(http, url)
846
847        # Unless the user has requested otherwise, we want to just
848        # go ahead and pump the bytes now.
849        if self.auto_transfer:
850            return self.StreamInChunks()
851        return http_response
852
853    def __GetLastByte(self, range_header):
854        _, _, end = range_header.partition('-')
855        # TODO(craigcitro): Validate start == 0?
856        return int(end)
857
858    def __ValidateChunksize(self, chunksize=None):
859        if self.__server_chunk_granularity is None:
860            return
861        chunksize = chunksize or self.chunksize
862        if chunksize % self.__server_chunk_granularity:
863            raise exceptions.ConfigurationValueError(
864                'Server requires chunksize to be a multiple of %d',
865                self.__server_chunk_granularity)
866
867    def __StreamMedia(self, callback=None, finish_callback=None,
868                      additional_headers=None, use_chunks=True):
869        """Helper function for StreamMedia / StreamInChunks."""
870        if self.strategy != RESUMABLE_UPLOAD:
871            raise exceptions.InvalidUserInputError(
872                'Cannot stream non-resumable upload')
873        callback = callback or self.progress_callback
874        finish_callback = finish_callback or self.finish_callback
875        # final_response is set if we resumed an already-completed upload.
876        response = self.__final_response
877        send_func = self.__SendChunk if use_chunks else self.__SendMediaBody
878        if use_chunks:
879            self.__ValidateChunksize(self.chunksize)
880        self.EnsureInitialized()
881        while not self.complete:
882            response = send_func(self.stream.tell(),
883                                 additional_headers=additional_headers)
884            if response.status_code in (http_client.OK, http_client.CREATED):
885                self.__complete = True
886                break
887            self.__progress = self.__GetLastByte(response.info['range'])
888            if self.progress + 1 != self.stream.tell():
889                # TODO(craigcitro): Add a better way to recover here.
890                raise exceptions.CommunicationError(
891                    'Failed to transfer all bytes in chunk, upload paused at '
892                    'byte %d' % self.progress)
893            self._ExecuteCallback(callback, response)
894        if self.__complete and hasattr(self.stream, 'seek'):
895            current_pos = self.stream.tell()
896            self.stream.seek(0, os.SEEK_END)
897            end_pos = self.stream.tell()
898            self.stream.seek(current_pos)
899            if current_pos != end_pos:
900                raise exceptions.TransferInvalidError(
901                    'Upload complete with %s additional bytes left in stream' %
902                    (int(end_pos) - int(current_pos)))
903        self._ExecuteCallback(finish_callback, response)
904        return response
905
906    def StreamMedia(self, callback=None, finish_callback=None,
907                    additional_headers=None):
908        """Send this resumable upload in a single request.
909
910        Args:
911          callback: Progress callback function with inputs
912              (http_wrapper.Response, transfer.Upload)
913          finish_callback: Final callback function with inputs
914              (http_wrapper.Response, transfer.Upload)
915          additional_headers: Dict of headers to include with the upload
916              http_wrapper.Request.
917
918        Returns:
919          http_wrapper.Response of final response.
920        """
921        return self.__StreamMedia(
922            callback=callback, finish_callback=finish_callback,
923            additional_headers=additional_headers, use_chunks=False)
924
925    def StreamInChunks(self, callback=None, finish_callback=None,
926                       additional_headers=None):
927        """Send this (resumable) upload in chunks."""
928        return self.__StreamMedia(
929            callback=callback, finish_callback=finish_callback,
930            additional_headers=additional_headers)
931
932    def __SendMediaRequest(self, request, end):
933        """Request helper function for SendMediaBody & SendChunk."""
934        response = http_wrapper.MakeRequest(
935            self.bytes_http, request, retry_func=self.retry_func,
936            retries=self.num_retries)
937        if response.status_code not in (http_client.OK, http_client.CREATED,
938                                        http_wrapper.RESUME_INCOMPLETE):
939            # We want to reset our state to wherever the server left us
940            # before this failed request, and then raise.
941            self.RefreshResumableUploadState()
942            raise exceptions.HttpError.FromResponse(response)
943        if response.status_code == http_wrapper.RESUME_INCOMPLETE:
944            last_byte = self.__GetLastByte(
945                self._GetRangeHeaderFromResponse(response))
946            if last_byte + 1 != end:
947                self.stream.seek(last_byte)
948        return response
949
950    def __SendMediaBody(self, start, additional_headers=None):
951        """Send the entire media stream in a single request."""
952        self.EnsureInitialized()
953        if self.total_size is None:
954            raise exceptions.TransferInvalidError(
955                'Total size must be known for SendMediaBody')
956        body_stream = stream_slice.StreamSlice(
957            self.stream, self.total_size - start)
958
959        request = http_wrapper.Request(url=self.url, http_method='PUT',
960                                       body=body_stream)
961        request.headers['Content-Type'] = self.mime_type
962        if start == self.total_size:
963            # End of an upload with 0 bytes left to send; just finalize.
964            range_string = 'bytes */%s' % self.total_size
965        else:
966            range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
967                                               self.total_size)
968
969        request.headers['Content-Range'] = range_string
970        if additional_headers:
971            request.headers.update(additional_headers)
972
973        return self.__SendMediaRequest(request, self.total_size)
974
975    def __SendChunk(self, start, additional_headers=None):
976        """Send the specified chunk."""
977        self.EnsureInitialized()
978        no_log_body = self.total_size is None
979        if self.total_size is None:
980            # For the streaming resumable case, we need to detect when
981            # we're at the end of the stream.
982            body_stream = buffered_stream.BufferedStream(
983                self.stream, start, self.chunksize)
984            end = body_stream.stream_end_position
985            if body_stream.stream_exhausted:
986                self.__total_size = end
987            # TODO: Here, change body_stream from a stream to a string object,
988            # which means reading a chunk into memory.  This works around
989            # https://code.google.com/p/httplib2/issues/detail?id=176 which can
990            # cause httplib2 to skip bytes on 401's for file objects.
991            # Rework this solution to be more general.
992            body_stream = body_stream.read(self.chunksize)
993        else:
994            end = min(start + self.chunksize, self.total_size)
995            body_stream = stream_slice.StreamSlice(self.stream, end - start)
996        # TODO(craigcitro): Think about clearer errors on "no data in
997        # stream".
998        request = http_wrapper.Request(url=self.url, http_method='PUT',
999                                       body=body_stream)
1000        request.headers['Content-Type'] = self.mime_type
1001        if no_log_body:
1002            # Disable logging of streaming body.
1003            # TODO: Remove no_log_body and rework as part of a larger logs
1004            # refactor.
1005            request.loggable_body = '<media body>'
1006        if self.total_size is None:
1007            # Streaming resumable upload case, unknown total size.
1008            range_string = 'bytes %s-%s/*' % (start, end - 1)
1009        elif end == start:
1010            # End of an upload with 0 bytes left to send; just finalize.
1011            range_string = 'bytes */%s' % self.total_size
1012        else:
1013            # Normal resumable upload case with known sizes.
1014            range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
1015
1016        request.headers['Content-Range'] = range_string
1017        if additional_headers:
1018            request.headers.update(additional_headers)
1019
1020        return self.__SendMediaRequest(request, end)
1021