1#!/usr/bin/env python 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Upload and download support for apitools.""" 18from __future__ import print_function 19 20import email.generator as email_generator 21import email.mime.multipart as mime_multipart 22import email.mime.nonmultipart as mime_nonmultipart 23import io 24import json 25import mimetypes 26import os 27import threading 28 29import six 30from six.moves import http_client 31 32from apitools.base.py import buffered_stream 33from apitools.base.py import compression 34from apitools.base.py import exceptions 35from apitools.base.py import http_wrapper 36from apitools.base.py import stream_slice 37from apitools.base.py import util 38 39__all__ = [ 40 'Download', 41 'Upload', 42 'RESUMABLE_UPLOAD', 43 'SIMPLE_UPLOAD', 44 'DownloadProgressPrinter', 45 'DownloadCompletePrinter', 46 'UploadProgressPrinter', 47 'UploadCompletePrinter', 48] 49 50_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 51SIMPLE_UPLOAD = 'simple' 52RESUMABLE_UPLOAD = 'resumable' 53 54 55def DownloadProgressPrinter(response, unused_download): 56 """Print download progress based on response.""" 57 if 'content-range' in response.info: 58 print('Received %s' % response.info['content-range']) 59 else: 60 print('Received %d bytes' % response.length) 61 62 63def DownloadCompletePrinter(unused_response, unused_download): 64 """Print information about a completed download.""" 65 print('Download complete') 66 67 68def UploadProgressPrinter(response, unused_upload): 69 """Print upload progress based on response.""" 70 print('Sent %s' % response.info['range']) 71 72 73def UploadCompletePrinter(unused_response, unused_upload): 74 """Print information about a completed upload.""" 75 print('Upload complete') 76 77 78class _Transfer(object): 79 80 """Generic bits common to Uploads and Downloads.""" 81 82 def __init__(self, stream, close_stream=False, chunksize=None, 83 auto_transfer=True, http=None, num_retries=5): 84 self.__bytes_http = None 85 self.__close_stream = close_stream 86 self.__http = http 87 self.__stream = stream 88 self.__url = None 89 90 self.__num_retries = 5 91 # Let the @property do validation 92 self.num_retries = num_retries 93 94 self.retry_func = ( 95 http_wrapper.HandleExceptionsAndRebuildHttpConnections) 96 self.auto_transfer = auto_transfer 97 self.chunksize = chunksize or 1048576 98 99 def __repr__(self): 100 return str(self) 101 102 @property 103 def close_stream(self): 104 return self.__close_stream 105 106 @property 107 def http(self): 108 return self.__http 109 110 @property 111 def bytes_http(self): 112 return self.__bytes_http or self.http 113 114 @bytes_http.setter 115 def bytes_http(self, value): 116 self.__bytes_http = value 117 118 @property 119 def num_retries(self): 120 return self.__num_retries 121 122 @num_retries.setter 123 def num_retries(self, value): 124 util.Typecheck(value, six.integer_types) 125 if value < 0: 126 raise exceptions.InvalidDataError( 127 'Cannot have negative value for num_retries') 128 self.__num_retries = value 129 130 @property 131 def stream(self): 132 return self.__stream 133 134 @property 135 def url(self): 136 return self.__url 137 138 def _Initialize(self, http, url): 139 """Initialize this download by setting self.http and self.url. 140 141 We want the user to be able to override self.http by having set 142 the value in the constructor; in that case, we ignore the provided 143 http. 144 145 Args: 146 http: An httplib2.Http instance or None. 147 url: The url for this transfer. 148 149 Returns: 150 None. Initializes self. 151 """ 152 self.EnsureUninitialized() 153 if self.http is None: 154 self.__http = http or http_wrapper.GetHttp() 155 self.__url = url 156 157 @property 158 def initialized(self): 159 return self.url is not None and self.http is not None 160 161 @property 162 def _type_name(self): 163 return type(self).__name__ 164 165 def EnsureInitialized(self): 166 if not self.initialized: 167 raise exceptions.TransferInvalidError( 168 'Cannot use uninitialized %s' % self._type_name) 169 170 def EnsureUninitialized(self): 171 if self.initialized: 172 raise exceptions.TransferInvalidError( 173 'Cannot re-initialize %s' % self._type_name) 174 175 def __del__(self): 176 if self.__close_stream: 177 self.__stream.close() 178 179 def _ExecuteCallback(self, callback, response): 180 # TODO(craigcitro): Push these into a queue. 181 if callback is not None: 182 threading.Thread(target=callback, args=(response, self)).start() 183 184 185class Download(_Transfer): 186 187 """Data for a single download. 188 189 Public attributes: 190 chunksize: default chunksize to use for transfers. 191 """ 192 _ACCEPTABLE_STATUSES = set(( 193 http_client.OK, 194 http_client.NO_CONTENT, 195 http_client.PARTIAL_CONTENT, 196 http_client.REQUESTED_RANGE_NOT_SATISFIABLE, 197 )) 198 _REQUIRED_SERIALIZATION_KEYS = set(( 199 'auto_transfer', 'progress', 'total_size', 'url')) 200 201 def __init__(self, stream, progress_callback=None, finish_callback=None, 202 **kwds): 203 total_size = kwds.pop('total_size', None) 204 super(Download, self).__init__(stream, **kwds) 205 self.__initial_response = None 206 self.__progress = 0 207 self.__total_size = total_size 208 self.__encoding = None 209 210 self.progress_callback = progress_callback 211 self.finish_callback = finish_callback 212 213 @property 214 def progress(self): 215 return self.__progress 216 217 @property 218 def encoding(self): 219 return self.__encoding 220 221 @classmethod 222 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): 223 """Create a new download object from a filename.""" 224 path = os.path.expanduser(filename) 225 if os.path.exists(path) and not overwrite: 226 raise exceptions.InvalidUserInputError( 227 'File %s exists and overwrite not specified' % path) 228 return cls(open(path, 'wb'), close_stream=True, 229 auto_transfer=auto_transfer, **kwds) 230 231 @classmethod 232 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): 233 """Create a new Download object from a stream.""" 234 return cls(stream, auto_transfer=auto_transfer, total_size=total_size, 235 **kwds) 236 237 @classmethod 238 def FromData(cls, stream, json_data, http=None, auto_transfer=None, 239 **kwds): 240 """Create a new Download object from a stream and serialized data.""" 241 info = json.loads(json_data) 242 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 243 if missing_keys: 244 raise exceptions.InvalidDataError( 245 'Invalid serialization data, missing keys: %s' % ( 246 ', '.join(missing_keys))) 247 download = cls.FromStream(stream, **kwds) 248 if auto_transfer is not None: 249 download.auto_transfer = auto_transfer 250 else: 251 download.auto_transfer = info['auto_transfer'] 252 setattr(download, '_Download__progress', info['progress']) 253 setattr(download, '_Download__total_size', info['total_size']) 254 download._Initialize( # pylint: disable=protected-access 255 http, info['url']) 256 return download 257 258 @property 259 def serialization_data(self): 260 self.EnsureInitialized() 261 return { 262 'auto_transfer': self.auto_transfer, 263 'progress': self.progress, 264 'total_size': self.total_size, 265 'url': self.url, 266 } 267 268 @property 269 def total_size(self): 270 return self.__total_size 271 272 def __str__(self): 273 if not self.initialized: 274 return 'Download (uninitialized)' 275 return 'Download with %d/%s bytes transferred from url %s' % ( 276 self.progress, self.total_size, self.url) 277 278 def ConfigureRequest(self, http_request, url_builder): 279 url_builder.query_params['alt'] = 'media' 280 # TODO(craigcitro): We need to send range requests because by 281 # default httplib2 stores entire reponses in memory. Override 282 # httplib2's download method (as gsutil does) so that this is not 283 # necessary. 284 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) 285 286 def __SetTotal(self, info): 287 """Sets the total size based off info if possible otherwise 0.""" 288 if 'content-range' in info: 289 _, _, total = info['content-range'].rpartition('/') 290 if total != '*': 291 self.__total_size = int(total) 292 # Note "total_size is None" means we don't know it; if no size 293 # info was returned on our initial range request, that means we 294 # have a 0-byte file. (That last statement has been verified 295 # empirically, but is not clearly documented anywhere.) 296 if self.total_size is None: 297 self.__total_size = 0 298 299 def InitializeDownload(self, http_request, http=None, client=None): 300 """Initialize this download by making a request. 301 302 Args: 303 http_request: The HttpRequest to use to initialize this download. 304 http: The httplib2.Http instance for this request. 305 client: If provided, let this client process the final URL before 306 sending any additional requests. If client is provided and 307 http is not, client.http will be used instead. 308 """ 309 self.EnsureUninitialized() 310 if http is None and client is None: 311 raise exceptions.UserError('Must provide client or http.') 312 http = http or client.http 313 if client is not None: 314 http_request.url = client.FinalizeTransferUrl(http_request.url) 315 url = http_request.url 316 if self.auto_transfer: 317 end_byte = self.__ComputeEndByte(0) 318 self.__SetRangeHeader(http_request, 0, end_byte) 319 response = http_wrapper.MakeRequest( 320 self.bytes_http or http, http_request) 321 if response.status_code not in self._ACCEPTABLE_STATUSES: 322 raise exceptions.HttpError.FromResponse(response) 323 self.__initial_response = response 324 self.__SetTotal(response.info) 325 url = response.info.get('content-location', response.request_url) 326 if client is not None: 327 url = client.FinalizeTransferUrl(url) 328 self._Initialize(http, url) 329 # Unless the user has requested otherwise, we want to just 330 # go ahead and pump the bytes now. 331 if self.auto_transfer: 332 self.StreamInChunks() 333 334 def __NormalizeStartEnd(self, start, end=None): 335 """Normalizes start and end values based on total size.""" 336 if end is not None: 337 if start < 0: 338 raise exceptions.TransferInvalidError( 339 'Cannot have end index with negative start index ' + 340 '[start=%d, end=%d]' % (start, end)) 341 elif start >= self.total_size: 342 raise exceptions.TransferInvalidError( 343 'Cannot have start index greater than total size ' + 344 '[start=%d, total_size=%d]' % (start, self.total_size)) 345 end = min(end, self.total_size - 1) 346 if end < start: 347 raise exceptions.TransferInvalidError( 348 'Range requested with end[%s] < start[%s]' % (end, start)) 349 return start, end 350 else: 351 if start < 0: 352 start = max(0, start + self.total_size) 353 return start, self.total_size - 1 354 355 def __SetRangeHeader(self, request, start, end=None): 356 if start < 0: 357 request.headers['range'] = 'bytes=%d' % start 358 elif end is None or end < start: 359 request.headers['range'] = 'bytes=%d-' % start 360 else: 361 request.headers['range'] = 'bytes=%d-%d' % (start, end) 362 363 def __ComputeEndByte(self, start, end=None, use_chunks=True): 364 """Compute the last byte to fetch for this request. 365 366 This is all based on the HTTP spec for Range and 367 Content-Range. 368 369 Note that this is potentially confusing in several ways: 370 * the value for the last byte is 0-based, eg "fetch 10 bytes 371 from the beginning" would return 9 here. 372 * if we have no information about size, and don't want to 373 use the chunksize, we'll return None. 374 See the tests for more examples. 375 376 Args: 377 start: byte to start at. 378 end: (int or None, default: None) Suggested last byte. 379 use_chunks: (bool, default: True) If False, ignore self.chunksize. 380 381 Returns: 382 Last byte to use in a Range header, or None. 383 384 """ 385 end_byte = end 386 387 if start < 0 and not self.total_size: 388 return end_byte 389 390 if use_chunks: 391 alternate = start + self.chunksize - 1 392 if end_byte is not None: 393 end_byte = min(end_byte, alternate) 394 else: 395 end_byte = alternate 396 397 if self.total_size: 398 alternate = self.total_size - 1 399 if end_byte is not None: 400 end_byte = min(end_byte, alternate) 401 else: 402 end_byte = alternate 403 404 return end_byte 405 406 def __GetChunk(self, start, end, additional_headers=None): 407 """Retrieve a chunk, and return the full response.""" 408 self.EnsureInitialized() 409 request = http_wrapper.Request(url=self.url) 410 self.__SetRangeHeader(request, start, end=end) 411 if additional_headers is not None: 412 request.headers.update(additional_headers) 413 return http_wrapper.MakeRequest( 414 self.bytes_http, request, retry_func=self.retry_func, 415 retries=self.num_retries) 416 417 def __ProcessResponse(self, response): 418 """Process response (by updating self and writing to self.stream).""" 419 if response.status_code not in self._ACCEPTABLE_STATUSES: 420 # We distinguish errors that mean we made a mistake in setting 421 # up the transfer versus something we should attempt again. 422 if response.status_code in (http_client.FORBIDDEN, 423 http_client.NOT_FOUND): 424 raise exceptions.HttpError.FromResponse(response) 425 else: 426 raise exceptions.TransferRetryError(response.content) 427 if response.status_code in (http_client.OK, 428 http_client.PARTIAL_CONTENT): 429 try: 430 self.stream.write(six.ensure_binary(response.content)) 431 except TypeError: 432 self.stream.write(six.ensure_text(response.content)) 433 self.__progress += response.length 434 if response.info and 'content-encoding' in response.info: 435 # TODO(craigcitro): Handle the case where this changes over a 436 # download. 437 self.__encoding = response.info['content-encoding'] 438 elif response.status_code == http_client.NO_CONTENT: 439 # It's important to write something to the stream for the case 440 # of a 0-byte download to a file, as otherwise python won't 441 # create the file. 442 self.stream.write('') 443 return response 444 445 def GetRange(self, start, end=None, additional_headers=None, 446 use_chunks=True): 447 """Retrieve a given byte range from this download, inclusive. 448 449 Range must be of one of these three forms: 450 * 0 <= start, end = None: Fetch from start to the end of the file. 451 * 0 <= start <= end: Fetch the bytes from start to end. 452 * start < 0, end = None: Fetch the last -start bytes of the file. 453 454 (These variations correspond to those described in the HTTP 1.1 455 protocol for range headers in RFC 2616, sec. 14.35.1.) 456 457 Args: 458 start: (int) Where to start fetching bytes. (See above.) 459 end: (int, optional) Where to stop fetching bytes. (See above.) 460 additional_headers: (bool, optional) Any additional headers to 461 pass with the request. 462 use_chunks: (bool, default: True) If False, ignore self.chunksize 463 and fetch this range in a single request. 464 465 Returns: 466 None. Streams bytes into self.stream. 467 """ 468 self.EnsureInitialized() 469 progress_end_normalized = False 470 if self.total_size is not None: 471 progress, end_byte = self.__NormalizeStartEnd(start, end) 472 progress_end_normalized = True 473 else: 474 progress = start 475 end_byte = end 476 while (not progress_end_normalized or end_byte is None or 477 progress <= end_byte): 478 end_byte = self.__ComputeEndByte(progress, end=end_byte, 479 use_chunks=use_chunks) 480 response = self.__GetChunk(progress, end_byte, 481 additional_headers=additional_headers) 482 if not progress_end_normalized: 483 self.__SetTotal(response.info) 484 progress, end_byte = self.__NormalizeStartEnd(start, end) 485 progress_end_normalized = True 486 response = self.__ProcessResponse(response) 487 progress += response.length 488 if response.length == 0: 489 if response.status_code == http_client.OK: 490 # There can legitimately be no Content-Length header sent 491 # in some cases (e.g., when there's a Transfer-Encoding 492 # header) and if this was a 200 response (as opposed to 493 # 206 Partial Content) we know we're done now without 494 # looping further on received length. 495 return 496 raise exceptions.TransferRetryError( 497 'Zero bytes unexpectedly returned in download response') 498 499 def StreamInChunks(self, callback=None, finish_callback=None, 500 additional_headers=None): 501 """Stream the entire download in chunks.""" 502 self.StreamMedia(callback=callback, finish_callback=finish_callback, 503 additional_headers=additional_headers, 504 use_chunks=True) 505 506 def StreamMedia(self, callback=None, finish_callback=None, 507 additional_headers=None, use_chunks=True): 508 """Stream the entire download. 509 510 Args: 511 callback: (default: None) Callback to call as each chunk is 512 completed. 513 finish_callback: (default: None) Callback to call when the 514 download is complete. 515 additional_headers: (default: None) Additional headers to 516 include in fetching bytes. 517 use_chunks: (bool, default: True) If False, ignore self.chunksize 518 and stream this download in a single request. 519 520 Returns: 521 None. Streams bytes into self.stream. 522 """ 523 callback = callback or self.progress_callback 524 finish_callback = finish_callback or self.finish_callback 525 526 self.EnsureInitialized() 527 while True: 528 if self.__initial_response is not None: 529 response = self.__initial_response 530 self.__initial_response = None 531 else: 532 end_byte = self.__ComputeEndByte(self.progress, 533 use_chunks=use_chunks) 534 response = self.__GetChunk( 535 self.progress, end_byte, 536 additional_headers=additional_headers) 537 if self.total_size is None: 538 self.__SetTotal(response.info) 539 response = self.__ProcessResponse(response) 540 self._ExecuteCallback(callback, response) 541 if (response.status_code == http_client.OK or 542 self.progress >= self.total_size): 543 break 544 self._ExecuteCallback(finish_callback, response) 545 546 547if six.PY3: 548 class MultipartBytesGenerator(email_generator.BytesGenerator): 549 """Generates a bytes Message object tree for multipart messages 550 551 This is a BytesGenerator that has been modified to not attempt line 552 termination character modification in the bytes payload. Known to 553 work with the compat32 policy only. It may work on others, but not 554 tested. The outfp object must accept bytes in its write method. 555 """ 556 def _handle_text(self, msg): 557 # If the string has surrogates the original source was bytes, so 558 # just write it back out. 559 if msg._payload is None: 560 return 561 self.write(msg._payload) 562 563 # Default body handler 564 _writeBody = _handle_text 565 566 567class Upload(_Transfer): 568 569 """Data for a single Upload. 570 571 Fields: 572 stream: The stream to upload. 573 mime_type: MIME type of the upload. 574 total_size: (optional) Total upload size for the stream. 575 close_stream: (default: False) Whether or not we should close the 576 stream when finished with the upload. 577 auto_transfer: (default: True) If True, stream all bytes as soon as 578 the upload is created. 579 """ 580 _REQUIRED_SERIALIZATION_KEYS = set(( 581 'auto_transfer', 'mime_type', 'total_size', 'url')) 582 583 def __init__(self, stream, mime_type, total_size=None, http=None, 584 close_stream=False, chunksize=None, auto_transfer=True, 585 progress_callback=None, finish_callback=None, 586 gzip_encoded=False, **kwds): 587 super(Upload, self).__init__( 588 stream, close_stream=close_stream, chunksize=chunksize, 589 auto_transfer=auto_transfer, http=http, **kwds) 590 self.__complete = False 591 self.__final_response = None 592 self.__mime_type = mime_type 593 self.__progress = 0 594 self.__server_chunk_granularity = None 595 self.__strategy = None 596 self.__total_size = None 597 self.__gzip_encoded = gzip_encoded 598 599 self.progress_callback = progress_callback 600 self.finish_callback = finish_callback 601 self.total_size = total_size 602 603 @property 604 def progress(self): 605 return self.__progress 606 607 @classmethod 608 def FromFile(cls, filename, mime_type=None, auto_transfer=True, 609 gzip_encoded=False, **kwds): 610 """Create a new Upload object from a filename.""" 611 path = os.path.expanduser(filename) 612 if not os.path.exists(path): 613 raise exceptions.NotFoundError('Could not find file %s' % path) 614 if not mime_type: 615 mime_type, _ = mimetypes.guess_type(path) 616 if mime_type is None: 617 raise exceptions.InvalidUserInputError( 618 'Could not guess mime type for %s' % path) 619 size = os.stat(path).st_size 620 return cls(open(path, 'rb'), mime_type, total_size=size, 621 close_stream=True, auto_transfer=auto_transfer, 622 gzip_encoded=gzip_encoded, **kwds) 623 624 @classmethod 625 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, 626 gzip_encoded=False, **kwds): 627 """Create a new Upload object from a stream.""" 628 if mime_type is None: 629 raise exceptions.InvalidUserInputError( 630 'No mime_type specified for stream') 631 return cls(stream, mime_type, total_size=total_size, 632 close_stream=False, auto_transfer=auto_transfer, 633 gzip_encoded=gzip_encoded, **kwds) 634 635 @classmethod 636 def FromData(cls, stream, json_data, http, auto_transfer=None, 637 gzip_encoded=False, **kwds): 638 """Create a new Upload of stream from serialized json_data and http.""" 639 info = json.loads(json_data) 640 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 641 if missing_keys: 642 raise exceptions.InvalidDataError( 643 'Invalid serialization data, missing keys: %s' % ( 644 ', '.join(missing_keys))) 645 if 'total_size' in kwds: 646 raise exceptions.InvalidUserInputError( 647 'Cannot override total_size on serialized Upload') 648 upload = cls.FromStream(stream, info['mime_type'], 649 total_size=info.get('total_size'), 650 gzip_encoded=gzip_encoded, **kwds) 651 if isinstance(stream, io.IOBase) and not stream.seekable(): 652 raise exceptions.InvalidUserInputError( 653 'Cannot restart resumable upload on non-seekable stream') 654 if auto_transfer is not None: 655 upload.auto_transfer = auto_transfer 656 else: 657 upload.auto_transfer = info['auto_transfer'] 658 upload.strategy = RESUMABLE_UPLOAD 659 upload._Initialize( # pylint: disable=protected-access 660 http, info['url']) 661 upload.RefreshResumableUploadState() 662 upload.EnsureInitialized() 663 if upload.auto_transfer: 664 upload.StreamInChunks() 665 return upload 666 667 @property 668 def serialization_data(self): 669 self.EnsureInitialized() 670 if self.strategy != RESUMABLE_UPLOAD: 671 raise exceptions.InvalidDataError( 672 'Serialization only supported for resumable uploads') 673 return { 674 'auto_transfer': self.auto_transfer, 675 'mime_type': self.mime_type, 676 'total_size': self.total_size, 677 'url': self.url, 678 } 679 680 @property 681 def complete(self): 682 return self.__complete 683 684 @property 685 def mime_type(self): 686 return self.__mime_type 687 688 def __str__(self): 689 if not self.initialized: 690 return 'Upload (uninitialized)' 691 return 'Upload with %d/%s bytes transferred for url %s' % ( 692 self.progress, self.total_size or '???', self.url) 693 694 @property 695 def strategy(self): 696 return self.__strategy 697 698 @strategy.setter 699 def strategy(self, value): 700 if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): 701 raise exceptions.UserError(( 702 'Invalid value "%s" for upload strategy, must be one of ' 703 '"simple" or "resumable".') % value) 704 self.__strategy = value 705 706 @property 707 def total_size(self): 708 return self.__total_size 709 710 @total_size.setter 711 def total_size(self, value): 712 self.EnsureUninitialized() 713 self.__total_size = value 714 715 def __SetDefaultUploadStrategy(self, upload_config, http_request): 716 """Determine and set the default upload strategy for this upload. 717 718 We generally prefer simple or multipart, unless we're forced to 719 use resumable. This happens when any of (1) the upload is too 720 large, (2) the simple endpoint doesn't support multipart requests 721 and we have metadata, or (3) there is no simple upload endpoint. 722 723 Args: 724 upload_config: Configuration for the upload endpoint. 725 http_request: The associated http request. 726 727 Returns: 728 None. 729 """ 730 if upload_config.resumable_path is None: 731 self.strategy = SIMPLE_UPLOAD 732 if self.strategy is not None: 733 return 734 strategy = SIMPLE_UPLOAD 735 if (self.total_size is not None and 736 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): 737 strategy = RESUMABLE_UPLOAD 738 if http_request.body and not upload_config.simple_multipart: 739 strategy = RESUMABLE_UPLOAD 740 if not upload_config.simple_path: 741 strategy = RESUMABLE_UPLOAD 742 self.strategy = strategy 743 744 def ConfigureRequest(self, upload_config, http_request, url_builder): 745 """Configure the request and url for this upload.""" 746 # Validate total_size vs. max_size 747 if (self.total_size and upload_config.max_size and 748 self.total_size > upload_config.max_size): 749 raise exceptions.InvalidUserInputError( 750 'Upload too big: %s larger than max size %s' % ( 751 self.total_size, upload_config.max_size)) 752 # Validate mime type 753 if not util.AcceptableMimeType(upload_config.accept, self.mime_type): 754 raise exceptions.InvalidUserInputError( 755 'MIME type %s does not match any accepted MIME ranges %s' % ( 756 self.mime_type, upload_config.accept)) 757 758 self.__SetDefaultUploadStrategy(upload_config, http_request) 759 if self.strategy == SIMPLE_UPLOAD: 760 url_builder.relative_path = upload_config.simple_path 761 if http_request.body: 762 url_builder.query_params['uploadType'] = 'multipart' 763 self.__ConfigureMultipartRequest(http_request) 764 else: 765 url_builder.query_params['uploadType'] = 'media' 766 self.__ConfigureMediaRequest(http_request) 767 # Once the entire body is written, compress the body if configured 768 # to. Both multipart and media request uploads will read the 769 # entire stream into memory, which means full compression is also 770 # safe to perform. Because the strategy is set to SIMPLE_UPLOAD, 771 # StreamInChunks throws an exception, meaning double compression 772 # cannot happen. 773 if self.__gzip_encoded: 774 http_request.headers['Content-Encoding'] = 'gzip' 775 # Turn the body into a stream so that we can compress it, then 776 # read the compressed bytes. In the event of a retry (e.g. if 777 # our access token has expired), we need to be able to re-read 778 # the body, which we can't do with a stream. So, we consume the 779 # bytes from the stream now and store them in a re-readable 780 # bytes container. 781 http_request.body = ( 782 compression.CompressStream( 783 six.BytesIO(http_request.body))[0].read()) 784 else: 785 url_builder.relative_path = upload_config.resumable_path 786 url_builder.query_params['uploadType'] = 'resumable' 787 self.__ConfigureResumableRequest(http_request) 788 789 def __ConfigureMediaRequest(self, http_request): 790 """Configure http_request as a simple request for this upload.""" 791 http_request.headers['content-type'] = self.mime_type 792 http_request.body = self.stream.read() 793 http_request.loggable_body = '<media body>' 794 795 def __ConfigureMultipartRequest(self, http_request): 796 """Configure http_request as a multipart request for this upload.""" 797 # This is a multipart/related upload. 798 msg_root = mime_multipart.MIMEMultipart('related') 799 # msg_root should not write out its own headers 800 setattr(msg_root, '_write_headers', lambda self: None) 801 802 # attach the body as one part 803 msg = mime_nonmultipart.MIMENonMultipart( 804 *http_request.headers['content-type'].split('/')) 805 msg.set_payload(http_request.body) 806 msg_root.attach(msg) 807 808 # attach the media as the second part 809 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) 810 msg['Content-Transfer-Encoding'] = 'binary' 811 msg.set_payload(self.stream.read()) 812 msg_root.attach(msg) 813 814 # NOTE: We encode the body, but can't use 815 # `email.message.Message.as_string` because it prepends 816 # `> ` to `From ` lines. 817 fp = six.BytesIO() 818 if six.PY3: 819 generator_class = MultipartBytesGenerator 820 else: 821 generator_class = email_generator.Generator 822 g = generator_class(fp, mangle_from_=False) 823 g.flatten(msg_root, unixfrom=False) 824 http_request.body = fp.getvalue() 825 826 multipart_boundary = msg_root.get_boundary() 827 http_request.headers['content-type'] = ( 828 'multipart/related; boundary=%r' % multipart_boundary) 829 if isinstance(multipart_boundary, six.text_type): 830 multipart_boundary = multipart_boundary.encode('ascii') 831 832 body_components = http_request.body.split(multipart_boundary) 833 headers, _, _ = body_components[-2].partition(b'\n\n') 834 body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--']) 835 http_request.loggable_body = multipart_boundary.join(body_components) 836 837 def __ConfigureResumableRequest(self, http_request): 838 http_request.headers['X-Upload-Content-Type'] = self.mime_type 839 if self.total_size is not None: 840 http_request.headers[ 841 'X-Upload-Content-Length'] = str(self.total_size) 842 843 def RefreshResumableUploadState(self): 844 """Talk to the server and refresh the state of this resumable upload. 845 846 Returns: 847 Response if the upload is complete. 848 """ 849 if self.strategy != RESUMABLE_UPLOAD: 850 return 851 self.EnsureInitialized() 852 refresh_request = http_wrapper.Request( 853 url=self.url, http_method='PUT', 854 headers={'Content-Range': 'bytes */*'}) 855 refresh_response = http_wrapper.MakeRequest( 856 self.http, refresh_request, redirections=0, 857 retries=self.num_retries) 858 range_header = self._GetRangeHeaderFromResponse(refresh_response) 859 if refresh_response.status_code in (http_client.OK, 860 http_client.CREATED): 861 self.__complete = True 862 self.__progress = self.total_size 863 self.stream.seek(self.progress) 864 # If we're finished, the refresh response will contain the metadata 865 # originally requested. Cache it so it can be returned in 866 # StreamInChunks. 867 self.__final_response = refresh_response 868 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: 869 if range_header is None: 870 self.__progress = 0 871 else: 872 self.__progress = self.__GetLastByte(range_header) + 1 873 self.stream.seek(self.progress) 874 else: 875 raise exceptions.HttpError.FromResponse(refresh_response) 876 877 def _GetRangeHeaderFromResponse(self, response): 878 return response.info.get('Range', response.info.get('range')) 879 880 def InitializeUpload(self, http_request, http=None, client=None): 881 """Initialize this upload from the given http_request.""" 882 if self.strategy is None: 883 raise exceptions.UserError( 884 'No upload strategy set; did you call ConfigureRequest?') 885 if http is None and client is None: 886 raise exceptions.UserError('Must provide client or http.') 887 if self.strategy != RESUMABLE_UPLOAD: 888 return 889 http = http or client.http 890 if client is not None: 891 http_request.url = client.FinalizeTransferUrl(http_request.url) 892 self.EnsureUninitialized() 893 http_response = http_wrapper.MakeRequest(http, http_request, 894 retries=self.num_retries) 895 if http_response.status_code != http_client.OK: 896 raise exceptions.HttpError.FromResponse(http_response) 897 898 self.__server_chunk_granularity = http_response.info.get( 899 'X-Goog-Upload-Chunk-Granularity') 900 url = http_response.info['location'] 901 if client is not None: 902 url = client.FinalizeTransferUrl(url) 903 self._Initialize(http, url) 904 905 # Unless the user has requested otherwise, we want to just 906 # go ahead and pump the bytes now. 907 if self.auto_transfer: 908 return self.StreamInChunks() 909 return http_response 910 911 def __GetLastByte(self, range_header): 912 _, _, end = range_header.partition('-') 913 # TODO(craigcitro): Validate start == 0? 914 return int(end) 915 916 def __ValidateChunksize(self, chunksize=None): 917 if self.__server_chunk_granularity is None: 918 return 919 chunksize = chunksize or self.chunksize 920 if chunksize % self.__server_chunk_granularity: 921 raise exceptions.ConfigurationValueError( 922 'Server requires chunksize to be a multiple of %d' % 923 self.__server_chunk_granularity) 924 925 def __IsRetryable(self, response): 926 return (response.status_code >= 500 or 927 response.status_code == http_wrapper.TOO_MANY_REQUESTS or 928 response.retry_after) 929 930 def __StreamMedia(self, callback=None, finish_callback=None, 931 additional_headers=None, use_chunks=True): 932 """Helper function for StreamMedia / StreamInChunks.""" 933 if self.strategy != RESUMABLE_UPLOAD: 934 raise exceptions.InvalidUserInputError( 935 'Cannot stream non-resumable upload') 936 callback = callback or self.progress_callback 937 finish_callback = finish_callback or self.finish_callback 938 # final_response is set if we resumed an already-completed upload. 939 response = self.__final_response 940 941 def CallSendChunk(start): 942 return self.__SendChunk( 943 start, additional_headers=additional_headers) 944 945 def CallSendMediaBody(start): 946 return self.__SendMediaBody( 947 start, additional_headers=additional_headers) 948 949 send_func = CallSendChunk if use_chunks else CallSendMediaBody 950 if not use_chunks and self.__gzip_encoded: 951 raise exceptions.InvalidUserInputError( 952 'Cannot gzip encode non-chunked upload') 953 if use_chunks: 954 self.__ValidateChunksize(self.chunksize) 955 self.EnsureInitialized() 956 while not self.complete: 957 response = send_func(self.stream.tell()) 958 if response.status_code in (http_client.OK, http_client.CREATED): 959 self.__complete = True 960 break 961 if response.status_code not in ( 962 http_client.OK, http_client.CREATED, 963 http_wrapper.RESUME_INCOMPLETE): 964 # Only raise an exception if the error is something we can't 965 # recover from. 966 if (self.strategy != RESUMABLE_UPLOAD or 967 not self.__IsRetryable(response)): 968 raise exceptions.HttpError.FromResponse(response) 969 # We want to reset our state to wherever the server left us 970 # before this failed request, and then raise. 971 self.RefreshResumableUploadState() 972 973 self._ExecuteCallback(callback, response) 974 continue 975 976 self.__progress = self.__GetLastByte( 977 self._GetRangeHeaderFromResponse(response)) 978 if self.progress + 1 != self.stream.tell(): 979 # TODO(craigcitro): Add a better way to recover here. 980 raise exceptions.CommunicationError( 981 'Failed to transfer all bytes in chunk, upload paused at ' 982 'byte %d' % self.progress) 983 self._ExecuteCallback(callback, response) 984 if self.__complete and hasattr(self.stream, 'seek'): 985 current_pos = self.stream.tell() 986 self.stream.seek(0, os.SEEK_END) 987 end_pos = self.stream.tell() 988 self.stream.seek(current_pos) 989 if current_pos != end_pos: 990 raise exceptions.TransferInvalidError( 991 'Upload complete with %s additional bytes left in stream' % 992 (int(end_pos) - int(current_pos))) 993 self._ExecuteCallback(finish_callback, response) 994 return response 995 996 def StreamMedia(self, callback=None, finish_callback=None, 997 additional_headers=None): 998 """Send this resumable upload in a single request. 999 1000 Args: 1001 callback: Progress callback function with inputs 1002 (http_wrapper.Response, transfer.Upload) 1003 finish_callback: Final callback function with inputs 1004 (http_wrapper.Response, transfer.Upload) 1005 additional_headers: Dict of headers to include with the upload 1006 http_wrapper.Request. 1007 1008 Returns: 1009 http_wrapper.Response of final response. 1010 """ 1011 return self.__StreamMedia( 1012 callback=callback, finish_callback=finish_callback, 1013 additional_headers=additional_headers, use_chunks=False) 1014 1015 def StreamInChunks(self, callback=None, finish_callback=None, 1016 additional_headers=None): 1017 """Send this (resumable) upload in chunks.""" 1018 return self.__StreamMedia( 1019 callback=callback, finish_callback=finish_callback, 1020 additional_headers=additional_headers) 1021 1022 def __SendMediaRequest(self, request, end): 1023 """Request helper function for SendMediaBody & SendChunk.""" 1024 def CheckResponse(response): 1025 if response is None: 1026 # Caller shouldn't call us if the response is None, 1027 # but handle anyway. 1028 raise exceptions.RequestError( 1029 'Request to url %s did not return a response.' % 1030 response.request_url) 1031 response = http_wrapper.MakeRequest( 1032 self.bytes_http, request, retry_func=self.retry_func, 1033 retries=self.num_retries, check_response_func=CheckResponse) 1034 if response.status_code == http_wrapper.RESUME_INCOMPLETE: 1035 last_byte = self.__GetLastByte( 1036 self._GetRangeHeaderFromResponse(response)) 1037 if last_byte + 1 != end: 1038 self.stream.seek(last_byte + 1) 1039 return response 1040 1041 def __SendMediaBody(self, start, additional_headers=None): 1042 """Send the entire media stream in a single request.""" 1043 self.EnsureInitialized() 1044 if self.total_size is None: 1045 raise exceptions.TransferInvalidError( 1046 'Total size must be known for SendMediaBody') 1047 body_stream = stream_slice.StreamSlice( 1048 self.stream, self.total_size - start) 1049 1050 request = http_wrapper.Request(url=self.url, http_method='PUT', 1051 body=body_stream) 1052 request.headers['Content-Type'] = self.mime_type 1053 if start == self.total_size: 1054 # End of an upload with 0 bytes left to send; just finalize. 1055 range_string = 'bytes */%s' % self.total_size 1056 else: 1057 range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, 1058 self.total_size) 1059 1060 request.headers['Content-Range'] = range_string 1061 if additional_headers: 1062 request.headers.update(additional_headers) 1063 1064 return self.__SendMediaRequest(request, self.total_size) 1065 1066 def __SendChunk(self, start, additional_headers=None): 1067 """Send the specified chunk.""" 1068 self.EnsureInitialized() 1069 no_log_body = self.total_size is None 1070 request = http_wrapper.Request(url=self.url, http_method='PUT') 1071 if self.__gzip_encoded: 1072 request.headers['Content-Encoding'] = 'gzip' 1073 body_stream, read_length, exhausted = compression.CompressStream( 1074 self.stream, self.chunksize) 1075 end = start + read_length 1076 # If the stream length was previously unknown and the input stream 1077 # is exhausted, then we're at the end of the stream. 1078 if self.total_size is None and exhausted: 1079 self.__total_size = end 1080 elif self.total_size is None: 1081 # For the streaming resumable case, we need to detect when 1082 # we're at the end of the stream. 1083 body_stream = buffered_stream.BufferedStream( 1084 self.stream, start, self.chunksize) 1085 end = body_stream.stream_end_position 1086 if body_stream.stream_exhausted: 1087 self.__total_size = end 1088 # TODO: Here, change body_stream from a stream to a string object, 1089 # which means reading a chunk into memory. This works around 1090 # https://code.google.com/p/httplib2/issues/detail?id=176 which can 1091 # cause httplib2 to skip bytes on 401's for file objects. 1092 # Rework this solution to be more general. 1093 body_stream = body_stream.read(self.chunksize) 1094 else: 1095 end = min(start + self.chunksize, self.total_size) 1096 body_stream = stream_slice.StreamSlice(self.stream, end - start) 1097 # TODO(craigcitro): Think about clearer errors on "no data in 1098 # stream". 1099 request.body = body_stream 1100 request.headers['Content-Type'] = self.mime_type 1101 if no_log_body: 1102 # Disable logging of streaming body. 1103 # TODO: Remove no_log_body and rework as part of a larger logs 1104 # refactor. 1105 request.loggable_body = '<media body>' 1106 if self.total_size is None: 1107 # Streaming resumable upload case, unknown total size. 1108 range_string = 'bytes %s-%s/*' % (start, end - 1) 1109 elif end == start: 1110 # End of an upload with 0 bytes left to send; just finalize. 1111 range_string = 'bytes */%s' % self.total_size 1112 else: 1113 # Normal resumable upload case with known sizes. 1114 range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) 1115 1116 request.headers['Content-Range'] = range_string 1117 if additional_headers: 1118 request.headers.update(additional_headers) 1119 1120 return self.__SendMediaRequest(request, end) 1121