1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/client_channel/subchannel_stream_client.h"
18
19 #include <grpc/status.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22 #include <stdio.h>
23
24 #include <utility>
25
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/iomgr/exec_ctx.h"
30 #include "src/core/lib/resource_quota/resource_quota.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/util/debug_location.h"
33 #include "src/core/util/status_helper.h"
34 #include "src/core/util/sync.h"
35 #include "src/core/util/time.h"
36 #include "src/core/util/time_precise.h"
37
38 #define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
39 #define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
40 #define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
41 #define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2
42
43 namespace grpc_core {
44
45 using ::grpc_event_engine::experimental::EventEngine;
46
47 //
48 // SubchannelStreamClient
49 //
50
SubchannelStreamClient(RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,std::unique_ptr<CallEventHandler> event_handler,const char * tracer)51 SubchannelStreamClient::SubchannelStreamClient(
52 RefCountedPtr<ConnectedSubchannel> connected_subchannel,
53 grpc_pollset_set* interested_parties,
54 std::unique_ptr<CallEventHandler> event_handler, const char* tracer)
55 : InternallyRefCounted<SubchannelStreamClient>(tracer),
56 connected_subchannel_(std::move(connected_subchannel)),
57 interested_parties_(interested_parties),
58 tracer_(tracer),
59 call_allocator_(MakeRefCounted<CallArenaAllocator>(
60 connected_subchannel_->args()
61 .GetObject<ResourceQuota>()
62 ->memory_quota()
63 ->CreateMemoryAllocator(
64 (tracer != nullptr) ? tracer : "SubchannelStreamClient"),
65 1024)),
66 event_handler_(std::move(event_handler)),
67 retry_backoff_(
68 BackOff::Options()
69 .set_initial_backoff(Duration::Seconds(
70 SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS))
71 .set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER)
72 .set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER)
73 .set_max_backoff(Duration::Seconds(
74 SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))),
75 event_engine_(connected_subchannel_->args().GetObject<EventEngine>()) {
76 if (GPR_UNLIKELY(tracer_ != nullptr)) {
77 LOG(INFO) << tracer_ << " " << this << ": created SubchannelStreamClient";
78 }
79 StartCall();
80 }
81
~SubchannelStreamClient()82 SubchannelStreamClient::~SubchannelStreamClient() {
83 if (GPR_UNLIKELY(tracer_ != nullptr)) {
84 LOG(INFO) << tracer_ << " " << this
85 << ": destroying SubchannelStreamClient";
86 }
87 }
88
Orphan()89 void SubchannelStreamClient::Orphan() {
90 if (GPR_UNLIKELY(tracer_ != nullptr)) {
91 LOG(INFO) << tracer_ << " " << this
92 << ": SubchannelStreamClient shutting down";
93 }
94 {
95 MutexLock lock(&mu_);
96 event_handler_.reset();
97 call_state_.reset();
98 if (retry_timer_handle_.has_value()) {
99 event_engine_->Cancel(*retry_timer_handle_);
100 retry_timer_handle_.reset();
101 }
102 }
103 Unref(DEBUG_LOCATION, "orphan");
104 }
105
StartCall()106 void SubchannelStreamClient::StartCall() {
107 MutexLock lock(&mu_);
108 StartCallLocked();
109 }
110
StartCallLocked()111 void SubchannelStreamClient::StartCallLocked() {
112 if (event_handler_ == nullptr) return;
113 CHECK(call_state_ == nullptr);
114 if (event_handler_ != nullptr) {
115 event_handler_->OnCallStartLocked(this);
116 }
117 call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
118 if (GPR_UNLIKELY(tracer_ != nullptr)) {
119 LOG(INFO) << tracer_ << " " << this
120 << ": SubchannelStreamClient created CallState "
121 << call_state_.get();
122 }
123 call_state_->StartCallLocked();
124 }
125
StartRetryTimerLocked()126 void SubchannelStreamClient::StartRetryTimerLocked() {
127 if (event_handler_ != nullptr) {
128 event_handler_->OnRetryTimerStartLocked(this);
129 }
130 const Duration timeout = retry_backoff_.NextAttemptDelay();
131 if (GPR_UNLIKELY(tracer_ != nullptr)) {
132 LOG(INFO) << tracer_ << " " << this
133 << ": SubchannelStreamClient health check call lost...";
134 if (timeout > Duration::Zero()) {
135 LOG(INFO) << tracer_ << " " << this << ": ... will retry in "
136 << timeout.millis() << "ms.";
137 } else {
138 LOG(INFO) << tracer_ << " " << this << ": ... retrying immediately.";
139 }
140 }
141 retry_timer_handle_ = event_engine_->RunAfter(
142 timeout, [self = Ref(DEBUG_LOCATION, "health_retry_timer")]() mutable {
143 ApplicationCallbackExecCtx callback_exec_ctx;
144 ExecCtx exec_ctx;
145 self->OnRetryTimer();
146 self.reset(DEBUG_LOCATION, "health_retry_timer");
147 });
148 }
149
OnRetryTimer()150 void SubchannelStreamClient::OnRetryTimer() {
151 MutexLock lock(&mu_);
152 if (event_handler_ != nullptr && retry_timer_handle_.has_value() &&
153 call_state_ == nullptr) {
154 if (GPR_UNLIKELY(tracer_ != nullptr)) {
155 LOG(INFO) << tracer_ << " " << this
156 << ": SubchannelStreamClient restarting health check call";
157 }
158 StartCallLocked();
159 }
160 retry_timer_handle_.reset();
161 }
162
163 //
164 // SubchannelStreamClient::CallState
165 //
166
CallState(RefCountedPtr<SubchannelStreamClient> health_check_client,grpc_pollset_set * interested_parties)167 SubchannelStreamClient::CallState::CallState(
168 RefCountedPtr<SubchannelStreamClient> health_check_client,
169 grpc_pollset_set* interested_parties)
170 : subchannel_stream_client_(std::move(health_check_client)),
171 pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
172 arena_(subchannel_stream_client_->call_allocator_->MakeArena()) {}
173
~CallState()174 SubchannelStreamClient::CallState::~CallState() {
175 if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
176 LOG(INFO) << subchannel_stream_client_->tracer_ << " "
177 << subchannel_stream_client_.get()
178 << ": SubchannelStreamClient destroying CallState " << this;
179 }
180 // Unset the call combiner cancellation closure. This has the
181 // effect of scheduling the previously set cancellation closure, if
182 // any, so that it can release any internal references it may be
183 // holding to the call stack.
184 call_combiner_.SetNotifyOnCancel(nullptr);
185 }
186
Orphan()187 void SubchannelStreamClient::CallState::Orphan() {
188 call_combiner_.Cancel(absl::CancelledError());
189 Cancel();
190 }
191
StartCallLocked()192 void SubchannelStreamClient::CallState::StartCallLocked() {
193 SubchannelCall::Args args = {
194 subchannel_stream_client_->connected_subchannel_,
195 &pollent_,
196 Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
197 gpr_get_cycle_counter(), // start_time
198 Timestamp::InfFuture(), // deadline
199 arena_.get(),
200 &call_combiner_,
201 };
202 grpc_error_handle error;
203 call_ = SubchannelCall::Create(std::move(args), &error).release();
204 // Register after-destruction callback.
205 GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
206 this, grpc_schedule_on_exec_ctx);
207 call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
208 // Check if creation failed.
209 if (!error.ok() || subchannel_stream_client_->event_handler_ == nullptr) {
210 LOG(ERROR) << "SubchannelStreamClient " << subchannel_stream_client_.get()
211 << " CallState " << this << ": error creating "
212 << "stream on subchannel (" << StatusToString(error)
213 << "); will retry";
214 CallEndedLocked(/*retry=*/true);
215 return;
216 }
217 // Initialize payload and batch.
218 batch_.payload = &payload_;
219 // on_complete callback takes ref, handled manually.
220 call_->Ref(DEBUG_LOCATION, "on_complete").release();
221 batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
222 grpc_schedule_on_exec_ctx);
223 // Add send_initial_metadata op.
224 send_initial_metadata_.Set(
225 HttpPathMetadata(),
226 subchannel_stream_client_->event_handler_->GetPathLocked());
227 CHECK(error.ok());
228 payload_.send_initial_metadata.send_initial_metadata =
229 &send_initial_metadata_;
230 batch_.send_initial_metadata = true;
231 // Add send_message op.
232 send_message_.Append(Slice(
233 subchannel_stream_client_->event_handler_->EncodeSendMessageLocked()));
234 payload_.send_message.send_message = &send_message_;
235 batch_.send_message = true;
236 // Add send_trailing_metadata op.
237 payload_.send_trailing_metadata.send_trailing_metadata =
238 &send_trailing_metadata_;
239 batch_.send_trailing_metadata = true;
240 // Add recv_initial_metadata op.
241 payload_.recv_initial_metadata.recv_initial_metadata =
242 &recv_initial_metadata_;
243 payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
244 // recv_initial_metadata_ready callback takes ref, handled manually.
245 call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
246 payload_.recv_initial_metadata.recv_initial_metadata_ready =
247 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
248 this, grpc_schedule_on_exec_ctx);
249 batch_.recv_initial_metadata = true;
250 // Add recv_message op.
251 payload_.recv_message.recv_message = &recv_message_;
252 payload_.recv_message.call_failed_before_recv_message = nullptr;
253 // recv_message callback takes ref, handled manually.
254 call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
255 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
256 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
257 batch_.recv_message = true;
258 // Start batch.
259 StartBatch(&batch_);
260 // Initialize recv_trailing_metadata batch.
261 recv_trailing_metadata_batch_.payload = &payload_;
262 // Add recv_trailing_metadata op.
263 payload_.recv_trailing_metadata.recv_trailing_metadata =
264 &recv_trailing_metadata_;
265 payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
266 // This callback signals the end of the call, so it relies on the
267 // initial ref instead of taking a new ref. When it's invoked, the
268 // initial ref is released.
269 payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
270 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
271 RecvTrailingMetadataReady, this,
272 grpc_schedule_on_exec_ctx);
273 recv_trailing_metadata_batch_.recv_trailing_metadata = true;
274 // Start recv_trailing_metadata batch.
275 StartBatch(&recv_trailing_metadata_batch_);
276 }
277
StartBatchInCallCombiner(void * arg,grpc_error_handle)278 void SubchannelStreamClient::CallState::StartBatchInCallCombiner(
279 void* arg, grpc_error_handle /*error*/) {
280 auto* batch = static_cast<grpc_transport_stream_op_batch*>(arg);
281 auto* call = static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
282 call->StartTransportStreamOpBatch(batch);
283 }
284
StartBatch(grpc_transport_stream_op_batch * batch)285 void SubchannelStreamClient::CallState::StartBatch(
286 grpc_transport_stream_op_batch* batch) {
287 batch->handler_private.extra_arg = call_;
288 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
289 batch, grpc_schedule_on_exec_ctx);
290 GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
291 absl::OkStatus(), "start_subchannel_batch");
292 }
293
AfterCallStackDestruction(void * arg,grpc_error_handle)294 void SubchannelStreamClient::CallState::AfterCallStackDestruction(
295 void* arg, grpc_error_handle /*error*/) {
296 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
297 delete self;
298 }
299
OnCancelComplete(void * arg,grpc_error_handle)300 void SubchannelStreamClient::CallState::OnCancelComplete(
301 void* arg, grpc_error_handle /*error*/) {
302 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
303 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
304 self->call_->Unref(DEBUG_LOCATION, "cancel");
305 }
306
StartCancel(void * arg,grpc_error_handle)307 void SubchannelStreamClient::CallState::StartCancel(
308 void* arg, grpc_error_handle /*error*/) {
309 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
310 auto* batch = grpc_make_transport_stream_op(
311 GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
312 batch->cancel_stream = true;
313 batch->payload->cancel_stream.cancel_error = absl::CancelledError();
314 self->call_->StartTransportStreamOpBatch(batch);
315 }
316
Cancel()317 void SubchannelStreamClient::CallState::Cancel() {
318 bool expected = false;
319 if (cancelled_.compare_exchange_strong(expected, true,
320 std::memory_order_acq_rel,
321 std::memory_order_acquire)) {
322 call_->Ref(DEBUG_LOCATION, "cancel").release();
323 GRPC_CALL_COMBINER_START(
324 &call_combiner_,
325 GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
326 absl::OkStatus(), "health_cancel");
327 }
328 }
329
OnComplete(void * arg,grpc_error_handle)330 void SubchannelStreamClient::CallState::OnComplete(
331 void* arg, grpc_error_handle /*error*/) {
332 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
333 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
334 self->send_initial_metadata_.Clear();
335 self->send_trailing_metadata_.Clear();
336 self->call_->Unref(DEBUG_LOCATION, "on_complete");
337 }
338
RecvInitialMetadataReady(void * arg,grpc_error_handle)339 void SubchannelStreamClient::CallState::RecvInitialMetadataReady(
340 void* arg, grpc_error_handle /*error*/) {
341 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
342 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
343 self->recv_initial_metadata_.Clear();
344 self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
345 }
346
RecvMessageReady()347 void SubchannelStreamClient::CallState::RecvMessageReady() {
348 if (!recv_message_.has_value()) {
349 call_->Unref(DEBUG_LOCATION, "recv_message_ready");
350 return;
351 }
352 // Report payload.
353 {
354 MutexLock lock(&subchannel_stream_client_->mu_);
355 if (subchannel_stream_client_->event_handler_ != nullptr) {
356 absl::Status status =
357 subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
358 subchannel_stream_client_.get(), recv_message_->JoinIntoString());
359 if (!status.ok()) {
360 if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
361 LOG(INFO) << subchannel_stream_client_->tracer_ << " "
362 << subchannel_stream_client_.get()
363 << ": SubchannelStreamClient CallState " << this
364 << ": failed to parse response message: " << status;
365 }
366 Cancel();
367 }
368 }
369 }
370 seen_response_.store(true, std::memory_order_release);
371 recv_message_.reset();
372 // Start another recv_message batch.
373 // This re-uses the ref we're holding.
374 // Note: Can't just reuse batch_ here, since we don't know that all
375 // callbacks from the original batch have completed yet.
376 recv_message_batch_.payload = &payload_;
377 payload_.recv_message.recv_message = &recv_message_;
378 payload_.recv_message.call_failed_before_recv_message = nullptr;
379 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
380 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
381 recv_message_batch_.recv_message = true;
382 StartBatch(&recv_message_batch_);
383 }
384
RecvMessageReady(void * arg,grpc_error_handle)385 void SubchannelStreamClient::CallState::RecvMessageReady(
386 void* arg, grpc_error_handle /*error*/) {
387 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
388 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
389 self->RecvMessageReady();
390 }
391
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)392 void SubchannelStreamClient::CallState::RecvTrailingMetadataReady(
393 void* arg, grpc_error_handle error) {
394 auto* self = static_cast<SubchannelStreamClient::CallState*>(arg);
395 GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
396 "recv_trailing_metadata_ready");
397 // Get call status.
398 grpc_status_code status =
399 self->recv_trailing_metadata_.get(GrpcStatusMetadata())
400 .value_or(GRPC_STATUS_UNKNOWN);
401 if (!error.ok()) {
402 grpc_error_get_status(error, Timestamp::InfFuture(), &status,
403 nullptr /* slice */, nullptr /* http_error */,
404 nullptr /* error_string */);
405 }
406 if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) {
407 LOG(INFO) << self->subchannel_stream_client_->tracer_ << " "
408 << self->subchannel_stream_client_.get()
409 << ": SubchannelStreamClient CallState " << self
410 << ": health watch failed with status " << status;
411 }
412 // Clean up.
413 self->recv_trailing_metadata_.Clear();
414 // Report call end.
415 MutexLock lock(&self->subchannel_stream_client_->mu_);
416 if (self->subchannel_stream_client_->event_handler_ != nullptr) {
417 self->subchannel_stream_client_->event_handler_
418 ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(),
419 status);
420 }
421 // For status UNIMPLEMENTED, give up and assume always healthy.
422 self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED);
423 }
424
CallEndedLocked(bool retry)425 void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) {
426 // If this CallState is still in use, this call ended because of a failure,
427 // so we need to stop using it and optionally create a new one.
428 // Otherwise, we have deliberately ended this call, and no further action
429 // is required.
430 if (this == subchannel_stream_client_->call_state_.get()) {
431 subchannel_stream_client_->call_state_.reset();
432 if (retry) {
433 CHECK(subchannel_stream_client_->event_handler_ != nullptr);
434 if (seen_response_.load(std::memory_order_acquire)) {
435 // If the call fails after we've gotten a successful response, reset
436 // the backoff and restart the call immediately.
437 subchannel_stream_client_->retry_backoff_.Reset();
438 subchannel_stream_client_->StartCallLocked();
439 } else {
440 // If the call failed without receiving any messages, retry later.
441 subchannel_stream_client_->StartRetryTimerLocked();
442 }
443 }
444 }
445 // When the last ref to the call stack goes away, the CallState object
446 // will be automatically destroyed.
447 call_->Unref(DEBUG_LOCATION, "call_ended");
448 }
449
450 } // namespace grpc_core
451