1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/endpoint_list.h"
18
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/support/json.h>
21 #include <grpc/support/port_platform.h>
22 #include <stdlib.h>
23
24 #include <memory>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/types/optional.h"
33 #include "src/core/config/core_configuration.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/iomgr/pollset_set.h"
36 #include "src/core/load_balancing/delegating_helper.h"
37 #include "src/core/load_balancing/lb_policy.h"
38 #include "src/core/load_balancing/lb_policy_registry.h"
39 #include "src/core/load_balancing/pick_first/pick_first.h"
40 #include "src/core/resolver/endpoint_addresses.h"
41 #include "src/core/util/debug_location.h"
42 #include "src/core/util/json/json.h"
43 #include "src/core/util/orphanable.h"
44 #include "src/core/util/ref_counted_ptr.h"
45
46 namespace grpc_core {
47
48 //
49 // EndpointList::Endpoint::Helper
50 //
51
52 class EndpointList::Endpoint::Helper final
53 : public LoadBalancingPolicy::DelegatingChannelControlHelper {
54 public:
Helper(RefCountedPtr<Endpoint> endpoint)55 explicit Helper(RefCountedPtr<Endpoint> endpoint)
56 : endpoint_(std::move(endpoint)) {}
57
~Helper()58 ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
59
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)60 RefCountedPtr<SubchannelInterface> CreateSubchannel(
61 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
62 const ChannelArgs& args) override {
63 return endpoint_->CreateSubchannel(address, per_address_args, args);
64 }
65
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)66 void UpdateState(
67 grpc_connectivity_state state, const absl::Status& status,
68 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
69 auto old_state = std::exchange(endpoint_->connectivity_state_, state);
70 if (!old_state.has_value()) {
71 ++endpoint_->endpoint_list_->num_endpoints_seen_initial_state_;
72 }
73 endpoint_->picker_ = std::move(picker);
74 endpoint_->OnStateUpdate(old_state, state, status);
75 }
76
77 private:
parent_helper() const78 LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override {
79 return endpoint_->endpoint_list_->channel_control_helper();
80 }
81
82 RefCountedPtr<Endpoint> endpoint_;
83 };
84
85 //
86 // EndpointList::Endpoint
87 //
88
Init(const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)89 absl::Status EndpointList::Endpoint::Init(
90 const EndpointAddresses& addresses, const ChannelArgs& args,
91 std::shared_ptr<WorkSerializer> work_serializer) {
92 ChannelArgs child_args =
93 args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
94 .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
95 LoadBalancingPolicy::Args lb_policy_args;
96 lb_policy_args.work_serializer = std::move(work_serializer);
97 lb_policy_args.args = child_args;
98 lb_policy_args.channel_control_helper =
99 std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
100 child_policy_ =
101 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
102 "pick_first", std::move(lb_policy_args));
103 if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) {
104 LOG(INFO) << "[" << endpoint_list_->tracer_ << " "
105 << endpoint_list_->policy_.get() << "] endpoint " << this
106 << ": created child policy " << child_policy_.get();
107 }
108 // Add our interested_parties pollset_set to that of the newly created
109 // child policy. This will make the child policy progress upon activity on
110 // this policy, which in turn is tied to the application's call.
111 grpc_pollset_set_add_pollset_set(
112 child_policy_->interested_parties(),
113 endpoint_list_->policy_->interested_parties());
114 // Construct pick_first config.
115 auto config =
116 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
117 Json::FromArray(
118 {Json::FromObject({{"pick_first", Json::FromObject({})}})}));
119 CHECK(config.ok());
120 // Update child policy.
121 LoadBalancingPolicy::UpdateArgs update_args;
122 update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
123 update_args.args = child_args;
124 update_args.config = std::move(*config);
125 return child_policy_->UpdateLocked(std::move(update_args));
126 }
127
Orphan()128 void EndpointList::Endpoint::Orphan() {
129 // Remove pollset_set linkage.
130 grpc_pollset_set_del_pollset_set(
131 child_policy_->interested_parties(),
132 endpoint_list_->policy_->interested_parties());
133 child_policy_.reset();
134 picker_.reset();
135 Unref();
136 }
137
ResetBackoffLocked()138 void EndpointList::Endpoint::ResetBackoffLocked() {
139 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
140 }
141
ExitIdleLocked()142 void EndpointList::Endpoint::ExitIdleLocked() {
143 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
144 }
145
Index() const146 size_t EndpointList::Endpoint::Index() const {
147 for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) {
148 if (endpoint_list_->endpoints_[i].get() == this) return i;
149 }
150 return -1;
151 }
152
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)153 RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
154 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
155 const ChannelArgs& args) {
156 return endpoint_list_->channel_control_helper()->CreateSubchannel(
157 address, per_address_args, args);
158 }
159
160 //
161 // EndpointList
162 //
163
Init(EndpointAddressesIterator * endpoints,const ChannelArgs & args,absl::FunctionRef<OrphanablePtr<Endpoint> (RefCountedPtr<EndpointList>,const EndpointAddresses &,const ChannelArgs &)> create_endpoint)164 void EndpointList::Init(
165 EndpointAddressesIterator* endpoints, const ChannelArgs& args,
166 absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
167 const EndpointAddresses&,
168 const ChannelArgs&)>
169 create_endpoint) {
170 if (endpoints == nullptr) return;
171 endpoints->ForEach([&](const EndpointAddresses& endpoint) {
172 endpoints_.push_back(
173 create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
174 });
175 }
176
ResetBackoffLocked()177 void EndpointList::ResetBackoffLocked() {
178 for (const auto& endpoint : endpoints_) {
179 endpoint->ResetBackoffLocked();
180 }
181 }
182
ReportTransientFailure(absl::Status status)183 void EndpointList::ReportTransientFailure(absl::Status status) {
184 if (!resolution_note_.empty()) {
185 status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
186 resolution_note_, ")"));
187 }
188 channel_control_helper()->UpdateState(
189 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
190 MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
191 }
192
193 } // namespace grpc_core
194