1# Copyright 2017, Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16import mock 17import pytest 18 19try: 20 import grpc # noqa: F401 21except ImportError: 22 pytest.skip("No GRPC", allow_module_level=True) 23 24from google.api_core import exceptions 25from google.api_core import operation_async 26from google.api_core import operations_v1 27from google.api_core import retry_async 28from google.longrunning import operations_pb2 29from google.protobuf import struct_pb2 30from google.rpc import code_pb2 31from google.rpc import status_pb2 32 33TEST_OPERATION_NAME = "test/operation" 34 35 36def make_operation_proto( 37 name=TEST_OPERATION_NAME, metadata=None, response=None, error=None, **kwargs 38): 39 operation_proto = operations_pb2.Operation(name=name, **kwargs) 40 41 if metadata is not None: 42 operation_proto.metadata.Pack(metadata) 43 44 if response is not None: 45 operation_proto.response.Pack(response) 46 47 if error is not None: 48 operation_proto.error.CopyFrom(error) 49 50 return operation_proto 51 52 53def make_operation_future(client_operations_responses=None): 54 if client_operations_responses is None: 55 client_operations_responses = [make_operation_proto()] 56 57 refresh = mock.AsyncMock(spec=["__call__"], side_effect=client_operations_responses) 58 refresh.responses = client_operations_responses 59 cancel = mock.AsyncMock(spec=["__call__"]) 60 operation_future = operation_async.AsyncOperation( 61 client_operations_responses[0], 62 refresh, 63 cancel, 64 result_type=struct_pb2.Struct, 65 metadata_type=struct_pb2.Struct, 66 ) 67 68 return operation_future, refresh, cancel 69 70 71@pytest.mark.asyncio 72async def test_constructor(): 73 future, refresh, _ = make_operation_future() 74 75 assert future.operation == refresh.responses[0] 76 assert future.operation.done is False 77 assert future.operation.name == TEST_OPERATION_NAME 78 assert future.metadata is None 79 assert await future.running() 80 81 82def test_metadata(): 83 expected_metadata = struct_pb2.Struct() 84 future, _, _ = make_operation_future( 85 [make_operation_proto(metadata=expected_metadata)] 86 ) 87 88 assert future.metadata == expected_metadata 89 90 91@pytest.mark.asyncio 92async def test_cancellation(): 93 responses = [ 94 make_operation_proto(), 95 # Second response indicates that the operation was cancelled. 96 make_operation_proto( 97 done=True, error=status_pb2.Status(code=code_pb2.CANCELLED) 98 ), 99 ] 100 future, _, cancel = make_operation_future(responses) 101 102 assert await future.cancel() 103 assert await future.cancelled() 104 cancel.assert_called_once_with() 105 106 # Cancelling twice should have no effect. 107 assert not await future.cancel() 108 cancel.assert_called_once_with() 109 110 111@pytest.mark.asyncio 112async def test_result(): 113 expected_result = struct_pb2.Struct() 114 responses = [ 115 make_operation_proto(), 116 # Second operation response includes the result. 117 make_operation_proto(done=True, response=expected_result), 118 ] 119 future, _, _ = make_operation_future(responses) 120 121 result = await future.result() 122 123 assert result == expected_result 124 assert await future.done() 125 126 127@pytest.mark.asyncio 128async def test_done_w_retry(): 129 RETRY_PREDICATE = retry_async.if_exception_type(exceptions.TooManyRequests) 130 test_retry = retry_async.AsyncRetry(predicate=RETRY_PREDICATE) 131 132 expected_result = struct_pb2.Struct() 133 responses = [ 134 make_operation_proto(), 135 # Second operation response includes the result. 136 make_operation_proto(done=True, response=expected_result), 137 ] 138 future, refresh, _ = make_operation_future(responses) 139 140 await future.done(retry=test_retry) 141 refresh.assert_called_once_with(retry=test_retry) 142 143 144@pytest.mark.asyncio 145async def test_exception(): 146 expected_exception = status_pb2.Status(message="meep") 147 responses = [ 148 make_operation_proto(), 149 # Second operation response includes the error. 150 make_operation_proto(done=True, error=expected_exception), 151 ] 152 future, _, _ = make_operation_future(responses) 153 154 exception = await future.exception() 155 156 assert expected_exception.message in "{!r}".format(exception) 157 158 159@mock.patch("asyncio.sleep", autospec=True) 160@pytest.mark.asyncio 161async def test_unexpected_result(unused_sleep): 162 responses = [ 163 make_operation_proto(), 164 # Second operation response is done, but has not error or response. 165 make_operation_proto(done=True), 166 ] 167 future, _, _ = make_operation_future(responses) 168 169 exception = await future.exception() 170 171 assert "Unexpected state" in "{!r}".format(exception) 172 173 174def test_from_gapic(): 175 operation_proto = make_operation_proto(done=True) 176 operations_client = mock.create_autospec( 177 operations_v1.OperationsClient, instance=True 178 ) 179 180 future = operation_async.from_gapic( 181 operation_proto, 182 operations_client, 183 struct_pb2.Struct, 184 metadata_type=struct_pb2.Struct, 185 grpc_metadata=[("x-goog-request-params", "foo")], 186 ) 187 188 assert future._result_type == struct_pb2.Struct 189 assert future._metadata_type == struct_pb2.Struct 190 assert future.operation.name == TEST_OPERATION_NAME 191 assert future.done 192 assert future._refresh.keywords["metadata"] == [("x-goog-request-params", "foo")] 193 assert future._cancel.keywords["metadata"] == [("x-goog-request-params", "foo")] 194 195 196def test_deserialize(): 197 op = make_operation_proto(name="foobarbaz") 198 serialized = op.SerializeToString() 199 deserialized_op = operation_async.AsyncOperation.deserialize(serialized) 200 assert op.name == deserialized_op.name 201 assert type(op) is type(deserialized_op) 202