1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""AsyncIO iterators for paging through paged API methods. 16 17These iterators simplify the process of paging through API responses 18where the request takes a page token and the response is a list of results with 19a token for the next page. See `list pagination`_ in the Google API Style Guide 20for more details. 21 22.. _list pagination: 23 https://cloud.google.com/apis/design/design_patterns#list_pagination 24 25API clients that have methods that follow the list pagination pattern can 26return an :class:`.AsyncIterator`: 27 28 >>> results_iterator = await client.list_resources() 29 30Or you can walk your way through items and call off the search early if 31you find what you're looking for (resulting in possibly fewer requests):: 32 33 >>> async for resource in results_iterator: 34 ... print(resource.name) 35 ... if not resource.is_valid: 36 ... break 37 38At any point, you may check the number of items consumed by referencing the 39``num_results`` property of the iterator:: 40 41 >>> async for my_item in results_iterator: 42 ... if results_iterator.num_results >= 10: 43 ... break 44 45When iterating, not every new item will send a request to the server. 46To iterate based on each page of items (where a page corresponds to 47a request):: 48 49 >>> async for page in results_iterator.pages: 50 ... print('=' * 20) 51 ... print(' Page number: {:d}'.format(iterator.page_number)) 52 ... print(' Items in page: {:d}'.format(page.num_items)) 53 ... print(' First item: {!r}'.format(next(page))) 54 ... print('Items remaining: {:d}'.format(page.remaining)) 55 ... print('Next page token: {}'.format(iterator.next_page_token)) 56 ==================== 57 Page number: 1 58 Items in page: 1 59 First item: <MyItemClass at 0x7f1d3cccf690> 60 Items remaining: 0 61 Next page token: eav1OzQB0OM8rLdGXOEsyQWSG 62 ==================== 63 Page number: 2 64 Items in page: 19 65 First item: <MyItemClass at 0x7f1d3cccffd0> 66 Items remaining: 18 67 Next page token: None 68""" 69 70import abc 71 72from google.api_core.page_iterator import Page 73 74 75def _item_to_value_identity(iterator, item): 76 """An item to value transformer that returns the item un-changed.""" 77 # pylint: disable=unused-argument 78 # We are conforming to the interface defined by Iterator. 79 return item 80 81 82class AsyncIterator(abc.ABC): 83 """A generic class for iterating through API list responses. 84 85 Args: 86 client(google.cloud.client.Client): The API client. 87 item_to_value (Callable[google.api_core.page_iterator_async.AsyncIterator, Any]): 88 Callable to convert an item from the type in the raw API response 89 into the native object. Will be called with the iterator and a 90 single item. 91 page_token (str): A token identifying a page in a result set to start 92 fetching results from. 93 max_results (int): The maximum number of results to fetch. 94 """ 95 96 def __init__( 97 self, 98 client, 99 item_to_value=_item_to_value_identity, 100 page_token=None, 101 max_results=None, 102 ): 103 self._started = False 104 self.__active_aiterator = None 105 106 self.client = client 107 """Optional[Any]: The client that created this iterator.""" 108 self.item_to_value = item_to_value 109 """Callable[Iterator, Any]: Callable to convert an item from the type 110 in the raw API response into the native object. Will be called with 111 the iterator and a 112 single item. 113 """ 114 self.max_results = max_results 115 """int: The maximum number of results to fetch.""" 116 117 # The attributes below will change over the life of the iterator. 118 self.page_number = 0 119 """int: The current page of results.""" 120 self.next_page_token = page_token 121 """str: The token for the next page of results. If this is set before 122 the iterator starts, it effectively offsets the iterator to a 123 specific starting point.""" 124 self.num_results = 0 125 """int: The total number of results fetched so far.""" 126 127 @property 128 def pages(self): 129 """Iterator of pages in the response. 130 131 returns: 132 types.GeneratorType[google.api_core.page_iterator.Page]: A 133 generator of page instances. 134 135 raises: 136 ValueError: If the iterator has already been started. 137 """ 138 if self._started: 139 raise ValueError("Iterator has already started", self) 140 self._started = True 141 return self._page_aiter(increment=True) 142 143 async def _items_aiter(self): 144 """Iterator for each item returned.""" 145 async for page in self._page_aiter(increment=False): 146 for item in page: 147 self.num_results += 1 148 yield item 149 150 def __aiter__(self): 151 """Iterator for each item returned. 152 153 Returns: 154 types.GeneratorType[Any]: A generator of items from the API. 155 156 Raises: 157 ValueError: If the iterator has already been started. 158 """ 159 if self._started: 160 raise ValueError("Iterator has already started", self) 161 self._started = True 162 return self._items_aiter() 163 164 async def __anext__(self): 165 if self.__active_aiterator is None: 166 self.__active_aiterator = self.__aiter__() 167 return await self.__active_aiterator.__anext__() 168 169 async def _page_aiter(self, increment): 170 """Generator of pages of API responses. 171 172 Args: 173 increment (bool): Flag indicating if the total number of results 174 should be incremented on each page. This is useful since a page 175 iterator will want to increment by results per page while an 176 items iterator will want to increment per item. 177 178 Yields: 179 Page: each page of items from the API. 180 """ 181 page = await self._next_page() 182 while page is not None: 183 self.page_number += 1 184 if increment: 185 self.num_results += page.num_items 186 yield page 187 page = await self._next_page() 188 189 @abc.abstractmethod 190 async def _next_page(self): 191 """Get the next page in the iterator. 192 193 This does nothing and is intended to be over-ridden by subclasses 194 to return the next :class:`Page`. 195 196 Raises: 197 NotImplementedError: Always, this method is abstract. 198 """ 199 raise NotImplementedError 200 201 202class AsyncGRPCIterator(AsyncIterator): 203 """A generic class for iterating through gRPC list responses. 204 205 .. note:: The class does not take a ``page_token`` argument because it can 206 just be specified in the ``request``. 207 208 Args: 209 client (google.cloud.client.Client): The API client. This unused by 210 this class, but kept to satisfy the :class:`Iterator` interface. 211 method (Callable[protobuf.Message]): A bound gRPC method that should 212 take a single message for the request. 213 request (protobuf.Message): The request message. 214 items_field (str): The field in the response message that has the 215 items for the page. 216 item_to_value (Callable[GRPCIterator, Any]): Callable to convert an 217 item from the type in the JSON response into a native object. Will 218 be called with the iterator and a single item. 219 request_token_field (str): The field in the request message used to 220 specify the page token. 221 response_token_field (str): The field in the response message that has 222 the token for the next page. 223 max_results (int): The maximum number of results to fetch. 224 225 .. autoattribute:: pages 226 """ 227 228 _DEFAULT_REQUEST_TOKEN_FIELD = "page_token" 229 _DEFAULT_RESPONSE_TOKEN_FIELD = "next_page_token" 230 231 def __init__( 232 self, 233 client, 234 method, 235 request, 236 items_field, 237 item_to_value=_item_to_value_identity, 238 request_token_field=_DEFAULT_REQUEST_TOKEN_FIELD, 239 response_token_field=_DEFAULT_RESPONSE_TOKEN_FIELD, 240 max_results=None, 241 ): 242 super().__init__(client, item_to_value, max_results=max_results) 243 self._method = method 244 self._request = request 245 self._items_field = items_field 246 self._request_token_field = request_token_field 247 self._response_token_field = response_token_field 248 249 async def _next_page(self): 250 """Get the next page in the iterator. 251 252 Returns: 253 Page: The next page in the iterator or :data:`None` if 254 there are no pages left. 255 """ 256 if not self._has_next_page(): 257 return None 258 259 if self.next_page_token is not None: 260 setattr(self._request, self._request_token_field, self.next_page_token) 261 262 response = await self._method(self._request) 263 264 self.next_page_token = getattr(response, self._response_token_field) 265 items = getattr(response, self._items_field) 266 page = Page(self, items, self.item_to_value, raw_page=response) 267 268 return page 269 270 def _has_next_page(self): 271 """Determines whether or not there are more pages with results. 272 273 Returns: 274 bool: Whether the iterator has more pages. 275 """ 276 if self.page_number == 0: 277 return True 278 279 # Note: intentionally a falsy check instead of a None check. The RPC 280 # can return an empty string indicating no more pages. 281 if self.max_results is not None: 282 if self.num_results >= self.max_results: 283 return False 284 285 return True if self.next_page_token else False 286