• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/endpoint_list.h"
20 
21 #include <stdlib.h>
22 
23 #include <memory>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/types/optional.h"
30 
31 #include <grpc/impl/connectivity_state.h>
32 #include <grpc/support/json.h>
33 #include <grpc/support/log.h>
34 
35 #include "src/core/load_balancing/pick_first/pick_first.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/core_configuration.h"
38 #include "src/core/lib/gprpp/debug_location.h"
39 #include "src/core/lib/gprpp/orphanable.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/pollset_set.h"
42 #include "src/core/lib/json/json.h"
43 #include "src/core/load_balancing/delegating_helper.h"
44 #include "src/core/load_balancing/lb_policy.h"
45 #include "src/core/load_balancing/lb_policy_registry.h"
46 #include "src/core/resolver/endpoint_addresses.h"
47 
48 namespace grpc_core {
49 
50 //
51 // EndpointList::Endpoint::Helper
52 //
53 
54 class EndpointList::Endpoint::Helper final
55     : public LoadBalancingPolicy::DelegatingChannelControlHelper {
56  public:
Helper(RefCountedPtr<Endpoint> endpoint)57   explicit Helper(RefCountedPtr<Endpoint> endpoint)
58       : endpoint_(std::move(endpoint)) {}
59 
~Helper()60   ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
61 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)62   RefCountedPtr<SubchannelInterface> CreateSubchannel(
63       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
64       const ChannelArgs& args) override {
65     return endpoint_->CreateSubchannel(address, per_address_args, args);
66   }
67 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)68   void UpdateState(
69       grpc_connectivity_state state, const absl::Status& status,
70       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
71     auto old_state = std::exchange(endpoint_->connectivity_state_, state);
72     if (!old_state.has_value()) {
73       ++endpoint_->endpoint_list_->num_endpoints_seen_initial_state_;
74     }
75     endpoint_->picker_ = std::move(picker);
76     endpoint_->OnStateUpdate(old_state, state, status);
77   }
78 
79  private:
parent_helper() const80   LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override {
81     return endpoint_->endpoint_list_->channel_control_helper();
82   }
83 
84   RefCountedPtr<Endpoint> endpoint_;
85 };
86 
87 //
88 // EndpointList::Endpoint
89 //
90 
Init(const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)91 void EndpointList::Endpoint::Init(
92     const EndpointAddresses& addresses, const ChannelArgs& args,
93     std::shared_ptr<WorkSerializer> work_serializer) {
94   ChannelArgs child_args =
95       args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
96           .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
97   LoadBalancingPolicy::Args lb_policy_args;
98   lb_policy_args.work_serializer = std::move(work_serializer);
99   lb_policy_args.args = child_args;
100   lb_policy_args.channel_control_helper =
101       std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
102   child_policy_ =
103       CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
104           "pick_first", std::move(lb_policy_args));
105   if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) {
106     gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p",
107             endpoint_list_->tracer_, endpoint_list_->policy_.get(), this,
108             child_policy_.get());
109   }
110   // Add our interested_parties pollset_set to that of the newly created
111   // child policy. This will make the child policy progress upon activity on
112   // this policy, which in turn is tied to the application's call.
113   grpc_pollset_set_add_pollset_set(
114       child_policy_->interested_parties(),
115       endpoint_list_->policy_->interested_parties());
116   // Construct pick_first config.
117   auto config =
118       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
119           Json::FromArray(
120               {Json::FromObject({{"pick_first", Json::FromObject({})}})}));
121   GPR_ASSERT(config.ok());
122   // Update child policy.
123   LoadBalancingPolicy::UpdateArgs update_args;
124   update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
125   update_args.args = child_args;
126   update_args.config = std::move(*config);
127   // TODO(roth): If the child reports a non-OK status with the update,
128   // we need to propagate that back to the resolver somehow.
129   (void)child_policy_->UpdateLocked(std::move(update_args));
130 }
131 
Orphan()132 void EndpointList::Endpoint::Orphan() {
133   // Remove pollset_set linkage.
134   grpc_pollset_set_del_pollset_set(
135       child_policy_->interested_parties(),
136       endpoint_list_->policy_->interested_parties());
137   child_policy_.reset();
138   picker_.reset();
139   Unref();
140 }
141 
ResetBackoffLocked()142 void EndpointList::Endpoint::ResetBackoffLocked() {
143   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
144 }
145 
ExitIdleLocked()146 void EndpointList::Endpoint::ExitIdleLocked() {
147   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
148 }
149 
Index() const150 size_t EndpointList::Endpoint::Index() const {
151   for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) {
152     if (endpoint_list_->endpoints_[i].get() == this) return i;
153   }
154   return -1;
155 }
156 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)157 RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
158     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
159     const ChannelArgs& args) {
160   return endpoint_list_->channel_control_helper()->CreateSubchannel(
161       address, per_address_args, args);
162 }
163 
164 //
165 // EndpointList
166 //
167 
Init(EndpointAddressesIterator * endpoints,const ChannelArgs & args,absl::FunctionRef<OrphanablePtr<Endpoint> (RefCountedPtr<EndpointList>,const EndpointAddresses &,const ChannelArgs &)> create_endpoint)168 void EndpointList::Init(
169     EndpointAddressesIterator* endpoints, const ChannelArgs& args,
170     absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
171                                               const EndpointAddresses&,
172                                               const ChannelArgs&)>
173         create_endpoint) {
174   if (endpoints == nullptr) return;
175   endpoints->ForEach([&](const EndpointAddresses& endpoint) {
176     endpoints_.push_back(
177         create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
178   });
179 }
180 
ResetBackoffLocked()181 void EndpointList::ResetBackoffLocked() {
182   for (const auto& endpoint : endpoints_) {
183     endpoint->ResetBackoffLocked();
184   }
185 }
186 
187 }  // namespace grpc_core
188