• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/commands/cvd/server.h"
18 
19 #include <signal.h>
20 
21 #include <atomic>
22 #include <future>
23 #include <map>
24 #include <mutex>
25 #include <optional>
26 #include <thread>
27 
28 #include <android-base/file.h>
29 #include <android-base/logging.h>
30 #include <fruit/fruit.h>
31 
32 #include "cvd_server.pb.h"
33 
34 #include "common/libs/fs/shared_buf.h"
35 #include "common/libs/fs/shared_fd.h"
36 #include "common/libs/fs/shared_select.h"
37 #include "common/libs/utils/files.h"
38 #include "common/libs/utils/flag_parser.h"
39 #include "common/libs/utils/result.h"
40 #include "common/libs/utils/shared_fd_flag.h"
41 #include "common/libs/utils/subprocess.h"
42 #include "host/commands/cvd/epoll_loop.h"
43 #include "host/commands/cvd/scope_guard.h"
44 #include "host/commands/cvd/server_constants.h"
45 #include "host/libs/config/cuttlefish_config.h"
46 #include "host/libs/config/known_paths.h"
47 
48 namespace cuttlefish {
49 
RequestComponent(CvdServer * server,InstanceManager * instance_manager)50 static fruit::Component<> RequestComponent(CvdServer* server,
51                                            InstanceManager* instance_manager) {
52   return fruit::createComponent()
53       .bindInstance(*server)
54       .bindInstance(*instance_manager)
55       .install(AcloudCommandComponent)
56       .install(cvdCommandComponent)
57       .install(cvdShutdownComponent)
58       .install(cvdVersionComponent);
59 }
60 
61 static constexpr int kNumThreads = 10;
62 
CvdServer(EpollPool & epoll_pool,InstanceManager & instance_manager)63 CvdServer::CvdServer(EpollPool& epoll_pool, InstanceManager& instance_manager)
64     : epoll_pool_(epoll_pool),
65       instance_manager_(instance_manager),
66       running_(true) {
67   std::scoped_lock lock(threads_mutex_);
68   for (auto i = 0; i < kNumThreads; i++) {
69     threads_.emplace_back([this]() {
70       while (running_) {
71         auto result = epoll_pool_.HandleEvent();
72         if (!result.ok()) {
73           LOG(ERROR) << "Epoll worker error:\n" << result.error();
74         }
75       }
76       auto wakeup = BestEffortWakeup();
77       CHECK(wakeup.ok()) << wakeup.error().message();
78     });
79   }
80 }
81 
~CvdServer()82 CvdServer::~CvdServer() {
83   running_ = false;
84   auto wakeup = BestEffortWakeup();
85   CHECK(wakeup.ok()) << wakeup.error().message();
86   Join();
87 }
88 
BestEffortWakeup()89 Result<void> CvdServer::BestEffortWakeup() {
90   // This attempts to cascade through the responder threads, forcing them
91   // to wake up and see that running_ is false, then exit and wake up
92   // further threads.
93   auto eventfd = SharedFD::Event();
94   CF_EXPECT(eventfd->IsOpen(), eventfd->StrError());
95   CF_EXPECT(eventfd->EventfdWrite(1) == 0, eventfd->StrError());
96 
97   auto cb = [](EpollEvent) -> Result<void> { return {}; };
98   CF_EXPECT(epoll_pool_.Register(eventfd, EPOLLIN, cb));
99   return {};
100 }
101 
Stop()102 void CvdServer::Stop() {
103   {
104     std::lock_guard lock(ongoing_requests_mutex_);
105     running_ = false;
106   }
107   while (true) {
108     std::shared_ptr<OngoingRequest> request;
109     {
110       std::lock_guard lock(ongoing_requests_mutex_);
111       if (ongoing_requests_.empty()) {
112         break;
113       }
114       auto it = ongoing_requests_.begin();
115       request = *it;
116       ongoing_requests_.erase(it);
117     }
118     {
119       std::lock_guard lock(request->mutex);
120       if (request->handler == nullptr) {
121         continue;
122       }
123       request->handler->Interrupt();
124     }
125     std::scoped_lock lock(threads_mutex_);
126     for (auto& thread : threads_) {
127       auto current_thread = thread.get_id() == std::this_thread::get_id();
128       auto matching_thread = thread.get_id() == request->thread_id;
129       if (!current_thread && matching_thread && thread.joinable()) {
130         thread.join();
131       }
132     }
133   }
134 }
135 
Join()136 void CvdServer::Join() {
137   for (auto& thread : threads_) {
138     if (thread.joinable()) {
139       thread.join();
140     }
141   }
142 }
143 
RequestHandler(const RequestWithStdio & request,const std::vector<CvdServerHandler * > & handlers)144 static Result<CvdServerHandler*> RequestHandler(
145     const RequestWithStdio& request,
146     const std::vector<CvdServerHandler*>& handlers) {
147   Result<cvd::Response> response;
148   std::vector<CvdServerHandler*> compatible_handlers;
149   for (auto& handler : handlers) {
150     if (CF_EXPECT(handler->CanHandle(request))) {
151       compatible_handlers.push_back(handler);
152     }
153   }
154   CF_EXPECT(compatible_handlers.size() == 1,
155             "Expected exactly one handler for message, found "
156                 << compatible_handlers.size());
157   return compatible_handlers[0];
158 }
159 
StartServer(SharedFD server_fd)160 Result<void> CvdServer::StartServer(SharedFD server_fd) {
161   auto cb = [this](EpollEvent ev) -> Result<void> {
162     CF_EXPECT(AcceptClient(ev));
163     return {};
164   };
165   CF_EXPECT(epoll_pool_.Register(server_fd, EPOLLIN, cb));
166   return {};
167 }
168 
AcceptClient(EpollEvent event)169 Result<void> CvdServer::AcceptClient(EpollEvent event) {
170   ScopeGuard stop_on_failure([this] { Stop(); });
171 
172   CF_EXPECT(event.events & EPOLLIN);
173   auto client_fd = SharedFD::Accept(*event.fd);
174   CF_EXPECT(client_fd->IsOpen(), client_fd->StrError());
175   auto client_cb = [this](EpollEvent ev) -> Result<void> {
176     CF_EXPECT(HandleMessage(ev));
177     return {};
178   };
179   CF_EXPECT(epoll_pool_.Register(client_fd, EPOLLIN, client_cb));
180 
181   auto self_cb = [this](EpollEvent ev) -> Result<void> {
182     CF_EXPECT(AcceptClient(ev));
183     return {};
184   };
185   CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb));
186 
187   stop_on_failure.Cancel();
188   return {};
189 }
190 
HandleMessage(EpollEvent event)191 Result<void> CvdServer::HandleMessage(EpollEvent event) {
192   ScopeGuard abandon_client([this, event] { epoll_pool_.Remove(event.fd); });
193 
194   if (event.events & EPOLLHUP) {  // Client went away.
195     epoll_pool_.Remove(event.fd);
196     return {};
197   }
198 
199   CF_EXPECT(event.events & EPOLLIN);
200   auto request = CF_EXPECT(GetRequest(event.fd));
201   if (!request) {  // End-of-file / client went away.
202     epoll_pool_.Remove(event.fd);
203     return {};
204   }
205 
206   auto response = HandleRequest(*request, event.fd);
207   if (!response.ok()) {
208     cvd::Response failure_message;
209     failure_message.mutable_status()->set_code(cvd::Status::INTERNAL);
210     failure_message.mutable_status()->set_message(response.error().message());
211     CF_EXPECT(SendResponse(event.fd, failure_message));
212     return {};  // Error already sent to the client, don't repeat on the server
213   }
214   CF_EXPECT(SendResponse(event.fd, *response));
215 
216   auto self_cb = [this](EpollEvent ev) -> Result<void> {
217     CF_EXPECT(HandleMessage(ev));
218     return {};
219   };
220   CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb));
221 
222   abandon_client.Cancel();
223   return {};
224 }
225 
HandleRequest(RequestWithStdio request,SharedFD client)226 Result<cvd::Response> CvdServer::HandleRequest(RequestWithStdio request,
227                                                SharedFD client) {
228   fruit::Injector<> injector(RequestComponent, this, &instance_manager_);
229   auto possible_handlers = injector.getMultibindings<CvdServerHandler>();
230 
231   // Even if the interrupt callback outlives the request handler, it'll only
232   // hold on to this struct which will be cleaned out when the request handler
233   // exits.
234   auto shared = std::make_shared<OngoingRequest>();
235   shared->handler = CF_EXPECT(RequestHandler(request, possible_handlers));
236   shared->thread_id = std::this_thread::get_id();
237 
238   {
239     std::lock_guard lock(ongoing_requests_mutex_);
240     if (running_) {
241       ongoing_requests_.insert(shared);
242     } else {
243       // We're executing concurrently with a Stop() call.
244       return {};
245     }
246   }
247   ScopeGuard remove_ongoing_request([this, shared] {
248     std::lock_guard lock(ongoing_requests_mutex_);
249     ongoing_requests_.erase(shared);
250   });
251 
252   auto interrupt_cb = [shared](EpollEvent) -> Result<void> {
253     std::lock_guard lock(shared->mutex);
254     CF_EXPECT(shared->handler != nullptr);
255     CF_EXPECT(shared->handler->Interrupt());
256     return {};
257   };
258   CF_EXPECT(epoll_pool_.Register(client, EPOLLHUP, interrupt_cb));
259 
260   auto response = CF_EXPECT(shared->handler->Handle(request));
261   {
262     std::lock_guard lock(shared->mutex);
263     shared->handler = nullptr;
264   }
265   CF_EXPECT(epoll_pool_.Remove(client));  // Delete interrupt handler
266 
267   return response;
268 }
269 
ServerComponent()270 static fruit::Component<CvdServer> ServerComponent() {
271   return fruit::createComponent()
272       .install(EpollLoopComponent);
273 }
274 
CvdServerMain(SharedFD server_fd)275 Result<int> CvdServerMain(SharedFD server_fd) {
276   LOG(INFO) << "Starting server";
277 
278   signal(SIGPIPE, SIG_IGN);
279 
280   CF_EXPECT(server_fd->IsOpen(), "Did not receive a valid cvd_server fd");
281 
282   fruit::Injector<CvdServer> injector(ServerComponent);
283   CvdServer& server = injector.get<CvdServer&>();
284   server.StartServer(server_fd);
285   server.Join();
286 
287   return 0;
288 }
289 
290 }  // namespace cuttlefish
291