• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/xds/xds_client.h"
22 #include "src/core/lib/surface/api_trace.h"
23 #include "src/core/lib/surface/server.h"
24 
25 namespace grpc_core {
26 namespace {
27 
28 class XdsServerConfigFetcher : public grpc_server_config_fetcher {
29  public:
XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)30   explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
31       : xds_client_(std::move(xds_client)) {
32     GPR_ASSERT(xds_client_ != nullptr);
33   }
34 
StartWatch(std::string listening_address,std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher)35   void StartWatch(std::string listening_address,
36                   std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
37                       watcher) override {
38     grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
39     auto listener_watcher =
40         absl::make_unique<ListenerWatcher>(std::move(watcher));
41     auto* listener_watcher_ptr = listener_watcher.get();
42     // TODO(yashykt): Get the resource name id from bootstrap
43     xds_client_->WatchListenerData(
44         absl::StrCat("grpc/server?xds.resource.listening_address=",
45                      listening_address),
46         std::move(listener_watcher));
47     MutexLock lock(&mu_);
48     auto& watcher_state = watchers_[watcher_ptr];
49     watcher_state.listening_address = listening_address;
50     watcher_state.listener_watcher = listener_watcher_ptr;
51   }
52 
CancelWatch(grpc_server_config_fetcher::WatcherInterface * watcher)53   void CancelWatch(
54       grpc_server_config_fetcher::WatcherInterface* watcher) override {
55     MutexLock lock(&mu_);
56     auto it = watchers_.find(watcher);
57     if (it != watchers_.end()) {
58       // Cancel the watch on the listener before erasing
59       xds_client_->CancelListenerDataWatch(it->second.listening_address,
60                                            it->second.listener_watcher,
61                                            false /* delay_unsubscription */);
62       watchers_.erase(it);
63     }
64   }
65 
66   // Return the interested parties from the xds client so that it can be polled.
interested_parties()67   grpc_pollset_set* interested_parties() override {
68     return xds_client_->interested_parties();
69   }
70 
71  private:
72   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
73    public:
ListenerWatcher(std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> server_config_watcher)74     explicit ListenerWatcher(
75         std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
76             server_config_watcher)
77         : server_config_watcher_(std::move(server_config_watcher)) {}
78 
OnListenerChanged(XdsApi::LdsUpdate listener)79     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
80       // TODO(yashykt): Construct channel args according to received update
81       server_config_watcher_->UpdateConfig(nullptr);
82     }
83 
OnError(grpc_error * error)84     void OnError(grpc_error* error) override {
85       gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
86               grpc_error_string(error));
87       GRPC_ERROR_UNREF(error);
88       // TODO(yashykt): We might want to bubble this error to the application.
89     }
90 
OnResourceDoesNotExist()91     void OnResourceDoesNotExist() override {
92       gpr_log(GPR_ERROR,
93               "ListenerWatcher:%p XdsClient reports requested listener does "
94               "not exist",
95               this);
96       // TODO(yashykt): We might want to bubble this error to the application.
97     }
98 
99    private:
100     std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
101         server_config_watcher_;
102   };
103 
104   struct WatcherState {
105     std::string listening_address;
106     ListenerWatcher* listener_watcher = nullptr;
107   };
108 
109   RefCountedPtr<XdsClient> xds_client_;
110   Mutex mu_;
111   std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
112       watchers_;
113 };
114 
115 }  // namespace
116 }  // namespace grpc_core
117 
grpc_server_config_fetcher_xds_create()118 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
119   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
120   grpc_core::ExecCtx exec_ctx;
121   GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
122   grpc_error* error = GRPC_ERROR_NONE;
123   grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
124       grpc_core::XdsClient::GetOrCreate(&error);
125   if (error != GRPC_ERROR_NONE) {
126     gpr_log(GPR_ERROR, "Failed to create xds client: %s",
127             grpc_error_string(error));
128     return nullptr;
129   }
130   return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
131 }
132