• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""AsyncIO iterators for paging through paged API methods.
16
17These iterators simplify the process of paging through API responses
18where the request takes a page token and the response is a list of results with
19a token for the next page. See `list pagination`_ in the Google API Style Guide
20for more details.
21
22.. _list pagination:
23    https://cloud.google.com/apis/design/design_patterns#list_pagination
24
25API clients that have methods that follow the list pagination pattern can
26return an :class:`.AsyncIterator`:
27
28    >>> results_iterator = await client.list_resources()
29
30Or you can walk your way through items and call off the search early if
31you find what you're looking for (resulting in possibly fewer requests)::
32
33    >>> async for resource in results_iterator:
34    ...     print(resource.name)
35    ...     if not resource.is_valid:
36    ...         break
37
38At any point, you may check the number of items consumed by referencing the
39``num_results`` property of the iterator::
40
41    >>> async for my_item in results_iterator:
42    ...     if results_iterator.num_results >= 10:
43    ...         break
44
45When iterating, not every new item will send a request to the server.
46To iterate based on each page of items (where a page corresponds to
47a request)::
48
49    >>> async for page in results_iterator.pages:
50    ...     print('=' * 20)
51    ...     print('    Page number: {:d}'.format(iterator.page_number))
52    ...     print('  Items in page: {:d}'.format(page.num_items))
53    ...     print('     First item: {!r}'.format(next(page)))
54    ...     print('Items remaining: {:d}'.format(page.remaining))
55    ...     print('Next page token: {}'.format(iterator.next_page_token))
56    ====================
57        Page number: 1
58      Items in page: 1
59         First item: <MyItemClass at 0x7f1d3cccf690>
60    Items remaining: 0
61    Next page token: eav1OzQB0OM8rLdGXOEsyQWSG
62    ====================
63        Page number: 2
64      Items in page: 19
65         First item: <MyItemClass at 0x7f1d3cccffd0>
66    Items remaining: 18
67    Next page token: None
68"""
69
70import abc
71
72from google.api_core.page_iterator import Page
73
74
75def _item_to_value_identity(iterator, item):
76    """An item to value transformer that returns the item un-changed."""
77    # pylint: disable=unused-argument
78    # We are conforming to the interface defined by Iterator.
79    return item
80
81
82class AsyncIterator(abc.ABC):
83    """A generic class for iterating through API list responses.
84
85    Args:
86        client(google.cloud.client.Client): The API client.
87        item_to_value (Callable[google.api_core.page_iterator_async.AsyncIterator, Any]):
88            Callable to convert an item from the type in the raw API response
89            into the native object. Will be called with the iterator and a
90            single item.
91        page_token (str): A token identifying a page in a result set to start
92            fetching results from.
93        max_results (int): The maximum number of results to fetch.
94    """
95
96    def __init__(
97        self,
98        client,
99        item_to_value=_item_to_value_identity,
100        page_token=None,
101        max_results=None,
102    ):
103        self._started = False
104        self.__active_aiterator = None
105
106        self.client = client
107        """Optional[Any]: The client that created this iterator."""
108        self.item_to_value = item_to_value
109        """Callable[Iterator, Any]: Callable to convert an item from the type
110            in the raw API response into the native object. Will be called with
111            the iterator and a
112            single item.
113        """
114        self.max_results = max_results
115        """int: The maximum number of results to fetch."""
116
117        # The attributes below will change over the life of the iterator.
118        self.page_number = 0
119        """int: The current page of results."""
120        self.next_page_token = page_token
121        """str: The token for the next page of results. If this is set before
122            the iterator starts, it effectively offsets the iterator to a
123            specific starting point."""
124        self.num_results = 0
125        """int: The total number of results fetched so far."""
126
127    @property
128    def pages(self):
129        """Iterator of pages in the response.
130
131        returns:
132            types.GeneratorType[google.api_core.page_iterator.Page]: A
133                generator of page instances.
134
135        raises:
136            ValueError: If the iterator has already been started.
137        """
138        if self._started:
139            raise ValueError("Iterator has already started", self)
140        self._started = True
141        return self._page_aiter(increment=True)
142
143    async def _items_aiter(self):
144        """Iterator for each item returned."""
145        async for page in self._page_aiter(increment=False):
146            for item in page:
147                self.num_results += 1
148                yield item
149
150    def __aiter__(self):
151        """Iterator for each item returned.
152
153        Returns:
154            types.GeneratorType[Any]: A generator of items from the API.
155
156        Raises:
157            ValueError: If the iterator has already been started.
158        """
159        if self._started:
160            raise ValueError("Iterator has already started", self)
161        self._started = True
162        return self._items_aiter()
163
164    async def __anext__(self):
165        if self.__active_aiterator is None:
166            self.__active_aiterator = self.__aiter__()
167        return await self.__active_aiterator.__anext__()
168
169    async def _page_aiter(self, increment):
170        """Generator of pages of API responses.
171
172        Args:
173            increment (bool): Flag indicating if the total number of results
174                should be incremented on each page. This is useful since a page
175                iterator will want to increment by results per page while an
176                items iterator will want to increment per item.
177
178        Yields:
179            Page: each page of items from the API.
180        """
181        page = await self._next_page()
182        while page is not None:
183            self.page_number += 1
184            if increment:
185                self.num_results += page.num_items
186            yield page
187            page = await self._next_page()
188
189    @abc.abstractmethod
190    async def _next_page(self):
191        """Get the next page in the iterator.
192
193        This does nothing and is intended to be over-ridden by subclasses
194        to return the next :class:`Page`.
195
196        Raises:
197            NotImplementedError: Always, this method is abstract.
198        """
199        raise NotImplementedError
200
201
202class AsyncGRPCIterator(AsyncIterator):
203    """A generic class for iterating through gRPC list responses.
204
205    .. note:: The class does not take a ``page_token`` argument because it can
206        just be specified in the ``request``.
207
208    Args:
209        client (google.cloud.client.Client): The API client. This unused by
210            this class, but kept to satisfy the :class:`Iterator` interface.
211        method (Callable[protobuf.Message]): A bound gRPC method that should
212            take a single message for the request.
213        request (protobuf.Message): The request message.
214        items_field (str): The field in the response message that has the
215            items for the page.
216        item_to_value (Callable[GRPCIterator, Any]): Callable to convert an
217            item from the type in the JSON response into a native object. Will
218            be called with the iterator and a single item.
219        request_token_field (str): The field in the request message used to
220            specify the page token.
221        response_token_field (str): The field in the response message that has
222            the token for the next page.
223        max_results (int): The maximum number of results to fetch.
224
225    .. autoattribute:: pages
226    """
227
228    _DEFAULT_REQUEST_TOKEN_FIELD = "page_token"
229    _DEFAULT_RESPONSE_TOKEN_FIELD = "next_page_token"
230
231    def __init__(
232        self,
233        client,
234        method,
235        request,
236        items_field,
237        item_to_value=_item_to_value_identity,
238        request_token_field=_DEFAULT_REQUEST_TOKEN_FIELD,
239        response_token_field=_DEFAULT_RESPONSE_TOKEN_FIELD,
240        max_results=None,
241    ):
242        super().__init__(client, item_to_value, max_results=max_results)
243        self._method = method
244        self._request = request
245        self._items_field = items_field
246        self._request_token_field = request_token_field
247        self._response_token_field = response_token_field
248
249    async def _next_page(self):
250        """Get the next page in the iterator.
251
252        Returns:
253            Page: The next page in the iterator or :data:`None` if
254                there are no pages left.
255        """
256        if not self._has_next_page():
257            return None
258
259        if self.next_page_token is not None:
260            setattr(self._request, self._request_token_field, self.next_page_token)
261
262        response = await self._method(self._request)
263
264        self.next_page_token = getattr(response, self._response_token_field)
265        items = getattr(response, self._items_field)
266        page = Page(self, items, self.item_to_value, raw_page=response)
267
268        return page
269
270    def _has_next_page(self):
271        """Determines whether or not there are more pages with results.
272
273        Returns:
274            bool: Whether the iterator has more pages.
275        """
276        if self.page_number == 0:
277            return True
278
279        # Note: intentionally a falsy check instead of a None check. The RPC
280        # can return an empty string indicating no more pages.
281        if self.max_results is not None:
282            if self.num_results >= self.max_results:
283                return False
284
285        return True if self.next_page_token else False
286