• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import mock
17import pytest
18
19try:
20    import grpc  # noqa: F401
21except ImportError:
22    pytest.skip("No GRPC", allow_module_level=True)
23
24from google.api_core import exceptions
25from google.api_core import operation_async
26from google.api_core import operations_v1
27from google.api_core import retry_async
28from google.longrunning import operations_pb2
29from google.protobuf import struct_pb2
30from google.rpc import code_pb2
31from google.rpc import status_pb2
32
33TEST_OPERATION_NAME = "test/operation"
34
35
36def make_operation_proto(
37    name=TEST_OPERATION_NAME, metadata=None, response=None, error=None, **kwargs
38):
39    operation_proto = operations_pb2.Operation(name=name, **kwargs)
40
41    if metadata is not None:
42        operation_proto.metadata.Pack(metadata)
43
44    if response is not None:
45        operation_proto.response.Pack(response)
46
47    if error is not None:
48        operation_proto.error.CopyFrom(error)
49
50    return operation_proto
51
52
53def make_operation_future(client_operations_responses=None):
54    if client_operations_responses is None:
55        client_operations_responses = [make_operation_proto()]
56
57    refresh = mock.AsyncMock(spec=["__call__"], side_effect=client_operations_responses)
58    refresh.responses = client_operations_responses
59    cancel = mock.AsyncMock(spec=["__call__"])
60    operation_future = operation_async.AsyncOperation(
61        client_operations_responses[0],
62        refresh,
63        cancel,
64        result_type=struct_pb2.Struct,
65        metadata_type=struct_pb2.Struct,
66    )
67
68    return operation_future, refresh, cancel
69
70
71@pytest.mark.asyncio
72async def test_constructor():
73    future, refresh, _ = make_operation_future()
74
75    assert future.operation == refresh.responses[0]
76    assert future.operation.done is False
77    assert future.operation.name == TEST_OPERATION_NAME
78    assert future.metadata is None
79    assert await future.running()
80
81
82def test_metadata():
83    expected_metadata = struct_pb2.Struct()
84    future, _, _ = make_operation_future(
85        [make_operation_proto(metadata=expected_metadata)]
86    )
87
88    assert future.metadata == expected_metadata
89
90
91@pytest.mark.asyncio
92async def test_cancellation():
93    responses = [
94        make_operation_proto(),
95        # Second response indicates that the operation was cancelled.
96        make_operation_proto(
97            done=True, error=status_pb2.Status(code=code_pb2.CANCELLED)
98        ),
99    ]
100    future, _, cancel = make_operation_future(responses)
101
102    assert await future.cancel()
103    assert await future.cancelled()
104    cancel.assert_called_once_with()
105
106    # Cancelling twice should have no effect.
107    assert not await future.cancel()
108    cancel.assert_called_once_with()
109
110
111@pytest.mark.asyncio
112async def test_result():
113    expected_result = struct_pb2.Struct()
114    responses = [
115        make_operation_proto(),
116        # Second operation response includes the result.
117        make_operation_proto(done=True, response=expected_result),
118    ]
119    future, _, _ = make_operation_future(responses)
120
121    result = await future.result()
122
123    assert result == expected_result
124    assert await future.done()
125
126
127@pytest.mark.asyncio
128async def test_done_w_retry():
129    RETRY_PREDICATE = retry_async.if_exception_type(exceptions.TooManyRequests)
130    test_retry = retry_async.AsyncRetry(predicate=RETRY_PREDICATE)
131
132    expected_result = struct_pb2.Struct()
133    responses = [
134        make_operation_proto(),
135        # Second operation response includes the result.
136        make_operation_proto(done=True, response=expected_result),
137    ]
138    future, refresh, _ = make_operation_future(responses)
139
140    await future.done(retry=test_retry)
141    refresh.assert_called_once_with(retry=test_retry)
142
143
144@pytest.mark.asyncio
145async def test_exception():
146    expected_exception = status_pb2.Status(message="meep")
147    responses = [
148        make_operation_proto(),
149        # Second operation response includes the error.
150        make_operation_proto(done=True, error=expected_exception),
151    ]
152    future, _, _ = make_operation_future(responses)
153
154    exception = await future.exception()
155
156    assert expected_exception.message in "{!r}".format(exception)
157
158
159@mock.patch("asyncio.sleep", autospec=True)
160@pytest.mark.asyncio
161async def test_unexpected_result(unused_sleep):
162    responses = [
163        make_operation_proto(),
164        # Second operation response is done, but has not error or response.
165        make_operation_proto(done=True),
166    ]
167    future, _, _ = make_operation_future(responses)
168
169    exception = await future.exception()
170
171    assert "Unexpected state" in "{!r}".format(exception)
172
173
174def test_from_gapic():
175    operation_proto = make_operation_proto(done=True)
176    operations_client = mock.create_autospec(
177        operations_v1.OperationsClient, instance=True
178    )
179
180    future = operation_async.from_gapic(
181        operation_proto,
182        operations_client,
183        struct_pb2.Struct,
184        metadata_type=struct_pb2.Struct,
185        grpc_metadata=[("x-goog-request-params", "foo")],
186    )
187
188    assert future._result_type == struct_pb2.Struct
189    assert future._metadata_type == struct_pb2.Struct
190    assert future.operation.name == TEST_OPERATION_NAME
191    assert future.done
192    assert future._refresh.keywords["metadata"] == [("x-goog-request-params", "foo")]
193    assert future._cancel.keywords["metadata"] == [("x-goog-request-params", "foo")]
194
195
196def test_deserialize():
197    op = make_operation_proto(name="foobarbaz")
198    serialized = op.SerializeToString()
199    deserialized_op = operation_async.AsyncOperation.deserialize(serialized)
200    assert op.name == deserialized_op.name
201    assert type(op) is type(deserialized_op)
202