• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/child_policy_handler.h"
18 
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include <memory>
23 #include <string>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/status/status.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/string_view.h"
30 #include "src/core/config/core_configuration.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/iomgr/pollset_set.h"
33 #include "src/core/lib/iomgr/resolved_address.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/load_balancing/delegating_helper.h"
36 #include "src/core/load_balancing/lb_policy_registry.h"
37 #include "src/core/load_balancing/subchannel_interface.h"
38 #include "src/core/util/debug_location.h"
39 
40 namespace grpc_core {
41 
42 //
43 // ChildPolicyHandler::Helper
44 //
45 
46 class ChildPolicyHandler::Helper final
47     : public LoadBalancingPolicy::ParentOwningDelegatingChannelControlHelper<
48           ChildPolicyHandler> {
49  public:
Helper(RefCountedPtr<ChildPolicyHandler> parent)50   explicit Helper(RefCountedPtr<ChildPolicyHandler> parent)
51       : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
52 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)53   RefCountedPtr<SubchannelInterface> CreateSubchannel(
54       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
55       const ChannelArgs& args) override {
56     if (parent()->shutting_down_) return nullptr;
57     if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
58     return parent()->channel_control_helper()->CreateSubchannel(
59         address, per_address_args, args);
60   }
61 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)62   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
63                    RefCountedPtr<SubchannelPicker> picker) override {
64     if (parent()->shutting_down_) return;
65     // If this request is from the pending child policy, ignore it until
66     // it reports something other than CONNECTING, at which point we swap it
67     // into place.
68     if (CalledByPendingChild()) {
69       if (GRPC_TRACE_FLAG_ENABLED_OBJ(*(parent()->tracer_))) {
70         LOG(INFO) << "[child_policy_handler " << parent() << "] helper " << this
71                   << ": pending child policy " << child_
72                   << " reports state=" << ConnectivityStateName(state) << " ("
73                   << status << ")";
74       }
75       if (state == GRPC_CHANNEL_CONNECTING) return;
76       grpc_pollset_set_del_pollset_set(
77           parent()->child_policy_->interested_parties(),
78           parent()->interested_parties());
79       parent()->child_policy_ = std::move(parent()->pending_child_policy_);
80     } else if (!CalledByCurrentChild()) {
81       // This request is from an outdated child, so ignore it.
82       return;
83     }
84     parent()->channel_control_helper()->UpdateState(state, status,
85                                                     std::move(picker));
86   }
87 
RequestReresolution()88   void RequestReresolution() override {
89     if (parent()->shutting_down_) return;
90     // Only forward re-resolution requests from the most recent child,
91     // since that's the one that will be receiving any update we receive
92     // from the resolver.
93     const LoadBalancingPolicy* latest_child_policy =
94         parent()->pending_child_policy_ != nullptr
95             ? parent()->pending_child_policy_.get()
96             : parent()->child_policy_.get();
97     if (child_ != latest_child_policy) return;
98     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*(parent()->tracer_))) {
99       LOG(INFO) << "[child_policy_handler " << parent()
100                 << "] requesting re-resolution";
101     }
102     parent()->channel_control_helper()->RequestReresolution();
103   }
104 
AddTraceEvent(TraceSeverity severity,absl::string_view message)105   void AddTraceEvent(TraceSeverity severity,
106                      absl::string_view message) override {
107     if (parent()->shutting_down_) return;
108     if (!CalledByPendingChild() && !CalledByCurrentChild()) return;
109     parent()->channel_control_helper()->AddTraceEvent(severity, message);
110   }
111 
set_child(LoadBalancingPolicy * child)112   void set_child(LoadBalancingPolicy* child) { child_ = child; }
113 
114  private:
CalledByPendingChild() const115   bool CalledByPendingChild() const {
116     CHECK_NE(child_, nullptr);
117     return child_ == parent()->pending_child_policy_.get();
118   }
119 
CalledByCurrentChild() const120   bool CalledByCurrentChild() const {
121     CHECK_NE(child_, nullptr);
122     return child_ == parent()->child_policy_.get();
123   };
124 
125   LoadBalancingPolicy* child_ = nullptr;
126 };
127 
128 //
129 // ChildPolicyHandler
130 //
131 
ShutdownLocked()132 void ChildPolicyHandler::ShutdownLocked() {
133   if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
134     LOG(INFO) << "[child_policy_handler " << this << "] shutting down";
135   }
136   shutting_down_ = true;
137   if (child_policy_ != nullptr) {
138     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
139       LOG(INFO) << "[child_policy_handler " << this
140                 << "] shutting down lb_policy " << child_policy_.get();
141     }
142     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
143                                      interested_parties());
144     child_policy_.reset();
145   }
146   if (pending_child_policy_ != nullptr) {
147     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
148       LOG(INFO) << "[child_policy_handler " << this
149                 << "] shutting down pending lb_policy "
150                 << pending_child_policy_.get();
151     }
152     grpc_pollset_set_del_pollset_set(
153         pending_child_policy_->interested_parties(), interested_parties());
154     pending_child_policy_.reset();
155   }
156 }
157 
UpdateLocked(UpdateArgs args)158 absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
159   // If the child policy name changes, we need to create a new child
160   // policy.  When this happens, we leave child_policy_ as-is and store
161   // the new child policy in pending_child_policy_.  Once the new child
162   // policy transitions into state READY, we swap it into child_policy_,
163   // replacing the original child policy.  So pending_child_policy_ is
164   // non-null only between when we apply an update that changes the child
165   // policy name and when the new child reports state READY.
166   //
167   // Updates can arrive at any point during this transition.  We always
168   // apply updates relative to the most recently created child policy,
169   // even if the most recent one is still in pending_child_policy_.  This
170   // is true both when applying the updates to an existing child policy
171   // and when determining whether we need to create a new policy.
172   //
173   // As a result of this, there are several cases to consider here:
174   //
175   // 1. We have no existing child policy (i.e., this is the first update
176   //    we receive after being created; in this case, both child_policy_
177   //    and pending_child_policy_ are null).  In this case, we create a
178   //    new child policy and store it in child_policy_.
179   //
180   // 2. We have an existing child policy and have no pending child policy
181   //    from a previous update (i.e., either there has not been a
182   //    previous update that changed the policy name, or we have already
183   //    finished swapping in the new policy; in this case, child_policy_
184   //    is non-null but pending_child_policy_ is null).  In this case:
185   //    a. If going from the current config to the new config does not
186   //       require a new policy, then we update the existing child policy.
187   //    b. If going from the current config to the new config does require a
188   //       new policy, we create a new policy.  The policy will be stored in
189   //       pending_child_policy_ and will later be swapped into
190   //       child_policy_ by the helper when the new child transitions
191   //       into state READY.
192   //
193   // 3. We have an existing child policy and have a pending child policy
194   //    from a previous update (i.e., a previous update set
195   //    pending_child_policy_ as per case 2b above and that policy has
196   //    not yet transitioned into state READY and been swapped into
197   //    child_policy_; in this case, both child_policy_ and
198   //    pending_child_policy_ are non-null).  In this case:
199   //    a. If going from the current config to the new config does not
200   //       require a new policy, then we update the existing pending
201   //       child policy.
202   //    b. If going from the current config to the new config does require a
203   //       new child policy, then we create a new policy.  The new
204   //       policy is stored in pending_child_policy_ (replacing the one
205   //       that was there before, which will be immediately shut down)
206   //       and will later be swapped into child_policy_ by the helper
207   //       when the new child transitions into state READY.
208   const bool create_policy =
209       // case 1
210       child_policy_ == nullptr ||
211       // cases 2b and 3b
212       ConfigChangeRequiresNewPolicyInstance(current_config_.get(),
213                                             args.config.get());
214   current_config_ = args.config;
215   LoadBalancingPolicy* policy_to_update = nullptr;
216   if (create_policy) {
217     // Cases 1, 2b, and 3b: create a new child policy.
218     // If child_policy_ is null, we set it (case 1), else we set
219     // pending_child_policy_ (cases 2b and 3b).
220     // TODO(roth): In cases 2b and 3b, we should start a timer here, so
221     // that there's an upper bound on the amount of time it takes us to
222     // switch to the new policy, even if the new policy stays in
223     // CONNECTING for a very long period of time.
224     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
225       LOG(INFO) << "[child_policy_handler " << this << "] creating new "
226                 << (child_policy_ == nullptr ? "" : "pending ")
227                 << "child policy " << args.config->name();
228     }
229     auto& lb_policy =
230         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
231     lb_policy = CreateChildPolicy(args.config->name(), args.args);
232     policy_to_update = lb_policy.get();
233   } else {
234     // Cases 2a and 3a: update an existing policy.
235     // If we have a pending child policy, send the update to the pending
236     // policy (case 3a), else send it to the current policy (case 2a).
237     policy_to_update = pending_child_policy_ != nullptr
238                            ? pending_child_policy_.get()
239                            : child_policy_.get();
240   }
241   CHECK_NE(policy_to_update, nullptr);
242   // Update the policy.
243   if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
244     LOG(INFO) << "[child_policy_handler " << this << "] updating "
245               << (policy_to_update == pending_child_policy_.get() ? "pending "
246                                                                   : "")
247               << "child policy " << policy_to_update;
248   }
249   return policy_to_update->UpdateLocked(std::move(args));
250 }
251 
ExitIdleLocked()252 void ChildPolicyHandler::ExitIdleLocked() {
253   if (child_policy_ != nullptr) {
254     child_policy_->ExitIdleLocked();
255     if (pending_child_policy_ != nullptr) {
256       pending_child_policy_->ExitIdleLocked();
257     }
258   }
259 }
260 
ResetBackoffLocked()261 void ChildPolicyHandler::ResetBackoffLocked() {
262   if (child_policy_ != nullptr) {
263     child_policy_->ResetBackoffLocked();
264     if (pending_child_policy_ != nullptr) {
265       pending_child_policy_->ResetBackoffLocked();
266     }
267   }
268 }
269 
CreateChildPolicy(absl::string_view child_policy_name,const ChannelArgs & args)270 OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
271     absl::string_view child_policy_name, const ChannelArgs& args) {
272   Helper* helper =
273       new Helper(RefAsSubclass<ChildPolicyHandler>(DEBUG_LOCATION, "Helper"));
274   LoadBalancingPolicy::Args lb_policy_args;
275   lb_policy_args.work_serializer = work_serializer();
276   lb_policy_args.channel_control_helper =
277       std::unique_ptr<ChannelControlHelper>(helper);
278   lb_policy_args.args = args;
279   OrphanablePtr<LoadBalancingPolicy> lb_policy =
280       CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args));
281   if (GPR_UNLIKELY(lb_policy == nullptr)) {
282     LOG(ERROR) << "could not create LB policy \"" << child_policy_name << "\"";
283     return nullptr;
284   }
285   helper->set_child(lb_policy.get());
286   if (GRPC_TRACE_FLAG_ENABLED_OBJ(*tracer_)) {
287     LOG(INFO) << "[child_policy_handler " << this
288               << "] created new LB policy \"" << child_policy_name << "\" ("
289               << lb_policy.get() << ")";
290   }
291   channel_control_helper()->AddTraceEvent(
292       ChannelControlHelper::TRACE_INFO,
293       absl::StrCat("Created new LB policy \"", child_policy_name, "\""));
294   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
295                                    interested_parties());
296   return lb_policy;
297 }
298 
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const299 bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance(
300     LoadBalancingPolicy::Config* old_config,
301     LoadBalancingPolicy::Config* new_config) const {
302   return old_config->name() != new_config->name();
303 }
304 
305 OrphanablePtr<LoadBalancingPolicy>
CreateLoadBalancingPolicy(absl::string_view name,LoadBalancingPolicy::Args args) const306 ChildPolicyHandler::CreateLoadBalancingPolicy(
307     absl::string_view name, LoadBalancingPolicy::Args args) const {
308   return CoreConfiguration::Get()
309       .lb_policy_registry()
310       .CreateLoadBalancingPolicy(name, std::move(args));
311 }
312 
313 }  // namespace grpc_core
314