1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/xds/xds_client.h"
22 #include "src/core/lib/surface/api_trace.h"
23 #include "src/core/lib/surface/server.h"
24
25 namespace grpc_core {
26 namespace {
27
28 class XdsServerConfigFetcher : public grpc_server_config_fetcher {
29 public:
XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)30 explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
31 : xds_client_(std::move(xds_client)) {
32 GPR_ASSERT(xds_client_ != nullptr);
33 }
34
StartWatch(std::string listening_address,std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher)35 void StartWatch(std::string listening_address,
36 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
37 watcher) override {
38 grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
39 auto listener_watcher =
40 absl::make_unique<ListenerWatcher>(std::move(watcher));
41 auto* listener_watcher_ptr = listener_watcher.get();
42 // TODO(yashykt): Get the resource name id from bootstrap
43 xds_client_->WatchListenerData(
44 absl::StrCat("grpc/server?xds.resource.listening_address=",
45 listening_address),
46 std::move(listener_watcher));
47 MutexLock lock(&mu_);
48 auto& watcher_state = watchers_[watcher_ptr];
49 watcher_state.listening_address = listening_address;
50 watcher_state.listener_watcher = listener_watcher_ptr;
51 }
52
CancelWatch(grpc_server_config_fetcher::WatcherInterface * watcher)53 void CancelWatch(
54 grpc_server_config_fetcher::WatcherInterface* watcher) override {
55 MutexLock lock(&mu_);
56 auto it = watchers_.find(watcher);
57 if (it != watchers_.end()) {
58 // Cancel the watch on the listener before erasing
59 xds_client_->CancelListenerDataWatch(it->second.listening_address,
60 it->second.listener_watcher,
61 false /* delay_unsubscription */);
62 watchers_.erase(it);
63 }
64 }
65
66 // Return the interested parties from the xds client so that it can be polled.
interested_parties()67 grpc_pollset_set* interested_parties() override {
68 return xds_client_->interested_parties();
69 }
70
71 private:
72 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
73 public:
ListenerWatcher(std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> server_config_watcher)74 explicit ListenerWatcher(
75 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
76 server_config_watcher)
77 : server_config_watcher_(std::move(server_config_watcher)) {}
78
OnListenerChanged(XdsApi::LdsUpdate listener)79 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
80 // TODO(yashykt): Construct channel args according to received update
81 server_config_watcher_->UpdateConfig(nullptr);
82 }
83
OnError(grpc_error * error)84 void OnError(grpc_error* error) override {
85 gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
86 grpc_error_string(error));
87 GRPC_ERROR_UNREF(error);
88 // TODO(yashykt): We might want to bubble this error to the application.
89 }
90
OnResourceDoesNotExist()91 void OnResourceDoesNotExist() override {
92 gpr_log(GPR_ERROR,
93 "ListenerWatcher:%p XdsClient reports requested listener does "
94 "not exist",
95 this);
96 // TODO(yashykt): We might want to bubble this error to the application.
97 }
98
99 private:
100 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
101 server_config_watcher_;
102 };
103
104 struct WatcherState {
105 std::string listening_address;
106 ListenerWatcher* listener_watcher = nullptr;
107 };
108
109 RefCountedPtr<XdsClient> xds_client_;
110 Mutex mu_;
111 std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
112 watchers_;
113 };
114
115 } // namespace
116 } // namespace grpc_core
117
grpc_server_config_fetcher_xds_create()118 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
119 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
120 grpc_core::ExecCtx exec_ctx;
121 GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
122 grpc_error* error = GRPC_ERROR_NONE;
123 grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
124 grpc_core::XdsClient::GetOrCreate(&error);
125 if (error != GRPC_ERROR_NONE) {
126 gpr_log(GPR_ERROR, "Failed to create xds client: %s",
127 grpc_error_string(error));
128 return nullptr;
129 }
130 return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
131 }
132