1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/client_channel/load_balanced_call_destination.h"
16
17 #include "absl/log/log.h"
18 #include "src/core/client_channel/client_channel.h"
19 #include "src/core/client_channel/client_channel_internal.h"
20 #include "src/core/client_channel/lb_metadata.h"
21 #include "src/core/client_channel/subchannel.h"
22 #include "src/core/config/core_configuration.h"
23 #include "src/core/lib/channel/status_util.h"
24 #include "src/core/lib/promise/loop.h"
25 #include "src/core/telemetry/call_tracer.h"
26
27 namespace grpc_core {
28
29 namespace {
30
MaybeCreateCallAttemptTracer(bool is_transparent_retry)31 void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
32 auto* call_tracer = MaybeGetContext<ClientCallTracer>();
33 if (call_tracer == nullptr) return;
34 auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
35 SetContext<CallTracerInterface>(tracer);
36 }
37
38 class LbCallState : public ClientChannelLbCallState {
39 public:
Alloc(size_t size)40 void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); }
41
42 // Internal API to allow first-party LB policies to access per-call
43 // attributes set by the ConfigSelector.
GetCallAttribute(UniqueTypeName type) const44 ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
45 UniqueTypeName type) const override {
46 auto* service_config_call_data = GetContext<ServiceConfigCallData>();
47 return service_config_call_data->GetCallAttribute(type);
48 }
49
GetCallAttemptTracer() const50 ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override {
51 return GetContext<ClientCallTracer::CallAttemptTracer>();
52 }
53 };
54
55 // TODO(roth): Remove this in favor of src/core/util/match.h function once
56 // we can do that without breaking lock annotations.
57 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)58 T HandlePickResult(
59 LoadBalancingPolicy::PickResult* result,
60 std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
61 std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
62 std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
63 std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
64 auto* complete_pick =
65 absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
66 if (complete_pick != nullptr) {
67 return complete_func(complete_pick);
68 }
69 auto* queue_pick =
70 absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
71 if (queue_pick != nullptr) {
72 return queue_func(queue_pick);
73 }
74 auto* fail_pick =
75 absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
76 if (fail_pick != nullptr) {
77 return fail_func(fail_pick);
78 }
79 auto* drop_pick =
80 absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
81 CHECK(drop_pick != nullptr);
82 return drop_func(drop_pick);
83 }
84
85 // Does an LB pick for a call. Returns one of the following things:
86 // - Continue{}, meaning to queue the pick
87 // - a non-OK status, meaning to fail the call
88 // - a call destination, meaning that the pick is complete
89 // When the pick is complete, pushes client_initial_metadata onto
90 // call_initiator. Also adds the subchannel call tracker (if any) to
91 // context.
PickSubchannel(LoadBalancingPolicy::SubchannelPicker & picker,UnstartedCallHandler & unstarted_handler)92 LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> PickSubchannel(
93 LoadBalancingPolicy::SubchannelPicker& picker,
94 UnstartedCallHandler& unstarted_handler) {
95 // Perform LB pick.
96 auto& client_initial_metadata =
97 unstarted_handler.UnprocessedClientInitialMetadata();
98 LoadBalancingPolicy::PickArgs pick_args;
99 Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata());
100 CHECK(path != nullptr);
101 pick_args.path = path->as_string_view();
102 LbCallState lb_call_state;
103 pick_args.call_state = &lb_call_state;
104 LbMetadata initial_metadata(&client_initial_metadata);
105 pick_args.initial_metadata = &initial_metadata;
106 auto result = picker.Pick(pick_args);
107 // Handle result.
108 return HandlePickResult<
109 LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>>>(
110 &result,
111 // CompletePick
112 [&](LoadBalancingPolicy::PickResult::Complete* complete_pick)
113 -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
114 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
115 << "client_channel: " << GetContext<Activity>()->DebugTag()
116 << " pick succeeded: subchannel="
117 << complete_pick->subchannel.get();
118 CHECK(complete_pick->subchannel != nullptr);
119 // Grab a ref to the call destination while we're still
120 // holding the data plane mutex.
121 auto call_destination =
122 DownCast<SubchannelInterfaceWithCallDestination*>(
123 complete_pick->subchannel.get())
124 ->call_destination();
125 // If the subchannel has no call destination (e.g., if the
126 // subchannel has moved out of state READY but the LB policy hasn't
127 // yet seen that change and given us a new picker), then just
128 // queue the pick. We'll try again as soon as we get a new picker.
129 if (call_destination == nullptr) {
130 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
131 << "client_channel: " << GetContext<Activity>()->DebugTag()
132 << " returned by LB picker has no connected subchannel; queueing "
133 "pick";
134 return Continue{};
135 }
136 // If the LB policy returned a call tracker, inform it that the
137 // call is starting and add it to context, so that we can notify
138 // it when the call finishes.
139 if (complete_pick->subchannel_call_tracker != nullptr) {
140 complete_pick->subchannel_call_tracker->Start();
141 SetContext(complete_pick->subchannel_call_tracker.release());
142 }
143 // Apply metadata mutations, if any.
144 MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
145 &client_initial_metadata);
146 MaybeOverrideAuthority(std::move(complete_pick->authority_override),
147 &client_initial_metadata);
148 // Return the connected subchannel.
149 return call_destination;
150 },
151 // QueuePick
152 [&](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
153 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
154 << "client_channel: " << GetContext<Activity>()->DebugTag()
155 << " pick queued";
156 return Continue{};
157 },
158 // FailPick
159 [&](LoadBalancingPolicy::PickResult::Fail* fail_pick)
160 -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
161 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
162 << "client_channel: " << GetContext<Activity>()->DebugTag()
163 << " pick failed: " << fail_pick->status;
164 // If wait_for_ready is false, then the error indicates the RPC
165 // attempt's final status.
166 if (!unstarted_handler.UnprocessedClientInitialMetadata()
167 .GetOrCreatePointer(WaitForReady())
168 ->value) {
169 return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status),
170 "LB pick");
171 }
172 // If wait_for_ready is true, then queue to retry when we get a new
173 // picker.
174 return Continue{};
175 },
176 // DropPick
177 [&](LoadBalancingPolicy::PickResult::Drop* drop_pick)
178 -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
179 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
180 << "client_channel: " << GetContext<Activity>()->DebugTag()
181 << " pick dropped: " << drop_pick->status;
182 return grpc_error_set_int(MaybeRewriteIllegalStatusCode(
183 std::move(drop_pick->status), "LB drop"),
184 StatusIntProperty::kLbPolicyDrop, 1);
185 });
186 }
187
188 } // namespace
189
StartCall(UnstartedCallHandler unstarted_handler)190 void LoadBalancedCallDestination::StartCall(
191 UnstartedCallHandler unstarted_handler) {
192 // If there is a call tracer, create a call attempt tracer.
193 bool* is_transparent_retry_metadata =
194 unstarted_handler.UnprocessedClientInitialMetadata().get_pointer(
195 IsTransparentRetry());
196 bool is_transparent_retry = is_transparent_retry_metadata != nullptr
197 ? *is_transparent_retry_metadata
198 : false;
199 MaybeCreateCallAttemptTracer(is_transparent_retry);
200 // Spawn a promise to do the LB pick.
201 // This will eventually start the call.
202 unstarted_handler.SpawnGuardedUntilCallCompletes(
203 "lb_pick", [unstarted_handler, picker = picker_]() mutable {
204 return Map(
205 // Wait for the LB picker.
206 CheckDelayed(Loop(
207 [last_picker =
208 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(),
209 unstarted_handler, picker]() mutable {
210 return Map(
211 picker.Next(last_picker),
212 [unstarted_handler, &last_picker](
213 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
214 picker) mutable {
215 CHECK_NE(picker.get(), nullptr);
216 last_picker = std::move(picker);
217 // Returns 3 possible things:
218 // - Continue to queue the pick
219 // - non-OK status to fail the pick
220 // - a connected subchannel to complete the pick
221 return PickSubchannel(*last_picker, unstarted_handler);
222 });
223 })),
224 // Create call stack on the connected subchannel.
225 [unstarted_handler](
226 std::tuple<
227 absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>,
228 bool>
229 pick_result) {
230 auto& call_destination = std::get<0>(pick_result);
231 const bool was_queued = std::get<1>(pick_result);
232 if (!call_destination.ok()) {
233 return call_destination.status();
234 }
235 // LB pick is done, so indicate that we've committed.
236 auto* on_commit = MaybeGetContext<LbOnCommit>();
237 if (on_commit != nullptr && *on_commit != nullptr) {
238 (*on_commit)();
239 }
240 // If it was queued, add a trace annotation.
241 if (was_queued) {
242 auto* tracer =
243 MaybeGetContext<ClientCallTracer::CallAttemptTracer>();
244 if (tracer != nullptr) {
245 tracer->RecordAnnotation("Delayed LB pick complete.");
246 }
247 }
248 // Delegate to connected subchannel.
249 // TODO(ctiller): need to insert LbCallTracingFilter at the top of
250 // the stack
251 (*call_destination)->StartCall(unstarted_handler);
252 return absl::OkStatus();
253 });
254 });
255 }
256
RegisterLoadBalancedCallDestination(CoreConfiguration::Builder * builder)257 void RegisterLoadBalancedCallDestination(CoreConfiguration::Builder* builder) {
258 class LoadBalancedCallDestinationFactory final
259 : public ClientChannel::CallDestinationFactory {
260 public:
261 RefCountedPtr<UnstartedCallDestination> CreateCallDestination(
262 ClientChannel::PickerObservable picker) override {
263 return MakeRefCounted<LoadBalancedCallDestination>(std::move(picker));
264 }
265 };
266
267 builder->channel_args_preconditioning()->RegisterStage([](ChannelArgs args) {
268 return args.SetObject(
269 NoDestructSingleton<LoadBalancedCallDestinationFactory>::Get());
270 });
271 }
272
273 } // namespace grpc_core
274