1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/server_call.h"
16
17 #include <grpc/compression.h>
18 #include <grpc/grpc.h>
19
20 #include <atomic>
21
22 #include "absl/status/status.h"
23 #include "src/core/channelz/channelz.h"
24 #include "src/core/lib/promise/promise.h"
25 #include "src/core/lib/resource_quota/arena.h"
26 #include "src/core/telemetry/call_tracer.h"
27 #include "test/core/call/batch_builder.h"
28 #include "test/core/call/yodel/yodel_test.h"
29
30 namespace grpc_core {
31
32 namespace {
33 const absl::string_view kDefaultPath = "/foo/bar";
34 }
35
36 class ServerCallTest : public YodelTest {
37 protected:
38 using YodelTest::YodelTest;
39
~ServerCallTest()40 ~ServerCallTest() override {
41 grpc_metadata_array_destroy(&publish_initial_metadata_);
42 }
43
InitCall(ClientMetadataHandle client_initial_metadata)44 grpc_call* InitCall(ClientMetadataHandle client_initial_metadata) {
45 CHECK_EQ(call_, nullptr);
46 auto arena = SimpleArenaAllocator()->MakeArena();
47 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
48 event_engine().get());
49 auto call =
50 MakeCallPair(std::move(client_initial_metadata), std::move(arena));
51 call.initiator.SpawnGuarded(
52 "initial_metadata",
53 [this, handler = call.handler.StartCall()]() mutable {
54 return TrySeq(
55 handler.PullClientInitialMetadata(),
56 [this,
57 handler](ClientMetadataHandle client_initial_metadata) mutable {
58 call_.store(MakeServerCall(std::move(handler),
59 std::move(client_initial_metadata),
60 &test_server_, cq_,
61 &publish_initial_metadata_),
62 std::memory_order_release);
63 return absl::OkStatus();
64 });
65 });
66 while (true) {
67 auto* result = call_.load(std::memory_order_acquire);
68 if (result != nullptr) return result;
69 }
70 }
71
MakeClientInitialMetadata(std::initializer_list<std::pair<absl::string_view,absl::string_view>> md)72 ClientMetadataHandle MakeClientInitialMetadata(
73 std::initializer_list<std::pair<absl::string_view, absl::string_view>>
74 md) {
75 auto client_initial_metadata =
76 Arena::MakePooledForOverwrite<ClientMetadata>();
77 client_initial_metadata->Set(HttpPathMetadata(),
78 Slice::FromCopiedString(kDefaultPath));
79 for (const auto& pair : md) {
80 client_initial_metadata->Append(
81 pair.first, Slice::FromCopiedBuffer(pair.second),
82 [](absl::string_view error, const Slice&) { Crash(error); });
83 }
84 return client_initial_metadata;
85 }
86
GetClientInitialMetadata(absl::string_view key)87 absl::optional<std::string> GetClientInitialMetadata(absl::string_view key) {
88 CHECK_NE(call_.load(std::memory_order_acquire), nullptr);
89 return FindInMetadataArray(publish_initial_metadata_, key);
90 }
91
92 private:
93 class TestServer final : public ServerInterface {
94 public:
channel_args() const95 const ChannelArgs& channel_args() const override { return channel_args_; }
channelz_node() const96 channelz::ServerNode* channelz_node() const override { return nullptr; }
server_call_tracer_factory() const97 ServerCallTracerFactory* server_call_tracer_factory() const override {
98 return nullptr;
99 }
compression_options() const100 grpc_compression_options compression_options() const override {
101 return {
102 1,
103 {0, GRPC_COMPRESS_LEVEL_NONE},
104 {0, GRPC_COMPRESS_NONE},
105 };
106 }
107
108 private:
109 ChannelArgs channel_args_;
110 };
111
InitTest()112 void InitTest() override {
113 cq_ = grpc_completion_queue_create_for_next(nullptr);
114 }
115
Shutdown()116 void Shutdown() override {
117 auto* call = call_.load(std::memory_order_acquire);
118 if (call != nullptr) {
119 grpc_call_unref(call);
120 }
121 grpc_completion_queue_shutdown(cq_);
122 auto ev = grpc_completion_queue_next(
123 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
124 CHECK_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
125 grpc_completion_queue_destroy(cq_);
126 }
127
128 grpc_completion_queue* cq_{nullptr};
129 std::atomic<grpc_call*> call_{nullptr};
130 CallInitiator call_initiator_;
131 TestServer test_server_;
132 grpc_metadata_array publish_initial_metadata_{0, 0, nullptr};
133 };
134
135 #define SERVER_CALL_TEST(name) YODEL_TEST(ServerCallTest, name)
136
SERVER_CALL_TEST(NoOp)137 SERVER_CALL_TEST(NoOp) { InitCall(MakeClientInitialMetadata({})); }
138
SERVER_CALL_TEST(InitialMetadataPassedThrough)139 SERVER_CALL_TEST(InitialMetadataPassedThrough) {
140 InitCall(MakeClientInitialMetadata({{"foo", "bar"}}));
141 EXPECT_EQ(GetClientInitialMetadata("foo"), "bar");
142 }
143
144 } // namespace grpc_core
145