• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/oob_backend_metric.h"
18 
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/slice.h>
21 #include <grpc/status.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 
27 #include <algorithm>
28 #include <set>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/strings/string_view.h"
36 #include "google/protobuf/duration.upb.h"
37 #include "src/core/channelz/channel_trace.h"
38 #include "src/core/client_channel/subchannel.h"
39 #include "src/core/client_channel/subchannel_stream_client.h"
40 #include "src/core/lib/debug/trace.h"
41 #include "src/core/lib/iomgr/closure.h"
42 #include "src/core/lib/iomgr/error.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/iomgr_fwd.h"
45 #include "src/core/lib/iomgr/pollset_set.h"
46 #include "src/core/lib/slice/slice.h"
47 #include "src/core/load_balancing/backend_metric_parser.h"
48 #include "src/core/load_balancing/oob_backend_metric_internal.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/memory.h"
51 #include "src/core/util/orphanable.h"
52 #include "src/core/util/ref_counted_ptr.h"
53 #include "src/core/util/sync.h"
54 #include "src/core/util/time.h"
55 #include "upb/mem/arena.hpp"
56 #include "xds/service/orca/v3/orca.upb.h"
57 
58 namespace grpc_core {
59 
60 //
61 // OrcaProducer::ConnectivityWatcher
62 //
63 
64 class OrcaProducer::ConnectivityWatcher final
65     : public Subchannel::ConnectivityStateWatcherInterface {
66  public:
ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)67   explicit ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)
68       : producer_(std::move(producer)),
69         interested_parties_(grpc_pollset_set_create()) {}
70 
~ConnectivityWatcher()71   ~ConnectivityWatcher() override {
72     grpc_pollset_set_destroy(interested_parties_);
73   }
74 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status &)75   void OnConnectivityStateChange(
76       RefCountedPtr<ConnectivityStateWatcherInterface> self,
77       grpc_connectivity_state state, const absl::Status&) override {
78     producer_->OnConnectivityStateChange(state);
79     self.reset();
80   }
81 
interested_parties()82   grpc_pollset_set* interested_parties() override {
83     return interested_parties_;
84   }
85 
86  private:
87   WeakRefCountedPtr<OrcaProducer> producer_;
88   grpc_pollset_set* interested_parties_;
89 };
90 
91 //
92 // OrcaProducer::OrcaStreamEventHandler
93 //
94 
95 class OrcaProducer::OrcaStreamEventHandler final
96     : public SubchannelStreamClient::CallEventHandler {
97  public:
OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,Duration report_interval)98   OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,
99                          Duration report_interval)
100       : producer_(std::move(producer)), report_interval_(report_interval) {}
101 
GetPathLocked()102   Slice GetPathLocked() override {
103     return Slice::FromStaticString(
104         "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics");
105   }
106 
OnCallStartLocked(SubchannelStreamClient *)107   void OnCallStartLocked(SubchannelStreamClient* /*client*/) override {}
108 
OnRetryTimerStartLocked(SubchannelStreamClient *)109   void OnRetryTimerStartLocked(SubchannelStreamClient* /*client*/) override {}
110 
EncodeSendMessageLocked()111   grpc_slice EncodeSendMessageLocked() override {
112     upb::Arena arena;
113     xds_service_orca_v3_OrcaLoadReportRequest* request =
114         xds_service_orca_v3_OrcaLoadReportRequest_new(arena.ptr());
115     gpr_timespec timespec = report_interval_.as_timespec();
116     auto* report_interval =
117         xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval(
118             request, arena.ptr());
119     google_protobuf_Duration_set_seconds(report_interval, timespec.tv_sec);
120     google_protobuf_Duration_set_nanos(report_interval, timespec.tv_nsec);
121     size_t buf_length;
122     char* buf = xds_service_orca_v3_OrcaLoadReportRequest_serialize(
123         request, arena.ptr(), &buf_length);
124     grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
125     memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
126     return request_slice;
127   }
128 
RecvMessageReadyLocked(SubchannelStreamClient *,absl::string_view serialized_message)129   absl::Status RecvMessageReadyLocked(
130       SubchannelStreamClient* /*client*/,
131       absl::string_view serialized_message) override {
132     auto* allocator = new BackendMetricAllocator(producer_);
133     auto* backend_metric_data =
134         ParseBackendMetricData(serialized_message, allocator);
135     if (backend_metric_data == nullptr) {
136       delete allocator;
137       return absl::InvalidArgumentError("unable to parse Orca response");
138     }
139     allocator->AsyncNotifyWatchersAndDelete();
140     return absl::OkStatus();
141   }
142 
RecvTrailingMetadataReadyLocked(SubchannelStreamClient *,grpc_status_code status)143   void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* /*client*/,
144                                        grpc_status_code status) override {
145     if (status == GRPC_STATUS_UNIMPLEMENTED) {
146       static const char kErrorMessage[] =
147           "Orca stream returned UNIMPLEMENTED; disabling";
148       LOG(ERROR) << kErrorMessage;
149       auto* channelz_node = producer_->subchannel_->channelz_node();
150       if (channelz_node != nullptr) {
151         channelz_node->AddTraceEvent(
152             channelz::ChannelTrace::Error,
153             grpc_slice_from_static_string(kErrorMessage));
154       }
155     }
156   }
157 
158  private:
159   // This class acts as storage for the parsed backend metric data.  It
160   // is injected into ParseBackendMetricData() as an allocator that
161   // returns internal storage.  It then also acts as a place to hold
162   // onto the data during an async hop into the ExecCtx before sending
163   // notifications, which avoids lock inversion problems due to
164   // acquiring producer_->mu_ while holding the lock from inside of
165   // SubchannelStreamClient.
166   class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
167    public:
BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)168     explicit BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)
169         : producer_(std::move(producer)) {}
170 
AllocateBackendMetricData()171     BackendMetricData* AllocateBackendMetricData() override {
172       return &backend_metric_data_;
173     }
174 
AllocateString(size_t size)175     char* AllocateString(size_t size) override {
176       char* string = static_cast<char*>(gpr_malloc(size));
177       string_storage_.emplace_back(string);
178       return string;
179     }
180 
181     // Notifies watchers asynchronously and then deletes the
182     // BackendMetricAllocator object.
AsyncNotifyWatchersAndDelete()183     void AsyncNotifyWatchersAndDelete() {
184       GRPC_CLOSURE_INIT(&closure_, NotifyWatchersInExecCtx, this, nullptr);
185       ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
186     }
187 
188    private:
NotifyWatchersInExecCtx(void * arg,grpc_error_handle)189     static void NotifyWatchersInExecCtx(void* arg,
190                                         grpc_error_handle /*error*/) {
191       auto* self = static_cast<BackendMetricAllocator*>(arg);
192       self->producer_->NotifyWatchers(self->backend_metric_data_);
193       delete self;
194     }
195 
196     WeakRefCountedPtr<OrcaProducer> producer_;
197     BackendMetricData backend_metric_data_;
198     std::vector<UniquePtr<char>> string_storage_;
199     grpc_closure closure_;
200   };
201 
202   WeakRefCountedPtr<OrcaProducer> producer_;
203   const Duration report_interval_;
204 };
205 
206 //
207 // OrcaProducer
208 //
209 
Start(RefCountedPtr<Subchannel> subchannel)210 void OrcaProducer::Start(RefCountedPtr<Subchannel> subchannel) {
211   subchannel_ = std::move(subchannel);
212   connected_subchannel_ = subchannel_->connected_subchannel();
213   auto connectivity_watcher =
214       MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<OrcaProducer>());
215   connectivity_watcher_ = connectivity_watcher.get();
216   subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
217 }
218 
Orphaned()219 void OrcaProducer::Orphaned() {
220   {
221     MutexLock lock(&mu_);
222     stream_client_.reset();
223   }
224   CHECK(subchannel_ != nullptr);  // Should not be called before Start().
225   subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
226   subchannel_->RemoveDataProducer(this);
227 }
228 
AddWatcher(OrcaWatcher * watcher)229 void OrcaProducer::AddWatcher(OrcaWatcher* watcher) {
230   MutexLock lock(&mu_);
231   watchers_.insert(watcher);
232   Duration watcher_interval = watcher->report_interval();
233   if (watcher_interval < report_interval_) {
234     report_interval_ = watcher_interval;
235     stream_client_.reset();
236     MaybeStartStreamLocked();
237   }
238 }
239 
RemoveWatcher(OrcaWatcher * watcher)240 void OrcaProducer::RemoveWatcher(OrcaWatcher* watcher) {
241   MutexLock lock(&mu_);
242   watchers_.erase(watcher);
243   if (watchers_.empty()) {
244     stream_client_.reset();
245     return;
246   }
247   Duration new_interval = GetMinIntervalLocked();
248   if (new_interval < report_interval_) {
249     report_interval_ = new_interval;
250     stream_client_.reset();
251     MaybeStartStreamLocked();
252   }
253 }
254 
GetMinIntervalLocked() const255 Duration OrcaProducer::GetMinIntervalLocked() const {
256   Duration duration = Duration::Infinity();
257   for (OrcaWatcher* watcher : watchers_) {
258     Duration watcher_interval = watcher->report_interval();
259     if (watcher_interval < duration) duration = watcher_interval;
260   }
261   return duration;
262 }
263 
MaybeStartStreamLocked()264 void OrcaProducer::MaybeStartStreamLocked() {
265   if (connected_subchannel_ == nullptr) return;
266   stream_client_ = MakeOrphanable<SubchannelStreamClient>(
267       connected_subchannel_, subchannel_->pollset_set(),
268       std::make_unique<OrcaStreamEventHandler>(
269           WeakRefAsSubclass<OrcaProducer>(), report_interval_),
270       GRPC_TRACE_FLAG_ENABLED(orca_client) ? "OrcaClient" : nullptr);
271 }
272 
NotifyWatchers(const BackendMetricData & backend_metric_data)273 void OrcaProducer::NotifyWatchers(
274     const BackendMetricData& backend_metric_data) {
275   GRPC_TRACE_LOG(orca_client, INFO)
276       << "OrcaProducer " << this << ": reporting backend metrics to watchers";
277   MutexLock lock(&mu_);
278   for (OrcaWatcher* watcher : watchers_) {
279     watcher->watcher()->OnBackendMetricReport(backend_metric_data);
280   }
281 }
282 
OnConnectivityStateChange(grpc_connectivity_state state)283 void OrcaProducer::OnConnectivityStateChange(grpc_connectivity_state state) {
284   MutexLock lock(&mu_);
285   if (state == GRPC_CHANNEL_READY) {
286     connected_subchannel_ = subchannel_->connected_subchannel();
287     if (!watchers_.empty()) MaybeStartStreamLocked();
288   } else {
289     connected_subchannel_.reset();
290     stream_client_.reset();
291   }
292 }
293 
294 //
295 // OrcaWatcher
296 //
297 
~OrcaWatcher()298 OrcaWatcher::~OrcaWatcher() {
299   if (producer_ != nullptr) producer_->RemoveWatcher(this);
300 }
301 
SetSubchannel(Subchannel * subchannel)302 void OrcaWatcher::SetSubchannel(Subchannel* subchannel) {
303   bool created = false;
304   // Check if our producer is already registered with the subchannel.
305   // If not, create a new one.
306   subchannel->GetOrAddDataProducer(
307       OrcaProducer::Type(), [&](Subchannel::DataProducerInterface** producer) {
308         if (*producer != nullptr) {
309           producer_ =
310               (*producer)->RefIfNonZero().TakeAsSubclass<OrcaProducer>();
311         }
312         if (producer_ == nullptr) {
313           producer_ = MakeRefCounted<OrcaProducer>();
314           *producer = producer_.get();
315           created = true;
316         }
317       });
318   // If we just created the producer, start it.
319   // This needs to be done outside of the lambda passed to
320   // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
321   // subchannel lock while already holding it.
322   if (created) producer_->Start(subchannel->Ref());
323   // Register ourself with the producer.
324   producer_->AddWatcher(this);
325 }
326 
327 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeOobBackendMetricWatcher(Duration report_interval,std::unique_ptr<OobBackendMetricWatcher> watcher)328 MakeOobBackendMetricWatcher(Duration report_interval,
329                             std::unique_ptr<OobBackendMetricWatcher> watcher) {
330   return std::make_unique<OrcaWatcher>(report_interval, std::move(watcher));
331 }
332 
333 }  // namespace grpc_core
334