1#!/usr/bin/env python 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Library for handling batch HTTP requests for apitools.""" 18 19import collections 20import email.generator as generator 21import email.mime.multipart as mime_multipart 22import email.mime.nonmultipart as mime_nonmultipart 23import email.parser as email_parser 24import itertools 25import time 26import uuid 27 28import six 29from six.moves import http_client 30from six.moves import urllib_parse 31from six.moves import range # pylint: disable=redefined-builtin 32 33from apitools.base.py import exceptions 34from apitools.base.py import http_wrapper 35 36__all__ = [ 37 'BatchApiRequest', 38] 39 40 41class RequestResponseAndHandler(collections.namedtuple( 42 'RequestResponseAndHandler', ['request', 'response', 'handler'])): 43 44 """Container for data related to completing an HTTP request. 45 46 This contains an HTTP request, its response, and a callback for handling 47 the response from the server. 48 49 Attributes: 50 request: An http_wrapper.Request object representing the HTTP request. 51 response: The http_wrapper.Response object returned from the server. 52 handler: A callback function accepting two arguments, response 53 and exception. Response is an http_wrapper.Response object, and 54 exception is an apiclient.errors.HttpError object if an error 55 occurred, or otherwise None. 56 """ 57 58 59class BatchApiRequest(object): 60 """Batches multiple api requests into a single request.""" 61 62 class ApiCall(object): 63 64 """Holds request and response information for each request. 65 66 ApiCalls are ultimately exposed to the client once the HTTP 67 batch request has been completed. 68 69 Attributes: 70 http_request: A client-supplied http_wrapper.Request to be 71 submitted to the server. 72 response: A http_wrapper.Response object given by the server as a 73 response to the user request, or None if an error occurred. 74 exception: An apiclient.errors.HttpError object if an error 75 occurred, or None. 76 77 """ 78 79 def __init__(self, request, retryable_codes, service, method_config): 80 """Initialize an individual API request. 81 82 Args: 83 request: An http_wrapper.Request object. 84 retryable_codes: A list of integer HTTP codes that can 85 be retried. 86 service: A service inheriting from base_api.BaseApiService. 87 method_config: Method config for the desired API request. 88 89 """ 90 self.__retryable_codes = list( 91 set(retryable_codes + [http_client.UNAUTHORIZED])) 92 self.__http_response = None 93 self.__service = service 94 self.__method_config = method_config 95 96 self.http_request = request 97 # TODO(user): Add some validation to these fields. 98 self.__response = None 99 self.__exception = None 100 101 @property 102 def is_error(self): 103 return self.exception is not None 104 105 @property 106 def response(self): 107 return self.__response 108 109 @property 110 def exception(self): 111 return self.__exception 112 113 @property 114 def authorization_failed(self): 115 return (self.__http_response and ( 116 self.__http_response.status_code == http_client.UNAUTHORIZED)) 117 118 @property 119 def terminal_state(self): 120 if self.__http_response is None: 121 return False 122 response_code = self.__http_response.status_code 123 return response_code not in self.__retryable_codes 124 125 def HandleResponse(self, http_response, exception): 126 """Handles incoming http response to the request in http_request. 127 128 This is intended to be used as a callback function for 129 BatchHttpRequest.Add. 130 131 Args: 132 http_response: Deserialized http_wrapper.Response object. 133 exception: apiclient.errors.HttpError object if an error 134 occurred. 135 136 """ 137 self.__http_response = http_response 138 self.__exception = exception 139 if self.terminal_state and not self.__exception: 140 self.__response = self.__service.ProcessHttpResponse( 141 self.__method_config, self.__http_response) 142 143 def __init__(self, batch_url=None, retryable_codes=None, 144 response_encoding=None): 145 """Initialize a batch API request object. 146 147 Args: 148 batch_url: Base URL for batch API calls. 149 retryable_codes: A list of integer HTTP codes that can be retried. 150 response_encoding: The encoding type of response content. 151 """ 152 self.api_requests = [] 153 self.retryable_codes = retryable_codes or [] 154 self.batch_url = batch_url or 'https://www.googleapis.com/batch' 155 self.response_encoding = response_encoding 156 157 def Add(self, service, method, request, global_params=None): 158 """Add a request to the batch. 159 160 Args: 161 service: A class inheriting base_api.BaseApiService. 162 method: A string indicated desired method from the service. See 163 the example in the class docstring. 164 request: An input message appropriate for the specified 165 service.method. 166 global_params: Optional additional parameters to pass into 167 method.PrepareHttpRequest. 168 169 Returns: 170 None 171 172 """ 173 # Retrieve the configs for the desired method and service. 174 method_config = service.GetMethodConfig(method) 175 upload_config = service.GetUploadConfig(method) 176 177 # Prepare the HTTP Request. 178 http_request = service.PrepareHttpRequest( 179 method_config, request, global_params=global_params, 180 upload_config=upload_config) 181 182 # Create the request and add it to our master list. 183 api_request = self.ApiCall( 184 http_request, self.retryable_codes, service, method_config) 185 self.api_requests.append(api_request) 186 187 def Execute(self, http, sleep_between_polls=5, max_retries=5, 188 max_batch_size=None, batch_request_callback=None): 189 """Execute all of the requests in the batch. 190 191 Args: 192 http: httplib2.Http object for use in the request. 193 sleep_between_polls: Integer number of seconds to sleep between 194 polls. 195 max_retries: Max retries. Any requests that have not succeeded by 196 this number of retries simply report the last response or 197 exception, whatever it happened to be. 198 max_batch_size: int, if specified requests will be split in batches 199 of given size. 200 batch_request_callback: function of (http_response, exception) passed 201 to BatchHttpRequest which will be run on any given results. 202 203 Returns: 204 List of ApiCalls. 205 """ 206 requests = [request for request in self.api_requests 207 if not request.terminal_state] 208 batch_size = max_batch_size or len(requests) 209 210 for attempt in range(max_retries): 211 if attempt: 212 time.sleep(sleep_between_polls) 213 214 for i in range(0, len(requests), batch_size): 215 # Create a batch_http_request object and populate it with 216 # incomplete requests. 217 batch_http_request = BatchHttpRequest( 218 batch_url=self.batch_url, 219 callback=batch_request_callback, 220 response_encoding=self.response_encoding 221 ) 222 for request in itertools.islice(requests, 223 i, i + batch_size): 224 batch_http_request.Add( 225 request.http_request, request.HandleResponse) 226 batch_http_request.Execute(http) 227 228 if hasattr(http.request, 'credentials'): 229 if any(request.authorization_failed 230 for request in itertools.islice(requests, 231 i, i + batch_size)): 232 http.request.credentials.refresh(http) 233 234 # Collect retryable requests. 235 requests = [request for request in self.api_requests if not 236 request.terminal_state] 237 if not requests: 238 break 239 240 return self.api_requests 241 242 243class BatchHttpRequest(object): 244 245 """Batches multiple http_wrapper.Request objects into a single request.""" 246 247 def __init__(self, batch_url, callback=None, response_encoding=None): 248 """Constructor for a BatchHttpRequest. 249 250 Args: 251 batch_url: URL to send batch requests to. 252 callback: A callback to be called for each response, of the 253 form callback(response, exception). The first parameter is 254 the deserialized Response object. The second is an 255 apiclient.errors.HttpError exception object if an HTTP error 256 occurred while processing the request, or None if no error 257 occurred. 258 response_encoding: The encoding type of response content. 259 """ 260 # Endpoint to which these requests are sent. 261 self.__batch_url = batch_url 262 263 # Global callback to be called for each individual response in the 264 # batch. 265 self.__callback = callback 266 267 # Response content will be decoded if this is provided. 268 self.__response_encoding = response_encoding 269 270 # List of requests, responses and handlers. 271 self.__request_response_handlers = {} 272 273 # The last auto generated id. 274 self.__last_auto_id = itertools.count() 275 276 # Unique ID on which to base the Content-ID headers. 277 self.__base_id = uuid.uuid4() 278 279 def _ConvertIdToHeader(self, request_id): 280 """Convert an id to a Content-ID header value. 281 282 Args: 283 request_id: String identifier for a individual request. 284 285 Returns: 286 A Content-ID header with the id_ encoded into it. A UUID is 287 prepended to the value because Content-ID headers are 288 supposed to be universally unique. 289 290 """ 291 return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id)) 292 293 @staticmethod 294 def _ConvertHeaderToId(header): 295 """Convert a Content-ID header value to an id. 296 297 Presumes the Content-ID header conforms to the format that 298 _ConvertIdToHeader() returns. 299 300 Args: 301 header: A string indicating the Content-ID header value. 302 303 Returns: 304 The extracted id value. 305 306 Raises: 307 BatchError if the header is not in the expected format. 308 """ 309 if not (header.startswith('<') or header.endswith('>')): 310 raise exceptions.BatchError( 311 'Invalid value for Content-ID: %s' % header) 312 if '+' not in header: 313 raise exceptions.BatchError( 314 'Invalid value for Content-ID: %s' % header) 315 _, request_id = header[1:-1].rsplit('+', 1) 316 317 return urllib_parse.unquote(request_id) 318 319 def _SerializeRequest(self, request): 320 """Convert a http_wrapper.Request object into a string. 321 322 Args: 323 request: A http_wrapper.Request to serialize. 324 325 Returns: 326 The request as a string in application/http format. 327 """ 328 # Construct status line 329 parsed = urllib_parse.urlsplit(request.url) 330 request_line = urllib_parse.urlunsplit( 331 ('', '', parsed.path, parsed.query, '')) 332 if not isinstance(request_line, six.text_type): 333 request_line = request_line.decode('utf-8') 334 status_line = u' '.join(( 335 request.http_method, 336 request_line, 337 u'HTTP/1.1\n' 338 )) 339 major, minor = request.headers.get( 340 'content-type', 'application/json').split('/') 341 msg = mime_nonmultipart.MIMENonMultipart(major, minor) 342 343 # MIMENonMultipart adds its own Content-Type header. 344 # Keep all of the other headers in `request.headers`. 345 for key, value in request.headers.items(): 346 if key == 'content-type': 347 continue 348 msg[key] = value 349 350 msg['Host'] = parsed.netloc 351 msg.set_unixfrom(None) 352 353 if request.body is not None: 354 msg.set_payload(request.body) 355 356 # Serialize the mime message. 357 str_io = six.StringIO() 358 # maxheaderlen=0 means don't line wrap headers. 359 gen = generator.Generator(str_io, maxheaderlen=0) 360 gen.flatten(msg, unixfrom=False) 361 body = str_io.getvalue() 362 363 return status_line + body 364 365 def _DeserializeResponse(self, payload): 366 """Convert string into Response and content. 367 368 Args: 369 payload: Header and body string to be deserialized. 370 371 Returns: 372 A Response object 373 """ 374 # Strip off the status line. 375 status_line, payload = payload.split('\n', 1) 376 _, status, _ = status_line.split(' ', 2) 377 378 # Parse the rest of the response. 379 parser = email_parser.Parser() 380 msg = parser.parsestr(payload) 381 382 # Get the headers. 383 info = dict(msg) 384 info['status'] = status 385 386 # Create Response from the parsed headers. 387 content = msg.get_payload() 388 389 return http_wrapper.Response(info, content, self.__batch_url) 390 391 def _NewId(self): 392 """Create a new id. 393 394 Auto incrementing number that avoids conflicts with ids already used. 395 396 Returns: 397 A new unique id string. 398 """ 399 return str(next(self.__last_auto_id)) 400 401 def Add(self, request, callback=None): 402 """Add a new request. 403 404 Args: 405 request: A http_wrapper.Request to add to the batch. 406 callback: A callback to be called for this response, of the 407 form callback(response, exception). The first parameter is the 408 deserialized response object. The second is an 409 apiclient.errors.HttpError exception object if an HTTP error 410 occurred while processing the request, or None if no errors 411 occurred. 412 413 Returns: 414 None 415 """ 416 handler = RequestResponseAndHandler(request, None, callback) 417 self.__request_response_handlers[self._NewId()] = handler 418 419 def _Execute(self, http): 420 """Serialize batch request, send to server, process response. 421 422 Args: 423 http: A httplib2.Http object to be used to make the request with. 424 425 Raises: 426 httplib2.HttpLib2Error if a transport error has occured. 427 apiclient.errors.BatchError if the response is the wrong format. 428 """ 429 message = mime_multipart.MIMEMultipart('mixed') 430 # Message should not write out its own headers. 431 setattr(message, '_write_headers', lambda self: None) 432 433 # Add all the individual requests. 434 for key in self.__request_response_handlers: 435 msg = mime_nonmultipart.MIMENonMultipart('application', 'http') 436 msg['Content-Transfer-Encoding'] = 'binary' 437 msg['Content-ID'] = self._ConvertIdToHeader(key) 438 439 body = self._SerializeRequest( 440 self.__request_response_handlers[key].request) 441 msg.set_payload(body) 442 message.attach(msg) 443 444 request = http_wrapper.Request(self.__batch_url, 'POST') 445 request.body = message.as_string() 446 request.headers['content-type'] = ( 447 'multipart/mixed; boundary="%s"') % message.get_boundary() 448 449 response = http_wrapper.MakeRequest(http, request) 450 451 if response.status_code >= 300: 452 raise exceptions.HttpError.FromResponse(response) 453 454 # Prepend with a content-type header so Parser can handle it. 455 header = 'content-type: %s\r\n\r\n' % response.info['content-type'] 456 457 content = response.content 458 if isinstance(content, bytes) and self.__response_encoding: 459 content = response.content.decode(self.__response_encoding) 460 461 parser = email_parser.Parser() 462 mime_response = parser.parsestr(header + content) 463 464 if not mime_response.is_multipart(): 465 raise exceptions.BatchError( 466 'Response not in multipart/mixed format.') 467 468 for part in mime_response.get_payload(): 469 request_id = self._ConvertHeaderToId(part['Content-ID']) 470 response = self._DeserializeResponse(part.get_payload()) 471 472 # Disable protected access because namedtuple._replace(...) 473 # is not actually meant to be protected. 474 # pylint: disable=protected-access 475 self.__request_response_handlers[request_id] = ( 476 self.__request_response_handlers[request_id]._replace( 477 response=response)) 478 479 def Execute(self, http): 480 """Execute all the requests as a single batched HTTP request. 481 482 Args: 483 http: A httplib2.Http object to be used with the request. 484 485 Returns: 486 None 487 488 Raises: 489 BatchError if the response is the wrong format. 490 """ 491 492 self._Execute(http) 493 494 for key in self.__request_response_handlers: 495 response = self.__request_response_handlers[key].response 496 callback = self.__request_response_handlers[key].handler 497 498 exception = None 499 500 if response.status_code >= 300: 501 exception = exceptions.HttpError.FromResponse(response) 502 503 if callback is not None: 504 callback(response, exception) 505 if self.__callback is not None: 506 self.__callback(response, exception) 507