1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/client_call.h"
16
17 #include <grpc/compression.h>
18 #include <grpc/grpc.h>
19
20 #include "absl/status/status.h"
21 #include "src/core/lib/resource_quota/arena.h"
22 #include "src/core/lib/transport/metadata.h"
23 #include "src/core/util/debug_location.h"
24 #include "test/core/call/batch_builder.h"
25 #include "test/core/call/yodel/yodel_test.h"
26
27 namespace grpc_core {
28
29 namespace {
30 const absl::string_view kDefaultPath = "/foo/bar";
31 }
32
33 class ClientCallTest : public YodelTest {
34 protected:
35 using YodelTest::YodelTest;
36
37 class CallOptions {
38 public:
path() const39 Slice path() const { return path_.Copy(); }
authority() const40 absl::optional<Slice> authority() const {
41 return authority_.has_value() ? absl::optional<Slice>(authority_->Copy())
42 : absl::nullopt;
43 }
registered_method() const44 bool registered_method() const { return registered_method_; }
timeout() const45 Duration timeout() const { return timeout_; }
compression_options() const46 grpc_compression_options compression_options() const {
47 return compression_options_;
48 }
49
SetTimeout(Duration timeout)50 CallOptions& SetTimeout(Duration timeout) {
51 timeout_ = timeout;
52 return *this;
53 }
54
55 private:
56 Slice path_ = Slice::FromCopiedString(kDefaultPath);
57 absl::optional<Slice> authority_;
58 bool registered_method_ = false;
59 Duration timeout_ = Duration::Infinity();
60 grpc_compression_options compression_options_ = {
61 1,
62 {0, GRPC_COMPRESS_LEVEL_NONE},
63 {0, GRPC_COMPRESS_NONE},
64 };
65 };
66
InitCall(const CallOptions & options)67 grpc_call* InitCall(const CallOptions& options) {
68 CHECK_EQ(call_, nullptr);
69 auto arena = SimpleArenaAllocator()->MakeArena();
70 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
71 event_engine().get());
72 call_ = MakeClientCall(
73 nullptr, 0, cq_, options.path(), options.authority(),
74 options.registered_method(), options.timeout() + Timestamp::Now(),
75 options.compression_options(), std::move(arena), destination_);
76 return call_;
77 }
78
NewBatch(int tag)79 BatchBuilder NewBatch(int tag) {
80 return BatchBuilder(call_, cq_verifier_.get(), tag);
81 }
82
83 // Pull in CqVerifier types for ergonomics
84 using ExpectedResult = CqVerifier::ExpectedResult;
85 using Maybe = CqVerifier::Maybe;
86 using PerformAction = CqVerifier::PerformAction;
87 using MaybePerformAction = CqVerifier::MaybePerformAction;
88 using AnyStatus = CqVerifier::AnyStatus;
Expect(int tag,ExpectedResult result,SourceLocation whence={})89 void Expect(int tag, ExpectedResult result, SourceLocation whence = {}) {
90 expectations_++;
91 cq_verifier_->Expect(CqVerifier::tag(tag), std::move(result), whence);
92 }
93
TickThroughCqExpectations(absl::optional<Duration> timeout=absl::nullopt,SourceLocation whence={})94 void TickThroughCqExpectations(
95 absl::optional<Duration> timeout = absl::nullopt,
96 SourceLocation whence = {}) {
97 if (expectations_ == 0) {
98 cq_verifier_->VerifyEmpty(timeout.value_or(Duration::Seconds(1)), whence);
99 return;
100 }
101 expectations_ = 0;
102 cq_verifier_->Verify(
103 timeout.value_or(g_yodel_fuzzing ? Duration::Minutes(5)
104 : Duration::Seconds(10)),
105 whence);
106 }
107
handler()108 CallHandler& handler() {
109 CHECK(handler_.has_value());
110 return *handler_;
111 }
112
113 private:
114 class TestCallDestination final : public UnstartedCallDestination {
115 public:
TestCallDestination(ClientCallTest * test)116 explicit TestCallDestination(ClientCallTest* test) : test_(test) {}
117
Orphaned()118 void Orphaned() override {}
StartCall(UnstartedCallHandler handler)119 void StartCall(UnstartedCallHandler handler) override {
120 CHECK(!test_->handler_.has_value());
121 test_->handler_.emplace(handler.StartCall());
122 }
123
124 private:
125 ClientCallTest* const test_;
126 };
127
InitTest()128 void InitTest() override {
129 cq_ = grpc_completion_queue_create_for_next(nullptr);
130 cq_verifier_ = absl::make_unique<CqVerifier>(
131 cq_, CqVerifier::FailUsingGprCrash,
132 [this](
133 grpc_event_engine::experimental::EventEngine::Duration max_step) {
134 event_engine()->Tick(max_step);
135 });
136 }
137
Shutdown()138 void Shutdown() override {
139 if (call_ != nullptr) {
140 grpc_call_unref(call_);
141 }
142 handler_.reset();
143 grpc_completion_queue_shutdown(cq_);
144 auto ev = grpc_completion_queue_next(
145 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
146 CHECK_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
147 grpc_completion_queue_destroy(cq_);
148 }
149
150 grpc_completion_queue* cq_ = nullptr;
151 grpc_call* call_ = nullptr;
152 RefCountedPtr<TestCallDestination> destination_ =
153 MakeRefCounted<TestCallDestination>(this);
154 absl::optional<CallHandler> handler_;
155 std::unique_ptr<CqVerifier> cq_verifier_;
156 int expectations_ = 0;
157 };
158
159 #define CLIENT_CALL_TEST(name) YODEL_TEST(ClientCallTest, name)
160
CLIENT_CALL_TEST(NoOp)161 CLIENT_CALL_TEST(NoOp) { InitCall(CallOptions()); }
162
CLIENT_CALL_TEST(SendInitialMetadata)163 CLIENT_CALL_TEST(SendInitialMetadata) {
164 InitCall(CallOptions());
165 NewBatch(1).SendInitialMetadata({
166 {"foo", "bar"},
167 });
168 Expect(1, true);
169 TickThroughCqExpectations();
170 SpawnTestSeq(
171 handler(), "pull-initial-metadata",
172 [this]() { return handler().PullClientInitialMetadata(); },
173 [](ValueOrFailure<ClientMetadataHandle> md) {
174 CHECK(md.ok());
175 CHECK_NE((*md)->get_pointer(HttpPathMetadata()), nullptr);
176 EXPECT_EQ((*md)->get_pointer(HttpPathMetadata())->as_string_view(),
177 kDefaultPath);
178 std::string buffer;
179 auto r = (*md)->GetStringValue("foo", &buffer);
180 EXPECT_EQ(r, "bar");
181 return Immediate(Empty{});
182 });
183 WaitForAllPendingWork();
184 }
185
CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterCancellation)186 CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterCancellation) {
187 InitCall(CallOptions());
188 IncomingStatusOnClient status;
189 NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
190 SpawnTestSeq(
191 handler(), "pull-initial-metadata",
192 [this]() { return handler().PullClientInitialMetadata(); },
193 [this](ValueOrFailure<ClientMetadataHandle> md) {
194 CHECK(md.ok());
195 EXPECT_EQ((*md)->get_pointer(HttpPathMetadata())->as_string_view(),
196 kDefaultPath);
197 handler().PushServerTrailingMetadata(
198 ServerMetadataFromStatus(GRPC_STATUS_INTERNAL, "test error"));
199 return Immediate(Empty{});
200 });
201 Expect(1, true);
202 TickThroughCqExpectations();
203 EXPECT_EQ(status.status(), GRPC_STATUS_INTERNAL);
204 EXPECT_EQ(status.message(), "test error");
205 WaitForAllPendingWork();
206 }
207
CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterTimeout)208 CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterTimeout) {
209 auto start = Timestamp::Now();
210 InitCall(CallOptions().SetTimeout(Duration::Seconds(1)));
211 IncomingStatusOnClient status;
212 NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
213 Expect(1, true);
214 TickThroughCqExpectations();
215 EXPECT_EQ(status.status(), GRPC_STATUS_DEADLINE_EXCEEDED);
216 ExecCtx::Get()->InvalidateNow();
217 auto now = Timestamp::Now();
218 EXPECT_GE(now - start, Duration::Seconds(1)) << GRPC_DUMP_ARGS(now, start);
219 EXPECT_LE(now - start,
220 g_yodel_fuzzing ? Duration::Minutes(10) : Duration::Seconds(5))
221 << GRPC_DUMP_ARGS(now, start);
222 WaitForAllPendingWork();
223 }
224
CLIENT_CALL_TEST(CancelBeforeInvoke1)225 CLIENT_CALL_TEST(CancelBeforeInvoke1) {
226 grpc_call_cancel(InitCall(CallOptions()), nullptr);
227 IncomingStatusOnClient status;
228 NewBatch(1).RecvStatusOnClient(status);
229 Expect(1, true);
230 TickThroughCqExpectations();
231 EXPECT_EQ(status.status(), GRPC_STATUS_CANCELLED);
232 }
233
CLIENT_CALL_TEST(CancelBeforeInvoke2)234 CLIENT_CALL_TEST(CancelBeforeInvoke2) {
235 grpc_call_cancel(InitCall(CallOptions()), nullptr);
236 IncomingStatusOnClient status;
237 NewBatch(1).RecvStatusOnClient(status).SendInitialMetadata({});
238 Expect(1, true);
239 TickThroughCqExpectations();
240 EXPECT_EQ(status.status(), GRPC_STATUS_CANCELLED);
241 }
242
CLIENT_CALL_TEST(NegativeDeadline)243 CLIENT_CALL_TEST(NegativeDeadline) {
244 auto start = Timestamp::Now();
245 InitCall(CallOptions().SetTimeout(Duration::Seconds(-1)));
246 IncomingStatusOnClient status;
247 NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
248 Expect(1, true);
249 TickThroughCqExpectations();
250 EXPECT_EQ(status.status(), GRPC_STATUS_DEADLINE_EXCEEDED);
251 auto now = Timestamp::Now();
252 EXPECT_LE(now - start, Duration::Milliseconds(100))
253 << GRPC_DUMP_ARGS(now, start);
254 WaitForAllPendingWork();
255 }
256
257 } // namespace grpc_core
258