• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/client_channel/load_balanced_call_destination.h"
16 
17 #include "absl/log/log.h"
18 #include "src/core/client_channel/client_channel.h"
19 #include "src/core/client_channel/client_channel_internal.h"
20 #include "src/core/client_channel/lb_metadata.h"
21 #include "src/core/client_channel/subchannel.h"
22 #include "src/core/config/core_configuration.h"
23 #include "src/core/lib/channel/status_util.h"
24 #include "src/core/lib/promise/loop.h"
25 #include "src/core/telemetry/call_tracer.h"
26 
27 namespace grpc_core {
28 
29 namespace {
30 
MaybeCreateCallAttemptTracer(bool is_transparent_retry)31 void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
32   auto* call_tracer = MaybeGetContext<ClientCallTracer>();
33   if (call_tracer == nullptr) return;
34   auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
35   SetContext<CallTracerInterface>(tracer);
36 }
37 
38 class LbCallState : public ClientChannelLbCallState {
39  public:
Alloc(size_t size)40   void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); }
41 
42   // Internal API to allow first-party LB policies to access per-call
43   // attributes set by the ConfigSelector.
GetCallAttribute(UniqueTypeName type) const44   ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
45       UniqueTypeName type) const override {
46     auto* service_config_call_data = GetContext<ServiceConfigCallData>();
47     return service_config_call_data->GetCallAttribute(type);
48   }
49 
GetCallAttemptTracer() const50   ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override {
51     return GetContext<ClientCallTracer::CallAttemptTracer>();
52   }
53 };
54 
55 // TODO(roth): Remove this in favor of src/core/util/match.h function once
56 // we can do that without breaking lock annotations.
57 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)58 T HandlePickResult(
59     LoadBalancingPolicy::PickResult* result,
60     std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
61     std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
62     std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
63     std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
64   auto* complete_pick =
65       absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
66   if (complete_pick != nullptr) {
67     return complete_func(complete_pick);
68   }
69   auto* queue_pick =
70       absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
71   if (queue_pick != nullptr) {
72     return queue_func(queue_pick);
73   }
74   auto* fail_pick =
75       absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
76   if (fail_pick != nullptr) {
77     return fail_func(fail_pick);
78   }
79   auto* drop_pick =
80       absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
81   CHECK(drop_pick != nullptr);
82   return drop_func(drop_pick);
83 }
84 
85 // Does an LB pick for a call.  Returns one of the following things:
86 // - Continue{}, meaning to queue the pick
87 // - a non-OK status, meaning to fail the call
88 // - a call destination, meaning that the pick is complete
89 // When the pick is complete, pushes client_initial_metadata onto
90 // call_initiator.  Also adds the subchannel call tracker (if any) to
91 // context.
PickSubchannel(LoadBalancingPolicy::SubchannelPicker & picker,UnstartedCallHandler & unstarted_handler)92 LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> PickSubchannel(
93     LoadBalancingPolicy::SubchannelPicker& picker,
94     UnstartedCallHandler& unstarted_handler) {
95   // Perform LB pick.
96   auto& client_initial_metadata =
97       unstarted_handler.UnprocessedClientInitialMetadata();
98   LoadBalancingPolicy::PickArgs pick_args;
99   Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata());
100   CHECK(path != nullptr);
101   pick_args.path = path->as_string_view();
102   LbCallState lb_call_state;
103   pick_args.call_state = &lb_call_state;
104   LbMetadata initial_metadata(&client_initial_metadata);
105   pick_args.initial_metadata = &initial_metadata;
106   auto result = picker.Pick(pick_args);
107   // Handle result.
108   return HandlePickResult<
109       LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>>>(
110       &result,
111       // CompletePick
112       [&](LoadBalancingPolicy::PickResult::Complete* complete_pick)
113           -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
114         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
115             << "client_channel: " << GetContext<Activity>()->DebugTag()
116             << " pick succeeded: subchannel="
117             << complete_pick->subchannel.get();
118         CHECK(complete_pick->subchannel != nullptr);
119         // Grab a ref to the call destination while we're still
120         // holding the data plane mutex.
121         auto call_destination =
122             DownCast<SubchannelInterfaceWithCallDestination*>(
123                 complete_pick->subchannel.get())
124                 ->call_destination();
125         // If the subchannel has no call destination (e.g., if the
126         // subchannel has moved out of state READY but the LB policy hasn't
127         // yet seen that change and given us a new picker), then just
128         // queue the pick.  We'll try again as soon as we get a new picker.
129         if (call_destination == nullptr) {
130           GRPC_TRACE_LOG(client_channel_lb_call, INFO)
131               << "client_channel: " << GetContext<Activity>()->DebugTag()
132               << " returned by LB picker has no connected subchannel; queueing "
133                  "pick";
134           return Continue{};
135         }
136         // If the LB policy returned a call tracker, inform it that the
137         // call is starting and add it to context, so that we can notify
138         // it when the call finishes.
139         if (complete_pick->subchannel_call_tracker != nullptr) {
140           complete_pick->subchannel_call_tracker->Start();
141           SetContext(complete_pick->subchannel_call_tracker.release());
142         }
143         // Apply metadata mutations, if any.
144         MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
145                                        &client_initial_metadata);
146         MaybeOverrideAuthority(std::move(complete_pick->authority_override),
147                                &client_initial_metadata);
148         // Return the connected subchannel.
149         return call_destination;
150       },
151       // QueuePick
152       [&](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
153         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
154             << "client_channel: " << GetContext<Activity>()->DebugTag()
155             << " pick queued";
156         return Continue{};
157       },
158       // FailPick
159       [&](LoadBalancingPolicy::PickResult::Fail* fail_pick)
160           -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
161         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
162             << "client_channel: " << GetContext<Activity>()->DebugTag()
163             << " pick failed: " << fail_pick->status;
164         // If wait_for_ready is false, then the error indicates the RPC
165         // attempt's final status.
166         if (!unstarted_handler.UnprocessedClientInitialMetadata()
167                  .GetOrCreatePointer(WaitForReady())
168                  ->value) {
169           return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status),
170                                                "LB pick");
171         }
172         // If wait_for_ready is true, then queue to retry when we get a new
173         // picker.
174         return Continue{};
175       },
176       // DropPick
177       [&](LoadBalancingPolicy::PickResult::Drop* drop_pick)
178           -> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> {
179         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
180             << "client_channel: " << GetContext<Activity>()->DebugTag()
181             << " pick dropped: " << drop_pick->status;
182         return grpc_error_set_int(MaybeRewriteIllegalStatusCode(
183                                       std::move(drop_pick->status), "LB drop"),
184                                   StatusIntProperty::kLbPolicyDrop, 1);
185       });
186 }
187 
188 }  // namespace
189 
StartCall(UnstartedCallHandler unstarted_handler)190 void LoadBalancedCallDestination::StartCall(
191     UnstartedCallHandler unstarted_handler) {
192   // If there is a call tracer, create a call attempt tracer.
193   bool* is_transparent_retry_metadata =
194       unstarted_handler.UnprocessedClientInitialMetadata().get_pointer(
195           IsTransparentRetry());
196   bool is_transparent_retry = is_transparent_retry_metadata != nullptr
197                                   ? *is_transparent_retry_metadata
198                                   : false;
199   MaybeCreateCallAttemptTracer(is_transparent_retry);
200   // Spawn a promise to do the LB pick.
201   // This will eventually start the call.
202   unstarted_handler.SpawnGuardedUntilCallCompletes(
203       "lb_pick", [unstarted_handler, picker = picker_]() mutable {
204         return Map(
205             // Wait for the LB picker.
206             CheckDelayed(Loop(
207                 [last_picker =
208                      RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(),
209                  unstarted_handler, picker]() mutable {
210                   return Map(
211                       picker.Next(last_picker),
212                       [unstarted_handler, &last_picker](
213                           RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
214                               picker) mutable {
215                         CHECK_NE(picker.get(), nullptr);
216                         last_picker = std::move(picker);
217                         // Returns 3 possible things:
218                         // - Continue to queue the pick
219                         // - non-OK status to fail the pick
220                         // - a connected subchannel to complete the pick
221                         return PickSubchannel(*last_picker, unstarted_handler);
222                       });
223                 })),
224             // Create call stack on the connected subchannel.
225             [unstarted_handler](
226                 std::tuple<
227                     absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>,
228                     bool>
229                     pick_result) {
230               auto& call_destination = std::get<0>(pick_result);
231               const bool was_queued = std::get<1>(pick_result);
232               if (!call_destination.ok()) {
233                 return call_destination.status();
234               }
235               // LB pick is done, so indicate that we've committed.
236               auto* on_commit = MaybeGetContext<LbOnCommit>();
237               if (on_commit != nullptr && *on_commit != nullptr) {
238                 (*on_commit)();
239               }
240               // If it was queued, add a trace annotation.
241               if (was_queued) {
242                 auto* tracer =
243                     MaybeGetContext<ClientCallTracer::CallAttemptTracer>();
244                 if (tracer != nullptr) {
245                   tracer->RecordAnnotation("Delayed LB pick complete.");
246                 }
247               }
248               // Delegate to connected subchannel.
249               // TODO(ctiller): need to insert LbCallTracingFilter at the top of
250               // the stack
251               (*call_destination)->StartCall(unstarted_handler);
252               return absl::OkStatus();
253             });
254       });
255 }
256 
RegisterLoadBalancedCallDestination(CoreConfiguration::Builder * builder)257 void RegisterLoadBalancedCallDestination(CoreConfiguration::Builder* builder) {
258   class LoadBalancedCallDestinationFactory final
259       : public ClientChannel::CallDestinationFactory {
260    public:
261     RefCountedPtr<UnstartedCallDestination> CreateCallDestination(
262         ClientChannel::PickerObservable picker) override {
263       return MakeRefCounted<LoadBalancedCallDestination>(std::move(picker));
264     }
265   };
266 
267   builder->channel_args_preconditioning()->RegisterStage([](ChannelArgs args) {
268     return args.SetObject(
269         NoDestructSingleton<LoadBalancedCallDestinationFactory>::Get());
270   });
271 }
272 
273 }  // namespace grpc_core
274