• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <stdint.h>
22 #include <stdio.h>
23 
24 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
25 
26 #include "upb/upb.hpp"
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34 
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39 
40 namespace grpc_core {
41 
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43 
44 //
45 // HealthCheckClient
46 //
47 
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49     std::string service_name,
50     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51     grpc_pollset_set* interested_parties,
52     RefCountedPtr<channelz::SubchannelNode> channelz_node,
53     RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54     : InternallyRefCounted<HealthCheckClient>(
55           GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56               ? "HealthCheckClient"
57               : nullptr),
58       service_name_(std::move(service_name)),
59       connected_subchannel_(std::move(connected_subchannel)),
60       interested_parties_(interested_parties),
61       channelz_node_(std::move(channelz_node)),
62       watcher_(std::move(watcher)),
63       retry_backoff_(
64           BackOff::Options()
65               .set_initial_backoff(
66                   HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67               .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68               .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69               .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70                                1000)) {
71   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72     gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73   }
74   GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75                     grpc_schedule_on_exec_ctx);
76   StartCall();
77 }
78 
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81     gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82   }
83 }
84 
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86                                         const char* reason) {
87   MutexLock lock(&mu_);
88   SetHealthStatusLocked(state, reason);
89 }
90 
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92                                               const char* reason) {
93   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94     gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95             ConnectivityStateName(state), reason);
96   }
97   if (watcher_ != nullptr) {
98     watcher_->Notify(state,
99                      state == GRPC_CHANNEL_TRANSIENT_FAILURE
100                          ? absl::Status(absl::StatusCode::kUnavailable, reason)
101                          : absl::Status());
102   }
103 }
104 
Orphan()105 void HealthCheckClient::Orphan() {
106   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107     gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108   }
109   {
110     MutexLock lock(&mu_);
111     shutting_down_ = true;
112     watcher_.reset();
113     call_state_.reset();
114     if (retry_timer_callback_pending_) {
115       grpc_timer_cancel(&retry_timer_);
116     }
117   }
118   Unref(DEBUG_LOCATION, "orphan");
119 }
120 
StartCall()121 void HealthCheckClient::StartCall() {
122   MutexLock lock(&mu_);
123   StartCallLocked();
124 }
125 
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127   if (shutting_down_) return;
128   GPR_ASSERT(call_state_ == nullptr);
129   SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130   call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132     gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133             call_state_.get());
134   }
135   call_state_->StartCall();
136 }
137 
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139   SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140                         "health check call failed; will retry after backoff");
141   grpc_millis next_try = retry_backoff_.NextAttemptTime();
142   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143     gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145     if (timeout > 0) {
146       gpr_log(GPR_INFO,
147               "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148               timeout);
149     } else {
150       gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151               this);
152     }
153   }
154   // Ref for callback, tracked manually.
155   Ref(DEBUG_LOCATION, "health_retry_timer").release();
156   retry_timer_callback_pending_ = true;
157   grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159 
OnRetryTimer(void * arg,grpc_error_handle error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
161   HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162   {
163     MutexLock lock(&self->mu_);
164     self->retry_timer_callback_pending_ = false;
165     if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166         self->call_state_ == nullptr) {
167       if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168         gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169                 self);
170       }
171       self->StartCallLocked();
172     }
173   }
174   self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176 
177 //
178 // protobuf helpers
179 //
180 
181 namespace {
182 
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184                    ManualConstructor<SliceBufferByteStream>* send_message) {
185   upb::Arena arena;
186   grpc_health_v1_HealthCheckRequest* request_struct =
187       grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188   grpc_health_v1_HealthCheckRequest_set_service(
189       request_struct,
190       upb_strview_make(service_name.data(), service_name.size()));
191   size_t buf_length;
192   char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193       request_struct, arena.ptr(), &buf_length);
194   grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195   memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196   grpc_slice_buffer slice_buffer;
197   grpc_slice_buffer_init(&slice_buffer);
198   grpc_slice_buffer_add(&slice_buffer, request_slice);
199   send_message->Init(&slice_buffer, 0);
200   grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202 
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error_handle * error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
206   // If message is empty, assume unhealthy.
207   if (slice_buffer->length == 0) {
208     *error =
209         GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210     return false;
211   }
212   // Concatenate the slices to form a single string.
213   std::unique_ptr<uint8_t> recv_message_deleter;
214   uint8_t* recv_message;
215   if (slice_buffer->count == 1) {
216     recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217   } else {
218     recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219     recv_message_deleter.reset(recv_message);
220     size_t offset = 0;
221     for (size_t i = 0; i < slice_buffer->count; ++i) {
222       memcpy(recv_message + offset,
223              GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224              GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225       offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226     }
227   }
228   // Deserialize message.
229   upb::Arena arena;
230   grpc_health_v1_HealthCheckResponse* response_struct =
231       grpc_health_v1_HealthCheckResponse_parse(
232           reinterpret_cast<char*>(recv_message), slice_buffer->length,
233           arena.ptr());
234   if (response_struct == nullptr) {
235     // Can't parse message; assume unhealthy.
236     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237         "cannot parse health check response");
238     return false;
239   }
240   int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241   return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243 
244 }  // namespace
245 
246 //
247 // HealthCheckClient::CallState
248 //
249 
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251     RefCountedPtr<HealthCheckClient> health_check_client,
252     grpc_pollset_set* interested_parties)
253     : health_check_client_(std::move(health_check_client)),
254       pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255       arena_(Arena::Create(health_check_client_->connected_subchannel_
256                                ->GetInitialCallSizeEstimate())),
257       payload_(context_) {}
258 
~CallState()259 HealthCheckClient::CallState::~CallState() {
260   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
261     gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
262             health_check_client_.get(), this);
263   }
264   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
265     if (context_[i].destroy != nullptr) {
266       context_[i].destroy(context_[i].value);
267     }
268   }
269   // Unset the call combiner cancellation closure.  This has the
270   // effect of scheduling the previously set cancellation closure, if
271   // any, so that it can release any internal references it may be
272   // holding to the call stack.
273   call_combiner_.SetNotifyOnCancel(nullptr);
274   arena_->Destroy();
275 }
276 
Orphan()277 void HealthCheckClient::CallState::Orphan() {
278   call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
279   Cancel();
280 }
281 
StartCall()282 void HealthCheckClient::CallState::StartCall() {
283   SubchannelCall::Args args = {
284       health_check_client_->connected_subchannel_,
285       &pollent_,
286       GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
287       gpr_get_cycle_counter(),  // start_time
288       GRPC_MILLIS_INF_FUTURE,   // deadline
289       arena_,
290       context_,
291       &call_combiner_,
292   };
293   grpc_error_handle error = GRPC_ERROR_NONE;
294   call_ = SubchannelCall::Create(std::move(args), &error).release();
295   // Register after-destruction callback.
296   GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
297                     this, grpc_schedule_on_exec_ctx);
298   call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
299   // Check if creation failed.
300   if (error != GRPC_ERROR_NONE) {
301     gpr_log(GPR_ERROR,
302             "HealthCheckClient %p CallState %p: error creating health "
303             "checking call on subchannel (%s); will retry",
304             health_check_client_.get(), this,
305             grpc_error_std_string(error).c_str());
306     GRPC_ERROR_UNREF(error);
307     CallEndedLocked(/*retry=*/true);
308     return;
309   }
310   // Initialize payload and batch.
311   payload_.context = context_;
312   batch_.payload = &payload_;
313   // on_complete callback takes ref, handled manually.
314   call_->Ref(DEBUG_LOCATION, "on_complete").release();
315   batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
316                                          grpc_schedule_on_exec_ctx);
317   // Add send_initial_metadata op.
318   grpc_metadata_batch_init(&send_initial_metadata_);
319   error = grpc_metadata_batch_add_head(
320       &send_initial_metadata_, &path_metadata_storage_,
321       grpc_mdelem_from_slices(
322           GRPC_MDSTR_PATH,
323           GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
324       GRPC_BATCH_PATH);
325   GPR_ASSERT(error == GRPC_ERROR_NONE);
326   payload_.send_initial_metadata.send_initial_metadata =
327       &send_initial_metadata_;
328   payload_.send_initial_metadata.send_initial_metadata_flags = 0;
329   payload_.send_initial_metadata.peer_string = nullptr;
330   batch_.send_initial_metadata = true;
331   // Add send_message op.
332   EncodeRequest(health_check_client_->service_name_, &send_message_);
333   payload_.send_message.send_message.reset(send_message_.get());
334   batch_.send_message = true;
335   // Add send_trailing_metadata op.
336   grpc_metadata_batch_init(&send_trailing_metadata_);
337   payload_.send_trailing_metadata.send_trailing_metadata =
338       &send_trailing_metadata_;
339   batch_.send_trailing_metadata = true;
340   // Add recv_initial_metadata op.
341   grpc_metadata_batch_init(&recv_initial_metadata_);
342   payload_.recv_initial_metadata.recv_initial_metadata =
343       &recv_initial_metadata_;
344   payload_.recv_initial_metadata.recv_flags = nullptr;
345   payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
346   payload_.recv_initial_metadata.peer_string = nullptr;
347   // recv_initial_metadata_ready callback takes ref, handled manually.
348   call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
349   payload_.recv_initial_metadata.recv_initial_metadata_ready =
350       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
351                         this, grpc_schedule_on_exec_ctx);
352   batch_.recv_initial_metadata = true;
353   // Add recv_message op.
354   payload_.recv_message.recv_message = &recv_message_;
355   // recv_message callback takes ref, handled manually.
356   call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
357   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
358       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
359   batch_.recv_message = true;
360   // Start batch.
361   StartBatch(&batch_);
362   // Initialize recv_trailing_metadata batch.
363   recv_trailing_metadata_batch_.payload = &payload_;
364   // Add recv_trailing_metadata op.
365   grpc_metadata_batch_init(&recv_trailing_metadata_);
366   payload_.recv_trailing_metadata.recv_trailing_metadata =
367       &recv_trailing_metadata_;
368   payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
369   // This callback signals the end of the call, so it relies on the
370   // initial ref instead of taking a new ref.  When it's invoked, the
371   // initial ref is released.
372   payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
373       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
374                         RecvTrailingMetadataReady, this,
375                         grpc_schedule_on_exec_ctx);
376   recv_trailing_metadata_batch_.recv_trailing_metadata = true;
377   // Start recv_trailing_metadata batch.
378   StartBatch(&recv_trailing_metadata_batch_);
379 }
380 
StartBatchInCallCombiner(void * arg,grpc_error_handle)381 void HealthCheckClient::CallState::StartBatchInCallCombiner(
382     void* arg, grpc_error_handle /*error*/) {
383   grpc_transport_stream_op_batch* batch =
384       static_cast<grpc_transport_stream_op_batch*>(arg);
385   SubchannelCall* call =
386       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
387   call->StartTransportStreamOpBatch(batch);
388 }
389 
StartBatch(grpc_transport_stream_op_batch * batch)390 void HealthCheckClient::CallState::StartBatch(
391     grpc_transport_stream_op_batch* batch) {
392   batch->handler_private.extra_arg = call_;
393   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
394                     batch, grpc_schedule_on_exec_ctx);
395   GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
396                            GRPC_ERROR_NONE, "start_subchannel_batch");
397 }
398 
AfterCallStackDestruction(void * arg,grpc_error_handle)399 void HealthCheckClient::CallState::AfterCallStackDestruction(
400     void* arg, grpc_error_handle /*error*/) {
401   HealthCheckClient::CallState* self =
402       static_cast<HealthCheckClient::CallState*>(arg);
403   delete self;
404 }
405 
OnCancelComplete(void * arg,grpc_error_handle)406 void HealthCheckClient::CallState::OnCancelComplete(
407     void* arg, grpc_error_handle /*error*/) {
408   HealthCheckClient::CallState* self =
409       static_cast<HealthCheckClient::CallState*>(arg);
410   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
411   self->call_->Unref(DEBUG_LOCATION, "cancel");
412 }
413 
StartCancel(void * arg,grpc_error_handle)414 void HealthCheckClient::CallState::StartCancel(void* arg,
415                                                grpc_error_handle /*error*/) {
416   HealthCheckClient::CallState* self =
417       static_cast<HealthCheckClient::CallState*>(arg);
418   auto* batch = grpc_make_transport_stream_op(
419       GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
420   batch->cancel_stream = true;
421   batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
422   self->call_->StartTransportStreamOpBatch(batch);
423 }
424 
Cancel()425 void HealthCheckClient::CallState::Cancel() {
426   bool expected = false;
427   if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
428                                        MemoryOrder::ACQUIRE)) {
429     call_->Ref(DEBUG_LOCATION, "cancel").release();
430     GRPC_CALL_COMBINER_START(
431         &call_combiner_,
432         GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
433         GRPC_ERROR_NONE, "health_cancel");
434   }
435 }
436 
OnComplete(void * arg,grpc_error_handle)437 void HealthCheckClient::CallState::OnComplete(void* arg,
438                                               grpc_error_handle /*error*/) {
439   HealthCheckClient::CallState* self =
440       static_cast<HealthCheckClient::CallState*>(arg);
441   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
442   grpc_metadata_batch_destroy(&self->send_initial_metadata_);
443   grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
444   self->call_->Unref(DEBUG_LOCATION, "on_complete");
445 }
446 
RecvInitialMetadataReady(void * arg,grpc_error_handle)447 void HealthCheckClient::CallState::RecvInitialMetadataReady(
448     void* arg, grpc_error_handle /*error*/) {
449   HealthCheckClient::CallState* self =
450       static_cast<HealthCheckClient::CallState*>(arg);
451   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
452   grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
453   self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
454 }
455 
DoneReadingRecvMessage(grpc_error_handle error)456 void HealthCheckClient::CallState::DoneReadingRecvMessage(
457     grpc_error_handle error) {
458   recv_message_.reset();
459   if (error != GRPC_ERROR_NONE) {
460     GRPC_ERROR_UNREF(error);
461     Cancel();
462     grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
463     call_->Unref(DEBUG_LOCATION, "recv_message_ready");
464     return;
465   }
466   const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
467   const grpc_connectivity_state state =
468       healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
469   health_check_client_->SetHealthStatus(
470       state, error == GRPC_ERROR_NONE && !healthy
471                  ? "backend unhealthy"
472                  : grpc_error_std_string(error).c_str());
473   seen_response_.Store(true, MemoryOrder::RELEASE);
474   grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
475   // Start another recv_message batch.
476   // This re-uses the ref we're holding.
477   // Note: Can't just reuse batch_ here, since we don't know that all
478   // callbacks from the original batch have completed yet.
479   recv_message_batch_.payload = &payload_;
480   payload_.recv_message.recv_message = &recv_message_;
481   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
482       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
483   recv_message_batch_.recv_message = true;
484   StartBatch(&recv_message_batch_);
485 }
486 
PullSliceFromRecvMessage()487 grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
488   grpc_slice slice;
489   grpc_error_handle error = recv_message_->Pull(&slice);
490   if (error == GRPC_ERROR_NONE) {
491     grpc_slice_buffer_add(&recv_message_buffer_, slice);
492   }
493   return error;
494 }
495 
ContinueReadingRecvMessage()496 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
497   while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
498     grpc_error_handle error = PullSliceFromRecvMessage();
499     if (error != GRPC_ERROR_NONE) {
500       DoneReadingRecvMessage(error);
501       return;
502     }
503     if (recv_message_buffer_.length == recv_message_->length()) {
504       DoneReadingRecvMessage(GRPC_ERROR_NONE);
505       break;
506     }
507   }
508 }
509 
OnByteStreamNext(void * arg,grpc_error_handle error)510 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
511                                                     grpc_error_handle error) {
512   HealthCheckClient::CallState* self =
513       static_cast<HealthCheckClient::CallState*>(arg);
514   if (error != GRPC_ERROR_NONE) {
515     self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
516     return;
517   }
518   error = self->PullSliceFromRecvMessage();
519   if (error != GRPC_ERROR_NONE) {
520     self->DoneReadingRecvMessage(error);
521     return;
522   }
523   if (self->recv_message_buffer_.length == self->recv_message_->length()) {
524     self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
525   } else {
526     self->ContinueReadingRecvMessage();
527   }
528 }
529 
RecvMessageReady(void * arg,grpc_error_handle)530 void HealthCheckClient::CallState::RecvMessageReady(
531     void* arg, grpc_error_handle /*error*/) {
532   HealthCheckClient::CallState* self =
533       static_cast<HealthCheckClient::CallState*>(arg);
534   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
535   if (self->recv_message_ == nullptr) {
536     self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
537     return;
538   }
539   grpc_slice_buffer_init(&self->recv_message_buffer_);
540   GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
541                     grpc_schedule_on_exec_ctx);
542   self->ContinueReadingRecvMessage();
543   // Ref will continue to be held until we finish draining the byte stream.
544 }
545 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)546 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
547     void* arg, grpc_error_handle error) {
548   HealthCheckClient::CallState* self =
549       static_cast<HealthCheckClient::CallState*>(arg);
550   GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
551                           "recv_trailing_metadata_ready");
552   // Get call status.
553   grpc_status_code status = GRPC_STATUS_UNKNOWN;
554   if (error != GRPC_ERROR_NONE) {
555     grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
556                           nullptr /* slice */, nullptr /* http_error */,
557                           nullptr /* error_string */);
558   } else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
559     status = grpc_get_status_code_from_metadata(
560         self->recv_trailing_metadata_.idx.named.grpc_status->md);
561   }
562   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
563     gpr_log(GPR_INFO,
564             "HealthCheckClient %p CallState %p: health watch failed with "
565             "status %d",
566             self->health_check_client_.get(), self, status);
567   }
568   // Clean up.
569   grpc_metadata_batch_destroy(&self->recv_trailing_metadata_);
570   // For status UNIMPLEMENTED, give up and assume always healthy.
571   bool retry = true;
572   if (status == GRPC_STATUS_UNIMPLEMENTED) {
573     static const char kErrorMessage[] =
574         "health checking Watch method returned UNIMPLEMENTED; "
575         "disabling health checks but assuming server is healthy";
576     gpr_log(GPR_ERROR, kErrorMessage);
577     if (self->health_check_client_->channelz_node_ != nullptr) {
578       self->health_check_client_->channelz_node_->AddTraceEvent(
579           channelz::ChannelTrace::Error,
580           grpc_slice_from_static_string(kErrorMessage));
581     }
582     self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
583                                                 kErrorMessage);
584     retry = false;
585   }
586   MutexLock lock(&self->health_check_client_->mu_);
587   self->CallEndedLocked(retry);
588 }
589 
CallEndedLocked(bool retry)590 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
591   // If this CallState is still in use, this call ended because of a failure,
592   // so we need to stop using it and optionally create a new one.
593   // Otherwise, we have deliberately ended this call, and no further action
594   // is required.
595   if (this == health_check_client_->call_state_.get()) {
596     health_check_client_->call_state_.reset();
597     if (retry) {
598       GPR_ASSERT(!health_check_client_->shutting_down_);
599       if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
600         // If the call fails after we've gotten a successful response, reset
601         // the backoff and restart the call immediately.
602         health_check_client_->retry_backoff_.Reset();
603         health_check_client_->StartCallLocked();
604       } else {
605         // If the call failed without receiving any messages, retry later.
606         health_check_client_->StartRetryTimerLocked();
607       }
608     }
609   }
610   // When the last ref to the call stack goes away, the CallState object
611   // will be automatically destroyed.
612   call_->Unref(DEBUG_LOCATION, "call_ended");
613 }
614 
615 }  // namespace grpc_core
616