1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/status.h>
16 #include <grpc/support/time.h>
17
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <utility>
22
23 #include "absl/base/thread_annotations.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/time/time.h"
28 #include "gtest/gtest.h"
29 #include "src/core/config/core_configuration.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/channel_fwd.h"
32 #include "src/core/lib/channel/channel_stack.h"
33 #include "src/core/lib/channel/promise_based_filter.h"
34 #include "src/core/lib/experiments/experiments.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/promise/arena_promise.h"
37 #include "src/core/lib/promise/context.h"
38 #include "src/core/lib/resource_quota/arena.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_buffer.h"
41 #include "src/core/lib/surface/channel_stack_type.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/transport.h"
44 #include "src/core/telemetry/call_tracer.h"
45 #include "src/core/telemetry/metrics.h"
46 #include "src/core/telemetry/tcp_tracer.h"
47 #include "src/core/util/notification.h"
48 #include "src/core/util/sync.h"
49 #include "src/core/util/time.h"
50 #include "test/core/end2end/end2end_tests.h"
51 #include "test/core/test_util/fake_stats_plugin.h"
52
53 namespace grpc_core {
54 namespace {
55
56 Mutex* g_mu;
57 CoreEnd2endTest::TestNotification* g_client_call_ended_notify;
58 CoreEnd2endTest::TestNotification* g_server_call_ended_notify;
59
60 class FakeCallTracer : public ClientCallTracer {
61 public:
62 class FakeCallAttemptTracer : public CallAttemptTracer {
63 public:
FakeCallAttemptTracer()64 FakeCallAttemptTracer() {
65 MutexLock lock(g_mu);
66 incoming_bytes_ = TransportByteSize();
67 outgoing_bytes_ = TransportByteSize();
68 }
TraceId()69 std::string TraceId() override { return ""; }
SpanId()70 std::string SpanId() override { return ""; }
IsSampled()71 bool IsSampled() override { return false; }
RecordSendInitialMetadata(grpc_metadata_batch *)72 void RecordSendInitialMetadata(
73 grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)74 void RecordSendTrailingMetadata(
75 grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)76 void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)77 void RecordSendCompressedMessage(
78 const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)79 void RecordReceivedInitialMetadata(
80 grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)81 void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)82 void RecordReceivedDecompressedMessage(
83 const SliceBuffer& /*recv_decompressed_message*/) override {}
84
RecordReceivedTrailingMetadata(absl::Status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)85 void RecordReceivedTrailingMetadata(
86 absl::Status /*status*/,
87 grpc_metadata_batch* /*recv_trailing_metadata*/,
88 const grpc_transport_stream_stats* transport_stream_stats) override {
89 if (IsCallTracerInTransportEnabled()) return;
90 TransportByteSize incoming_bytes = {
91 transport_stream_stats->incoming.framing_bytes,
92 transport_stream_stats->incoming.data_bytes,
93 transport_stream_stats->incoming.header_bytes};
94 TransportByteSize outgoing_bytes = {
95 transport_stream_stats->outgoing.framing_bytes,
96 transport_stream_stats->outgoing.data_bytes,
97 transport_stream_stats->outgoing.header_bytes};
98 MutexLock lock(g_mu);
99 incoming_bytes_ = incoming_bytes;
100 outgoing_bytes_ = outgoing_bytes;
101 }
102
RecordIncomingBytes(const TransportByteSize & transport_byte_size)103 void RecordIncomingBytes(
104 const TransportByteSize& transport_byte_size) override {
105 MutexLock lock(g_mu);
106 incoming_bytes_ += transport_byte_size;
107 }
108
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)109 void RecordOutgoingBytes(
110 const TransportByteSize& transport_byte_size) override {
111 MutexLock lock(g_mu);
112 outgoing_bytes_ += transport_byte_size;
113 }
114
RecordCancel(grpc_error_handle)115 void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()116 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
117 return nullptr;
118 }
RecordEnd(const gpr_timespec &)119 void RecordEnd(const gpr_timespec& /*latency*/) override {
120 g_client_call_ended_notify->Notify();
121 delete this;
122 }
RecordAnnotation(absl::string_view)123 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)124 void RecordAnnotation(const Annotation& /*annotation*/) override {}
125
SetOptionalLabel(OptionalLabelKey,RefCountedStringValue)126 void SetOptionalLabel(OptionalLabelKey /*key*/,
127 RefCountedStringValue /*value*/) override {}
128
incoming_bytes()129 static TransportByteSize incoming_bytes() {
130 MutexLock lock(g_mu);
131 return incoming_bytes_;
132 }
133
outgoing_bytes()134 static TransportByteSize outgoing_bytes() {
135 MutexLock lock(g_mu);
136 return outgoing_bytes_;
137 }
138
139 private:
140 static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
141 static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
142 };
143
FakeCallTracer()144 explicit FakeCallTracer() {}
~FakeCallTracer()145 ~FakeCallTracer() override {}
TraceId()146 std::string TraceId() override { return ""; }
SpanId()147 std::string SpanId() override { return ""; }
IsSampled()148 bool IsSampled() override { return false; }
149
StartNewAttempt(bool)150 FakeCallAttemptTracer* StartNewAttempt(
151 bool /*is_transparent_retry*/) override {
152 return new FakeCallAttemptTracer;
153 }
154
RecordAnnotation(absl::string_view)155 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)156 void RecordAnnotation(const Annotation& /*annotation*/) override {}
157 };
158
159 CallTracerInterface::TransportByteSize
160 FakeCallTracer::FakeCallAttemptTracer::incoming_bytes_;
161 CallTracerInterface::TransportByteSize
162 FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes_;
163
164 class FakeServerCallTracer : public ServerCallTracer {
165 public:
FakeServerCallTracer()166 FakeServerCallTracer() {
167 MutexLock lock(g_mu);
168 incoming_bytes_ = TransportByteSize();
169 outgoing_bytes_ = TransportByteSize();
170 }
~FakeServerCallTracer()171 ~FakeServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch *)172 void RecordSendInitialMetadata(
173 grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)174 void RecordSendTrailingMetadata(
175 grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)176 void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)177 void RecordSendCompressedMessage(
178 const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)179 void RecordReceivedInitialMetadata(
180 grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)181 void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)182 void RecordReceivedDecompressedMessage(
183 const SliceBuffer& /*recv_decompressed_message*/) override {}
RecordCancel(grpc_error_handle)184 void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()185 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
186 return nullptr;
187 }
RecordReceivedTrailingMetadata(grpc_metadata_batch *)188 void RecordReceivedTrailingMetadata(
189 grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
190
RecordEnd(const grpc_call_final_info * final_info)191 void RecordEnd(const grpc_call_final_info* final_info) override {
192 if (!IsCallTracerInTransportEnabled()) {
193 TransportByteSize incoming_bytes = {
194 final_info->stats.transport_stream_stats.incoming.framing_bytes,
195 final_info->stats.transport_stream_stats.incoming.data_bytes,
196 final_info->stats.transport_stream_stats.incoming.header_bytes};
197 TransportByteSize outgoing_bytes = {
198 final_info->stats.transport_stream_stats.outgoing.framing_bytes,
199 final_info->stats.transport_stream_stats.outgoing.data_bytes,
200 final_info->stats.transport_stream_stats.outgoing.header_bytes};
201 MutexLock lock(g_mu);
202 incoming_bytes_ = incoming_bytes;
203 outgoing_bytes_ = outgoing_bytes;
204 }
205 g_server_call_ended_notify->Notify();
206 }
207
RecordIncomingBytes(const TransportByteSize & transport_byte_size)208 void RecordIncomingBytes(
209 const TransportByteSize& transport_byte_size) override {
210 MutexLock lock(g_mu);
211 incoming_bytes_ += transport_byte_size;
212 }
213
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)214 void RecordOutgoingBytes(
215 const TransportByteSize& transport_byte_size) override {
216 MutexLock lock(g_mu);
217 outgoing_bytes_ += transport_byte_size;
218 }
219
RecordAnnotation(absl::string_view)220 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)221 void RecordAnnotation(const Annotation& /*annotation*/) override {}
TraceId()222 std::string TraceId() override { return ""; }
SpanId()223 std::string SpanId() override { return ""; }
IsSampled()224 bool IsSampled() override { return false; }
225
incoming_bytes()226 static TransportByteSize incoming_bytes() {
227 MutexLock lock(g_mu);
228 return incoming_bytes_;
229 }
230
outgoing_bytes()231 static TransportByteSize outgoing_bytes() {
232 MutexLock lock(g_mu);
233 return outgoing_bytes_;
234 }
235
236 private:
237 static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
238 static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
239 };
240
241 CallTracerInterface::TransportByteSize FakeServerCallTracer::incoming_bytes_;
242 CallTracerInterface::TransportByteSize FakeServerCallTracer::outgoing_bytes_;
243
244 // TODO(yijiem): figure out how to reuse FakeStatsPlugin instead of
245 // inheriting and overriding it here.
246 class NewFakeStatsPlugin : public FakeStatsPlugin {
247 public:
GetClientCallTracer(const Slice &,bool,std::shared_ptr<StatsPlugin::ScopeConfig>)248 ClientCallTracer* GetClientCallTracer(
249 const Slice& /*path*/, bool /*registered_method*/,
250 std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
251 return GetContext<Arena>()->ManagedNew<FakeCallTracer>();
252 }
GetServerCallTracer(std::shared_ptr<StatsPlugin::ScopeConfig>)253 ServerCallTracer* GetServerCallTracer(
254 std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
255 return GetContext<Arena>()->ManagedNew<FakeServerCallTracer>();
256 }
257 };
258
259 // This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest,StreamStats)260 CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
261 g_mu = new Mutex();
262 g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
263 g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
264 GlobalStatsPluginRegistryTestPeer::ResetGlobalStatsPluginRegistry();
265 GlobalStatsPluginRegistry::RegisterStatsPlugin(
266 std::make_shared<NewFakeStatsPlugin>());
267 auto send_from_client = RandomSlice(10);
268 auto send_from_server = RandomSlice(20);
269 IncomingStatusOnClient server_status;
270 IncomingMetadata server_initial_metadata;
271 IncomingMessage server_message;
272 IncomingMessage client_message;
273 IncomingCloseOnServer client_close;
274 {
275 auto c = NewClientCall("/foo").Timeout(Duration::Minutes(5)).Create();
276 c.NewBatch(1)
277 .SendInitialMetadata({})
278 .SendMessage(send_from_client.Ref())
279 .SendCloseFromClient()
280 .RecvInitialMetadata(server_initial_metadata)
281 .RecvMessage(server_message)
282 .RecvStatusOnClient(server_status);
283 auto s = RequestCall(101);
284 Expect(101, true);
285 Step(Duration::Minutes(1));
286 s.NewBatch(102).SendInitialMetadata({}).RecvMessage(client_message);
287 Expect(102, true);
288 Step(Duration::Minutes(1));
289 s.NewBatch(103)
290 .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
291 .SendMessage(send_from_server.Ref())
292 .RecvCloseOnServer(client_close);
293 Expect(103, true);
294 Expect(1, true);
295 Step(Duration::Minutes(1));
296 EXPECT_EQ(s.method(), "/foo");
297 }
298 EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
299 EXPECT_EQ(server_status.message(), "xyz");
300 EXPECT_FALSE(client_close.was_cancelled());
301 EXPECT_EQ(client_message.payload(), send_from_client);
302 EXPECT_EQ(server_message.payload(), send_from_server);
303 // Make sure that the calls have ended for the stats to have been collected
304 g_client_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
305 g_server_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
306
307 auto client_outgoing_transport_stats =
308 FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes();
309 auto client_incoming_transport_stats =
310 FakeCallTracer::FakeCallAttemptTracer::incoming_bytes();
311 auto server_outgoing_transport_stats = FakeServerCallTracer::outgoing_bytes();
312 auto server_incoming_transport_stats = FakeServerCallTracer::incoming_bytes();
313 EXPECT_EQ(client_outgoing_transport_stats.data_bytes,
314 send_from_client.size());
315 EXPECT_EQ(client_incoming_transport_stats.data_bytes,
316 send_from_server.size());
317 EXPECT_EQ(server_outgoing_transport_stats.data_bytes,
318 send_from_server.size());
319 EXPECT_EQ(server_incoming_transport_stats.data_bytes,
320 send_from_client.size());
321 // At the very minimum, we should have 9 bytes from initial header frame, 9
322 // bytes from data header frame, 5 bytes from the grpc header on data and 9
323 // bytes from the trailing header frame. The actual number might be more due
324 // to RST_STREAM (13 bytes) and WINDOW_UPDATE (13 bytes) frames.
325 EXPECT_GE(client_outgoing_transport_stats.framing_bytes, 32);
326 EXPECT_LE(client_outgoing_transport_stats.framing_bytes, 58);
327 EXPECT_GE(client_incoming_transport_stats.framing_bytes, 32);
328 EXPECT_LE(client_incoming_transport_stats.framing_bytes, 58);
329 EXPECT_GE(server_outgoing_transport_stats.framing_bytes, 32);
330 EXPECT_LE(server_outgoing_transport_stats.framing_bytes, 58);
331 EXPECT_GE(server_incoming_transport_stats.framing_bytes, 32);
332 EXPECT_LE(server_incoming_transport_stats.framing_bytes, 58);
333
334 delete g_client_call_ended_notify;
335 g_client_call_ended_notify = nullptr;
336 delete g_server_call_ended_notify;
337 g_server_call_ended_notify = nullptr;
338 delete g_mu;
339 g_mu = nullptr;
340 }
341
342 } // namespace
343 } // namespace grpc_core
344