1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <stdint.h>
22 #include <stdio.h>
23
24 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
25
26 #include "upb/upb.hpp"
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39
40 namespace grpc_core {
41
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43
44 //
45 // HealthCheckClient
46 //
47
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49 std::string service_name,
50 RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51 grpc_pollset_set* interested_parties,
52 RefCountedPtr<channelz::SubchannelNode> channelz_node,
53 RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54 : InternallyRefCounted<HealthCheckClient>(
55 GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56 ? "HealthCheckClient"
57 : nullptr),
58 service_name_(std::move(service_name)),
59 connected_subchannel_(std::move(connected_subchannel)),
60 interested_parties_(interested_parties),
61 channelz_node_(std::move(channelz_node)),
62 watcher_(std::move(watcher)),
63 retry_backoff_(
64 BackOff::Options()
65 .set_initial_backoff(
66 HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67 .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68 .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69 .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70 1000)) {
71 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72 gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73 }
74 GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75 grpc_schedule_on_exec_ctx);
76 StartCall();
77 }
78
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81 gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82 }
83 }
84
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86 const char* reason) {
87 MutexLock lock(&mu_);
88 SetHealthStatusLocked(state, reason);
89 }
90
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92 const char* reason) {
93 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94 gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95 ConnectivityStateName(state), reason);
96 }
97 if (watcher_ != nullptr) {
98 watcher_->Notify(state,
99 state == GRPC_CHANNEL_TRANSIENT_FAILURE
100 ? absl::Status(absl::StatusCode::kUnavailable, reason)
101 : absl::Status());
102 }
103 }
104
Orphan()105 void HealthCheckClient::Orphan() {
106 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107 gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108 }
109 {
110 MutexLock lock(&mu_);
111 shutting_down_ = true;
112 watcher_.reset();
113 call_state_.reset();
114 if (retry_timer_callback_pending_) {
115 grpc_timer_cancel(&retry_timer_);
116 }
117 }
118 Unref(DEBUG_LOCATION, "orphan");
119 }
120
StartCall()121 void HealthCheckClient::StartCall() {
122 MutexLock lock(&mu_);
123 StartCallLocked();
124 }
125
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127 if (shutting_down_) return;
128 GPR_ASSERT(call_state_ == nullptr);
129 SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130 call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132 gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133 call_state_.get());
134 }
135 call_state_->StartCall();
136 }
137
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139 SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140 "health check call failed; will retry after backoff");
141 grpc_millis next_try = retry_backoff_.NextAttemptTime();
142 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143 gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145 if (timeout > 0) {
146 gpr_log(GPR_INFO,
147 "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148 timeout);
149 } else {
150 gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151 this);
152 }
153 }
154 // Ref for callback, tracked manually.
155 Ref(DEBUG_LOCATION, "health_retry_timer").release();
156 retry_timer_callback_pending_ = true;
157 grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159
OnRetryTimer(void * arg,grpc_error_handle error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
161 HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162 {
163 MutexLock lock(&self->mu_);
164 self->retry_timer_callback_pending_ = false;
165 if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166 self->call_state_ == nullptr) {
167 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168 gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169 self);
170 }
171 self->StartCallLocked();
172 }
173 }
174 self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176
177 //
178 // protobuf helpers
179 //
180
181 namespace {
182
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184 ManualConstructor<SliceBufferByteStream>* send_message) {
185 upb::Arena arena;
186 grpc_health_v1_HealthCheckRequest* request_struct =
187 grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188 grpc_health_v1_HealthCheckRequest_set_service(
189 request_struct,
190 upb_strview_make(service_name.data(), service_name.size()));
191 size_t buf_length;
192 char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193 request_struct, arena.ptr(), &buf_length);
194 grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195 memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196 grpc_slice_buffer slice_buffer;
197 grpc_slice_buffer_init(&slice_buffer);
198 grpc_slice_buffer_add(&slice_buffer, request_slice);
199 send_message->Init(&slice_buffer, 0);
200 grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error_handle * error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
206 // If message is empty, assume unhealthy.
207 if (slice_buffer->length == 0) {
208 *error =
209 GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210 return false;
211 }
212 // Concatenate the slices to form a single string.
213 std::unique_ptr<uint8_t> recv_message_deleter;
214 uint8_t* recv_message;
215 if (slice_buffer->count == 1) {
216 recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217 } else {
218 recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219 recv_message_deleter.reset(recv_message);
220 size_t offset = 0;
221 for (size_t i = 0; i < slice_buffer->count; ++i) {
222 memcpy(recv_message + offset,
223 GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224 GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225 offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226 }
227 }
228 // Deserialize message.
229 upb::Arena arena;
230 grpc_health_v1_HealthCheckResponse* response_struct =
231 grpc_health_v1_HealthCheckResponse_parse(
232 reinterpret_cast<char*>(recv_message), slice_buffer->length,
233 arena.ptr());
234 if (response_struct == nullptr) {
235 // Can't parse message; assume unhealthy.
236 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237 "cannot parse health check response");
238 return false;
239 }
240 int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241 return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243
244 } // namespace
245
246 //
247 // HealthCheckClient::CallState
248 //
249
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251 RefCountedPtr<HealthCheckClient> health_check_client,
252 grpc_pollset_set* interested_parties)
253 : health_check_client_(std::move(health_check_client)),
254 pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255 arena_(Arena::Create(health_check_client_->connected_subchannel_
256 ->GetInitialCallSizeEstimate())),
257 payload_(context_) {}
258
~CallState()259 HealthCheckClient::CallState::~CallState() {
260 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
261 gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
262 health_check_client_.get(), this);
263 }
264 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
265 if (context_[i].destroy != nullptr) {
266 context_[i].destroy(context_[i].value);
267 }
268 }
269 // Unset the call combiner cancellation closure. This has the
270 // effect of scheduling the previously set cancellation closure, if
271 // any, so that it can release any internal references it may be
272 // holding to the call stack.
273 call_combiner_.SetNotifyOnCancel(nullptr);
274 arena_->Destroy();
275 }
276
Orphan()277 void HealthCheckClient::CallState::Orphan() {
278 call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
279 Cancel();
280 }
281
StartCall()282 void HealthCheckClient::CallState::StartCall() {
283 SubchannelCall::Args args = {
284 health_check_client_->connected_subchannel_,
285 &pollent_,
286 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
287 gpr_get_cycle_counter(), // start_time
288 GRPC_MILLIS_INF_FUTURE, // deadline
289 arena_,
290 context_,
291 &call_combiner_,
292 };
293 grpc_error_handle error = GRPC_ERROR_NONE;
294 call_ = SubchannelCall::Create(std::move(args), &error).release();
295 // Register after-destruction callback.
296 GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
297 this, grpc_schedule_on_exec_ctx);
298 call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
299 // Check if creation failed.
300 if (error != GRPC_ERROR_NONE) {
301 gpr_log(GPR_ERROR,
302 "HealthCheckClient %p CallState %p: error creating health "
303 "checking call on subchannel (%s); will retry",
304 health_check_client_.get(), this,
305 grpc_error_std_string(error).c_str());
306 GRPC_ERROR_UNREF(error);
307 CallEndedLocked(/*retry=*/true);
308 return;
309 }
310 // Initialize payload and batch.
311 payload_.context = context_;
312 batch_.payload = &payload_;
313 // on_complete callback takes ref, handled manually.
314 call_->Ref(DEBUG_LOCATION, "on_complete").release();
315 batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
316 grpc_schedule_on_exec_ctx);
317 // Add send_initial_metadata op.
318 grpc_metadata_batch_init(&send_initial_metadata_);
319 error = grpc_metadata_batch_add_head(
320 &send_initial_metadata_, &path_metadata_storage_,
321 grpc_mdelem_from_slices(
322 GRPC_MDSTR_PATH,
323 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
324 GRPC_BATCH_PATH);
325 GPR_ASSERT(error == GRPC_ERROR_NONE);
326 payload_.send_initial_metadata.send_initial_metadata =
327 &send_initial_metadata_;
328 payload_.send_initial_metadata.send_initial_metadata_flags = 0;
329 payload_.send_initial_metadata.peer_string = nullptr;
330 batch_.send_initial_metadata = true;
331 // Add send_message op.
332 EncodeRequest(health_check_client_->service_name_, &send_message_);
333 payload_.send_message.send_message.reset(send_message_.get());
334 batch_.send_message = true;
335 // Add send_trailing_metadata op.
336 grpc_metadata_batch_init(&send_trailing_metadata_);
337 payload_.send_trailing_metadata.send_trailing_metadata =
338 &send_trailing_metadata_;
339 batch_.send_trailing_metadata = true;
340 // Add recv_initial_metadata op.
341 grpc_metadata_batch_init(&recv_initial_metadata_);
342 payload_.recv_initial_metadata.recv_initial_metadata =
343 &recv_initial_metadata_;
344 payload_.recv_initial_metadata.recv_flags = nullptr;
345 payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
346 payload_.recv_initial_metadata.peer_string = nullptr;
347 // recv_initial_metadata_ready callback takes ref, handled manually.
348 call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
349 payload_.recv_initial_metadata.recv_initial_metadata_ready =
350 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
351 this, grpc_schedule_on_exec_ctx);
352 batch_.recv_initial_metadata = true;
353 // Add recv_message op.
354 payload_.recv_message.recv_message = &recv_message_;
355 // recv_message callback takes ref, handled manually.
356 call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
357 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
358 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
359 batch_.recv_message = true;
360 // Start batch.
361 StartBatch(&batch_);
362 // Initialize recv_trailing_metadata batch.
363 recv_trailing_metadata_batch_.payload = &payload_;
364 // Add recv_trailing_metadata op.
365 grpc_metadata_batch_init(&recv_trailing_metadata_);
366 payload_.recv_trailing_metadata.recv_trailing_metadata =
367 &recv_trailing_metadata_;
368 payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
369 // This callback signals the end of the call, so it relies on the
370 // initial ref instead of taking a new ref. When it's invoked, the
371 // initial ref is released.
372 payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
373 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
374 RecvTrailingMetadataReady, this,
375 grpc_schedule_on_exec_ctx);
376 recv_trailing_metadata_batch_.recv_trailing_metadata = true;
377 // Start recv_trailing_metadata batch.
378 StartBatch(&recv_trailing_metadata_batch_);
379 }
380
StartBatchInCallCombiner(void * arg,grpc_error_handle)381 void HealthCheckClient::CallState::StartBatchInCallCombiner(
382 void* arg, grpc_error_handle /*error*/) {
383 grpc_transport_stream_op_batch* batch =
384 static_cast<grpc_transport_stream_op_batch*>(arg);
385 SubchannelCall* call =
386 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
387 call->StartTransportStreamOpBatch(batch);
388 }
389
StartBatch(grpc_transport_stream_op_batch * batch)390 void HealthCheckClient::CallState::StartBatch(
391 grpc_transport_stream_op_batch* batch) {
392 batch->handler_private.extra_arg = call_;
393 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
394 batch, grpc_schedule_on_exec_ctx);
395 GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
396 GRPC_ERROR_NONE, "start_subchannel_batch");
397 }
398
AfterCallStackDestruction(void * arg,grpc_error_handle)399 void HealthCheckClient::CallState::AfterCallStackDestruction(
400 void* arg, grpc_error_handle /*error*/) {
401 HealthCheckClient::CallState* self =
402 static_cast<HealthCheckClient::CallState*>(arg);
403 delete self;
404 }
405
OnCancelComplete(void * arg,grpc_error_handle)406 void HealthCheckClient::CallState::OnCancelComplete(
407 void* arg, grpc_error_handle /*error*/) {
408 HealthCheckClient::CallState* self =
409 static_cast<HealthCheckClient::CallState*>(arg);
410 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
411 self->call_->Unref(DEBUG_LOCATION, "cancel");
412 }
413
StartCancel(void * arg,grpc_error_handle)414 void HealthCheckClient::CallState::StartCancel(void* arg,
415 grpc_error_handle /*error*/) {
416 HealthCheckClient::CallState* self =
417 static_cast<HealthCheckClient::CallState*>(arg);
418 auto* batch = grpc_make_transport_stream_op(
419 GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
420 batch->cancel_stream = true;
421 batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
422 self->call_->StartTransportStreamOpBatch(batch);
423 }
424
Cancel()425 void HealthCheckClient::CallState::Cancel() {
426 bool expected = false;
427 if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
428 MemoryOrder::ACQUIRE)) {
429 call_->Ref(DEBUG_LOCATION, "cancel").release();
430 GRPC_CALL_COMBINER_START(
431 &call_combiner_,
432 GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
433 GRPC_ERROR_NONE, "health_cancel");
434 }
435 }
436
OnComplete(void * arg,grpc_error_handle)437 void HealthCheckClient::CallState::OnComplete(void* arg,
438 grpc_error_handle /*error*/) {
439 HealthCheckClient::CallState* self =
440 static_cast<HealthCheckClient::CallState*>(arg);
441 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
442 grpc_metadata_batch_destroy(&self->send_initial_metadata_);
443 grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
444 self->call_->Unref(DEBUG_LOCATION, "on_complete");
445 }
446
RecvInitialMetadataReady(void * arg,grpc_error_handle)447 void HealthCheckClient::CallState::RecvInitialMetadataReady(
448 void* arg, grpc_error_handle /*error*/) {
449 HealthCheckClient::CallState* self =
450 static_cast<HealthCheckClient::CallState*>(arg);
451 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
452 grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
453 self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
454 }
455
DoneReadingRecvMessage(grpc_error_handle error)456 void HealthCheckClient::CallState::DoneReadingRecvMessage(
457 grpc_error_handle error) {
458 recv_message_.reset();
459 if (error != GRPC_ERROR_NONE) {
460 GRPC_ERROR_UNREF(error);
461 Cancel();
462 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
463 call_->Unref(DEBUG_LOCATION, "recv_message_ready");
464 return;
465 }
466 const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
467 const grpc_connectivity_state state =
468 healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
469 health_check_client_->SetHealthStatus(
470 state, error == GRPC_ERROR_NONE && !healthy
471 ? "backend unhealthy"
472 : grpc_error_std_string(error).c_str());
473 seen_response_.Store(true, MemoryOrder::RELEASE);
474 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
475 // Start another recv_message batch.
476 // This re-uses the ref we're holding.
477 // Note: Can't just reuse batch_ here, since we don't know that all
478 // callbacks from the original batch have completed yet.
479 recv_message_batch_.payload = &payload_;
480 payload_.recv_message.recv_message = &recv_message_;
481 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
482 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
483 recv_message_batch_.recv_message = true;
484 StartBatch(&recv_message_batch_);
485 }
486
PullSliceFromRecvMessage()487 grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
488 grpc_slice slice;
489 grpc_error_handle error = recv_message_->Pull(&slice);
490 if (error == GRPC_ERROR_NONE) {
491 grpc_slice_buffer_add(&recv_message_buffer_, slice);
492 }
493 return error;
494 }
495
ContinueReadingRecvMessage()496 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
497 while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
498 grpc_error_handle error = PullSliceFromRecvMessage();
499 if (error != GRPC_ERROR_NONE) {
500 DoneReadingRecvMessage(error);
501 return;
502 }
503 if (recv_message_buffer_.length == recv_message_->length()) {
504 DoneReadingRecvMessage(GRPC_ERROR_NONE);
505 break;
506 }
507 }
508 }
509
OnByteStreamNext(void * arg,grpc_error_handle error)510 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
511 grpc_error_handle error) {
512 HealthCheckClient::CallState* self =
513 static_cast<HealthCheckClient::CallState*>(arg);
514 if (error != GRPC_ERROR_NONE) {
515 self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
516 return;
517 }
518 error = self->PullSliceFromRecvMessage();
519 if (error != GRPC_ERROR_NONE) {
520 self->DoneReadingRecvMessage(error);
521 return;
522 }
523 if (self->recv_message_buffer_.length == self->recv_message_->length()) {
524 self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
525 } else {
526 self->ContinueReadingRecvMessage();
527 }
528 }
529
RecvMessageReady(void * arg,grpc_error_handle)530 void HealthCheckClient::CallState::RecvMessageReady(
531 void* arg, grpc_error_handle /*error*/) {
532 HealthCheckClient::CallState* self =
533 static_cast<HealthCheckClient::CallState*>(arg);
534 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
535 if (self->recv_message_ == nullptr) {
536 self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
537 return;
538 }
539 grpc_slice_buffer_init(&self->recv_message_buffer_);
540 GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
541 grpc_schedule_on_exec_ctx);
542 self->ContinueReadingRecvMessage();
543 // Ref will continue to be held until we finish draining the byte stream.
544 }
545
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)546 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
547 void* arg, grpc_error_handle error) {
548 HealthCheckClient::CallState* self =
549 static_cast<HealthCheckClient::CallState*>(arg);
550 GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
551 "recv_trailing_metadata_ready");
552 // Get call status.
553 grpc_status_code status = GRPC_STATUS_UNKNOWN;
554 if (error != GRPC_ERROR_NONE) {
555 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
556 nullptr /* slice */, nullptr /* http_error */,
557 nullptr /* error_string */);
558 } else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
559 status = grpc_get_status_code_from_metadata(
560 self->recv_trailing_metadata_.idx.named.grpc_status->md);
561 }
562 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
563 gpr_log(GPR_INFO,
564 "HealthCheckClient %p CallState %p: health watch failed with "
565 "status %d",
566 self->health_check_client_.get(), self, status);
567 }
568 // Clean up.
569 grpc_metadata_batch_destroy(&self->recv_trailing_metadata_);
570 // For status UNIMPLEMENTED, give up and assume always healthy.
571 bool retry = true;
572 if (status == GRPC_STATUS_UNIMPLEMENTED) {
573 static const char kErrorMessage[] =
574 "health checking Watch method returned UNIMPLEMENTED; "
575 "disabling health checks but assuming server is healthy";
576 gpr_log(GPR_ERROR, kErrorMessage);
577 if (self->health_check_client_->channelz_node_ != nullptr) {
578 self->health_check_client_->channelz_node_->AddTraceEvent(
579 channelz::ChannelTrace::Error,
580 grpc_slice_from_static_string(kErrorMessage));
581 }
582 self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
583 kErrorMessage);
584 retry = false;
585 }
586 MutexLock lock(&self->health_check_client_->mu_);
587 self->CallEndedLocked(retry);
588 }
589
CallEndedLocked(bool retry)590 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
591 // If this CallState is still in use, this call ended because of a failure,
592 // so we need to stop using it and optionally create a new one.
593 // Otherwise, we have deliberately ended this call, and no further action
594 // is required.
595 if (this == health_check_client_->call_state_.get()) {
596 health_check_client_->call_state_.reset();
597 if (retry) {
598 GPR_ASSERT(!health_check_client_->shutting_down_);
599 if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
600 // If the call fails after we've gotten a successful response, reset
601 // the backoff and restart the call immediately.
602 health_check_client_->retry_backoff_.Reset();
603 health_check_client_->StartCallLocked();
604 } else {
605 // If the call failed without receiving any messages, retry later.
606 health_check_client_->StartRetryTimerLocked();
607 }
608 }
609 }
610 // When the last ref to the call stack goes away, the CallState object
611 // will be automatically destroyed.
612 call_->Unref(DEBUG_LOCATION, "call_ended");
613 }
614
615 } // namespace grpc_core
616