1 // 2 // 3 // Copyright 2023 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 20 #define GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 21 22 #include <grpc/support/port_platform.h> 23 #include <grpcpp/generic/generic_stub.h> 24 #include <grpcpp/grpcpp.h> 25 26 #include <atomic> 27 #include <thread> 28 29 #include "absl/functional/any_invocable.h" 30 #include "gmock/gmock.h" 31 #include "gtest/gtest.h" 32 #include "opentelemetry/metrics/provider.h" 33 #include "opentelemetry/sdk/metrics/meter_provider.h" 34 #include "opentelemetry/sdk/metrics/metric_reader.h" 35 #include "src/core/config/core_configuration.h" 36 #include "src/core/telemetry/call_tracer.h" 37 #include "src/cpp/ext/otel/otel_plugin.h" 38 #include "test/core/test_util/test_config.h" 39 #include "test/cpp/end2end/test_service_impl.h" 40 41 namespace grpc { 42 namespace testing { 43 44 class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader { 45 public: GetAggregationTemporality(opentelemetry::sdk::metrics::InstrumentType)46 opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality( 47 opentelemetry::sdk::metrics::InstrumentType) const noexcept override { 48 return opentelemetry::sdk::metrics::AggregationTemporality::kDelta; 49 } 50 OnForceFlush(std::chrono::microseconds)51 bool OnForceFlush(std::chrono::microseconds) noexcept override { 52 return true; 53 } 54 OnShutDown(std::chrono::microseconds)55 bool OnShutDown(std::chrono::microseconds) noexcept override { return true; } 56 OnInitialized()57 void OnInitialized() noexcept override {} 58 }; 59 60 class OpenTelemetryPluginEnd2EndTest : public ::testing::Test { 61 protected: 62 struct Options { 63 public: set_metric_namesOptions64 Options& set_metric_names(std::vector<absl::string_view> names) { 65 metric_names = std::move(names); 66 return *this; 67 } 68 set_resourceOptions69 Options& set_resource(const opentelemetry::sdk::resource::Resource& res) { 70 resource = std::make_unique<opentelemetry::sdk::resource::Resource>(res); 71 return *this; 72 } 73 set_use_meter_providerOptions74 Options& set_use_meter_provider(bool flag) { 75 use_meter_provider = flag; 76 return *this; 77 } 78 set_labels_to_injectOptions79 Options& set_labels_to_inject( 80 std::map< 81 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 82 grpc_core::RefCountedStringValue> 83 labels) { 84 labels_to_inject = std::move(labels); 85 return *this; 86 } 87 set_service_configOptions88 Options& set_service_config(std::string svc_cfg) { 89 service_config = std::move(svc_cfg); 90 return *this; 91 } 92 set_channel_scope_filterOptions93 Options& set_channel_scope_filter( 94 absl::AnyInvocable<bool( 95 const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> 96 func) { 97 channel_scope_filter = std::move(func); 98 return *this; 99 } 100 set_server_selectorOptions101 Options& set_server_selector( 102 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*channel_args*/) 103 const> 104 func) { 105 server_selector = std::move(func); 106 return *this; 107 } 108 set_target_attribute_filterOptions109 Options& set_target_attribute_filter( 110 absl::AnyInvocable<bool(absl::string_view /*target*/) const> func) { 111 target_attribute_filter = std::move(func); 112 return *this; 113 } 114 set_generic_method_attribute_filterOptions115 Options& set_generic_method_attribute_filter( 116 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> 117 func) { 118 generic_method_attribute_filter = std::move(func); 119 return *this; 120 } 121 add_plugin_optionOptions122 Options& add_plugin_option( 123 std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption> 124 option) { 125 plugin_options.push_back(std::move(option)); 126 return *this; 127 } 128 add_optional_labelOptions129 Options& add_optional_label(absl::string_view optional_label_key) { 130 optional_label_keys.emplace(optional_label_key); 131 return *this; 132 } 133 add_per_channel_stats_pluginOptions134 Options& add_per_channel_stats_plugin( 135 std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin) { 136 per_channel_stats_plugins.emplace_back(std::move(plugin)); 137 return *this; 138 } 139 add_per_server_stats_pluginOptions140 Options& add_per_server_stats_plugin( 141 std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin) { 142 per_server_stats_plugins.emplace_back(std::move(plugin)); 143 return *this; 144 } 145 146 std::vector<absl::string_view> metric_names; 147 // TODO(yashykt): opentelemetry::sdk::resource::Resource doesn't have a copy 148 // assignment operator so wrapping it in a unique_ptr till it is fixed. 149 std::unique_ptr<opentelemetry::sdk::resource::Resource> resource = 150 std::make_unique<opentelemetry::sdk::resource::Resource>( 151 opentelemetry::sdk::resource::Resource::Create({})); 152 std::unique_ptr<grpc::internal::LabelsInjector> labels_injector; 153 bool use_meter_provider = true; 154 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 155 grpc_core::RefCountedStringValue> 156 labels_to_inject; 157 std::string service_config; 158 absl::AnyInvocable<bool( 159 const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> 160 channel_scope_filter; 161 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*channel_args*/) 162 const> 163 server_selector; 164 absl::AnyInvocable<bool(absl::string_view /*target*/) const> 165 target_attribute_filter; 166 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> 167 generic_method_attribute_filter; 168 std::vector< 169 std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>> 170 plugin_options; 171 absl::flat_hash_set<absl::string_view> optional_label_keys; 172 std::vector<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>> 173 per_channel_stats_plugins; 174 std::vector<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>> 175 per_server_stats_plugins; 176 }; 177 178 class MetricsCollectorThread { 179 public: 180 using ResultType = absl::flat_hash_map< 181 std::string, 182 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>; 183 MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest* test, 184 grpc_core::Duration interval, int iterations, 185 std::function<bool(const ResultType&)> predicate); 186 ~MetricsCollectorThread(); 187 const ResultType& Stop(); 188 189 private: 190 void Run(); 191 192 OpenTelemetryPluginEnd2EndTest* test_; 193 grpc_core::Duration interval_; 194 int iterations_; 195 std::function<bool(const ResultType&)> predicate_; 196 ResultType data_points_; 197 std::atomic_bool finished_{false}; 198 std::thread thread_; 199 }; 200 201 static std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> 202 ConfigureOTBuilder( 203 OpenTelemetryPluginEnd2EndTest::Options options, 204 grpc::internal::OpenTelemetryPluginBuilderImpl* ot_builder); 205 206 // Note that we can't use SetUp() here since we want to send in parameters. 207 void Init(Options config); 208 209 void TearDown() override; 210 211 void ResetStub(std::shared_ptr<Channel> channel); 212 213 void SendRPC(); 214 void SendGenericRPC(); 215 216 std::pair<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>, 217 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>> 218 BuildOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options); 219 220 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> 221 BuildAndRegisterOpenTelemetryPlugin( 222 OpenTelemetryPluginEnd2EndTest::Options options); 223 224 absl::flat_hash_map< 225 std::string, 226 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> 227 ReadCurrentMetricsData( 228 absl::AnyInvocable< 229 bool(const absl::flat_hash_map< 230 std::string, 231 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)> 232 continue_predicate, 233 opentelemetry::sdk::metrics::MetricReader* reader = nullptr); 234 235 const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo"; 236 const absl::string_view kGenericMethodName = "foo/bar"; 237 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 238 grpc_core::RefCountedStringValue> 239 labels_to_inject_; 240 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_; 241 std::string server_address_; 242 std::string canonical_server_address_; 243 CallbackTestServiceImpl service_; 244 std::unique_ptr<grpc::Server> server_; 245 std::unique_ptr<EchoTestService::Stub> stub_; 246 std::unique_ptr<grpc::GenericStub> generic_stub_; 247 }; 248 249 } // namespace testing 250 } // namespace grpc 251 252 #endif // GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 253