• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/client_channel/subchannel_stream_client.h"
18 
19 #include <grpc/status.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22 #include <stdio.h>
23 
24 #include <utility>
25 
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/resource_quota/resource_quota.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/util/debug_location.h"
33 #include "src/core/util/status_helper.h"
34 #include "src/core/util/sync.h"
35 #include "src/core/util/time.h"
36 #include "src/core/util/time_precise.h"
37 
38 #define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
39 #define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
40 #define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
41 #define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2
42 
43 namespace grpc_core {
44 
45 using ::grpc_event_engine::experimental::EventEngine;
46 
47 //
48 // SubchannelStreamClient
49 //
50 
SubchannelStreamClient(RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,std::unique_ptr<CallEventHandler> event_handler,const char * tracer)51 SubchannelStreamClient::SubchannelStreamClient(
52     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
53     grpc_pollset_set* interested_parties,
54     std::unique_ptr<CallEventHandler> event_handler, const char* tracer)
55     : InternallyRefCounted<SubchannelStreamClient>(tracer),
56       connected_subchannel_(std::move(connected_subchannel)),
57       interested_parties_(interested_parties),
58       tracer_(tracer),
59       call_allocator_(MakeRefCounted<CallArenaAllocator>(
60           connected_subchannel_->args()
61               .GetObject<ResourceQuota>()
62               ->memory_quota()
63               ->CreateMemoryAllocator(
64                   (tracer != nullptr) ? tracer : "SubchannelStreamClient"),
65           1024)),
66       event_handler_(std::move(event_handler)),
67       retry_backoff_(
68           BackOff::Options()
69               .set_initial_backoff(Duration::Seconds(
70                   SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS))
71               .set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER)
72               .set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER)
73               .set_max_backoff(Duration::Seconds(
74                   SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))),
75       event_engine_(connected_subchannel_->args().GetObject<EventEngine>()) {
76   if (GPR_UNLIKELY(tracer_ != nullptr)) {
77     LOG(INFO) << tracer_ << " " << this << ": created SubchannelStreamClient";
78   }
79   StartCall();
80 }
81 
~SubchannelStreamClient()82 SubchannelStreamClient::~SubchannelStreamClient() {
83   if (GPR_UNLIKELY(tracer_ != nullptr)) {
84     LOG(INFO) << tracer_ << " " << this
85               << ": destroying SubchannelStreamClient";
86   }
87 }
88 
Orphan()89 void SubchannelStreamClient::Orphan() {
90   if (GPR_UNLIKELY(tracer_ != nullptr)) {
91     LOG(INFO) << tracer_ << " " << this
92               << ": SubchannelStreamClient shutting down";
93   }
94   {
95     MutexLock lock(&mu_);
96     event_handler_.reset();
97     call_state_.reset();
98     if (retry_timer_handle_.has_value()) {
99       event_engine_->Cancel(*retry_timer_handle_);
100       retry_timer_handle_.reset();
101     }
102   }
103   Unref(DEBUG_LOCATION, "orphan");
104 }
105 
StartCall()106 void SubchannelStreamClient::StartCall() {
107   MutexLock lock(&mu_);
108   StartCallLocked();
109 }
110 
StartCallLocked()111 void SubchannelStreamClient::StartCallLocked() {
112   if (event_handler_ == nullptr) return;
113   CHECK(call_state_ == nullptr);
114   if (event_handler_ != nullptr) {
115     event_handler_->OnCallStartLocked(this);
116   }
117   call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
118   if (GPR_UNLIKELY(tracer_ != nullptr)) {
119     LOG(INFO) << tracer_ << " " << this
120               << ": SubchannelStreamClient created CallState "
121               << call_state_.get();
122   }
123   call_state_->StartCallLocked();
124 }
125 
StartRetryTimerLocked()126 void SubchannelStreamClient::StartRetryTimerLocked() {
127   if (event_handler_ != nullptr) {
128     event_handler_->OnRetryTimerStartLocked(this);
129   }
130   const Duration timeout = retry_backoff_.NextAttemptDelay();
131   if (GPR_UNLIKELY(tracer_ != nullptr)) {
132     LOG(INFO) << tracer_ << " " << this
133               << ": SubchannelStreamClient health check call lost...";
134     if (timeout > Duration::Zero()) {
135       LOG(INFO) << tracer_ << " " << this << ": ... will retry in "
136                 << timeout.millis() << "ms.";
137     } else {
138       LOG(INFO) << tracer_ << " " << this << ": ... retrying immediately.";
139     }
140   }
141   retry_timer_handle_ = event_engine_->RunAfter(
142       timeout, [self = Ref(DEBUG_LOCATION, "health_retry_timer")]() mutable {
143         ApplicationCallbackExecCtx callback_exec_ctx;
144         ExecCtx exec_ctx;
145         self->OnRetryTimer();
146         self.reset(DEBUG_LOCATION, "health_retry_timer");
147       });
148 }
149 
OnRetryTimer()150 void SubchannelStreamClient::OnRetryTimer() {
151   MutexLock lock(&mu_);
152   if (event_handler_ != nullptr && retry_timer_handle_.has_value() &&
153       call_state_ == nullptr) {
154     if (GPR_UNLIKELY(tracer_ != nullptr)) {
155       LOG(INFO) << tracer_ << " " << this
156                 << ": SubchannelStreamClient restarting health check call";
157     }
158     StartCallLocked();
159   }
160   retry_timer_handle_.reset();
161 }
162 
163 //
164 // SubchannelStreamClient::CallState
165 //
166 
CallState(RefCountedPtr<SubchannelStreamClient> health_check_client,grpc_pollset_set * interested_parties)167 SubchannelStreamClient::CallState::CallState(
168     RefCountedPtr<SubchannelStreamClient> health_check_client,
169     grpc_pollset_set* interested_parties)
170     : subchannel_stream_client_(std::move(health_check_client)),
171       pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
172       arena_(subchannel_stream_client_->call_allocator_->MakeArena()) {}
173 
~CallState()174 SubchannelStreamClient::CallState::~CallState() {
175   if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
176     LOG(INFO) << subchannel_stream_client_->tracer_ << " "
177               << subchannel_stream_client_.get()
178               << ": SubchannelStreamClient destroying CallState " << this;
179   }
180   // Unset the call combiner cancellation closure.  This has the
181   // effect of scheduling the previously set cancellation closure, if
182   // any, so that it can release any internal references it may be
183   // holding to the call stack.
184   call_combiner_.SetNotifyOnCancel(nullptr);
185 }
186 
Orphan()187 void SubchannelStreamClient::CallState::Orphan() {
188   call_combiner_.Cancel(absl::CancelledError());
189   Cancel();
190 }
191 
StartCallLocked()192 void SubchannelStreamClient::CallState::StartCallLocked() {
193   SubchannelCall::Args args = {
194       subchannel_stream_client_->connected_subchannel_,
195       &pollent_,
196       Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
197       gpr_get_cycle_counter(),  // start_time
198       Timestamp::InfFuture(),   // deadline
199       arena_.get(),
200       &call_combiner_,
201   };
202   grpc_error_handle error;
203   call_ = SubchannelCall::Create(std::move(args), &error).release();
204   // Register after-destruction callback.
205   GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
206                     this, grpc_schedule_on_exec_ctx);
207   call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
208   // Check if creation failed.
209   if (!error.ok() || subchannel_stream_client_->event_handler_ == nullptr) {
210     LOG(ERROR) << "SubchannelStreamClient " << subchannel_stream_client_.get()
211                << " CallState " << this << ": error creating "
212                << "stream on subchannel (" << StatusToString(error)
213                << "); will retry";
214     CallEndedLocked(/*retry=*/true);
215     return;
216   }
217   // Initialize payload and batch.
218   batch_.payload = &payload_;
219   // on_complete callback takes ref, handled manually.
220   call_->Ref(DEBUG_LOCATION, "on_complete").release();
221   batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
222                                          grpc_schedule_on_exec_ctx);
223   // Add send_initial_metadata op.
224   send_initial_metadata_.Set(
225       HttpPathMetadata(),
226       subchannel_stream_client_->event_handler_->GetPathLocked());
227   CHECK(error.ok());
228   payload_.send_initial_metadata.send_initial_metadata =
229       &send_initial_metadata_;
230   batch_.send_initial_metadata = true;
231   // Add send_message op.
232   send_message_.Append(Slice(
233       subchannel_stream_client_->event_handler_->EncodeSendMessageLocked()));
234   payload_.send_message.send_message = &send_message_;
235   batch_.send_message = true;
236   // Add send_trailing_metadata op.
237   payload_.send_trailing_metadata.send_trailing_metadata =
238       &send_trailing_metadata_;
239   batch_.send_trailing_metadata = true;
240   // Add recv_initial_metadata op.
241   payload_.recv_initial_metadata.recv_initial_metadata =
242       &recv_initial_metadata_;
243   payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
244   // recv_initial_metadata_ready callback takes ref, handled manually.
245   call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
246   payload_.recv_initial_metadata.recv_initial_metadata_ready =
247       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
248                         this, grpc_schedule_on_exec_ctx);
249   batch_.recv_initial_metadata = true;
250   // Add recv_message op.
251   payload_.recv_message.recv_message = &recv_message_;
252   payload_.recv_message.call_failed_before_recv_message = nullptr;
253   // recv_message callback takes ref, handled manually.
254   call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
255   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
256       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
257   batch_.recv_message = true;
258   // Start batch.
259   StartBatch(&batch_);
260   // Initialize recv_trailing_metadata batch.
261   recv_trailing_metadata_batch_.payload = &payload_;
262   // Add recv_trailing_metadata op.
263   payload_.recv_trailing_metadata.recv_trailing_metadata =
264       &recv_trailing_metadata_;
265   payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
266   // This callback signals the end of the call, so it relies on the
267   // initial ref instead of taking a new ref.  When it's invoked, the
268   // initial ref is released.
269   payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
270       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
271                         RecvTrailingMetadataReady, this,
272                         grpc_schedule_on_exec_ctx);
273   recv_trailing_metadata_batch_.recv_trailing_metadata = true;
274   // Start recv_trailing_metadata batch.
275   StartBatch(&recv_trailing_metadata_batch_);
276 }
277 
StartBatchInCallCombiner(void * arg,grpc_error_handle)278 void SubchannelStreamClient::CallState::StartBatchInCallCombiner(
279     void* arg, grpc_error_handle /*error*/) {
280   auto* batch = static_cast<grpc_transport_stream_op_batch*>(arg);
281   auto* call = static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
282   call->StartTransportStreamOpBatch(batch);
283 }
284 
StartBatch(grpc_transport_stream_op_batch * batch)285 void SubchannelStreamClient::CallState::StartBatch(
286     grpc_transport_stream_op_batch* batch) {
287   batch->handler_private.extra_arg = call_;
288   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
289                     batch, grpc_schedule_on_exec_ctx);
290   GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
291                            absl::OkStatus(), "start_subchannel_batch");
292 }
293 
AfterCallStackDestruction(void * arg,grpc_error_handle)294 void SubchannelStreamClient::CallState::AfterCallStackDestruction(
295     void* arg, grpc_error_handle /*error*/) {
296   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
297   delete self;
298 }
299 
OnCancelComplete(void * arg,grpc_error_handle)300 void SubchannelStreamClient::CallState::OnCancelComplete(
301     void* arg, grpc_error_handle /*error*/) {
302   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
303   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
304   self->call_->Unref(DEBUG_LOCATION, "cancel");
305 }
306 
StartCancel(void * arg,grpc_error_handle)307 void SubchannelStreamClient::CallState::StartCancel(
308     void* arg, grpc_error_handle /*error*/) {
309   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
310   auto* batch = grpc_make_transport_stream_op(
311       GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
312   batch->cancel_stream = true;
313   batch->payload->cancel_stream.cancel_error = absl::CancelledError();
314   self->call_->StartTransportStreamOpBatch(batch);
315 }
316 
Cancel()317 void SubchannelStreamClient::CallState::Cancel() {
318   bool expected = false;
319   if (cancelled_.compare_exchange_strong(expected, true,
320                                          std::memory_order_acq_rel,
321                                          std::memory_order_acquire)) {
322     call_->Ref(DEBUG_LOCATION, "cancel").release();
323     GRPC_CALL_COMBINER_START(
324         &call_combiner_,
325         GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
326         absl::OkStatus(), "health_cancel");
327   }
328 }
329 
OnComplete(void * arg,grpc_error_handle)330 void SubchannelStreamClient::CallState::OnComplete(
331     void* arg, grpc_error_handle /*error*/) {
332   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
333   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
334   self->send_initial_metadata_.Clear();
335   self->send_trailing_metadata_.Clear();
336   self->call_->Unref(DEBUG_LOCATION, "on_complete");
337 }
338 
RecvInitialMetadataReady(void * arg,grpc_error_handle)339 void SubchannelStreamClient::CallState::RecvInitialMetadataReady(
340     void* arg, grpc_error_handle /*error*/) {
341   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
342   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
343   self->recv_initial_metadata_.Clear();
344   self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
345 }
346 
RecvMessageReady()347 void SubchannelStreamClient::CallState::RecvMessageReady() {
348   if (!recv_message_.has_value()) {
349     call_->Unref(DEBUG_LOCATION, "recv_message_ready");
350     return;
351   }
352   // Report payload.
353   {
354     MutexLock lock(&subchannel_stream_client_->mu_);
355     if (subchannel_stream_client_->event_handler_ != nullptr) {
356       absl::Status status =
357           subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
358               subchannel_stream_client_.get(), recv_message_->JoinIntoString());
359       if (!status.ok()) {
360         if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
361           LOG(INFO) << subchannel_stream_client_->tracer_ << " "
362                     << subchannel_stream_client_.get()
363                     << ": SubchannelStreamClient CallState " << this
364                     << ": failed to parse response message: " << status;
365         }
366         Cancel();
367       }
368     }
369   }
370   seen_response_.store(true, std::memory_order_release);
371   recv_message_.reset();
372   // Start another recv_message batch.
373   // This re-uses the ref we're holding.
374   // Note: Can't just reuse batch_ here, since we don't know that all
375   // callbacks from the original batch have completed yet.
376   recv_message_batch_.payload = &payload_;
377   payload_.recv_message.recv_message = &recv_message_;
378   payload_.recv_message.call_failed_before_recv_message = nullptr;
379   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
380       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
381   recv_message_batch_.recv_message = true;
382   StartBatch(&recv_message_batch_);
383 }
384 
RecvMessageReady(void * arg,grpc_error_handle)385 void SubchannelStreamClient::CallState::RecvMessageReady(
386     void* arg, grpc_error_handle /*error*/) {
387   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
388   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
389   self->RecvMessageReady();
390 }
391 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)392 void SubchannelStreamClient::CallState::RecvTrailingMetadataReady(
393     void* arg, grpc_error_handle error) {
394   auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
395   GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
396                           "recv_trailing_metadata_ready");
397   // Get call status.
398   grpc_status_code status =
399       self->recv_trailing_metadata_.get(GrpcStatusMetadata())
400           .value_or(GRPC_STATUS_UNKNOWN);
401   if (!error.ok()) {
402     grpc_error_get_status(error, Timestamp::InfFuture(), &status,
403                           nullptr /* slice */, nullptr /* http_error */,
404                           nullptr /* error_string */);
405   }
406   if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) {
407     LOG(INFO) << self->subchannel_stream_client_->tracer_ << " "
408               << self->subchannel_stream_client_.get()
409               << ": SubchannelStreamClient CallState " << self
410               << ": health watch failed with status " << status;
411   }
412   // Clean up.
413   self->recv_trailing_metadata_.Clear();
414   // Report call end.
415   MutexLock lock(&self->subchannel_stream_client_->mu_);
416   if (self->subchannel_stream_client_->event_handler_ != nullptr) {
417     self->subchannel_stream_client_->event_handler_
418         ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(),
419                                           status);
420   }
421   // For status UNIMPLEMENTED, give up and assume always healthy.
422   self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED);
423 }
424 
CallEndedLocked(bool retry)425 void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) {
426   // If this CallState is still in use, this call ended because of a failure,
427   // so we need to stop using it and optionally create a new one.
428   // Otherwise, we have deliberately ended this call, and no further action
429   // is required.
430   if (this == subchannel_stream_client_->call_state_.get()) {
431     subchannel_stream_client_->call_state_.reset();
432     if (retry) {
433       CHECK(subchannel_stream_client_->event_handler_ != nullptr);
434       if (seen_response_.load(std::memory_order_acquire)) {
435         // If the call fails after we've gotten a successful response, reset
436         // the backoff and restart the call immediately.
437         subchannel_stream_client_->retry_backoff_.Reset();
438         subchannel_stream_client_->StartCallLocked();
439       } else {
440         // If the call failed without receiving any messages, retry later.
441         subchannel_stream_client_->StartRetryTimerLocked();
442       }
443     }
444   }
445   // When the last ref to the call stack goes away, the CallState object
446   // will be automatically destroyed.
447   call_->Unref(DEBUG_LOCATION, "call_ended");
448 }
449 
450 }  // namespace grpc_core
451