• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/surface/client_call.h"
16 
17 #include <grpc/compression.h>
18 #include <grpc/grpc.h>
19 
20 #include "absl/status/status.h"
21 #include "src/core/lib/resource_quota/arena.h"
22 #include "src/core/lib/transport/metadata.h"
23 #include "src/core/util/debug_location.h"
24 #include "test/core/call/batch_builder.h"
25 #include "test/core/call/yodel/yodel_test.h"
26 
27 namespace grpc_core {
28 
29 namespace {
30 const absl::string_view kDefaultPath = "/foo/bar";
31 }
32 
33 class ClientCallTest : public YodelTest {
34  protected:
35   using YodelTest::YodelTest;
36 
37   class CallOptions {
38    public:
path() const39     Slice path() const { return path_.Copy(); }
authority() const40     absl::optional<Slice> authority() const {
41       return authority_.has_value() ? absl::optional<Slice>(authority_->Copy())
42                                     : absl::nullopt;
43     }
registered_method() const44     bool registered_method() const { return registered_method_; }
timeout() const45     Duration timeout() const { return timeout_; }
compression_options() const46     grpc_compression_options compression_options() const {
47       return compression_options_;
48     }
49 
SetTimeout(Duration timeout)50     CallOptions& SetTimeout(Duration timeout) {
51       timeout_ = timeout;
52       return *this;
53     }
54 
55    private:
56     Slice path_ = Slice::FromCopiedString(kDefaultPath);
57     absl::optional<Slice> authority_;
58     bool registered_method_ = false;
59     Duration timeout_ = Duration::Infinity();
60     grpc_compression_options compression_options_ = {
61         1,
62         {0, GRPC_COMPRESS_LEVEL_NONE},
63         {0, GRPC_COMPRESS_NONE},
64     };
65   };
66 
InitCall(const CallOptions & options)67   grpc_call* InitCall(const CallOptions& options) {
68     CHECK_EQ(call_, nullptr);
69     auto arena = SimpleArenaAllocator()->MakeArena();
70     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
71         event_engine().get());
72     call_ = MakeClientCall(
73         nullptr, 0, cq_, options.path(), options.authority(),
74         options.registered_method(), options.timeout() + Timestamp::Now(),
75         options.compression_options(), std::move(arena), destination_);
76     return call_;
77   }
78 
NewBatch(int tag)79   BatchBuilder NewBatch(int tag) {
80     return BatchBuilder(call_, cq_verifier_.get(), tag);
81   }
82 
83   // Pull in CqVerifier types for ergonomics
84   using ExpectedResult = CqVerifier::ExpectedResult;
85   using Maybe = CqVerifier::Maybe;
86   using PerformAction = CqVerifier::PerformAction;
87   using MaybePerformAction = CqVerifier::MaybePerformAction;
88   using AnyStatus = CqVerifier::AnyStatus;
Expect(int tag,ExpectedResult result,SourceLocation whence={})89   void Expect(int tag, ExpectedResult result, SourceLocation whence = {}) {
90     expectations_++;
91     cq_verifier_->Expect(CqVerifier::tag(tag), std::move(result), whence);
92   }
93 
TickThroughCqExpectations(absl::optional<Duration> timeout=absl::nullopt,SourceLocation whence={})94   void TickThroughCqExpectations(
95       absl::optional<Duration> timeout = absl::nullopt,
96       SourceLocation whence = {}) {
97     if (expectations_ == 0) {
98       cq_verifier_->VerifyEmpty(timeout.value_or(Duration::Seconds(1)), whence);
99       return;
100     }
101     expectations_ = 0;
102     cq_verifier_->Verify(
103         timeout.value_or(g_yodel_fuzzing ? Duration::Minutes(5)
104                                          : Duration::Seconds(10)),
105         whence);
106   }
107 
handler()108   CallHandler& handler() {
109     CHECK(handler_.has_value());
110     return *handler_;
111   }
112 
113  private:
114   class TestCallDestination final : public UnstartedCallDestination {
115    public:
TestCallDestination(ClientCallTest * test)116     explicit TestCallDestination(ClientCallTest* test) : test_(test) {}
117 
Orphaned()118     void Orphaned() override {}
StartCall(UnstartedCallHandler handler)119     void StartCall(UnstartedCallHandler handler) override {
120       CHECK(!test_->handler_.has_value());
121       test_->handler_.emplace(handler.StartCall());
122     }
123 
124    private:
125     ClientCallTest* const test_;
126   };
127 
InitTest()128   void InitTest() override {
129     cq_ = grpc_completion_queue_create_for_next(nullptr);
130     cq_verifier_ = absl::make_unique<CqVerifier>(
131         cq_, CqVerifier::FailUsingGprCrash,
132         [this](
133             grpc_event_engine::experimental::EventEngine::Duration max_step) {
134           event_engine()->Tick(max_step);
135         });
136   }
137 
Shutdown()138   void Shutdown() override {
139     if (call_ != nullptr) {
140       grpc_call_unref(call_);
141     }
142     handler_.reset();
143     grpc_completion_queue_shutdown(cq_);
144     auto ev = grpc_completion_queue_next(
145         cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
146     CHECK_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
147     grpc_completion_queue_destroy(cq_);
148   }
149 
150   grpc_completion_queue* cq_ = nullptr;
151   grpc_call* call_ = nullptr;
152   RefCountedPtr<TestCallDestination> destination_ =
153       MakeRefCounted<TestCallDestination>(this);
154   absl::optional<CallHandler> handler_;
155   std::unique_ptr<CqVerifier> cq_verifier_;
156   int expectations_ = 0;
157 };
158 
159 #define CLIENT_CALL_TEST(name) YODEL_TEST(ClientCallTest, name)
160 
CLIENT_CALL_TEST(NoOp)161 CLIENT_CALL_TEST(NoOp) { InitCall(CallOptions()); }
162 
CLIENT_CALL_TEST(SendInitialMetadata)163 CLIENT_CALL_TEST(SendInitialMetadata) {
164   InitCall(CallOptions());
165   NewBatch(1).SendInitialMetadata({
166       {"foo", "bar"},
167   });
168   Expect(1, true);
169   TickThroughCqExpectations();
170   SpawnTestSeq(
171       handler(), "pull-initial-metadata",
172       [this]() { return handler().PullClientInitialMetadata(); },
173       [](ValueOrFailure<ClientMetadataHandle> md) {
174         CHECK(md.ok());
175         CHECK_NE((*md)->get_pointer(HttpPathMetadata()), nullptr);
176         EXPECT_EQ((*md)->get_pointer(HttpPathMetadata())->as_string_view(),
177                   kDefaultPath);
178         std::string buffer;
179         auto r = (*md)->GetStringValue("foo", &buffer);
180         EXPECT_EQ(r, "bar");
181         return Immediate(Empty{});
182       });
183   WaitForAllPendingWork();
184 }
185 
CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterCancellation)186 CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterCancellation) {
187   InitCall(CallOptions());
188   IncomingStatusOnClient status;
189   NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
190   SpawnTestSeq(
191       handler(), "pull-initial-metadata",
192       [this]() { return handler().PullClientInitialMetadata(); },
193       [this](ValueOrFailure<ClientMetadataHandle> md) {
194         CHECK(md.ok());
195         EXPECT_EQ((*md)->get_pointer(HttpPathMetadata())->as_string_view(),
196                   kDefaultPath);
197         handler().PushServerTrailingMetadata(
198             ServerMetadataFromStatus(GRPC_STATUS_INTERNAL, "test error"));
199         return Immediate(Empty{});
200       });
201   Expect(1, true);
202   TickThroughCqExpectations();
203   EXPECT_EQ(status.status(), GRPC_STATUS_INTERNAL);
204   EXPECT_EQ(status.message(), "test error");
205   WaitForAllPendingWork();
206 }
207 
CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterTimeout)208 CLIENT_CALL_TEST(SendInitialMetadataAndReceiveStatusAfterTimeout) {
209   auto start = Timestamp::Now();
210   InitCall(CallOptions().SetTimeout(Duration::Seconds(1)));
211   IncomingStatusOnClient status;
212   NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
213   Expect(1, true);
214   TickThroughCqExpectations();
215   EXPECT_EQ(status.status(), GRPC_STATUS_DEADLINE_EXCEEDED);
216   ExecCtx::Get()->InvalidateNow();
217   auto now = Timestamp::Now();
218   EXPECT_GE(now - start, Duration::Seconds(1)) << GRPC_DUMP_ARGS(now, start);
219   EXPECT_LE(now - start,
220             g_yodel_fuzzing ? Duration::Minutes(10) : Duration::Seconds(5))
221       << GRPC_DUMP_ARGS(now, start);
222   WaitForAllPendingWork();
223 }
224 
CLIENT_CALL_TEST(CancelBeforeInvoke1)225 CLIENT_CALL_TEST(CancelBeforeInvoke1) {
226   grpc_call_cancel(InitCall(CallOptions()), nullptr);
227   IncomingStatusOnClient status;
228   NewBatch(1).RecvStatusOnClient(status);
229   Expect(1, true);
230   TickThroughCqExpectations();
231   EXPECT_EQ(status.status(), GRPC_STATUS_CANCELLED);
232 }
233 
CLIENT_CALL_TEST(CancelBeforeInvoke2)234 CLIENT_CALL_TEST(CancelBeforeInvoke2) {
235   grpc_call_cancel(InitCall(CallOptions()), nullptr);
236   IncomingStatusOnClient status;
237   NewBatch(1).RecvStatusOnClient(status).SendInitialMetadata({});
238   Expect(1, true);
239   TickThroughCqExpectations();
240   EXPECT_EQ(status.status(), GRPC_STATUS_CANCELLED);
241 }
242 
CLIENT_CALL_TEST(NegativeDeadline)243 CLIENT_CALL_TEST(NegativeDeadline) {
244   auto start = Timestamp::Now();
245   InitCall(CallOptions().SetTimeout(Duration::Seconds(-1)));
246   IncomingStatusOnClient status;
247   NewBatch(1).SendInitialMetadata({}).RecvStatusOnClient(status);
248   Expect(1, true);
249   TickThroughCqExpectations();
250   EXPECT_EQ(status.status(), GRPC_STATUS_DEADLINE_EXCEEDED);
251   auto now = Timestamp::Now();
252   EXPECT_LE(now - start, Duration::Milliseconds(100))
253       << GRPC_DUMP_ARGS(now, start);
254   WaitForAllPendingWork();
255 }
256 
257 }  // namespace grpc_core
258