1#!/usr/bin/env python 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Upload and download support for apitools.""" 18from __future__ import print_function 19 20import email.generator as email_generator 21import email.mime.multipart as mime_multipart 22import email.mime.nonmultipart as mime_nonmultipart 23import io 24import json 25import mimetypes 26import os 27import threading 28 29import six 30from six.moves import http_client 31 32from apitools.base.py import buffered_stream 33from apitools.base.py import exceptions 34from apitools.base.py import http_wrapper 35from apitools.base.py import stream_slice 36from apitools.base.py import util 37 38__all__ = [ 39 'Download', 40 'Upload', 41 'RESUMABLE_UPLOAD', 42 'SIMPLE_UPLOAD', 43 'DownloadProgressPrinter', 44 'DownloadCompletePrinter', 45 'UploadProgressPrinter', 46 'UploadCompletePrinter', 47] 48 49_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 50SIMPLE_UPLOAD = 'simple' 51RESUMABLE_UPLOAD = 'resumable' 52 53 54def DownloadProgressPrinter(response, unused_download): 55 """Print download progress based on response.""" 56 if 'content-range' in response.info: 57 print('Received %s' % response.info['content-range']) 58 else: 59 print('Received %d bytes' % response.length) 60 61 62def DownloadCompletePrinter(unused_response, unused_download): 63 """Print information about a completed download.""" 64 print('Download complete') 65 66 67def UploadProgressPrinter(response, unused_upload): 68 """Print upload progress based on response.""" 69 print('Sent %s' % response.info['range']) 70 71 72def UploadCompletePrinter(unused_response, unused_upload): 73 """Print information about a completed upload.""" 74 print('Upload complete') 75 76 77class _Transfer(object): 78 79 """Generic bits common to Uploads and Downloads.""" 80 81 def __init__(self, stream, close_stream=False, chunksize=None, 82 auto_transfer=True, http=None, num_retries=5): 83 self.__bytes_http = None 84 self.__close_stream = close_stream 85 self.__http = http 86 self.__stream = stream 87 self.__url = None 88 89 self.__num_retries = 5 90 # Let the @property do validation 91 self.num_retries = num_retries 92 93 self.retry_func = ( 94 http_wrapper.HandleExceptionsAndRebuildHttpConnections) 95 self.auto_transfer = auto_transfer 96 self.chunksize = chunksize or 1048576 97 98 def __repr__(self): 99 return str(self) 100 101 @property 102 def close_stream(self): 103 return self.__close_stream 104 105 @property 106 def http(self): 107 return self.__http 108 109 @property 110 def bytes_http(self): 111 return self.__bytes_http or self.http 112 113 @bytes_http.setter 114 def bytes_http(self, value): 115 self.__bytes_http = value 116 117 @property 118 def num_retries(self): 119 return self.__num_retries 120 121 @num_retries.setter 122 def num_retries(self, value): 123 util.Typecheck(value, six.integer_types) 124 if value < 0: 125 raise exceptions.InvalidDataError( 126 'Cannot have negative value for num_retries') 127 self.__num_retries = value 128 129 @property 130 def stream(self): 131 return self.__stream 132 133 @property 134 def url(self): 135 return self.__url 136 137 def _Initialize(self, http, url): 138 """Initialize this download by setting self.http and self.url. 139 140 We want the user to be able to override self.http by having set 141 the value in the constructor; in that case, we ignore the provided 142 http. 143 144 Args: 145 http: An httplib2.Http instance or None. 146 url: The url for this transfer. 147 148 Returns: 149 None. Initializes self. 150 """ 151 self.EnsureUninitialized() 152 if self.http is None: 153 self.__http = http or http_wrapper.GetHttp() 154 self.__url = url 155 156 @property 157 def initialized(self): 158 return self.url is not None and self.http is not None 159 160 @property 161 def _type_name(self): 162 return type(self).__name__ 163 164 def EnsureInitialized(self): 165 if not self.initialized: 166 raise exceptions.TransferInvalidError( 167 'Cannot use uninitialized %s', self._type_name) 168 169 def EnsureUninitialized(self): 170 if self.initialized: 171 raise exceptions.TransferInvalidError( 172 'Cannot re-initialize %s', self._type_name) 173 174 def __del__(self): 175 if self.__close_stream: 176 self.__stream.close() 177 178 def _ExecuteCallback(self, callback, response): 179 # TODO(craigcitro): Push these into a queue. 180 if callback is not None: 181 threading.Thread(target=callback, args=(response, self)).start() 182 183 184class Download(_Transfer): 185 186 """Data for a single download. 187 188 Public attributes: 189 chunksize: default chunksize to use for transfers. 190 """ 191 _ACCEPTABLE_STATUSES = set(( 192 http_client.OK, 193 http_client.NO_CONTENT, 194 http_client.PARTIAL_CONTENT, 195 http_client.REQUESTED_RANGE_NOT_SATISFIABLE, 196 )) 197 _REQUIRED_SERIALIZATION_KEYS = set(( 198 'auto_transfer', 'progress', 'total_size', 'url')) 199 200 def __init__(self, stream, progress_callback=None, finish_callback=None, 201 **kwds): 202 total_size = kwds.pop('total_size', None) 203 super(Download, self).__init__(stream, **kwds) 204 self.__initial_response = None 205 self.__progress = 0 206 self.__total_size = total_size 207 self.__encoding = None 208 209 self.progress_callback = progress_callback 210 self.finish_callback = finish_callback 211 212 @property 213 def progress(self): 214 return self.__progress 215 216 @property 217 def encoding(self): 218 return self.__encoding 219 220 @classmethod 221 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): 222 """Create a new download object from a filename.""" 223 path = os.path.expanduser(filename) 224 if os.path.exists(path) and not overwrite: 225 raise exceptions.InvalidUserInputError( 226 'File %s exists and overwrite not specified' % path) 227 return cls(open(path, 'wb'), close_stream=True, 228 auto_transfer=auto_transfer, **kwds) 229 230 @classmethod 231 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): 232 """Create a new Download object from a stream.""" 233 return cls(stream, auto_transfer=auto_transfer, total_size=total_size, 234 **kwds) 235 236 @classmethod 237 def FromData(cls, stream, json_data, http=None, auto_transfer=None, 238 **kwds): 239 """Create a new Download object from a stream and serialized data.""" 240 info = json.loads(json_data) 241 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 242 if missing_keys: 243 raise exceptions.InvalidDataError( 244 'Invalid serialization data, missing keys: %s' % ( 245 ', '.join(missing_keys))) 246 download = cls.FromStream(stream, **kwds) 247 if auto_transfer is not None: 248 download.auto_transfer = auto_transfer 249 else: 250 download.auto_transfer = info['auto_transfer'] 251 setattr(download, '_Download__progress', info['progress']) 252 setattr(download, '_Download__total_size', info['total_size']) 253 download._Initialize( # pylint: disable=protected-access 254 http, info['url']) 255 return download 256 257 @property 258 def serialization_data(self): 259 self.EnsureInitialized() 260 return { 261 'auto_transfer': self.auto_transfer, 262 'progress': self.progress, 263 'total_size': self.total_size, 264 'url': self.url, 265 } 266 267 @property 268 def total_size(self): 269 return self.__total_size 270 271 def __str__(self): 272 if not self.initialized: 273 return 'Download (uninitialized)' 274 return 'Download with %d/%s bytes transferred from url %s' % ( 275 self.progress, self.total_size, self.url) 276 277 def ConfigureRequest(self, http_request, url_builder): 278 url_builder.query_params['alt'] = 'media' 279 # TODO(craigcitro): We need to send range requests because by 280 # default httplib2 stores entire reponses in memory. Override 281 # httplib2's download method (as gsutil does) so that this is not 282 # necessary. 283 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) 284 285 def __SetTotal(self, info): 286 if 'content-range' in info: 287 _, _, total = info['content-range'].rpartition('/') 288 if total != '*': 289 self.__total_size = int(total) 290 # Note "total_size is None" means we don't know it; if no size 291 # info was returned on our initial range request, that means we 292 # have a 0-byte file. (That last statement has been verified 293 # empirically, but is not clearly documented anywhere.) 294 if self.total_size is None: 295 self.__total_size = 0 296 297 def InitializeDownload(self, http_request, http=None, client=None): 298 """Initialize this download by making a request. 299 300 Args: 301 http_request: The HttpRequest to use to initialize this download. 302 http: The httplib2.Http instance for this request. 303 client: If provided, let this client process the final URL before 304 sending any additional requests. If client is provided and 305 http is not, client.http will be used instead. 306 """ 307 self.EnsureUninitialized() 308 if http is None and client is None: 309 raise exceptions.UserError('Must provide client or http.') 310 http = http or client.http 311 if client is not None: 312 http_request.url = client.FinalizeTransferUrl(http_request.url) 313 url = http_request.url 314 if self.auto_transfer: 315 end_byte = self.__ComputeEndByte(0) 316 self.__SetRangeHeader(http_request, 0, end_byte) 317 response = http_wrapper.MakeRequest( 318 self.bytes_http or http, http_request) 319 if response.status_code not in self._ACCEPTABLE_STATUSES: 320 raise exceptions.HttpError.FromResponse(response) 321 self.__initial_response = response 322 self.__SetTotal(response.info) 323 url = response.info.get('content-location', response.request_url) 324 if client is not None: 325 url = client.FinalizeTransferUrl(url) 326 self._Initialize(http, url) 327 # Unless the user has requested otherwise, we want to just 328 # go ahead and pump the bytes now. 329 if self.auto_transfer: 330 self.StreamInChunks() 331 332 def __NormalizeStartEnd(self, start, end=None): 333 if end is not None: 334 if start < 0: 335 raise exceptions.TransferInvalidError( 336 'Cannot have end index with negative start index') 337 elif start >= self.total_size: 338 raise exceptions.TransferInvalidError( 339 'Cannot have start index greater than total size') 340 end = min(end, self.total_size - 1) 341 if end < start: 342 raise exceptions.TransferInvalidError( 343 'Range requested with end[%s] < start[%s]' % (end, start)) 344 return start, end 345 else: 346 if start < 0: 347 start = max(0, start + self.total_size) 348 return start, self.total_size - 1 349 350 def __SetRangeHeader(self, request, start, end=None): 351 if start < 0: 352 request.headers['range'] = 'bytes=%d' % start 353 elif end is None: 354 request.headers['range'] = 'bytes=%d-' % start 355 else: 356 request.headers['range'] = 'bytes=%d-%d' % (start, end) 357 358 def __ComputeEndByte(self, start, end=None, use_chunks=True): 359 """Compute the last byte to fetch for this request. 360 361 This is all based on the HTTP spec for Range and 362 Content-Range. 363 364 Note that this is potentially confusing in several ways: 365 * the value for the last byte is 0-based, eg "fetch 10 bytes 366 from the beginning" would return 9 here. 367 * if we have no information about size, and don't want to 368 use the chunksize, we'll return None. 369 See the tests for more examples. 370 371 Args: 372 start: byte to start at. 373 end: (int or None, default: None) Suggested last byte. 374 use_chunks: (bool, default: True) If False, ignore self.chunksize. 375 376 Returns: 377 Last byte to use in a Range header, or None. 378 379 """ 380 end_byte = end 381 382 if start < 0 and not self.total_size: 383 return end_byte 384 385 if use_chunks: 386 alternate = start + self.chunksize - 1 387 if end_byte is not None: 388 end_byte = min(end_byte, alternate) 389 else: 390 end_byte = alternate 391 392 if self.total_size: 393 alternate = self.total_size - 1 394 if end_byte is not None: 395 end_byte = min(end_byte, alternate) 396 else: 397 end_byte = alternate 398 399 return end_byte 400 401 def __GetChunk(self, start, end, additional_headers=None): 402 """Retrieve a chunk, and return the full response.""" 403 self.EnsureInitialized() 404 request = http_wrapper.Request(url=self.url) 405 self.__SetRangeHeader(request, start, end=end) 406 if additional_headers is not None: 407 request.headers.update(additional_headers) 408 return http_wrapper.MakeRequest( 409 self.bytes_http, request, retry_func=self.retry_func, 410 retries=self.num_retries) 411 412 def __ProcessResponse(self, response): 413 """Process response (by updating self and writing to self.stream).""" 414 if response.status_code not in self._ACCEPTABLE_STATUSES: 415 # We distinguish errors that mean we made a mistake in setting 416 # up the transfer versus something we should attempt again. 417 if response.status_code in (http_client.FORBIDDEN, 418 http_client.NOT_FOUND): 419 raise exceptions.HttpError.FromResponse(response) 420 else: 421 raise exceptions.TransferRetryError(response.content) 422 if response.status_code in (http_client.OK, 423 http_client.PARTIAL_CONTENT): 424 self.stream.write(response.content) 425 self.__progress += response.length 426 if response.info and 'content-encoding' in response.info: 427 # TODO(craigcitro): Handle the case where this changes over a 428 # download. 429 self.__encoding = response.info['content-encoding'] 430 elif response.status_code == http_client.NO_CONTENT: 431 # It's important to write something to the stream for the case 432 # of a 0-byte download to a file, as otherwise python won't 433 # create the file. 434 self.stream.write('') 435 return response 436 437 def GetRange(self, start, end=None, additional_headers=None, 438 use_chunks=True): 439 """Retrieve a given byte range from this download, inclusive. 440 441 Range must be of one of these three forms: 442 * 0 <= start, end = None: Fetch from start to the end of the file. 443 * 0 <= start <= end: Fetch the bytes from start to end. 444 * start < 0, end = None: Fetch the last -start bytes of the file. 445 446 (These variations correspond to those described in the HTTP 1.1 447 protocol for range headers in RFC 2616, sec. 14.35.1.) 448 449 Args: 450 start: (int) Where to start fetching bytes. (See above.) 451 end: (int, optional) Where to stop fetching bytes. (See above.) 452 additional_headers: (bool, optional) Any additional headers to 453 pass with the request. 454 use_chunks: (bool, default: True) If False, ignore self.chunksize 455 and fetch this range in a single request. 456 457 Returns: 458 None. Streams bytes into self.stream. 459 """ 460 self.EnsureInitialized() 461 progress_end_normalized = False 462 if self.total_size is not None: 463 progress, end_byte = self.__NormalizeStartEnd(start, end) 464 progress_end_normalized = True 465 else: 466 progress = start 467 end_byte = end 468 while (not progress_end_normalized or end_byte is None or 469 progress <= end_byte): 470 end_byte = self.__ComputeEndByte(progress, end=end_byte, 471 use_chunks=use_chunks) 472 response = self.__GetChunk(progress, end_byte, 473 additional_headers=additional_headers) 474 if not progress_end_normalized: 475 self.__SetTotal(response.info) 476 progress, end_byte = self.__NormalizeStartEnd(start, end) 477 progress_end_normalized = True 478 response = self.__ProcessResponse(response) 479 progress += response.length 480 if response.length == 0: 481 raise exceptions.TransferRetryError( 482 'Zero bytes unexpectedly returned in download response') 483 484 def StreamInChunks(self, callback=None, finish_callback=None, 485 additional_headers=None): 486 """Stream the entire download in chunks.""" 487 self.StreamMedia(callback=callback, finish_callback=finish_callback, 488 additional_headers=additional_headers, 489 use_chunks=True) 490 491 def StreamMedia(self, callback=None, finish_callback=None, 492 additional_headers=None, use_chunks=True): 493 """Stream the entire download. 494 495 Args: 496 callback: (default: None) Callback to call as each chunk is 497 completed. 498 finish_callback: (default: None) Callback to call when the 499 download is complete. 500 additional_headers: (default: None) Additional headers to 501 include in fetching bytes. 502 use_chunks: (bool, default: True) If False, ignore self.chunksize 503 and stream this download in a single request. 504 505 Returns: 506 None. Streams bytes into self.stream. 507 """ 508 callback = callback or self.progress_callback 509 finish_callback = finish_callback or self.finish_callback 510 511 self.EnsureInitialized() 512 while True: 513 if self.__initial_response is not None: 514 response = self.__initial_response 515 self.__initial_response = None 516 else: 517 end_byte = self.__ComputeEndByte(self.progress, 518 use_chunks=use_chunks) 519 response = self.__GetChunk( 520 self.progress, end_byte, 521 additional_headers=additional_headers) 522 if self.total_size is None: 523 self.__SetTotal(response.info) 524 response = self.__ProcessResponse(response) 525 self._ExecuteCallback(callback, response) 526 if (response.status_code == http_client.OK or 527 self.progress >= self.total_size): 528 break 529 self._ExecuteCallback(finish_callback, response) 530 531 532class Upload(_Transfer): 533 534 """Data for a single Upload. 535 536 Fields: 537 stream: The stream to upload. 538 mime_type: MIME type of the upload. 539 total_size: (optional) Total upload size for the stream. 540 close_stream: (default: False) Whether or not we should close the 541 stream when finished with the upload. 542 auto_transfer: (default: True) If True, stream all bytes as soon as 543 the upload is created. 544 """ 545 _REQUIRED_SERIALIZATION_KEYS = set(( 546 'auto_transfer', 'mime_type', 'total_size', 'url')) 547 548 def __init__(self, stream, mime_type, total_size=None, http=None, 549 close_stream=False, chunksize=None, auto_transfer=True, 550 progress_callback=None, finish_callback=None, 551 **kwds): 552 super(Upload, self).__init__( 553 stream, close_stream=close_stream, chunksize=chunksize, 554 auto_transfer=auto_transfer, http=http, **kwds) 555 self.__complete = False 556 self.__final_response = None 557 self.__mime_type = mime_type 558 self.__progress = 0 559 self.__server_chunk_granularity = None 560 self.__strategy = None 561 self.__total_size = None 562 563 self.progress_callback = progress_callback 564 self.finish_callback = finish_callback 565 self.total_size = total_size 566 567 @property 568 def progress(self): 569 return self.__progress 570 571 @classmethod 572 def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds): 573 """Create a new Upload object from a filename.""" 574 path = os.path.expanduser(filename) 575 if not os.path.exists(path): 576 raise exceptions.NotFoundError('Could not find file %s' % path) 577 if not mime_type: 578 mime_type, _ = mimetypes.guess_type(path) 579 if mime_type is None: 580 raise exceptions.InvalidUserInputError( 581 'Could not guess mime type for %s' % path) 582 size = os.stat(path).st_size 583 return cls(open(path, 'rb'), mime_type, total_size=size, 584 close_stream=True, auto_transfer=auto_transfer, **kwds) 585 586 @classmethod 587 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, 588 **kwds): 589 """Create a new Upload object from a stream.""" 590 if mime_type is None: 591 raise exceptions.InvalidUserInputError( 592 'No mime_type specified for stream') 593 return cls(stream, mime_type, total_size=total_size, 594 close_stream=False, auto_transfer=auto_transfer, **kwds) 595 596 @classmethod 597 def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds): 598 """Create a new Upload of stream from serialized json_data and http.""" 599 info = json.loads(json_data) 600 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 601 if missing_keys: 602 raise exceptions.InvalidDataError( 603 'Invalid serialization data, missing keys: %s' % ( 604 ', '.join(missing_keys))) 605 if 'total_size' in kwds: 606 raise exceptions.InvalidUserInputError( 607 'Cannot override total_size on serialized Upload') 608 upload = cls.FromStream(stream, info['mime_type'], 609 total_size=info.get('total_size'), **kwds) 610 if isinstance(stream, io.IOBase) and not stream.seekable(): 611 raise exceptions.InvalidUserInputError( 612 'Cannot restart resumable upload on non-seekable stream') 613 if auto_transfer is not None: 614 upload.auto_transfer = auto_transfer 615 else: 616 upload.auto_transfer = info['auto_transfer'] 617 upload.strategy = RESUMABLE_UPLOAD 618 upload._Initialize( # pylint: disable=protected-access 619 http, info['url']) 620 upload.RefreshResumableUploadState() 621 upload.EnsureInitialized() 622 if upload.auto_transfer: 623 upload.StreamInChunks() 624 return upload 625 626 @property 627 def serialization_data(self): 628 self.EnsureInitialized() 629 if self.strategy != RESUMABLE_UPLOAD: 630 raise exceptions.InvalidDataError( 631 'Serialization only supported for resumable uploads') 632 return { 633 'auto_transfer': self.auto_transfer, 634 'mime_type': self.mime_type, 635 'total_size': self.total_size, 636 'url': self.url, 637 } 638 639 @property 640 def complete(self): 641 return self.__complete 642 643 @property 644 def mime_type(self): 645 return self.__mime_type 646 647 def __str__(self): 648 if not self.initialized: 649 return 'Upload (uninitialized)' 650 return 'Upload with %d/%s bytes transferred for url %s' % ( 651 self.progress, self.total_size or '???', self.url) 652 653 @property 654 def strategy(self): 655 return self.__strategy 656 657 @strategy.setter 658 def strategy(self, value): 659 if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): 660 raise exceptions.UserError(( 661 'Invalid value "%s" for upload strategy, must be one of ' 662 '"simple" or "resumable".') % value) 663 self.__strategy = value 664 665 @property 666 def total_size(self): 667 return self.__total_size 668 669 @total_size.setter 670 def total_size(self, value): 671 self.EnsureUninitialized() 672 self.__total_size = value 673 674 def __SetDefaultUploadStrategy(self, upload_config, http_request): 675 """Determine and set the default upload strategy for this upload. 676 677 We generally prefer simple or multipart, unless we're forced to 678 use resumable. This happens when any of (1) the upload is too 679 large, (2) the simple endpoint doesn't support multipart requests 680 and we have metadata, or (3) there is no simple upload endpoint. 681 682 Args: 683 upload_config: Configuration for the upload endpoint. 684 http_request: The associated http request. 685 686 Returns: 687 None. 688 """ 689 if upload_config.resumable_path is None: 690 self.strategy = SIMPLE_UPLOAD 691 if self.strategy is not None: 692 return 693 strategy = SIMPLE_UPLOAD 694 if (self.total_size is not None and 695 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): 696 strategy = RESUMABLE_UPLOAD 697 if http_request.body and not upload_config.simple_multipart: 698 strategy = RESUMABLE_UPLOAD 699 if not upload_config.simple_path: 700 strategy = RESUMABLE_UPLOAD 701 self.strategy = strategy 702 703 def ConfigureRequest(self, upload_config, http_request, url_builder): 704 """Configure the request and url for this upload.""" 705 # Validate total_size vs. max_size 706 if (self.total_size and upload_config.max_size and 707 self.total_size > upload_config.max_size): 708 raise exceptions.InvalidUserInputError( 709 'Upload too big: %s larger than max size %s' % ( 710 self.total_size, upload_config.max_size)) 711 # Validate mime type 712 if not util.AcceptableMimeType(upload_config.accept, self.mime_type): 713 raise exceptions.InvalidUserInputError( 714 'MIME type %s does not match any accepted MIME ranges %s' % ( 715 self.mime_type, upload_config.accept)) 716 717 self.__SetDefaultUploadStrategy(upload_config, http_request) 718 if self.strategy == SIMPLE_UPLOAD: 719 url_builder.relative_path = upload_config.simple_path 720 if http_request.body: 721 url_builder.query_params['uploadType'] = 'multipart' 722 self.__ConfigureMultipartRequest(http_request) 723 else: 724 url_builder.query_params['uploadType'] = 'media' 725 self.__ConfigureMediaRequest(http_request) 726 else: 727 url_builder.relative_path = upload_config.resumable_path 728 url_builder.query_params['uploadType'] = 'resumable' 729 self.__ConfigureResumableRequest(http_request) 730 731 def __ConfigureMediaRequest(self, http_request): 732 """Configure http_request as a simple request for this upload.""" 733 http_request.headers['content-type'] = self.mime_type 734 http_request.body = self.stream.read() 735 http_request.loggable_body = '<media body>' 736 737 def __ConfigureMultipartRequest(self, http_request): 738 """Configure http_request as a multipart request for this upload.""" 739 # This is a multipart/related upload. 740 msg_root = mime_multipart.MIMEMultipart('related') 741 # msg_root should not write out its own headers 742 setattr(msg_root, '_write_headers', lambda self: None) 743 744 # attach the body as one part 745 msg = mime_nonmultipart.MIMENonMultipart( 746 *http_request.headers['content-type'].split('/')) 747 msg.set_payload(http_request.body) 748 msg_root.attach(msg) 749 750 # attach the media as the second part 751 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) 752 msg['Content-Transfer-Encoding'] = 'binary' 753 msg.set_payload(self.stream.read()) 754 msg_root.attach(msg) 755 756 # NOTE: We encode the body, but can't use 757 # `email.message.Message.as_string` because it prepends 758 # `> ` to `From ` lines. 759 fp = six.BytesIO() 760 if six.PY3: 761 generator_class = email_generator.BytesGenerator 762 else: 763 generator_class = email_generator.Generator 764 g = generator_class(fp, mangle_from_=False) 765 g.flatten(msg_root, unixfrom=False) 766 http_request.body = fp.getvalue() 767 768 multipart_boundary = msg_root.get_boundary() 769 http_request.headers['content-type'] = ( 770 'multipart/related; boundary=%r' % multipart_boundary) 771 if isinstance(multipart_boundary, six.text_type): 772 multipart_boundary = multipart_boundary.encode('ascii') 773 774 body_components = http_request.body.split(multipart_boundary) 775 headers, _, _ = body_components[-2].partition(b'\n\n') 776 body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--']) 777 http_request.loggable_body = multipart_boundary.join(body_components) 778 779 def __ConfigureResumableRequest(self, http_request): 780 http_request.headers['X-Upload-Content-Type'] = self.mime_type 781 if self.total_size is not None: 782 http_request.headers[ 783 'X-Upload-Content-Length'] = str(self.total_size) 784 785 def RefreshResumableUploadState(self): 786 """Talk to the server and refresh the state of this resumable upload. 787 788 Returns: 789 Response if the upload is complete. 790 """ 791 if self.strategy != RESUMABLE_UPLOAD: 792 return 793 self.EnsureInitialized() 794 refresh_request = http_wrapper.Request( 795 url=self.url, http_method='PUT', 796 headers={'Content-Range': 'bytes */*'}) 797 refresh_response = http_wrapper.MakeRequest( 798 self.http, refresh_request, redirections=0, 799 retries=self.num_retries) 800 range_header = self._GetRangeHeaderFromResponse(refresh_response) 801 if refresh_response.status_code in (http_client.OK, 802 http_client.CREATED): 803 self.__complete = True 804 self.__progress = self.total_size 805 self.stream.seek(self.progress) 806 # If we're finished, the refresh response will contain the metadata 807 # originally requested. Cache it so it can be returned in 808 # StreamInChunks. 809 self.__final_response = refresh_response 810 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: 811 if range_header is None: 812 self.__progress = 0 813 else: 814 self.__progress = self.__GetLastByte(range_header) + 1 815 self.stream.seek(self.progress) 816 else: 817 raise exceptions.HttpError.FromResponse(refresh_response) 818 819 def _GetRangeHeaderFromResponse(self, response): 820 return response.info.get('Range', response.info.get('range')) 821 822 def InitializeUpload(self, http_request, http=None, client=None): 823 """Initialize this upload from the given http_request.""" 824 if self.strategy is None: 825 raise exceptions.UserError( 826 'No upload strategy set; did you call ConfigureRequest?') 827 if http is None and client is None: 828 raise exceptions.UserError('Must provide client or http.') 829 if self.strategy != RESUMABLE_UPLOAD: 830 return 831 http = http or client.http 832 if client is not None: 833 http_request.url = client.FinalizeTransferUrl(http_request.url) 834 self.EnsureUninitialized() 835 http_response = http_wrapper.MakeRequest(http, http_request, 836 retries=self.num_retries) 837 if http_response.status_code != http_client.OK: 838 raise exceptions.HttpError.FromResponse(http_response) 839 840 self.__server_chunk_granularity = http_response.info.get( 841 'X-Goog-Upload-Chunk-Granularity') 842 url = http_response.info['location'] 843 if client is not None: 844 url = client.FinalizeTransferUrl(url) 845 self._Initialize(http, url) 846 847 # Unless the user has requested otherwise, we want to just 848 # go ahead and pump the bytes now. 849 if self.auto_transfer: 850 return self.StreamInChunks() 851 return http_response 852 853 def __GetLastByte(self, range_header): 854 _, _, end = range_header.partition('-') 855 # TODO(craigcitro): Validate start == 0? 856 return int(end) 857 858 def __ValidateChunksize(self, chunksize=None): 859 if self.__server_chunk_granularity is None: 860 return 861 chunksize = chunksize or self.chunksize 862 if chunksize % self.__server_chunk_granularity: 863 raise exceptions.ConfigurationValueError( 864 'Server requires chunksize to be a multiple of %d', 865 self.__server_chunk_granularity) 866 867 def __StreamMedia(self, callback=None, finish_callback=None, 868 additional_headers=None, use_chunks=True): 869 """Helper function for StreamMedia / StreamInChunks.""" 870 if self.strategy != RESUMABLE_UPLOAD: 871 raise exceptions.InvalidUserInputError( 872 'Cannot stream non-resumable upload') 873 callback = callback or self.progress_callback 874 finish_callback = finish_callback or self.finish_callback 875 # final_response is set if we resumed an already-completed upload. 876 response = self.__final_response 877 send_func = self.__SendChunk if use_chunks else self.__SendMediaBody 878 if use_chunks: 879 self.__ValidateChunksize(self.chunksize) 880 self.EnsureInitialized() 881 while not self.complete: 882 response = send_func(self.stream.tell(), 883 additional_headers=additional_headers) 884 if response.status_code in (http_client.OK, http_client.CREATED): 885 self.__complete = True 886 break 887 self.__progress = self.__GetLastByte(response.info['range']) 888 if self.progress + 1 != self.stream.tell(): 889 # TODO(craigcitro): Add a better way to recover here. 890 raise exceptions.CommunicationError( 891 'Failed to transfer all bytes in chunk, upload paused at ' 892 'byte %d' % self.progress) 893 self._ExecuteCallback(callback, response) 894 if self.__complete and hasattr(self.stream, 'seek'): 895 current_pos = self.stream.tell() 896 self.stream.seek(0, os.SEEK_END) 897 end_pos = self.stream.tell() 898 self.stream.seek(current_pos) 899 if current_pos != end_pos: 900 raise exceptions.TransferInvalidError( 901 'Upload complete with %s additional bytes left in stream' % 902 (int(end_pos) - int(current_pos))) 903 self._ExecuteCallback(finish_callback, response) 904 return response 905 906 def StreamMedia(self, callback=None, finish_callback=None, 907 additional_headers=None): 908 """Send this resumable upload in a single request. 909 910 Args: 911 callback: Progress callback function with inputs 912 (http_wrapper.Response, transfer.Upload) 913 finish_callback: Final callback function with inputs 914 (http_wrapper.Response, transfer.Upload) 915 additional_headers: Dict of headers to include with the upload 916 http_wrapper.Request. 917 918 Returns: 919 http_wrapper.Response of final response. 920 """ 921 return self.__StreamMedia( 922 callback=callback, finish_callback=finish_callback, 923 additional_headers=additional_headers, use_chunks=False) 924 925 def StreamInChunks(self, callback=None, finish_callback=None, 926 additional_headers=None): 927 """Send this (resumable) upload in chunks.""" 928 return self.__StreamMedia( 929 callback=callback, finish_callback=finish_callback, 930 additional_headers=additional_headers) 931 932 def __SendMediaRequest(self, request, end): 933 """Request helper function for SendMediaBody & SendChunk.""" 934 response = http_wrapper.MakeRequest( 935 self.bytes_http, request, retry_func=self.retry_func, 936 retries=self.num_retries) 937 if response.status_code not in (http_client.OK, http_client.CREATED, 938 http_wrapper.RESUME_INCOMPLETE): 939 # We want to reset our state to wherever the server left us 940 # before this failed request, and then raise. 941 self.RefreshResumableUploadState() 942 raise exceptions.HttpError.FromResponse(response) 943 if response.status_code == http_wrapper.RESUME_INCOMPLETE: 944 last_byte = self.__GetLastByte( 945 self._GetRangeHeaderFromResponse(response)) 946 if last_byte + 1 != end: 947 self.stream.seek(last_byte) 948 return response 949 950 def __SendMediaBody(self, start, additional_headers=None): 951 """Send the entire media stream in a single request.""" 952 self.EnsureInitialized() 953 if self.total_size is None: 954 raise exceptions.TransferInvalidError( 955 'Total size must be known for SendMediaBody') 956 body_stream = stream_slice.StreamSlice( 957 self.stream, self.total_size - start) 958 959 request = http_wrapper.Request(url=self.url, http_method='PUT', 960 body=body_stream) 961 request.headers['Content-Type'] = self.mime_type 962 if start == self.total_size: 963 # End of an upload with 0 bytes left to send; just finalize. 964 range_string = 'bytes */%s' % self.total_size 965 else: 966 range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, 967 self.total_size) 968 969 request.headers['Content-Range'] = range_string 970 if additional_headers: 971 request.headers.update(additional_headers) 972 973 return self.__SendMediaRequest(request, self.total_size) 974 975 def __SendChunk(self, start, additional_headers=None): 976 """Send the specified chunk.""" 977 self.EnsureInitialized() 978 no_log_body = self.total_size is None 979 if self.total_size is None: 980 # For the streaming resumable case, we need to detect when 981 # we're at the end of the stream. 982 body_stream = buffered_stream.BufferedStream( 983 self.stream, start, self.chunksize) 984 end = body_stream.stream_end_position 985 if body_stream.stream_exhausted: 986 self.__total_size = end 987 # TODO: Here, change body_stream from a stream to a string object, 988 # which means reading a chunk into memory. This works around 989 # https://code.google.com/p/httplib2/issues/detail?id=176 which can 990 # cause httplib2 to skip bytes on 401's for file objects. 991 # Rework this solution to be more general. 992 body_stream = body_stream.read(self.chunksize) 993 else: 994 end = min(start + self.chunksize, self.total_size) 995 body_stream = stream_slice.StreamSlice(self.stream, end - start) 996 # TODO(craigcitro): Think about clearer errors on "no data in 997 # stream". 998 request = http_wrapper.Request(url=self.url, http_method='PUT', 999 body=body_stream) 1000 request.headers['Content-Type'] = self.mime_type 1001 if no_log_body: 1002 # Disable logging of streaming body. 1003 # TODO: Remove no_log_body and rework as part of a larger logs 1004 # refactor. 1005 request.loggable_body = '<media body>' 1006 if self.total_size is None: 1007 # Streaming resumable upload case, unknown total size. 1008 range_string = 'bytes %s-%s/*' % (start, end - 1) 1009 elif end == start: 1010 # End of an upload with 0 bytes left to send; just finalize. 1011 range_string = 'bytes */%s' % self.total_size 1012 else: 1013 # Normal resumable upload case with known sizes. 1014 range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) 1015 1016 request.headers['Content-Range'] = range_string 1017 if additional_headers: 1018 request.headers.update(additional_headers) 1019 1020 return self.__SendMediaRequest(request, end) 1021