• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
20 
21 #include <google/protobuf/repeated_ptr_field.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/support/status.h>
25 #include <inttypes.h>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/cpp/server/load_reporter/constants.h"
30 
31 // IWYU pragma: no_include "google/protobuf/duration.pb.h"
32 
33 namespace grpc {
34 namespace load_reporter {
35 
Run(bool ok)36 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
37   CHECK(handler_function_ != nullptr);
38   CHECK_NE(handler_, nullptr);
39   handler_function_(std::move(handler_), ok);
40 }
41 
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)42 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
43     std::unique_ptr<ServerCompletionQueue> cq)
44     : cq_(std::move(cq)) {
45   thread_ =
46       std::make_unique<grpc_core::Thread>("server_load_reporting", Work, this);
47   std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
48 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
49   cpu_stats_provider = std::make_unique<CpuStatsProviderDefaultImpl>();
50 #endif
51   load_reporter_ = std::make_unique<LoadReporter>(
52       kFeedbackSampleWindowSeconds,
53       std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
54       std::move(cpu_stats_provider));
55 }
56 
~LoadReporterAsyncServiceImpl()57 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
58   // We will reach here after the server starts shutting down.
59   shutdown_ = true;
60   {
61     grpc_core::MutexLock lock(&cq_shutdown_mu_);
62     cq_->Shutdown();
63   }
64   if (next_fetch_and_sample_alarm_ != nullptr) {
65     next_fetch_and_sample_alarm_->Cancel();
66   }
67   thread_->Join();
68 }
69 
ScheduleNextFetchAndSample()70 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
71   auto next_fetch_and_sample_time =
72       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
73                    gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
74                                         GPR_TIMESPAN));
75   {
76     grpc_core::MutexLock lock(&cq_shutdown_mu_);
77     if (shutdown_) return;
78     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
79     // instance for multiple events.
80     next_fetch_and_sample_alarm_ = std::make_unique<Alarm>();
81     next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
82                                       this);
83   }
84   VLOG(2) << "[LRS " << this << "] Next fetch-and-sample scheduled.";
85 }
86 
FetchAndSample(bool ok)87 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
88   if (!ok) {
89     LOG(INFO) << "[LRS " << this << "] Fetch-and-sample is stopped.";
90     return;
91   }
92   VLOG(2) << "[LRS " << this << "] Starting a fetch-and-sample...";
93   load_reporter_->FetchAndSample();
94   ScheduleNextFetchAndSample();
95 }
96 
Work(void * arg)97 void LoadReporterAsyncServiceImpl::Work(void* arg) {
98   LoadReporterAsyncServiceImpl* service =
99       static_cast<LoadReporterAsyncServiceImpl*>(arg);
100   service->FetchAndSample(true /* ok */);
101   // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
102   // to figure out why cq is not ready after service starts.
103   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
104                                gpr_time_from_seconds(1, GPR_TIMESPAN)));
105   ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
106                                     service->load_reporter_.get());
107   void* tag;
108   bool ok;
109   while (true) {
110     if (!service->cq_->Next(&tag, &ok)) {
111       // The completion queue is shutting down.
112       CHECK(service->shutdown_);
113       break;
114     }
115     if (tag == service) {
116       service->FetchAndSample(ok);
117     } else {
118       auto* next_step = static_cast<CallableTag*>(tag);
119       next_step->Run(ok);
120     }
121   }
122 }
123 
StartThread()124 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
125 
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)126 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
127     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
128     LoadReporter* load_reporter) {
129   std::shared_ptr<ReportLoadHandler> handler =
130       std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
131   ReportLoadHandler* p = handler.get();
132   {
133     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
134     if (service->shutdown_) return;
135     p->on_done_notified_ =
136         CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
137                               std::placeholders::_1, std::placeholders::_2),
138                     handler);
139     p->next_inbound_ =
140         CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
141                               std::placeholders::_1, std::placeholders::_2),
142                     std::move(handler));
143     p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
144     service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
145                                &p->next_inbound_);
146   }
147 }
148 
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)149 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
150     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
151     LoadReporter* load_reporter)
152     : cq_(cq),
153       service_(service),
154       load_reporter_(load_reporter),
155       stream_(&ctx_),
156       call_status_(WAITING_FOR_DELIVERY) {}
157 
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)158 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
159     std::shared_ptr<ReportLoadHandler> self, bool ok) {
160   if (ok) {
161     call_status_ = DELIVERED;
162   } else {
163     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
164     // tag will not pop out if the call never starts (
165     // https://github.com/grpc/grpc/issues/10136). So we need to manually
166     // release the ownership of the handler in this case.
167     CHECK_NE(on_done_notified_.ReleaseHandler(), nullptr);
168   }
169   if (!ok || shutdown_) {
170     // The value of ok being false means that the server is shutting down.
171     Shutdown(std::move(self), "OnRequestDelivered");
172     return;
173   }
174   // Spawn a new handler instance to serve the next new client. Every handler
175   // instance will deallocate itself when it's done.
176   CreateAndStart(cq_, service_, load_reporter_);
177   {
178     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
179     if (service_->shutdown_) {
180       lock.Release();
181       Shutdown(std::move(self), "OnRequestDelivered");
182       return;
183     }
184     next_inbound_ =
185         CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
186                               std::placeholders::_1, std::placeholders::_2),
187                     std::move(self));
188     stream_.Read(&request_, &next_inbound_);
189   }
190   // LB ID is unique for each load reporting stream.
191   lb_id_ = load_reporter_->GenerateLbId();
192   LOG(INFO) << "[LRS " << service_
193             << "] Call request delivered (lb_id_: " << lb_id_
194             << ", handler: " << this
195             << "). Start reading the initial request...";
196 }
197 
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)198 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
199     std::shared_ptr<ReportLoadHandler> self, bool ok) {
200   if (!ok || shutdown_) {
201     if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
202       // The client may have half-closed the stream or the stream is broken.
203       LOG(INFO) << "[LRS " << service_
204                 << "] Failed reading the initial request from the stream "
205                    "(lb_id_: "
206                 << lb_id_ << ", handler: " << this
207                 << ", done_notified: " << done_notified_
208                 << ", is_cancelled: " << is_cancelled_ << ").";
209     }
210     Shutdown(std::move(self), "OnReadDone");
211     return;
212   }
213   // We only receive one request, which is the initial request.
214   if (call_status_ < INITIAL_REQUEST_RECEIVED) {
215     if (!request_.has_initial_request()) {
216       Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
217     } else {
218       call_status_ = INITIAL_REQUEST_RECEIVED;
219       const auto& initial_request = request_.initial_request();
220       load_balanced_hostname_ = initial_request.load_balanced_hostname();
221       load_key_ = initial_request.load_key();
222       load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
223                                           load_key_);
224       const auto& load_report_interval = initial_request.load_report_interval();
225       load_report_interval_ms_ =
226           static_cast<unsigned long>((load_report_interval.seconds() * 1000) +
227                                      (load_report_interval.nanos() / 1000));
228       LOG(INFO) << "[LRS " << service_
229                 << "] Initial request received. Start load reporting (load "
230                    "balanced host: "
231                 << load_balanced_hostname_
232                 << ", interval: " << load_report_interval_ms_
233                 << " ms, lb_id_: " << lb_id_ << ", handler: " << this << ")...";
234       SendReport(self, true /* ok */);
235       // Expect this read to fail.
236       {
237         grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
238         if (service_->shutdown_) {
239           lock.Release();
240           Shutdown(std::move(self), "OnReadDone");
241           return;
242         }
243         next_inbound_ =
244             CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
245                                   std::placeholders::_1, std::placeholders::_2),
246                         std::move(self));
247         stream_.Read(&request_, &next_inbound_);
248       }
249     }
250   } else {
251     // Another request received! This violates the spec.
252     LOG(ERROR) << "[LRS " << service_
253                << "] Another request received (lb_id_: " << lb_id_
254                << ", handler: " << this << ").";
255     Shutdown(std::move(self), "OnReadDone+second_request");
256   }
257 }
258 
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)259 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
260     std::shared_ptr<ReportLoadHandler> self, bool ok) {
261   if (!ok || shutdown_) {
262     Shutdown(std::move(self), "ScheduleNextReport");
263     return;
264   }
265   auto next_report_time = gpr_time_add(
266       gpr_now(GPR_CLOCK_MONOTONIC),
267       gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
268   {
269     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
270     if (service_->shutdown_) {
271       lock.Release();
272       Shutdown(std::move(self), "ScheduleNextReport");
273       return;
274     }
275     next_outbound_ =
276         CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
277                               std::placeholders::_1, std::placeholders::_2),
278                     std::move(self));
279     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
280     // instance for multiple events.
281     next_report_alarm_ = std::make_unique<Alarm>();
282     next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
283   }
284   VLOG(2) << "[LRS " << service_
285           << "] Next load report scheduled (lb_id_: " << lb_id_
286           << ", handler: " << this << ").";
287 }
288 
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)289 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
290     std::shared_ptr<ReportLoadHandler> self, bool ok) {
291   if (!ok || shutdown_) {
292     Shutdown(std::move(self), "SendReport");
293     return;
294   }
295   grpc::lb::v1::LoadReportResponse response;
296   auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
297   response.mutable_load()->Swap(&loads);
298   auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
299   response.mutable_load_balancing_feedback()->Swap(&feedback);
300   if (call_status_ < INITIAL_RESPONSE_SENT) {
301     auto initial_response = response.mutable_initial_response();
302     initial_response->set_load_balancer_id(lb_id_);
303     initial_response->set_implementation_id(
304         grpc::lb::v1::InitialLoadReportResponse::CPP);
305     initial_response->set_server_version(kVersion);
306     call_status_ = INITIAL_RESPONSE_SENT;
307   }
308   {
309     grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
310     if (service_->shutdown_) {
311       lock.Release();
312       Shutdown(std::move(self), "SendReport");
313       return;
314     }
315     next_outbound_ =
316         CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
317                               std::placeholders::_1, std::placeholders::_2),
318                     std::move(self));
319     stream_.Write(response, &next_outbound_);
320     LOG(INFO) << "[LRS " << service_
321               << "] Sending load report (lb_id_: " << lb_id_
322               << ", handler: " << this
323               << ", loads count: " << response.load().size() << ")...";
324   }
325 }
326 
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)327 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
328     std::shared_ptr<ReportLoadHandler> self, bool ok) {
329   CHECK(ok);
330   done_notified_ = true;
331   if (ctx_.IsCancelled()) {
332     is_cancelled_ = true;
333   }
334   LOG(INFO) << "[LRS " << service_
335             << "] Load reporting call is notified done (handler: " << this
336             << ", is_cancelled: " << is_cancelled_ << ").";
337   Shutdown(std::move(self), "OnDoneNotified");
338 }
339 
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)340 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
341     std::shared_ptr<ReportLoadHandler> self, const char* reason) {
342   if (!shutdown_) {
343     LOG(INFO) << "[LRS " << service_
344               << "] Shutting down the handler (lb_id_: " << lb_id_
345               << ", handler: " << this << ", reason: " << reason << ").";
346     shutdown_ = true;
347     if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
348       load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
349       next_report_alarm_->Cancel();
350     }
351   }
352   // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
353   // try to Finish() every time we are in Shutdown().
354   if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
355     grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
356     if (!service_->shutdown_) {
357       on_finish_done_ =
358           CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
359                                 std::placeholders::_1, std::placeholders::_2),
360                       std::move(self));
361       // TODO(juanlishen): Maybe add a message proto for the client to
362       // explicitly cancel the stream so that we can return OK status in such
363       // cases.
364       stream_.Finish(Status::CANCELLED, &on_finish_done_);
365       call_status_ = FINISH_CALLED;
366     }
367   }
368 }
369 
OnFinishDone(std::shared_ptr<ReportLoadHandler>,bool ok)370 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
371     // NOLINTNEXTLINE(performance-unnecessary-value-param)
372     std::shared_ptr<ReportLoadHandler> /*self*/, bool ok) {
373   if (ok) {
374     LOG(INFO) << "[LRS " << service_
375               << "] Load reporting finished (lb_id_: " << lb_id_
376               << ", handler: " << this << ").";
377   }
378 }
379 
380 }  // namespace load_reporter
381 }  // namespace grpc
382