1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "host/commands/cvd/server.h"
18
19 #include <signal.h>
20
21 #include <atomic>
22 #include <future>
23 #include <map>
24 #include <mutex>
25 #include <optional>
26 #include <thread>
27
28 #include <android-base/file.h>
29 #include <android-base/logging.h>
30 #include <fruit/fruit.h>
31
32 #include "cvd_server.pb.h"
33
34 #include "common/libs/fs/shared_buf.h"
35 #include "common/libs/fs/shared_fd.h"
36 #include "common/libs/fs/shared_select.h"
37 #include "common/libs/utils/files.h"
38 #include "common/libs/utils/flag_parser.h"
39 #include "common/libs/utils/result.h"
40 #include "common/libs/utils/shared_fd_flag.h"
41 #include "common/libs/utils/subprocess.h"
42 #include "host/commands/cvd/epoll_loop.h"
43 #include "host/commands/cvd/scope_guard.h"
44 #include "host/commands/cvd/server_constants.h"
45 #include "host/libs/config/cuttlefish_config.h"
46 #include "host/libs/config/known_paths.h"
47
48 namespace cuttlefish {
49
RequestComponent(CvdServer * server,InstanceManager * instance_manager)50 static fruit::Component<> RequestComponent(CvdServer* server,
51 InstanceManager* instance_manager) {
52 return fruit::createComponent()
53 .bindInstance(*server)
54 .bindInstance(*instance_manager)
55 .install(AcloudCommandComponent)
56 .install(cvdCommandComponent)
57 .install(cvdShutdownComponent)
58 .install(cvdVersionComponent);
59 }
60
61 static constexpr int kNumThreads = 10;
62
CvdServer(EpollPool & epoll_pool,InstanceManager & instance_manager)63 CvdServer::CvdServer(EpollPool& epoll_pool, InstanceManager& instance_manager)
64 : epoll_pool_(epoll_pool),
65 instance_manager_(instance_manager),
66 running_(true) {
67 std::scoped_lock lock(threads_mutex_);
68 for (auto i = 0; i < kNumThreads; i++) {
69 threads_.emplace_back([this]() {
70 while (running_) {
71 auto result = epoll_pool_.HandleEvent();
72 if (!result.ok()) {
73 LOG(ERROR) << "Epoll worker error:\n" << result.error();
74 }
75 }
76 auto wakeup = BestEffortWakeup();
77 CHECK(wakeup.ok()) << wakeup.error().message();
78 });
79 }
80 }
81
~CvdServer()82 CvdServer::~CvdServer() {
83 running_ = false;
84 auto wakeup = BestEffortWakeup();
85 CHECK(wakeup.ok()) << wakeup.error().message();
86 Join();
87 }
88
BestEffortWakeup()89 Result<void> CvdServer::BestEffortWakeup() {
90 // This attempts to cascade through the responder threads, forcing them
91 // to wake up and see that running_ is false, then exit and wake up
92 // further threads.
93 auto eventfd = SharedFD::Event();
94 CF_EXPECT(eventfd->IsOpen(), eventfd->StrError());
95 CF_EXPECT(eventfd->EventfdWrite(1) == 0, eventfd->StrError());
96
97 auto cb = [](EpollEvent) -> Result<void> { return {}; };
98 CF_EXPECT(epoll_pool_.Register(eventfd, EPOLLIN, cb));
99 return {};
100 }
101
Stop()102 void CvdServer::Stop() {
103 {
104 std::lock_guard lock(ongoing_requests_mutex_);
105 running_ = false;
106 }
107 while (true) {
108 std::shared_ptr<OngoingRequest> request;
109 {
110 std::lock_guard lock(ongoing_requests_mutex_);
111 if (ongoing_requests_.empty()) {
112 break;
113 }
114 auto it = ongoing_requests_.begin();
115 request = *it;
116 ongoing_requests_.erase(it);
117 }
118 {
119 std::lock_guard lock(request->mutex);
120 if (request->handler == nullptr) {
121 continue;
122 }
123 request->handler->Interrupt();
124 }
125 std::scoped_lock lock(threads_mutex_);
126 for (auto& thread : threads_) {
127 auto current_thread = thread.get_id() == std::this_thread::get_id();
128 auto matching_thread = thread.get_id() == request->thread_id;
129 if (!current_thread && matching_thread && thread.joinable()) {
130 thread.join();
131 }
132 }
133 }
134 }
135
Join()136 void CvdServer::Join() {
137 for (auto& thread : threads_) {
138 if (thread.joinable()) {
139 thread.join();
140 }
141 }
142 }
143
RequestHandler(const RequestWithStdio & request,const std::vector<CvdServerHandler * > & handlers)144 static Result<CvdServerHandler*> RequestHandler(
145 const RequestWithStdio& request,
146 const std::vector<CvdServerHandler*>& handlers) {
147 Result<cvd::Response> response;
148 std::vector<CvdServerHandler*> compatible_handlers;
149 for (auto& handler : handlers) {
150 if (CF_EXPECT(handler->CanHandle(request))) {
151 compatible_handlers.push_back(handler);
152 }
153 }
154 CF_EXPECT(compatible_handlers.size() == 1,
155 "Expected exactly one handler for message, found "
156 << compatible_handlers.size());
157 return compatible_handlers[0];
158 }
159
StartServer(SharedFD server_fd)160 Result<void> CvdServer::StartServer(SharedFD server_fd) {
161 auto cb = [this](EpollEvent ev) -> Result<void> {
162 CF_EXPECT(AcceptClient(ev));
163 return {};
164 };
165 CF_EXPECT(epoll_pool_.Register(server_fd, EPOLLIN, cb));
166 return {};
167 }
168
AcceptClient(EpollEvent event)169 Result<void> CvdServer::AcceptClient(EpollEvent event) {
170 ScopeGuard stop_on_failure([this] { Stop(); });
171
172 CF_EXPECT(event.events & EPOLLIN);
173 auto client_fd = SharedFD::Accept(*event.fd);
174 CF_EXPECT(client_fd->IsOpen(), client_fd->StrError());
175 auto client_cb = [this](EpollEvent ev) -> Result<void> {
176 CF_EXPECT(HandleMessage(ev));
177 return {};
178 };
179 CF_EXPECT(epoll_pool_.Register(client_fd, EPOLLIN, client_cb));
180
181 auto self_cb = [this](EpollEvent ev) -> Result<void> {
182 CF_EXPECT(AcceptClient(ev));
183 return {};
184 };
185 CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb));
186
187 stop_on_failure.Cancel();
188 return {};
189 }
190
HandleMessage(EpollEvent event)191 Result<void> CvdServer::HandleMessage(EpollEvent event) {
192 ScopeGuard abandon_client([this, event] { epoll_pool_.Remove(event.fd); });
193
194 if (event.events & EPOLLHUP) { // Client went away.
195 epoll_pool_.Remove(event.fd);
196 return {};
197 }
198
199 CF_EXPECT(event.events & EPOLLIN);
200 auto request = CF_EXPECT(GetRequest(event.fd));
201 if (!request) { // End-of-file / client went away.
202 epoll_pool_.Remove(event.fd);
203 return {};
204 }
205
206 auto response = HandleRequest(*request, event.fd);
207 if (!response.ok()) {
208 cvd::Response failure_message;
209 failure_message.mutable_status()->set_code(cvd::Status::INTERNAL);
210 failure_message.mutable_status()->set_message(response.error().message());
211 CF_EXPECT(SendResponse(event.fd, failure_message));
212 return {}; // Error already sent to the client, don't repeat on the server
213 }
214 CF_EXPECT(SendResponse(event.fd, *response));
215
216 auto self_cb = [this](EpollEvent ev) -> Result<void> {
217 CF_EXPECT(HandleMessage(ev));
218 return {};
219 };
220 CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb));
221
222 abandon_client.Cancel();
223 return {};
224 }
225
HandleRequest(RequestWithStdio request,SharedFD client)226 Result<cvd::Response> CvdServer::HandleRequest(RequestWithStdio request,
227 SharedFD client) {
228 fruit::Injector<> injector(RequestComponent, this, &instance_manager_);
229 auto possible_handlers = injector.getMultibindings<CvdServerHandler>();
230
231 // Even if the interrupt callback outlives the request handler, it'll only
232 // hold on to this struct which will be cleaned out when the request handler
233 // exits.
234 auto shared = std::make_shared<OngoingRequest>();
235 shared->handler = CF_EXPECT(RequestHandler(request, possible_handlers));
236 shared->thread_id = std::this_thread::get_id();
237
238 {
239 std::lock_guard lock(ongoing_requests_mutex_);
240 if (running_) {
241 ongoing_requests_.insert(shared);
242 } else {
243 // We're executing concurrently with a Stop() call.
244 return {};
245 }
246 }
247 ScopeGuard remove_ongoing_request([this, shared] {
248 std::lock_guard lock(ongoing_requests_mutex_);
249 ongoing_requests_.erase(shared);
250 });
251
252 auto interrupt_cb = [shared](EpollEvent) -> Result<void> {
253 std::lock_guard lock(shared->mutex);
254 CF_EXPECT(shared->handler != nullptr);
255 CF_EXPECT(shared->handler->Interrupt());
256 return {};
257 };
258 CF_EXPECT(epoll_pool_.Register(client, EPOLLHUP, interrupt_cb));
259
260 auto response = CF_EXPECT(shared->handler->Handle(request));
261 {
262 std::lock_guard lock(shared->mutex);
263 shared->handler = nullptr;
264 }
265 CF_EXPECT(epoll_pool_.Remove(client)); // Delete interrupt handler
266
267 return response;
268 }
269
ServerComponent()270 static fruit::Component<CvdServer> ServerComponent() {
271 return fruit::createComponent()
272 .install(EpollLoopComponent);
273 }
274
CvdServerMain(SharedFD server_fd)275 Result<int> CvdServerMain(SharedFD server_fd) {
276 LOG(INFO) << "Starting server";
277
278 signal(SIGPIPE, SIG_IGN);
279
280 CF_EXPECT(server_fd->IsOpen(), "Did not receive a valid cvd_server fd");
281
282 fruit::Injector<CvdServer> injector(ServerComponent);
283 CvdServer& server = injector.get<CvdServer&>();
284 server.StartServer(server_fd);
285 server.Join();
286
287 return 0;
288 }
289
290 } // namespace cuttlefish
291