• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/status.h>
16 #include <grpc/support/time.h>
17 
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/time/time.h"
28 #include "gtest/gtest.h"
29 #include "src/core/config/core_configuration.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/channel_fwd.h"
32 #include "src/core/lib/channel/channel_stack.h"
33 #include "src/core/lib/channel/promise_based_filter.h"
34 #include "src/core/lib/experiments/experiments.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/promise/arena_promise.h"
37 #include "src/core/lib/promise/context.h"
38 #include "src/core/lib/resource_quota/arena.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_buffer.h"
41 #include "src/core/lib/surface/channel_stack_type.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/transport.h"
44 #include "src/core/telemetry/call_tracer.h"
45 #include "src/core/telemetry/metrics.h"
46 #include "src/core/telemetry/tcp_tracer.h"
47 #include "src/core/util/notification.h"
48 #include "src/core/util/sync.h"
49 #include "src/core/util/time.h"
50 #include "test/core/end2end/end2end_tests.h"
51 #include "test/core/test_util/fake_stats_plugin.h"
52 
53 namespace grpc_core {
54 namespace {
55 
56 Mutex* g_mu;
57 CoreEnd2endTest::TestNotification* g_client_call_ended_notify;
58 CoreEnd2endTest::TestNotification* g_server_call_ended_notify;
59 
60 class FakeCallTracer : public ClientCallTracer {
61  public:
62   class FakeCallAttemptTracer : public CallAttemptTracer {
63    public:
FakeCallAttemptTracer()64     FakeCallAttemptTracer() {
65       MutexLock lock(g_mu);
66       incoming_bytes_ = TransportByteSize();
67       outgoing_bytes_ = TransportByteSize();
68     }
TraceId()69     std::string TraceId() override { return ""; }
SpanId()70     std::string SpanId() override { return ""; }
IsSampled()71     bool IsSampled() override { return false; }
RecordSendInitialMetadata(grpc_metadata_batch *)72     void RecordSendInitialMetadata(
73         grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)74     void RecordSendTrailingMetadata(
75         grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)76     void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)77     void RecordSendCompressedMessage(
78         const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)79     void RecordReceivedInitialMetadata(
80         grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)81     void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)82     void RecordReceivedDecompressedMessage(
83         const SliceBuffer& /*recv_decompressed_message*/) override {}
84 
RecordReceivedTrailingMetadata(absl::Status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)85     void RecordReceivedTrailingMetadata(
86         absl::Status /*status*/,
87         grpc_metadata_batch* /*recv_trailing_metadata*/,
88         const grpc_transport_stream_stats* transport_stream_stats) override {
89       if (IsCallTracerInTransportEnabled()) return;
90       TransportByteSize incoming_bytes = {
91           transport_stream_stats->incoming.framing_bytes,
92           transport_stream_stats->incoming.data_bytes,
93           transport_stream_stats->incoming.header_bytes};
94       TransportByteSize outgoing_bytes = {
95           transport_stream_stats->outgoing.framing_bytes,
96           transport_stream_stats->outgoing.data_bytes,
97           transport_stream_stats->outgoing.header_bytes};
98       MutexLock lock(g_mu);
99       incoming_bytes_ = incoming_bytes;
100       outgoing_bytes_ = outgoing_bytes;
101     }
102 
RecordIncomingBytes(const TransportByteSize & transport_byte_size)103     void RecordIncomingBytes(
104         const TransportByteSize& transport_byte_size) override {
105       MutexLock lock(g_mu);
106       incoming_bytes_ += transport_byte_size;
107     }
108 
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)109     void RecordOutgoingBytes(
110         const TransportByteSize& transport_byte_size) override {
111       MutexLock lock(g_mu);
112       outgoing_bytes_ += transport_byte_size;
113     }
114 
RecordCancel(grpc_error_handle)115     void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()116     std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
117       return nullptr;
118     }
RecordEnd(const gpr_timespec &)119     void RecordEnd(const gpr_timespec& /*latency*/) override {
120       g_client_call_ended_notify->Notify();
121       delete this;
122     }
RecordAnnotation(absl::string_view)123     void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)124     void RecordAnnotation(const Annotation& /*annotation*/) override {}
125 
SetOptionalLabel(OptionalLabelKey,RefCountedStringValue)126     void SetOptionalLabel(OptionalLabelKey /*key*/,
127                           RefCountedStringValue /*value*/) override {}
128 
incoming_bytes()129     static TransportByteSize incoming_bytes() {
130       MutexLock lock(g_mu);
131       return incoming_bytes_;
132     }
133 
outgoing_bytes()134     static TransportByteSize outgoing_bytes() {
135       MutexLock lock(g_mu);
136       return outgoing_bytes_;
137     }
138 
139    private:
140     static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
141     static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
142   };
143 
FakeCallTracer()144   explicit FakeCallTracer() {}
~FakeCallTracer()145   ~FakeCallTracer() override {}
TraceId()146   std::string TraceId() override { return ""; }
SpanId()147   std::string SpanId() override { return ""; }
IsSampled()148   bool IsSampled() override { return false; }
149 
StartNewAttempt(bool)150   FakeCallAttemptTracer* StartNewAttempt(
151       bool /*is_transparent_retry*/) override {
152     return new FakeCallAttemptTracer;
153   }
154 
RecordAnnotation(absl::string_view)155   void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)156   void RecordAnnotation(const Annotation& /*annotation*/) override {}
157 };
158 
159 CallTracerInterface::TransportByteSize
160     FakeCallTracer::FakeCallAttemptTracer::incoming_bytes_;
161 CallTracerInterface::TransportByteSize
162     FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes_;
163 
164 class FakeServerCallTracer : public ServerCallTracer {
165  public:
FakeServerCallTracer()166   FakeServerCallTracer() {
167     MutexLock lock(g_mu);
168     incoming_bytes_ = TransportByteSize();
169     outgoing_bytes_ = TransportByteSize();
170   }
~FakeServerCallTracer()171   ~FakeServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch *)172   void RecordSendInitialMetadata(
173       grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)174   void RecordSendTrailingMetadata(
175       grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)176   void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)177   void RecordSendCompressedMessage(
178       const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)179   void RecordReceivedInitialMetadata(
180       grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)181   void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)182   void RecordReceivedDecompressedMessage(
183       const SliceBuffer& /*recv_decompressed_message*/) override {}
RecordCancel(grpc_error_handle)184   void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()185   std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
186     return nullptr;
187   }
RecordReceivedTrailingMetadata(grpc_metadata_batch *)188   void RecordReceivedTrailingMetadata(
189       grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
190 
RecordEnd(const grpc_call_final_info * final_info)191   void RecordEnd(const grpc_call_final_info* final_info) override {
192     if (!IsCallTracerInTransportEnabled()) {
193       TransportByteSize incoming_bytes = {
194           final_info->stats.transport_stream_stats.incoming.framing_bytes,
195           final_info->stats.transport_stream_stats.incoming.data_bytes,
196           final_info->stats.transport_stream_stats.incoming.header_bytes};
197       TransportByteSize outgoing_bytes = {
198           final_info->stats.transport_stream_stats.outgoing.framing_bytes,
199           final_info->stats.transport_stream_stats.outgoing.data_bytes,
200           final_info->stats.transport_stream_stats.outgoing.header_bytes};
201       MutexLock lock(g_mu);
202       incoming_bytes_ = incoming_bytes;
203       outgoing_bytes_ = outgoing_bytes;
204     }
205     g_server_call_ended_notify->Notify();
206   }
207 
RecordIncomingBytes(const TransportByteSize & transport_byte_size)208   void RecordIncomingBytes(
209       const TransportByteSize& transport_byte_size) override {
210     MutexLock lock(g_mu);
211     incoming_bytes_ += transport_byte_size;
212   }
213 
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)214   void RecordOutgoingBytes(
215       const TransportByteSize& transport_byte_size) override {
216     MutexLock lock(g_mu);
217     outgoing_bytes_ += transport_byte_size;
218   }
219 
RecordAnnotation(absl::string_view)220   void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)221   void RecordAnnotation(const Annotation& /*annotation*/) override {}
TraceId()222   std::string TraceId() override { return ""; }
SpanId()223   std::string SpanId() override { return ""; }
IsSampled()224   bool IsSampled() override { return false; }
225 
incoming_bytes()226   static TransportByteSize incoming_bytes() {
227     MutexLock lock(g_mu);
228     return incoming_bytes_;
229   }
230 
outgoing_bytes()231   static TransportByteSize outgoing_bytes() {
232     MutexLock lock(g_mu);
233     return outgoing_bytes_;
234   }
235 
236  private:
237   static TransportByteSize incoming_bytes_ ABSL_GUARDED_BY(g_mu);
238   static TransportByteSize outgoing_bytes_ ABSL_GUARDED_BY(g_mu);
239 };
240 
241 CallTracerInterface::TransportByteSize FakeServerCallTracer::incoming_bytes_;
242 CallTracerInterface::TransportByteSize FakeServerCallTracer::outgoing_bytes_;
243 
244 // TODO(yijiem): figure out how to reuse FakeStatsPlugin instead of
245 // inheriting and overriding it here.
246 class NewFakeStatsPlugin : public FakeStatsPlugin {
247  public:
GetClientCallTracer(const Slice &,bool,std::shared_ptr<StatsPlugin::ScopeConfig>)248   ClientCallTracer* GetClientCallTracer(
249       const Slice& /*path*/, bool /*registered_method*/,
250       std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
251     return GetContext<Arena>()->ManagedNew<FakeCallTracer>();
252   }
GetServerCallTracer(std::shared_ptr<StatsPlugin::ScopeConfig>)253   ServerCallTracer* GetServerCallTracer(
254       std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
255     return GetContext<Arena>()->ManagedNew<FakeServerCallTracer>();
256   }
257 };
258 
259 // This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest,StreamStats)260 CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
261   g_mu = new Mutex();
262   g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
263   g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
264   GlobalStatsPluginRegistryTestPeer::ResetGlobalStatsPluginRegistry();
265   GlobalStatsPluginRegistry::RegisterStatsPlugin(
266       std::make_shared<NewFakeStatsPlugin>());
267   auto send_from_client = RandomSlice(10);
268   auto send_from_server = RandomSlice(20);
269   IncomingStatusOnClient server_status;
270   IncomingMetadata server_initial_metadata;
271   IncomingMessage server_message;
272   IncomingMessage client_message;
273   IncomingCloseOnServer client_close;
274   {
275     auto c = NewClientCall("/foo").Timeout(Duration::Minutes(5)).Create();
276     c.NewBatch(1)
277         .SendInitialMetadata({})
278         .SendMessage(send_from_client.Ref())
279         .SendCloseFromClient()
280         .RecvInitialMetadata(server_initial_metadata)
281         .RecvMessage(server_message)
282         .RecvStatusOnClient(server_status);
283     auto s = RequestCall(101);
284     Expect(101, true);
285     Step(Duration::Minutes(1));
286     s.NewBatch(102).SendInitialMetadata({}).RecvMessage(client_message);
287     Expect(102, true);
288     Step(Duration::Minutes(1));
289     s.NewBatch(103)
290         .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
291         .SendMessage(send_from_server.Ref())
292         .RecvCloseOnServer(client_close);
293     Expect(103, true);
294     Expect(1, true);
295     Step(Duration::Minutes(1));
296     EXPECT_EQ(s.method(), "/foo");
297   }
298   EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
299   EXPECT_EQ(server_status.message(), "xyz");
300   EXPECT_FALSE(client_close.was_cancelled());
301   EXPECT_EQ(client_message.payload(), send_from_client);
302   EXPECT_EQ(server_message.payload(), send_from_server);
303   // Make sure that the calls have ended for the stats to have been collected
304   g_client_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
305   g_server_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
306 
307   auto client_outgoing_transport_stats =
308       FakeCallTracer::FakeCallAttemptTracer::outgoing_bytes();
309   auto client_incoming_transport_stats =
310       FakeCallTracer::FakeCallAttemptTracer::incoming_bytes();
311   auto server_outgoing_transport_stats = FakeServerCallTracer::outgoing_bytes();
312   auto server_incoming_transport_stats = FakeServerCallTracer::incoming_bytes();
313   EXPECT_EQ(client_outgoing_transport_stats.data_bytes,
314             send_from_client.size());
315   EXPECT_EQ(client_incoming_transport_stats.data_bytes,
316             send_from_server.size());
317   EXPECT_EQ(server_outgoing_transport_stats.data_bytes,
318             send_from_server.size());
319   EXPECT_EQ(server_incoming_transport_stats.data_bytes,
320             send_from_client.size());
321   // At the very minimum, we should have 9 bytes from initial header frame, 9
322   // bytes from data header frame, 5 bytes from the grpc header on data and 9
323   // bytes from the trailing header frame. The actual number might be more due
324   // to RST_STREAM (13 bytes) and WINDOW_UPDATE (13 bytes) frames.
325   EXPECT_GE(client_outgoing_transport_stats.framing_bytes, 32);
326   EXPECT_LE(client_outgoing_transport_stats.framing_bytes, 58);
327   EXPECT_GE(client_incoming_transport_stats.framing_bytes, 32);
328   EXPECT_LE(client_incoming_transport_stats.framing_bytes, 58);
329   EXPECT_GE(server_outgoing_transport_stats.framing_bytes, 32);
330   EXPECT_LE(server_outgoing_transport_stats.framing_bytes, 58);
331   EXPECT_GE(server_incoming_transport_stats.framing_bytes, 32);
332   EXPECT_LE(server_incoming_transport_stats.framing_bytes, 58);
333 
334   delete g_client_call_ended_notify;
335   g_client_call_ended_notify = nullptr;
336   delete g_server_call_ended_notify;
337   g_server_call_ended_notify = nullptr;
338   delete g_mu;
339   g_mu = nullptr;
340 }
341 
342 }  // namespace
343 }  // namespace grpc_core
344