1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_LOAD_BALANCING_OOB_BACKEND_METRIC_INTERNAL_H 18 #define GRPC_SRC_CORE_LOAD_BALANCING_OOB_BACKEND_METRIC_INTERNAL_H 19 20 #include <grpc/impl/connectivity_state.h> 21 #include <grpc/support/port_platform.h> 22 23 #include <memory> 24 #include <set> 25 #include <utility> 26 27 #include "absl/base/thread_annotations.h" 28 #include "absl/strings/string_view.h" 29 #include "src/core/client_channel/subchannel.h" 30 #include "src/core/client_channel/subchannel_interface_internal.h" 31 #include "src/core/client_channel/subchannel_stream_client.h" 32 #include "src/core/load_balancing/backend_metric_data.h" 33 #include "src/core/load_balancing/oob_backend_metric.h" 34 #include "src/core/util/orphanable.h" 35 #include "src/core/util/ref_counted_ptr.h" 36 #include "src/core/util/sync.h" 37 #include "src/core/util/time.h" 38 #include "src/core/util/unique_type_name.h" 39 40 namespace grpc_core { 41 42 class OrcaWatcher; 43 44 // This producer is registered with a subchannel. It creates a 45 // streaming ORCA call and reports the resulting backend metrics to all 46 // registered watchers. 47 class OrcaProducer final : public Subchannel::DataProducerInterface { 48 public: 49 void Start(RefCountedPtr<Subchannel> subchannel); 50 Type()51 static UniqueTypeName Type() { 52 static UniqueTypeName::Factory kFactory("orca"); 53 return kFactory.Create(); 54 } 55 type()56 UniqueTypeName type() const override { return Type(); } 57 58 // Adds and removes watchers. 59 void AddWatcher(OrcaWatcher* watcher); 60 void RemoveWatcher(OrcaWatcher* watcher); 61 62 private: 63 class ConnectivityWatcher; 64 class OrcaStreamEventHandler; 65 66 void Orphaned() override; 67 68 // Returns the minimum requested reporting interval across all watchers. 69 Duration GetMinIntervalLocked() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 70 71 // Starts a new stream if we have a connected subchannel. 72 // Called whenever the reporting interval changes or the subchannel 73 // transitions to state READY. 74 void MaybeStartStreamLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 75 76 // Handles a connectivity state change on the subchannel. 77 void OnConnectivityStateChange(grpc_connectivity_state state); 78 79 // Called to notify watchers of a new backend metric report. 80 void NotifyWatchers(const BackendMetricData& backend_metric_data); 81 82 RefCountedPtr<Subchannel> subchannel_; 83 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 84 ConnectivityWatcher* connectivity_watcher_; 85 Mutex mu_; 86 std::set<OrcaWatcher*> watchers_ ABSL_GUARDED_BY(mu_); 87 Duration report_interval_ ABSL_GUARDED_BY(mu_) = Duration::Infinity(); 88 OrphanablePtr<SubchannelStreamClient> stream_client_ ABSL_GUARDED_BY(mu_); 89 }; 90 91 // This watcher is returned to the LB policy and added to the 92 // client channel SubchannelWrapper. 93 class OrcaWatcher final : public InternalSubchannelDataWatcherInterface { 94 public: OrcaWatcher(Duration report_interval,std::unique_ptr<OobBackendMetricWatcher> watcher)95 OrcaWatcher(Duration report_interval, 96 std::unique_ptr<OobBackendMetricWatcher> watcher) 97 : report_interval_(report_interval), watcher_(std::move(watcher)) {} 98 ~OrcaWatcher() override; 99 report_interval()100 Duration report_interval() const { return report_interval_; } watcher()101 OobBackendMetricWatcher* watcher() const { return watcher_.get(); } 102 type()103 UniqueTypeName type() const override { return OrcaProducer::Type(); } 104 105 // When the client channel sees this wrapper, it will pass it the real 106 // subchannel to use. 107 void SetSubchannel(Subchannel* subchannel) override; 108 109 private: 110 const Duration report_interval_; 111 std::unique_ptr<OobBackendMetricWatcher> watcher_; 112 RefCountedPtr<OrcaProducer> producer_; 113 }; 114 115 } // namespace grpc_core 116 117 #endif // GRPC_SRC_CORE_LOAD_BALANCING_OOB_BACKEND_METRIC_INTERNAL_H 118