• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/endpoint_list.h"
18 
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/support/json.h>
21 #include <grpc/support/port_platform.h>
22 #include <stdlib.h>
23 
24 #include <memory>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/types/optional.h"
33 #include "src/core/config/core_configuration.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/iomgr/pollset_set.h"
36 #include "src/core/load_balancing/delegating_helper.h"
37 #include "src/core/load_balancing/lb_policy.h"
38 #include "src/core/load_balancing/lb_policy_registry.h"
39 #include "src/core/load_balancing/pick_first/pick_first.h"
40 #include "src/core/resolver/endpoint_addresses.h"
41 #include "src/core/util/debug_location.h"
42 #include "src/core/util/json/json.h"
43 #include "src/core/util/orphanable.h"
44 #include "src/core/util/ref_counted_ptr.h"
45 
46 namespace grpc_core {
47 
48 //
49 // EndpointList::Endpoint::Helper
50 //
51 
52 class EndpointList::Endpoint::Helper final
53     : public LoadBalancingPolicy::DelegatingChannelControlHelper {
54  public:
Helper(RefCountedPtr<Endpoint> endpoint)55   explicit Helper(RefCountedPtr<Endpoint> endpoint)
56       : endpoint_(std::move(endpoint)) {}
57 
~Helper()58   ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
59 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)60   RefCountedPtr<SubchannelInterface> CreateSubchannel(
61       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
62       const ChannelArgs& args) override {
63     return endpoint_->CreateSubchannel(address, per_address_args, args);
64   }
65 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)66   void UpdateState(
67       grpc_connectivity_state state, const absl::Status& status,
68       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
69     auto old_state = std::exchange(endpoint_->connectivity_state_, state);
70     if (!old_state.has_value()) {
71       ++endpoint_->endpoint_list_->num_endpoints_seen_initial_state_;
72     }
73     endpoint_->picker_ = std::move(picker);
74     endpoint_->OnStateUpdate(old_state, state, status);
75   }
76 
77  private:
parent_helper() const78   LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override {
79     return endpoint_->endpoint_list_->channel_control_helper();
80   }
81 
82   RefCountedPtr<Endpoint> endpoint_;
83 };
84 
85 //
86 // EndpointList::Endpoint
87 //
88 
Init(const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)89 absl::Status EndpointList::Endpoint::Init(
90     const EndpointAddresses& addresses, const ChannelArgs& args,
91     std::shared_ptr<WorkSerializer> work_serializer) {
92   ChannelArgs child_args =
93       args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
94           .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
95   LoadBalancingPolicy::Args lb_policy_args;
96   lb_policy_args.work_serializer = std::move(work_serializer);
97   lb_policy_args.args = child_args;
98   lb_policy_args.channel_control_helper =
99       std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
100   child_policy_ =
101       CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
102           "pick_first", std::move(lb_policy_args));
103   if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) {
104     LOG(INFO) << "[" << endpoint_list_->tracer_ << " "
105               << endpoint_list_->policy_.get() << "] endpoint " << this
106               << ": created child policy " << child_policy_.get();
107   }
108   // Add our interested_parties pollset_set to that of the newly created
109   // child policy. This will make the child policy progress upon activity on
110   // this policy, which in turn is tied to the application's call.
111   grpc_pollset_set_add_pollset_set(
112       child_policy_->interested_parties(),
113       endpoint_list_->policy_->interested_parties());
114   // Construct pick_first config.
115   auto config =
116       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
117           Json::FromArray(
118               {Json::FromObject({{"pick_first", Json::FromObject({})}})}));
119   CHECK(config.ok());
120   // Update child policy.
121   LoadBalancingPolicy::UpdateArgs update_args;
122   update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
123   update_args.args = child_args;
124   update_args.config = std::move(*config);
125   return child_policy_->UpdateLocked(std::move(update_args));
126 }
127 
Orphan()128 void EndpointList::Endpoint::Orphan() {
129   // Remove pollset_set linkage.
130   grpc_pollset_set_del_pollset_set(
131       child_policy_->interested_parties(),
132       endpoint_list_->policy_->interested_parties());
133   child_policy_.reset();
134   picker_.reset();
135   Unref();
136 }
137 
ResetBackoffLocked()138 void EndpointList::Endpoint::ResetBackoffLocked() {
139   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
140 }
141 
ExitIdleLocked()142 void EndpointList::Endpoint::ExitIdleLocked() {
143   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
144 }
145 
Index() const146 size_t EndpointList::Endpoint::Index() const {
147   for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) {
148     if (endpoint_list_->endpoints_[i].get() == this) return i;
149   }
150   return -1;
151 }
152 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)153 RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
154     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
155     const ChannelArgs& args) {
156   return endpoint_list_->channel_control_helper()->CreateSubchannel(
157       address, per_address_args, args);
158 }
159 
160 //
161 // EndpointList
162 //
163 
Init(EndpointAddressesIterator * endpoints,const ChannelArgs & args,absl::FunctionRef<OrphanablePtr<Endpoint> (RefCountedPtr<EndpointList>,const EndpointAddresses &,const ChannelArgs &)> create_endpoint)164 void EndpointList::Init(
165     EndpointAddressesIterator* endpoints, const ChannelArgs& args,
166     absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
167                                               const EndpointAddresses&,
168                                               const ChannelArgs&)>
169         create_endpoint) {
170   if (endpoints == nullptr) return;
171   endpoints->ForEach([&](const EndpointAddresses& endpoint) {
172     endpoints_.push_back(
173         create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
174   });
175 }
176 
ResetBackoffLocked()177 void EndpointList::ResetBackoffLocked() {
178   for (const auto& endpoint : endpoints_) {
179     endpoint->ResetBackoffLocked();
180   }
181 }
182 
ReportTransientFailure(absl::Status status)183 void EndpointList::ReportTransientFailure(absl::Status status) {
184   if (!resolution_note_.empty()) {
185     status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
186                                                       resolution_note_, ")"));
187   }
188   channel_control_helper()->UpdateState(
189       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
190       MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
191 }
192 
193 }  // namespace grpc_core
194