1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/ext/otel/otel_test_library.h"
20
21 #include <grpcpp/grpcpp.h>
22
23 #include <atomic>
24 #include <memory>
25
26 #include "absl/functional/any_invocable.h"
27 #include "gmock/gmock.h"
28 #include "gtest/gtest.h"
29 #include "opentelemetry/metrics/provider.h"
30 #include "opentelemetry/sdk/metrics/export/metric_producer.h"
31 #include "opentelemetry/sdk/metrics/meter_provider.h"
32 #include "opentelemetry/sdk/metrics/metric_reader.h"
33 #include "src/core/config/core_configuration.h"
34 #include "src/core/lib/channel/promise_based_filter.h"
35 #include "src/core/telemetry/call_tracer.h"
36 #include "src/core/util/notification.h"
37 #include "test/core/test_util/fake_stats_plugin.h"
38 #include "test/core/test_util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41
42 namespace grpc {
43 namespace testing {
44
45 #define GRPC_ARG_LABELS_TO_INJECT "grpc.testing.labels_to_inject"
46
47 // A subchannel filter that adds the service labels for test to the
48 // CallAttemptTracer in a call.
49 class AddLabelsFilter : public grpc_core::ChannelFilter {
50 public:
51 static const grpc_channel_filter kFilter;
52
TypeName()53 static absl::string_view TypeName() { return "add_service_labels_filter"; }
54
AddLabelsFilter(std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,grpc_core::RefCountedStringValue> labels_to_inject)55 explicit AddLabelsFilter(
56 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
57 grpc_core::RefCountedStringValue>
58 labels_to_inject)
59 : labels_to_inject_(std::move(labels_to_inject)) {}
60
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)61 static absl::StatusOr<std::unique_ptr<AddLabelsFilter>> Create(
62 const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
63 return absl::make_unique<AddLabelsFilter>(
64 *args.GetPointer<std::map<
65 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
66 grpc_core::RefCountedStringValue>>(GRPC_ARG_LABELS_TO_INJECT));
67 }
68
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)69 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
70 grpc_core::CallArgs call_args,
71 grpc_core::NextPromiseFactory next_promise_factory) override {
72 using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
73 auto* call_tracer = grpc_core::GetContext<CallAttemptTracer>();
74 EXPECT_NE(call_tracer, nullptr);
75 for (const auto& pair : labels_to_inject_) {
76 call_tracer->SetOptionalLabel(pair.first, pair.second);
77 }
78 return next_promise_factory(std::move(call_args));
79 }
80
81 private:
82 const std::map<
83 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
84 grpc_core::RefCountedStringValue>
85 labels_to_inject_;
86 };
87
88 const grpc_channel_filter AddLabelsFilter::kFilter =
89 grpc_core::MakePromiseBasedFilter<AddLabelsFilter,
90 grpc_core::FilterEndpoint::kClient>();
91
MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest * test,grpc_core::Duration interval,int iterations,std::function<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> predicate)92 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::MetricsCollectorThread(
93 OpenTelemetryPluginEnd2EndTest* test, grpc_core::Duration interval,
94 int iterations,
95 std::function<
96 bool(const absl::flat_hash_map<
97 std::string,
98 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
99 predicate)
100 : test_(test),
101 interval_(interval),
102 iterations_(iterations),
103 predicate_(std::move(predicate)),
104 thread_(&MetricsCollectorThread::Run, this) {}
105
106 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::
~MetricsCollectorThread()107 ~MetricsCollectorThread() {
108 if (!finished_) {
109 thread_.join();
110 }
111 }
112
Run()113 void OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Run() {
114 int i = 0;
115 while (i++ < iterations_ || (iterations_ == -1 && !finished_)) {
116 auto data_points = test_->ReadCurrentMetricsData(predicate_);
117 for (auto data : data_points) {
118 auto iter = data_points_.find(data.first);
119 if (iter == data_points_.end()) {
120 data_points_[data.first] = std::move(data.second);
121 } else {
122 for (auto point : data.second) {
123 iter->second.push_back(std::move(point));
124 }
125 }
126 }
127 absl::SleepFor(absl::Milliseconds(interval_.millis()));
128 }
129 }
130
131 const OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::ResultType&
Stop()132 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Stop() {
133 finished_ = true;
134 thread_.join();
135 return data_points_;
136 }
137
Init(Options config)138 void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
139 grpc_core::CoreConfiguration::Reset();
140 ChannelArguments channel_args;
141 if (!config.labels_to_inject.empty()) {
142 labels_to_inject_ = std::move(config.labels_to_inject);
143 grpc_core::CoreConfiguration::RegisterBuilder(
144 [](grpc_core::CoreConfiguration::Builder* builder) mutable {
145 builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
146 &AddLabelsFilter::kFilter);
147 });
148 channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
149 }
150 if (!config.service_config.empty()) {
151 channel_args.SetString(GRPC_ARG_SERVICE_CONFIG, config.service_config);
152 }
153 grpc_init();
154 grpc::ServerBuilder builder;
155 int port;
156 // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
157 builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
158 &port);
159 builder.RegisterService(&service_);
160 for (auto& per_server_stats_plugin : config.per_server_stats_plugins) {
161 per_server_stats_plugin->AddToServerBuilder(&builder);
162 }
163 server_ = builder.BuildAndStart();
164 ASSERT_NE(nullptr, server_);
165 ASSERT_NE(0, port);
166 server_address_ = absl::StrCat("localhost:", port);
167 canonical_server_address_ = absl::StrCat("dns:///", server_address_);
168 for (auto& per_channel_stats_plugin : config.per_channel_stats_plugins) {
169 per_channel_stats_plugin->AddToChannelArguments(&channel_args);
170 }
171 reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
172
173 auto channel = grpc::CreateCustomChannel(
174 server_address_, grpc::InsecureChannelCredentials(), channel_args);
175 stub_ = EchoTestService::NewStub(channel);
176 generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
177 }
178
TearDown()179 void OpenTelemetryPluginEnd2EndTest::TearDown() {
180 server_->Shutdown();
181 grpc_shutdown_blocking();
182 grpc_core::ServerCallTracerFactory::TestOnlyReset();
183 grpc_core::GlobalStatsPluginRegistryTestPeer::
184 ResetGlobalStatsPluginRegistry();
185 }
186
ResetStub(std::shared_ptr<Channel> channel)187 void OpenTelemetryPluginEnd2EndTest::ResetStub(
188 std::shared_ptr<Channel> channel) {
189 stub_ = EchoTestService::NewStub(channel);
190 generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
191 }
192
SendRPC()193 void OpenTelemetryPluginEnd2EndTest::SendRPC() {
194 EchoRequest request;
195 request.set_message("foo");
196 EchoResponse response;
197 grpc::ClientContext context;
198 grpc::Status status = stub_->Echo(&context, request, &response);
199 }
200
SendGenericRPC()201 void OpenTelemetryPluginEnd2EndTest::SendGenericRPC() {
202 grpc::ClientContext context;
203 EchoRequest request;
204 std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
205 ByteBuffer recv_buf;
206 grpc_core::Notification notify;
207 generic_stub_->UnaryCall(&context, absl::StrCat("/", kGenericMethodName),
208 StubOptions(), send_buf.get(), &recv_buf,
209 [&](grpc::Status /*s*/) { notify.Notify(); });
210 notify.WaitForNotificationWithTimeout(absl::Seconds(5));
211 }
212
213 absl::flat_hash_map<
214 std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(absl::AnyInvocable<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> continue_predicate,opentelemetry::sdk::metrics::MetricReader * reader)215 OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
216 absl::AnyInvocable<
217 bool(const absl::flat_hash_map<
218 std::string,
219 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
220 continue_predicate,
221 opentelemetry::sdk::metrics::MetricReader* reader) {
222 if (reader == nullptr) {
223 reader = reader_.get();
224 }
225 absl::flat_hash_map<
226 std::string,
227 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
228 data;
229 auto deadline = absl::Now() + absl::Seconds(5);
230 do {
231 reader->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
232 for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
233 rm.scope_metric_data_) {
234 for (const opentelemetry::sdk::metrics::MetricData& md :
235 smd.metric_data_) {
236 for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
237 md.point_data_attr_) {
238 data[md.instrument_descriptor.name_].push_back(dp);
239 }
240 }
241 }
242 return true;
243 });
244 } while (continue_predicate(data) && deadline > absl::Now());
245 return data;
246 }
247
248 std::pair<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>,
249 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>>
BuildOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)250 OpenTelemetryPluginEnd2EndTest::BuildOpenTelemetryPlugin(
251 OpenTelemetryPluginEnd2EndTest::Options options) {
252 grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
253 auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
254 auto plugin = ot_builder.Build();
255 EXPECT_TRUE(plugin.ok());
256 return {*plugin, reader};
257 }
258
259 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
BuildAndRegisterOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)260 OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
261 OpenTelemetryPluginEnd2EndTest::Options options) {
262 grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
263 absl::Status expected_status;
264 if (!options.use_meter_provider) {
265 expected_status =
266 absl::InvalidArgumentError("Need to configure a valid meter provider.");
267 }
268 auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
269 EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), expected_status);
270 return reader;
271 }
272
273 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
ConfigureOTBuilder(OpenTelemetryPluginEnd2EndTest::Options options,grpc::internal::OpenTelemetryPluginBuilderImpl * ot_builder)274 OpenTelemetryPluginEnd2EndTest::ConfigureOTBuilder(
275 OpenTelemetryPluginEnd2EndTest::Options options,
276 grpc::internal::OpenTelemetryPluginBuilderImpl* ot_builder) {
277 // We are resetting the MeterProvider and OpenTelemetry plugin at the start
278 // of each test to avoid test results from one test carrying over to another
279 // test. (Some measurements can get arbitrarily delayed.)
280 auto meter_provider =
281 std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(
282 std::make_unique<opentelemetry::sdk::metrics::ViewRegistry>(),
283 *options.resource);
284 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader =
285 std::make_shared<grpc::testing::MockMetricReader>();
286 meter_provider->AddMetricReader(reader);
287 ot_builder->DisableAllMetrics();
288 ot_builder->EnableMetrics(options.metric_names);
289 if (options.use_meter_provider) {
290 auto meter_provider =
291 std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
292 reader = std::make_shared<grpc::testing::MockMetricReader>();
293 meter_provider->AddMetricReader(reader);
294 ot_builder->SetMeterProvider(std::move(meter_provider));
295 }
296 ot_builder->SetChannelScopeFilter(std::move(options.channel_scope_filter));
297 ot_builder->SetServerSelector(std::move(options.server_selector));
298 ot_builder->SetTargetAttributeFilter(
299 std::move(options.target_attribute_filter));
300 ot_builder->SetGenericMethodAttributeFilter(
301 std::move(options.generic_method_attribute_filter));
302 for (auto& option : options.plugin_options) {
303 ot_builder->AddPluginOption(std::move(option));
304 }
305 for (auto& optional_label_key : options.optional_label_keys) {
306 ot_builder->AddOptionalLabel(optional_label_key);
307 }
308 return reader;
309 }
310
311 } // namespace testing
312 } // namespace grpc
313