• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Library for handling batch HTTP requests for apitools."""
18
19import collections
20import email.generator as generator
21import email.mime.multipart as mime_multipart
22import email.mime.nonmultipart as mime_nonmultipart
23import email.parser as email_parser
24import itertools
25import time
26import uuid
27
28import six
29from six.moves import http_client
30from six.moves import urllib_parse
31from six.moves import range  # pylint: disable=redefined-builtin
32
33from apitools.base.py import exceptions
34from apitools.base.py import http_wrapper
35
36__all__ = [
37    'BatchApiRequest',
38]
39
40
41class RequestResponseAndHandler(collections.namedtuple(
42        'RequestResponseAndHandler', ['request', 'response', 'handler'])):
43
44    """Container for data related to completing an HTTP request.
45
46    This contains an HTTP request, its response, and a callback for handling
47    the response from the server.
48
49    Attributes:
50      request: An http_wrapper.Request object representing the HTTP request.
51      response: The http_wrapper.Response object returned from the server.
52      handler: A callback function accepting two arguments, response
53        and exception. Response is an http_wrapper.Response object, and
54        exception is an apiclient.errors.HttpError object if an error
55        occurred, or otherwise None.
56    """
57
58
59class BatchApiRequest(object):
60    """Batches multiple api requests into a single request."""
61
62    class ApiCall(object):
63
64        """Holds request and response information for each request.
65
66        ApiCalls are ultimately exposed to the client once the HTTP
67        batch request has been completed.
68
69        Attributes:
70          http_request: A client-supplied http_wrapper.Request to be
71              submitted to the server.
72          response: A http_wrapper.Response object given by the server as a
73              response to the user request, or None if an error occurred.
74          exception: An apiclient.errors.HttpError object if an error
75              occurred, or None.
76
77        """
78
79        def __init__(self, request, retryable_codes, service, method_config):
80            """Initialize an individual API request.
81
82            Args:
83              request: An http_wrapper.Request object.
84              retryable_codes: A list of integer HTTP codes that can
85                  be retried.
86              service: A service inheriting from base_api.BaseApiService.
87              method_config: Method config for the desired API request.
88
89            """
90            self.__retryable_codes = list(
91                set(retryable_codes + [http_client.UNAUTHORIZED]))
92            self.__http_response = None
93            self.__service = service
94            self.__method_config = method_config
95
96            self.http_request = request
97            # TODO(user): Add some validation to these fields.
98            self.__response = None
99            self.__exception = None
100
101        @property
102        def is_error(self):
103            return self.exception is not None
104
105        @property
106        def response(self):
107            return self.__response
108
109        @property
110        def exception(self):
111            return self.__exception
112
113        @property
114        def authorization_failed(self):
115            return (self.__http_response and (
116                self.__http_response.status_code == http_client.UNAUTHORIZED))
117
118        @property
119        def terminal_state(self):
120            if self.__http_response is None:
121                return False
122            response_code = self.__http_response.status_code
123            return response_code not in self.__retryable_codes
124
125        def HandleResponse(self, http_response, exception):
126            """Handles incoming http response to the request in http_request.
127
128            This is intended to be used as a callback function for
129            BatchHttpRequest.Add.
130
131            Args:
132              http_response: Deserialized http_wrapper.Response object.
133              exception: apiclient.errors.HttpError object if an error
134                  occurred.
135
136            """
137            self.__http_response = http_response
138            self.__exception = exception
139            if self.terminal_state and not self.__exception:
140                self.__response = self.__service.ProcessHttpResponse(
141                    self.__method_config, self.__http_response)
142
143    def __init__(self, batch_url=None, retryable_codes=None,
144                 response_encoding=None):
145        """Initialize a batch API request object.
146
147        Args:
148          batch_url: Base URL for batch API calls.
149          retryable_codes: A list of integer HTTP codes that can be retried.
150          response_encoding: The encoding type of response content.
151        """
152        self.api_requests = []
153        self.retryable_codes = retryable_codes or []
154        self.batch_url = batch_url or 'https://www.googleapis.com/batch'
155        self.response_encoding = response_encoding
156
157    def Add(self, service, method, request, global_params=None):
158        """Add a request to the batch.
159
160        Args:
161          service: A class inheriting base_api.BaseApiService.
162          method: A string indicated desired method from the service. See
163              the example in the class docstring.
164          request: An input message appropriate for the specified
165              service.method.
166          global_params: Optional additional parameters to pass into
167              method.PrepareHttpRequest.
168
169        Returns:
170          None
171
172        """
173        # Retrieve the configs for the desired method and service.
174        method_config = service.GetMethodConfig(method)
175        upload_config = service.GetUploadConfig(method)
176
177        # Prepare the HTTP Request.
178        http_request = service.PrepareHttpRequest(
179            method_config, request, global_params=global_params,
180            upload_config=upload_config)
181
182        # Create the request and add it to our master list.
183        api_request = self.ApiCall(
184            http_request, self.retryable_codes, service, method_config)
185        self.api_requests.append(api_request)
186
187    def Execute(self, http, sleep_between_polls=5, max_retries=5,
188                max_batch_size=None, batch_request_callback=None):
189        """Execute all of the requests in the batch.
190
191        Args:
192          http: httplib2.Http object for use in the request.
193          sleep_between_polls: Integer number of seconds to sleep between
194              polls.
195          max_retries: Max retries. Any requests that have not succeeded by
196              this number of retries simply report the last response or
197              exception, whatever it happened to be.
198          max_batch_size: int, if specified requests will be split in batches
199              of given size.
200          batch_request_callback: function of (http_response, exception) passed
201              to BatchHttpRequest which will be run on any given results.
202
203        Returns:
204          List of ApiCalls.
205        """
206        requests = [request for request in self.api_requests
207                    if not request.terminal_state]
208        batch_size = max_batch_size or len(requests)
209
210        for attempt in range(max_retries):
211            if attempt:
212                time.sleep(sleep_between_polls)
213
214            for i in range(0, len(requests), batch_size):
215                # Create a batch_http_request object and populate it with
216                # incomplete requests.
217                batch_http_request = BatchHttpRequest(
218                    batch_url=self.batch_url,
219                    callback=batch_request_callback,
220                    response_encoding=self.response_encoding
221                )
222                for request in itertools.islice(requests,
223                                                i, i + batch_size):
224                    batch_http_request.Add(
225                        request.http_request, request.HandleResponse)
226                batch_http_request.Execute(http)
227
228                if hasattr(http.request, 'credentials'):
229                    if any(request.authorization_failed
230                           for request in itertools.islice(requests,
231                                                           i, i + batch_size)):
232                        http.request.credentials.refresh(http)
233
234            # Collect retryable requests.
235            requests = [request for request in self.api_requests if not
236                        request.terminal_state]
237            if not requests:
238                break
239
240        return self.api_requests
241
242
243class BatchHttpRequest(object):
244
245    """Batches multiple http_wrapper.Request objects into a single request."""
246
247    def __init__(self, batch_url, callback=None, response_encoding=None):
248        """Constructor for a BatchHttpRequest.
249
250        Args:
251          batch_url: URL to send batch requests to.
252          callback: A callback to be called for each response, of the
253              form callback(response, exception). The first parameter is
254              the deserialized Response object. The second is an
255              apiclient.errors.HttpError exception object if an HTTP error
256              occurred while processing the request, or None if no error
257              occurred.
258          response_encoding: The encoding type of response content.
259        """
260        # Endpoint to which these requests are sent.
261        self.__batch_url = batch_url
262
263        # Global callback to be called for each individual response in the
264        # batch.
265        self.__callback = callback
266
267        # Response content will be decoded if this is provided.
268        self.__response_encoding = response_encoding
269
270        # List of requests, responses and handlers.
271        self.__request_response_handlers = {}
272
273        # The last auto generated id.
274        self.__last_auto_id = itertools.count()
275
276        # Unique ID on which to base the Content-ID headers.
277        self.__base_id = uuid.uuid4()
278
279    def _ConvertIdToHeader(self, request_id):
280        """Convert an id to a Content-ID header value.
281
282        Args:
283          request_id: String identifier for a individual request.
284
285        Returns:
286          A Content-ID header with the id_ encoded into it. A UUID is
287          prepended to the value because Content-ID headers are
288          supposed to be universally unique.
289
290        """
291        return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id))
292
293    @staticmethod
294    def _ConvertHeaderToId(header):
295        """Convert a Content-ID header value to an id.
296
297        Presumes the Content-ID header conforms to the format that
298        _ConvertIdToHeader() returns.
299
300        Args:
301          header: A string indicating the Content-ID header value.
302
303        Returns:
304          The extracted id value.
305
306        Raises:
307          BatchError if the header is not in the expected format.
308        """
309        if not (header.startswith('<') or header.endswith('>')):
310            raise exceptions.BatchError(
311                'Invalid value for Content-ID: %s' % header)
312        if '+' not in header:
313            raise exceptions.BatchError(
314                'Invalid value for Content-ID: %s' % header)
315        _, request_id = header[1:-1].rsplit('+', 1)
316
317        return urllib_parse.unquote(request_id)
318
319    def _SerializeRequest(self, request):
320        """Convert a http_wrapper.Request object into a string.
321
322        Args:
323          request: A http_wrapper.Request to serialize.
324
325        Returns:
326          The request as a string in application/http format.
327        """
328        # Construct status line
329        parsed = urllib_parse.urlsplit(request.url)
330        request_line = urllib_parse.urlunsplit(
331            ('', '', parsed.path, parsed.query, ''))
332        if not isinstance(request_line, six.text_type):
333            request_line = request_line.decode('utf-8')
334        status_line = u' '.join((
335            request.http_method,
336            request_line,
337            u'HTTP/1.1\n'
338        ))
339        major, minor = request.headers.get(
340            'content-type', 'application/json').split('/')
341        msg = mime_nonmultipart.MIMENonMultipart(major, minor)
342
343        # MIMENonMultipart adds its own Content-Type header.
344        # Keep all of the other headers in `request.headers`.
345        for key, value in request.headers.items():
346            if key == 'content-type':
347                continue
348            msg[key] = value
349
350        msg['Host'] = parsed.netloc
351        msg.set_unixfrom(None)
352
353        if request.body is not None:
354            msg.set_payload(request.body)
355
356        # Serialize the mime message.
357        str_io = six.StringIO()
358        # maxheaderlen=0 means don't line wrap headers.
359        gen = generator.Generator(str_io, maxheaderlen=0)
360        gen.flatten(msg, unixfrom=False)
361        body = str_io.getvalue()
362
363        return status_line + body
364
365    def _DeserializeResponse(self, payload):
366        """Convert string into Response and content.
367
368        Args:
369          payload: Header and body string to be deserialized.
370
371        Returns:
372          A Response object
373        """
374        # Strip off the status line.
375        status_line, payload = payload.split('\n', 1)
376        _, status, _ = status_line.split(' ', 2)
377
378        # Parse the rest of the response.
379        parser = email_parser.Parser()
380        msg = parser.parsestr(payload)
381
382        # Get the headers.
383        info = dict(msg)
384        info['status'] = status
385
386        # Create Response from the parsed headers.
387        content = msg.get_payload()
388
389        return http_wrapper.Response(info, content, self.__batch_url)
390
391    def _NewId(self):
392        """Create a new id.
393
394        Auto incrementing number that avoids conflicts with ids already used.
395
396        Returns:
397           A new unique id string.
398        """
399        return str(next(self.__last_auto_id))
400
401    def Add(self, request, callback=None):
402        """Add a new request.
403
404        Args:
405          request: A http_wrapper.Request to add to the batch.
406          callback: A callback to be called for this response, of the
407              form callback(response, exception). The first parameter is the
408              deserialized response object. The second is an
409              apiclient.errors.HttpError exception object if an HTTP error
410              occurred while processing the request, or None if no errors
411              occurred.
412
413        Returns:
414          None
415        """
416        handler = RequestResponseAndHandler(request, None, callback)
417        self.__request_response_handlers[self._NewId()] = handler
418
419    def _Execute(self, http):
420        """Serialize batch request, send to server, process response.
421
422        Args:
423          http: A httplib2.Http object to be used to make the request with.
424
425        Raises:
426          httplib2.HttpLib2Error if a transport error has occured.
427          apiclient.errors.BatchError if the response is the wrong format.
428        """
429        message = mime_multipart.MIMEMultipart('mixed')
430        # Message should not write out its own headers.
431        setattr(message, '_write_headers', lambda self: None)
432
433        # Add all the individual requests.
434        for key in self.__request_response_handlers:
435            msg = mime_nonmultipart.MIMENonMultipart('application', 'http')
436            msg['Content-Transfer-Encoding'] = 'binary'
437            msg['Content-ID'] = self._ConvertIdToHeader(key)
438
439            body = self._SerializeRequest(
440                self.__request_response_handlers[key].request)
441            msg.set_payload(body)
442            message.attach(msg)
443
444        request = http_wrapper.Request(self.__batch_url, 'POST')
445        request.body = message.as_string()
446        request.headers['content-type'] = (
447            'multipart/mixed; boundary="%s"') % message.get_boundary()
448
449        response = http_wrapper.MakeRequest(http, request)
450
451        if response.status_code >= 300:
452            raise exceptions.HttpError.FromResponse(response)
453
454        # Prepend with a content-type header so Parser can handle it.
455        header = 'content-type: %s\r\n\r\n' % response.info['content-type']
456
457        content = response.content
458        if isinstance(content, bytes) and self.__response_encoding:
459            content = response.content.decode(self.__response_encoding)
460
461        parser = email_parser.Parser()
462        mime_response = parser.parsestr(header + content)
463
464        if not mime_response.is_multipart():
465            raise exceptions.BatchError(
466                'Response not in multipart/mixed format.')
467
468        for part in mime_response.get_payload():
469            request_id = self._ConvertHeaderToId(part['Content-ID'])
470            response = self._DeserializeResponse(part.get_payload())
471
472            # Disable protected access because namedtuple._replace(...)
473            # is not actually meant to be protected.
474            # pylint: disable=protected-access
475            self.__request_response_handlers[request_id] = (
476                self.__request_response_handlers[request_id]._replace(
477                    response=response))
478
479    def Execute(self, http):
480        """Execute all the requests as a single batched HTTP request.
481
482        Args:
483          http: A httplib2.Http object to be used with the request.
484
485        Returns:
486          None
487
488        Raises:
489          BatchError if the response is the wrong format.
490        """
491
492        self._Execute(http)
493
494        for key in self.__request_response_handlers:
495            response = self.__request_response_handlers[key].response
496            callback = self.__request_response_handlers[key].handler
497
498            exception = None
499
500            if response.status_code >= 300:
501                exception = exceptions.HttpError.FromResponse(response)
502
503            if callback is not None:
504                callback(response, exception)
505            if self.__callback is not None:
506                self.__callback(response, exception)
507