1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H 18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <set> 23 24 #include "absl/strings/string_view.h" 25 #include "absl/types/optional.h" 26 27 #include "src/core/ext/filters/client_channel/service_config.h" 28 #include "src/core/ext/filters/client_channel/xds/xds_api.h" 29 #include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h" 30 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" 31 #include "src/core/lib/gprpp/map.h" 32 #include "src/core/lib/gprpp/memory.h" 33 #include "src/core/lib/gprpp/orphanable.h" 34 #include "src/core/lib/gprpp/ref_counted.h" 35 #include "src/core/lib/gprpp/ref_counted_ptr.h" 36 #include "src/core/lib/iomgr/work_serializer.h" 37 38 namespace grpc_core { 39 40 extern TraceFlag xds_client_trace; 41 42 class XdsClient : public InternallyRefCounted<XdsClient> { 43 public: 44 // Service config watcher interface. Implemented by callers. 45 class ServiceConfigWatcherInterface { 46 public: 47 virtual ~ServiceConfigWatcherInterface() = default; 48 49 virtual void OnServiceConfigChanged( 50 RefCountedPtr<ServiceConfig> service_config) = 0; 51 52 virtual void OnError(grpc_error* error) = 0; 53 54 virtual void OnResourceDoesNotExist() = 0; 55 }; 56 57 // Cluster data watcher interface. Implemented by callers. 58 class ClusterWatcherInterface { 59 public: 60 virtual ~ClusterWatcherInterface() = default; 61 62 virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; 63 64 virtual void OnError(grpc_error* error) = 0; 65 66 virtual void OnResourceDoesNotExist() = 0; 67 }; 68 69 // Endpoint data watcher interface. Implemented by callers. 70 class EndpointWatcherInterface { 71 public: 72 virtual ~EndpointWatcherInterface() = default; 73 74 virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; 75 76 virtual void OnError(grpc_error* error) = 0; 77 78 virtual void OnResourceDoesNotExist() = 0; 79 }; 80 81 // If *error is not GRPC_ERROR_NONE after construction, then there was 82 // an error initializing the client. 83 XdsClient(std::shared_ptr<WorkSerializer> work_serializer, 84 grpc_pollset_set* interested_parties, absl::string_view server_name, 85 std::unique_ptr<ServiceConfigWatcherInterface> watcher, 86 const grpc_channel_args& channel_args, grpc_error** error); 87 ~XdsClient(); 88 89 void Orphan() override; 90 91 // Start and cancel cluster data watch for a cluster. 92 // The XdsClient takes ownership of the watcher, but the caller may 93 // keep a raw pointer to the watcher, which may be used only for 94 // cancellation. (Because the caller does not own the watcher, the 95 // pointer must not be used for any other purpose.) 96 // If the caller is going to start a new watch after cancelling the 97 // old one, it should set delay_unsubscription to true. 98 void WatchClusterData(absl::string_view cluster_name, 99 std::unique_ptr<ClusterWatcherInterface> watcher); 100 void CancelClusterDataWatch(absl::string_view cluster_name, 101 ClusterWatcherInterface* watcher, 102 bool delay_unsubscription = false); 103 104 // Start and cancel endpoint data watch for a cluster. 105 // The XdsClient takes ownership of the watcher, but the caller may 106 // keep a raw pointer to the watcher, which may be used only for 107 // cancellation. (Because the caller does not own the watcher, the 108 // pointer must not be used for any other purpose.) 109 // If the caller is going to start a new watch after cancelling the 110 // old one, it should set delay_unsubscription to true. 111 void WatchEndpointData(absl::string_view eds_service_name, 112 std::unique_ptr<EndpointWatcherInterface> watcher); 113 void CancelEndpointDataWatch(absl::string_view eds_service_name, 114 EndpointWatcherInterface* watcher, 115 bool delay_unsubscription = false); 116 117 // Adds and removes drop stats for cluster_name and eds_service_name. 118 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( 119 absl::string_view lrs_server, absl::string_view cluster_name, 120 absl::string_view eds_service_name); 121 void RemoveClusterDropStats(absl::string_view /*lrs_server*/, 122 absl::string_view cluster_name, 123 absl::string_view eds_service_name, 124 XdsClusterDropStats* cluster_drop_stats); 125 126 // Adds and removes locality stats for cluster_name and eds_service_name 127 // for the specified locality. 128 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( 129 absl::string_view lrs_server, absl::string_view cluster_name, 130 absl::string_view eds_service_name, 131 RefCountedPtr<XdsLocalityName> locality); 132 void RemoveClusterLocalityStats( 133 absl::string_view /*lrs_server*/, absl::string_view cluster_name, 134 absl::string_view eds_service_name, 135 const RefCountedPtr<XdsLocalityName>& locality, 136 XdsClusterLocalityStats* cluster_locality_stats); 137 138 // Resets connection backoff state. 139 void ResetBackoff(); 140 141 // Helpers for encoding the XdsClient object in channel args. 142 grpc_arg MakeChannelArg() const; 143 static RefCountedPtr<XdsClient> GetFromChannelArgs( 144 const grpc_channel_args& args); 145 static grpc_channel_args* RemoveFromChannelArgs( 146 const grpc_channel_args& args); 147 148 private: 149 // Contains a channel to the xds server and all the data related to the 150 // channel. Holds a ref to the xds client object. 151 // TODO(roth): This is separate from the XdsClient object because it was 152 // originally designed to be able to swap itself out in case the 153 // balancer name changed. Now that the balancer name is going to be 154 // coming from the bootstrap file, we don't really need this level of 155 // indirection unless we decide to support watching the bootstrap file 156 // for changes. At some point, if we decide that we're never going to 157 // need to do that, then we can eliminate this class and move its 158 // contents directly into the XdsClient class. 159 class ChannelState : public InternallyRefCounted<ChannelState> { 160 public: 161 template <typename T> 162 class RetryableCall; 163 164 class AdsCallState; 165 class LrsCallState; 166 167 ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel); 168 ~ChannelState(); 169 170 void Orphan() override; 171 channel()172 grpc_channel* channel() const { return channel_; } xds_client()173 XdsClient* xds_client() const { return xds_client_.get(); } 174 AdsCallState* ads_calld() const; 175 LrsCallState* lrs_calld() const; 176 177 void MaybeStartLrsCall(); 178 void StopLrsCall(); 179 180 bool HasActiveAdsCall() const; 181 182 void StartConnectivityWatchLocked(); 183 void CancelConnectivityWatchLocked(); 184 185 void Subscribe(const std::string& type_url, const std::string& name); 186 void Unsubscribe(const std::string& type_url, const std::string& name, 187 bool delay_unsubscription); 188 189 private: 190 class StateWatcher; 191 192 // The owning xds client. 193 RefCountedPtr<XdsClient> xds_client_; 194 195 // The channel and its status. 196 grpc_channel* channel_; 197 bool shutting_down_ = false; 198 StateWatcher* watcher_ = nullptr; 199 200 // The retryable XDS calls. 201 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; 202 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; 203 }; 204 205 struct ClusterState { 206 std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>> 207 watchers; 208 // The latest data seen from CDS. 209 absl::optional<XdsApi::CdsUpdate> update; 210 }; 211 212 struct EndpointState { 213 std::map<EndpointWatcherInterface*, 214 std::unique_ptr<EndpointWatcherInterface>> 215 watchers; 216 // The latest data seen from EDS. 217 absl::optional<XdsApi::EdsUpdate> update; 218 }; 219 220 struct LoadReportState { 221 struct LocalityState { 222 std::set<XdsClusterLocalityStats*> locality_stats; 223 std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats; 224 }; 225 226 std::set<XdsClusterDropStats*> drop_stats; 227 XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats; 228 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 229 XdsLocalityName::Less> 230 locality_stats; 231 grpc_millis last_report_time = ExecCtx::Get()->Now(); 232 }; 233 234 // Sends an error notification to all watchers. 235 void NotifyOnError(grpc_error* error); 236 237 // Returns the weighted_clusters action name to use from 238 // weighted_cluster_index_map_ for a WeightedClusters route action. 239 std::string WeightedClustersActionName( 240 const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& 241 weighted_clusters); 242 243 // Updates weighted_cluster_index_map_ that will 244 // determine the names of the WeightedCluster actions for the current update. 245 void UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate& rds_update); 246 247 // Create the service config generated by the RdsUpdate. 248 grpc_error* CreateServiceConfig(const XdsApi::RdsUpdate& rds_update, 249 RefCountedPtr<ServiceConfig>* service_config); 250 251 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot( 252 bool send_all_clusters, const std::set<std::string>& clusters); 253 254 // Channel arg vtable functions. 255 static void* ChannelArgCopy(void* p); 256 static void ChannelArgDestroy(void* p); 257 static int ChannelArgCmp(void* p, void* q); 258 259 static const grpc_arg_pointer_vtable kXdsClientVtable; 260 261 const grpc_millis request_timeout_; 262 263 std::shared_ptr<WorkSerializer> work_serializer_; 264 grpc_pollset_set* interested_parties_; 265 266 std::unique_ptr<XdsBootstrap> bootstrap_; 267 XdsApi api_; 268 269 const std::string server_name_; 270 271 std::unique_ptr<ServiceConfigWatcherInterface> service_config_watcher_; 272 273 // The channel for communicating with the xds server. 274 OrphanablePtr<ChannelState> chand_; 275 276 absl::optional<XdsApi::LdsUpdate> lds_result_; 277 absl::optional<XdsApi::RdsUpdate> rds_result_; 278 279 // One entry for each watched CDS resource. 280 std::map<std::string /*cluster_name*/, ClusterState> cluster_map_; 281 // One entry for each watched EDS resource. 282 std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_; 283 std::map< 284 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 285 LoadReportState> 286 load_report_map_; 287 288 // 2-level map to store WeightedCluster action names. 289 // Top level map is keyed by cluster names without weight like a_b_c; bottom 290 // level map is keyed by cluster names + weights like a10_b50_c40. 291 struct ClusterNamesInfo { 292 uint64_t next_index = 0; 293 std::map<std::string /*cluster names + weights*/, 294 uint64_t /*policy index number*/> 295 cluster_weights_map; 296 }; 297 using WeightedClusterIndexMap = 298 std::map<std::string /*cluster names*/, ClusterNamesInfo>; 299 300 // Cache of action names for WeightedCluster targets in the current 301 // service config. 302 WeightedClusterIndexMap weighted_cluster_index_map_; 303 304 bool shutting_down_ = false; 305 }; 306 307 } // namespace grpc_core 308 309 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H */ 310