• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/ext/otel/otel_test_library.h"
20 
21 #include <grpcpp/grpcpp.h>
22 
23 #include <atomic>
24 #include <memory>
25 
26 #include "absl/functional/any_invocable.h"
27 #include "gmock/gmock.h"
28 #include "gtest/gtest.h"
29 #include "opentelemetry/metrics/provider.h"
30 #include "opentelemetry/sdk/metrics/export/metric_producer.h"
31 #include "opentelemetry/sdk/metrics/meter_provider.h"
32 #include "opentelemetry/sdk/metrics/metric_reader.h"
33 #include "src/core/config/core_configuration.h"
34 #include "src/core/lib/channel/promise_based_filter.h"
35 #include "src/core/telemetry/call_tracer.h"
36 #include "src/core/util/notification.h"
37 #include "test/core/test_util/fake_stats_plugin.h"
38 #include "test/core/test_util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 
42 namespace grpc {
43 namespace testing {
44 
45 #define GRPC_ARG_LABELS_TO_INJECT "grpc.testing.labels_to_inject"
46 
47 // A subchannel filter that adds the service labels for test to the
48 // CallAttemptTracer in a call.
49 class AddLabelsFilter : public grpc_core::ChannelFilter {
50  public:
51   static const grpc_channel_filter kFilter;
52 
TypeName()53   static absl::string_view TypeName() { return "add_service_labels_filter"; }
54 
AddLabelsFilter(std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,grpc_core::RefCountedStringValue> labels_to_inject)55   explicit AddLabelsFilter(
56       std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
57                grpc_core::RefCountedStringValue>
58           labels_to_inject)
59       : labels_to_inject_(std::move(labels_to_inject)) {}
60 
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)61   static absl::StatusOr<std::unique_ptr<AddLabelsFilter>> Create(
62       const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
63     return absl::make_unique<AddLabelsFilter>(
64         *args.GetPointer<std::map<
65              grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
66              grpc_core::RefCountedStringValue>>(GRPC_ARG_LABELS_TO_INJECT));
67   }
68 
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)69   grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
70       grpc_core::CallArgs call_args,
71       grpc_core::NextPromiseFactory next_promise_factory) override {
72     using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
73     auto* call_tracer = grpc_core::GetContext<CallAttemptTracer>();
74     EXPECT_NE(call_tracer, nullptr);
75     for (const auto& pair : labels_to_inject_) {
76       call_tracer->SetOptionalLabel(pair.first, pair.second);
77     }
78     return next_promise_factory(std::move(call_args));
79   }
80 
81  private:
82   const std::map<
83       grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
84       grpc_core::RefCountedStringValue>
85       labels_to_inject_;
86 };
87 
88 const grpc_channel_filter AddLabelsFilter::kFilter =
89     grpc_core::MakePromiseBasedFilter<AddLabelsFilter,
90                                       grpc_core::FilterEndpoint::kClient>();
91 
MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest * test,grpc_core::Duration interval,int iterations,std::function<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> predicate)92 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::MetricsCollectorThread(
93     OpenTelemetryPluginEnd2EndTest* test, grpc_core::Duration interval,
94     int iterations,
95     std::function<
96         bool(const absl::flat_hash_map<
97              std::string,
98              std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
99         predicate)
100     : test_(test),
101       interval_(interval),
102       iterations_(iterations),
103       predicate_(std::move(predicate)),
104       thread_(&MetricsCollectorThread::Run, this) {}
105 
106 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::
~MetricsCollectorThread()107     ~MetricsCollectorThread() {
108   if (!finished_) {
109     thread_.join();
110   }
111 }
112 
Run()113 void OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Run() {
114   int i = 0;
115   while (i++ < iterations_ || (iterations_ == -1 && !finished_)) {
116     auto data_points = test_->ReadCurrentMetricsData(predicate_);
117     for (auto data : data_points) {
118       auto iter = data_points_.find(data.first);
119       if (iter == data_points_.end()) {
120         data_points_[data.first] = std::move(data.second);
121       } else {
122         for (auto point : data.second) {
123           iter->second.push_back(std::move(point));
124         }
125       }
126     }
127     absl::SleepFor(absl::Milliseconds(interval_.millis()));
128   }
129 }
130 
131 const OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::ResultType&
Stop()132 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Stop() {
133   finished_ = true;
134   thread_.join();
135   return data_points_;
136 }
137 
Init(Options config)138 void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
139   grpc_core::CoreConfiguration::Reset();
140   ChannelArguments channel_args;
141   if (!config.labels_to_inject.empty()) {
142     labels_to_inject_ = std::move(config.labels_to_inject);
143     grpc_core::CoreConfiguration::RegisterBuilder(
144         [](grpc_core::CoreConfiguration::Builder* builder) mutable {
145           builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
146                                                   &AddLabelsFilter::kFilter);
147         });
148     channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
149   }
150   if (!config.service_config.empty()) {
151     channel_args.SetString(GRPC_ARG_SERVICE_CONFIG, config.service_config);
152   }
153   grpc_init();
154   grpc::ServerBuilder builder;
155   int port;
156   // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
157   builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
158                            &port);
159   builder.RegisterService(&service_);
160   for (auto& per_server_stats_plugin : config.per_server_stats_plugins) {
161     per_server_stats_plugin->AddToServerBuilder(&builder);
162   }
163   server_ = builder.BuildAndStart();
164   ASSERT_NE(nullptr, server_);
165   ASSERT_NE(0, port);
166   server_address_ = absl::StrCat("localhost:", port);
167   canonical_server_address_ = absl::StrCat("dns:///", server_address_);
168   for (auto& per_channel_stats_plugin : config.per_channel_stats_plugins) {
169     per_channel_stats_plugin->AddToChannelArguments(&channel_args);
170   }
171   reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
172 
173   auto channel = grpc::CreateCustomChannel(
174       server_address_, grpc::InsecureChannelCredentials(), channel_args);
175   stub_ = EchoTestService::NewStub(channel);
176   generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
177 }
178 
TearDown()179 void OpenTelemetryPluginEnd2EndTest::TearDown() {
180   server_->Shutdown();
181   grpc_shutdown_blocking();
182   grpc_core::ServerCallTracerFactory::TestOnlyReset();
183   grpc_core::GlobalStatsPluginRegistryTestPeer::
184       ResetGlobalStatsPluginRegistry();
185 }
186 
ResetStub(std::shared_ptr<Channel> channel)187 void OpenTelemetryPluginEnd2EndTest::ResetStub(
188     std::shared_ptr<Channel> channel) {
189   stub_ = EchoTestService::NewStub(channel);
190   generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
191 }
192 
SendRPC()193 void OpenTelemetryPluginEnd2EndTest::SendRPC() {
194   EchoRequest request;
195   request.set_message("foo");
196   EchoResponse response;
197   grpc::ClientContext context;
198   grpc::Status status = stub_->Echo(&context, request, &response);
199 }
200 
SendGenericRPC()201 void OpenTelemetryPluginEnd2EndTest::SendGenericRPC() {
202   grpc::ClientContext context;
203   EchoRequest request;
204   std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
205   ByteBuffer recv_buf;
206   grpc_core::Notification notify;
207   generic_stub_->UnaryCall(&context, absl::StrCat("/", kGenericMethodName),
208                            StubOptions(), send_buf.get(), &recv_buf,
209                            [&](grpc::Status /*s*/) { notify.Notify(); });
210   notify.WaitForNotificationWithTimeout(absl::Seconds(5));
211 }
212 
213 absl::flat_hash_map<
214     std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(absl::AnyInvocable<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> continue_predicate,opentelemetry::sdk::metrics::MetricReader * reader)215 OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
216     absl::AnyInvocable<
217         bool(const absl::flat_hash_map<
218              std::string,
219              std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
220         continue_predicate,
221     opentelemetry::sdk::metrics::MetricReader* reader) {
222   if (reader == nullptr) {
223     reader = reader_.get();
224   }
225   absl::flat_hash_map<
226       std::string,
227       std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
228       data;
229   auto deadline = absl::Now() + absl::Seconds(5);
230   do {
231     reader->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
232       for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
233            rm.scope_metric_data_) {
234         for (const opentelemetry::sdk::metrics::MetricData& md :
235              smd.metric_data_) {
236           for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
237                md.point_data_attr_) {
238             data[md.instrument_descriptor.name_].push_back(dp);
239           }
240         }
241       }
242       return true;
243     });
244   } while (continue_predicate(data) && deadline > absl::Now());
245   return data;
246 }
247 
248 std::pair<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>,
249           std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>>
BuildOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)250 OpenTelemetryPluginEnd2EndTest::BuildOpenTelemetryPlugin(
251     OpenTelemetryPluginEnd2EndTest::Options options) {
252   grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
253   auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
254   auto plugin = ot_builder.Build();
255   EXPECT_TRUE(plugin.ok());
256   return {*plugin, reader};
257 }
258 
259 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
BuildAndRegisterOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)260 OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
261     OpenTelemetryPluginEnd2EndTest::Options options) {
262   grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
263   absl::Status expected_status;
264   if (!options.use_meter_provider) {
265     expected_status =
266         absl::InvalidArgumentError("Need to configure a valid meter provider.");
267   }
268   auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
269   EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), expected_status);
270   return reader;
271 }
272 
273 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
ConfigureOTBuilder(OpenTelemetryPluginEnd2EndTest::Options options,grpc::internal::OpenTelemetryPluginBuilderImpl * ot_builder)274 OpenTelemetryPluginEnd2EndTest::ConfigureOTBuilder(
275     OpenTelemetryPluginEnd2EndTest::Options options,
276     grpc::internal::OpenTelemetryPluginBuilderImpl* ot_builder) {
277   // We are resetting the MeterProvider and OpenTelemetry plugin at the start
278   // of each test to avoid test results from one test carrying over to another
279   // test. (Some measurements can get arbitrarily delayed.)
280   auto meter_provider =
281       std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(
282           std::make_unique<opentelemetry::sdk::metrics::ViewRegistry>(),
283           *options.resource);
284   std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader =
285       std::make_shared<grpc::testing::MockMetricReader>();
286   meter_provider->AddMetricReader(reader);
287   ot_builder->DisableAllMetrics();
288   ot_builder->EnableMetrics(options.metric_names);
289   if (options.use_meter_provider) {
290     auto meter_provider =
291         std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
292     reader = std::make_shared<grpc::testing::MockMetricReader>();
293     meter_provider->AddMetricReader(reader);
294     ot_builder->SetMeterProvider(std::move(meter_provider));
295   }
296   ot_builder->SetChannelScopeFilter(std::move(options.channel_scope_filter));
297   ot_builder->SetServerSelector(std::move(options.server_selector));
298   ot_builder->SetTargetAttributeFilter(
299       std::move(options.target_attribute_filter));
300   ot_builder->SetGenericMethodAttributeFilter(
301       std::move(options.generic_method_attribute_filter));
302   for (auto& option : options.plugin_options) {
303     ot_builder->AddPluginOption(std::move(option));
304   }
305   for (auto& optional_label_key : options.optional_label_keys) {
306     ot_builder->AddOptionalLabel(optional_label_key);
307   }
308   return reader;
309 }
310 
311 }  // namespace testing
312 }  // namespace grpc
313