1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
20
21 #include <google/protobuf/repeated_ptr_field.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/support/status.h>
25 #include <inttypes.h>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/cpp/server/load_reporter/constants.h"
30
31 // IWYU pragma: no_include "google/protobuf/duration.pb.h"
32
33 namespace grpc {
34 namespace load_reporter {
35
Run(bool ok)36 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
37 CHECK(handler_function_ != nullptr);
38 CHECK_NE(handler_, nullptr);
39 handler_function_(std::move(handler_), ok);
40 }
41
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)42 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
43 std::unique_ptr<ServerCompletionQueue> cq)
44 : cq_(std::move(cq)) {
45 thread_ =
46 std::make_unique<grpc_core::Thread>("server_load_reporting", Work, this);
47 std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
48 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
49 cpu_stats_provider = std::make_unique<CpuStatsProviderDefaultImpl>();
50 #endif
51 load_reporter_ = std::make_unique<LoadReporter>(
52 kFeedbackSampleWindowSeconds,
53 std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
54 std::move(cpu_stats_provider));
55 }
56
~LoadReporterAsyncServiceImpl()57 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
58 // We will reach here after the server starts shutting down.
59 shutdown_ = true;
60 {
61 grpc_core::MutexLock lock(&cq_shutdown_mu_);
62 cq_->Shutdown();
63 }
64 if (next_fetch_and_sample_alarm_ != nullptr) {
65 next_fetch_and_sample_alarm_->Cancel();
66 }
67 thread_->Join();
68 }
69
ScheduleNextFetchAndSample()70 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
71 auto next_fetch_and_sample_time =
72 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
73 gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
74 GPR_TIMESPAN));
75 {
76 grpc_core::MutexLock lock(&cq_shutdown_mu_);
77 if (shutdown_) return;
78 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
79 // instance for multiple events.
80 next_fetch_and_sample_alarm_ = std::make_unique<Alarm>();
81 next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
82 this);
83 }
84 VLOG(2) << "[LRS " << this << "] Next fetch-and-sample scheduled.";
85 }
86
FetchAndSample(bool ok)87 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
88 if (!ok) {
89 LOG(INFO) << "[LRS " << this << "] Fetch-and-sample is stopped.";
90 return;
91 }
92 VLOG(2) << "[LRS " << this << "] Starting a fetch-and-sample...";
93 load_reporter_->FetchAndSample();
94 ScheduleNextFetchAndSample();
95 }
96
Work(void * arg)97 void LoadReporterAsyncServiceImpl::Work(void* arg) {
98 LoadReporterAsyncServiceImpl* service =
99 static_cast<LoadReporterAsyncServiceImpl*>(arg);
100 service->FetchAndSample(true /* ok */);
101 // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
102 // to figure out why cq is not ready after service starts.
103 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
104 gpr_time_from_seconds(1, GPR_TIMESPAN)));
105 ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
106 service->load_reporter_.get());
107 void* tag;
108 bool ok;
109 while (true) {
110 if (!service->cq_->Next(&tag, &ok)) {
111 // The completion queue is shutting down.
112 CHECK(service->shutdown_);
113 break;
114 }
115 if (tag == service) {
116 service->FetchAndSample(ok);
117 } else {
118 auto* next_step = static_cast<CallableTag*>(tag);
119 next_step->Run(ok);
120 }
121 }
122 }
123
StartThread()124 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
125
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)126 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
127 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
128 LoadReporter* load_reporter) {
129 std::shared_ptr<ReportLoadHandler> handler =
130 std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
131 ReportLoadHandler* p = handler.get();
132 {
133 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
134 if (service->shutdown_) return;
135 p->on_done_notified_ =
136 CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
137 std::placeholders::_1, std::placeholders::_2),
138 handler);
139 p->next_inbound_ =
140 CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
141 std::placeholders::_1, std::placeholders::_2),
142 std::move(handler));
143 p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
144 service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
145 &p->next_inbound_);
146 }
147 }
148
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)149 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
150 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
151 LoadReporter* load_reporter)
152 : cq_(cq),
153 service_(service),
154 load_reporter_(load_reporter),
155 stream_(&ctx_),
156 call_status_(WAITING_FOR_DELIVERY) {}
157
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)158 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
159 std::shared_ptr<ReportLoadHandler> self, bool ok) {
160 if (ok) {
161 call_status_ = DELIVERED;
162 } else {
163 // AsyncNotifyWhenDone() needs to be called before the call starts, but the
164 // tag will not pop out if the call never starts (
165 // https://github.com/grpc/grpc/issues/10136). So we need to manually
166 // release the ownership of the handler in this case.
167 CHECK_NE(on_done_notified_.ReleaseHandler(), nullptr);
168 }
169 if (!ok || shutdown_) {
170 // The value of ok being false means that the server is shutting down.
171 Shutdown(std::move(self), "OnRequestDelivered");
172 return;
173 }
174 // Spawn a new handler instance to serve the next new client. Every handler
175 // instance will deallocate itself when it's done.
176 CreateAndStart(cq_, service_, load_reporter_);
177 {
178 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
179 if (service_->shutdown_) {
180 lock.Release();
181 Shutdown(std::move(self), "OnRequestDelivered");
182 return;
183 }
184 next_inbound_ =
185 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
186 std::placeholders::_1, std::placeholders::_2),
187 std::move(self));
188 stream_.Read(&request_, &next_inbound_);
189 }
190 // LB ID is unique for each load reporting stream.
191 lb_id_ = load_reporter_->GenerateLbId();
192 LOG(INFO) << "[LRS " << service_
193 << "] Call request delivered (lb_id_: " << lb_id_
194 << ", handler: " << this
195 << "). Start reading the initial request...";
196 }
197
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)198 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
199 std::shared_ptr<ReportLoadHandler> self, bool ok) {
200 if (!ok || shutdown_) {
201 if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
202 // The client may have half-closed the stream or the stream is broken.
203 LOG(INFO) << "[LRS " << service_
204 << "] Failed reading the initial request from the stream "
205 "(lb_id_: "
206 << lb_id_ << ", handler: " << this
207 << ", done_notified: " << done_notified_
208 << ", is_cancelled: " << is_cancelled_ << ").";
209 }
210 Shutdown(std::move(self), "OnReadDone");
211 return;
212 }
213 // We only receive one request, which is the initial request.
214 if (call_status_ < INITIAL_REQUEST_RECEIVED) {
215 if (!request_.has_initial_request()) {
216 Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
217 } else {
218 call_status_ = INITIAL_REQUEST_RECEIVED;
219 const auto& initial_request = request_.initial_request();
220 load_balanced_hostname_ = initial_request.load_balanced_hostname();
221 load_key_ = initial_request.load_key();
222 load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
223 load_key_);
224 const auto& load_report_interval = initial_request.load_report_interval();
225 load_report_interval_ms_ =
226 static_cast<unsigned long>((load_report_interval.seconds() * 1000) +
227 (load_report_interval.nanos() / 1000));
228 LOG(INFO) << "[LRS " << service_
229 << "] Initial request received. Start load reporting (load "
230 "balanced host: "
231 << load_balanced_hostname_
232 << ", interval: " << load_report_interval_ms_
233 << " ms, lb_id_: " << lb_id_ << ", handler: " << this << ")...";
234 SendReport(self, true /* ok */);
235 // Expect this read to fail.
236 {
237 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
238 if (service_->shutdown_) {
239 lock.Release();
240 Shutdown(std::move(self), "OnReadDone");
241 return;
242 }
243 next_inbound_ =
244 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
245 std::placeholders::_1, std::placeholders::_2),
246 std::move(self));
247 stream_.Read(&request_, &next_inbound_);
248 }
249 }
250 } else {
251 // Another request received! This violates the spec.
252 LOG(ERROR) << "[LRS " << service_
253 << "] Another request received (lb_id_: " << lb_id_
254 << ", handler: " << this << ").";
255 Shutdown(std::move(self), "OnReadDone+second_request");
256 }
257 }
258
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)259 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
260 std::shared_ptr<ReportLoadHandler> self, bool ok) {
261 if (!ok || shutdown_) {
262 Shutdown(std::move(self), "ScheduleNextReport");
263 return;
264 }
265 auto next_report_time = gpr_time_add(
266 gpr_now(GPR_CLOCK_MONOTONIC),
267 gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
268 {
269 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
270 if (service_->shutdown_) {
271 lock.Release();
272 Shutdown(std::move(self), "ScheduleNextReport");
273 return;
274 }
275 next_outbound_ =
276 CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
277 std::placeholders::_1, std::placeholders::_2),
278 std::move(self));
279 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
280 // instance for multiple events.
281 next_report_alarm_ = std::make_unique<Alarm>();
282 next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
283 }
284 VLOG(2) << "[LRS " << service_
285 << "] Next load report scheduled (lb_id_: " << lb_id_
286 << ", handler: " << this << ").";
287 }
288
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)289 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
290 std::shared_ptr<ReportLoadHandler> self, bool ok) {
291 if (!ok || shutdown_) {
292 Shutdown(std::move(self), "SendReport");
293 return;
294 }
295 grpc::lb::v1::LoadReportResponse response;
296 auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
297 response.mutable_load()->Swap(&loads);
298 auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
299 response.mutable_load_balancing_feedback()->Swap(&feedback);
300 if (call_status_ < INITIAL_RESPONSE_SENT) {
301 auto initial_response = response.mutable_initial_response();
302 initial_response->set_load_balancer_id(lb_id_);
303 initial_response->set_implementation_id(
304 grpc::lb::v1::InitialLoadReportResponse::CPP);
305 initial_response->set_server_version(kVersion);
306 call_status_ = INITIAL_RESPONSE_SENT;
307 }
308 {
309 grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
310 if (service_->shutdown_) {
311 lock.Release();
312 Shutdown(std::move(self), "SendReport");
313 return;
314 }
315 next_outbound_ =
316 CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
317 std::placeholders::_1, std::placeholders::_2),
318 std::move(self));
319 stream_.Write(response, &next_outbound_);
320 LOG(INFO) << "[LRS " << service_
321 << "] Sending load report (lb_id_: " << lb_id_
322 << ", handler: " << this
323 << ", loads count: " << response.load().size() << ")...";
324 }
325 }
326
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)327 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
328 std::shared_ptr<ReportLoadHandler> self, bool ok) {
329 CHECK(ok);
330 done_notified_ = true;
331 if (ctx_.IsCancelled()) {
332 is_cancelled_ = true;
333 }
334 LOG(INFO) << "[LRS " << service_
335 << "] Load reporting call is notified done (handler: " << this
336 << ", is_cancelled: " << is_cancelled_ << ").";
337 Shutdown(std::move(self), "OnDoneNotified");
338 }
339
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)340 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
341 std::shared_ptr<ReportLoadHandler> self, const char* reason) {
342 if (!shutdown_) {
343 LOG(INFO) << "[LRS " << service_
344 << "] Shutting down the handler (lb_id_: " << lb_id_
345 << ", handler: " << this << ", reason: " << reason << ").";
346 shutdown_ = true;
347 if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
348 load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
349 next_report_alarm_->Cancel();
350 }
351 }
352 // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
353 // try to Finish() every time we are in Shutdown().
354 if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
355 grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
356 if (!service_->shutdown_) {
357 on_finish_done_ =
358 CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
359 std::placeholders::_1, std::placeholders::_2),
360 std::move(self));
361 // TODO(juanlishen): Maybe add a message proto for the client to
362 // explicitly cancel the stream so that we can return OK status in such
363 // cases.
364 stream_.Finish(Status::CANCELLED, &on_finish_done_);
365 call_status_ = FINISH_CALLED;
366 }
367 }
368 }
369
OnFinishDone(std::shared_ptr<ReportLoadHandler>,bool ok)370 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
371 // NOLINTNEXTLINE(performance-unnecessary-value-param)
372 std::shared_ptr<ReportLoadHandler> /*self*/, bool ok) {
373 if (ok) {
374 LOG(INFO) << "[LRS " << service_
375 << "] Load reporting finished (lb_id_: " << lb_id_
376 << ", handler: " << this << ").";
377 }
378 }
379
380 } // namespace load_reporter
381 } // namespace grpc
382