1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "handler_manager.h"
16
17 #include <pthread.h>
18 #include <sys/eventfd.h>
19
20 #include <android-base/logging.h>
21
22 #include "android-base/properties.h"
23 #include "merge_worker.h"
24 #include "read_worker.h"
25 #include "snapuserd_core.h"
26
27 namespace android {
28 namespace snapshot {
29
30 static constexpr uint8_t kMaxMergeThreads = 2;
31
HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)32 HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
33 : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
34
FreeResources()35 void HandlerThread::FreeResources() {
36 // Each worker thread holds a reference to snapuserd.
37 // Clear them so that all the resources
38 // held by snapuserd is released
39 if (snapuserd_) {
40 snapuserd_->FreeResources();
41 snapuserd_ = nullptr;
42 }
43 }
44
SnapshotHandlerManager()45 SnapshotHandlerManager::SnapshotHandlerManager() {
46 monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
47 if (monitor_merge_event_fd_ == -1) {
48 PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
49 }
50 }
51
AddHandler(const std::string & misc_name,const std::string & cow_device_path,const std::string & backing_device,const std::string & base_path_merge,std::shared_ptr<IBlockServerOpener> opener,HandlerOptions options)52 std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
53 const std::string& misc_name, const std::string& cow_device_path,
54 const std::string& backing_device, const std::string& base_path_merge,
55 std::shared_ptr<IBlockServerOpener> opener, HandlerOptions options) {
56 auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
57 base_path_merge, opener, options);
58 if (!snapuserd->InitCowDevice()) {
59 LOG(ERROR) << "Failed to initialize Snapuserd";
60 return nullptr;
61 }
62
63 if (!snapuserd->InitializeWorkers()) {
64 LOG(ERROR) << "Failed to initialize workers";
65 return nullptr;
66 }
67
68 auto handler = std::make_shared<HandlerThread>(snapuserd);
69 {
70 std::lock_guard<std::mutex> lock(lock_);
71 if (FindHandler(&lock, misc_name) != dm_users_.end()) {
72 LOG(ERROR) << "Handler already exists: " << misc_name;
73 return nullptr;
74 }
75 dm_users_.push_back(handler);
76 }
77 return handler;
78 }
79
StartHandler(const std::string & misc_name)80 bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
81 std::lock_guard<std::mutex> lock(lock_);
82 auto iter = FindHandler(&lock, misc_name);
83 if (iter == dm_users_.end()) {
84 LOG(ERROR) << "Could not find handler: " << misc_name;
85 return false;
86 }
87 if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
88 LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
89 return false;
90 }
91 if (!StartHandler(*iter)) {
92 return false;
93 }
94 return true;
95 }
96
StartHandler(const std::shared_ptr<HandlerThread> & handler)97 bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
98 if (handler->snapuserd()->IsAttached()) {
99 LOG(ERROR) << "Handler already attached";
100 return false;
101 }
102
103 handler->snapuserd()->AttachControlDevice();
104
105 handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
106 return true;
107 }
108
DeleteHandler(const std::string & misc_name)109 bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
110 {
111 std::lock_guard<std::mutex> lock(lock_);
112 auto iter = FindHandler(&lock, misc_name);
113 if (iter == dm_users_.end()) {
114 // After merge is completed, we swap dm-user table with
115 // the underlying dm-linear base device. Hence, worker
116 // threads would have terminted and was removed from
117 // the list.
118 LOG(DEBUG) << "Could not find handler: " << misc_name;
119 return true;
120 }
121
122 if (!(*iter)->ThreadTerminated()) {
123 (*iter)->snapuserd()->NotifyIOTerminated();
124 }
125 }
126 if (!RemoveAndJoinHandler(misc_name)) {
127 return false;
128 }
129 return true;
130 }
131
RunThread(std::shared_ptr<HandlerThread> handler)132 void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
133 LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
134
135 pthread_setname_np(pthread_self(), "Handler");
136
137 if (!handler->snapuserd()->Start()) {
138 LOG(ERROR) << " Failed to launch all worker threads";
139 }
140
141 handler->snapuserd()->CloseFds();
142 bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
143 handler->snapuserd()->UnmapBufferRegion();
144
145 auto misc_name = handler->misc_name();
146 LOG(INFO) << "Handler thread about to exit: " << misc_name;
147
148 {
149 std::lock_guard<std::mutex> lock(lock_);
150 if (merge_completed) {
151 num_partitions_merge_complete_ += 1;
152 active_merge_threads_ -= 1;
153 WakeupMonitorMergeThread();
154 }
155 handler->SetThreadTerminated();
156 auto iter = FindHandler(&lock, handler->misc_name());
157 if (iter == dm_users_.end()) {
158 // RemoveAndJoinHandler() already removed us from the list, and is
159 // now waiting on a join(), so just return. Additionally, release
160 // all the resources held by snapuserd object which are shared
161 // by worker threads. This should be done when the last reference
162 // of "handler" is released; but we will explicitly release here
163 // to make sure snapuserd object is freed as it is the biggest
164 // consumer of memory in the daemon.
165 handler->FreeResources();
166 LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
167 return;
168 }
169
170 LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
171
172 if (handler->snapuserd()->IsAttached()) {
173 handler->thread().detach();
174 }
175
176 // Important: free resources within the lock. This ensures that if
177 // WaitForDelete() is called, the handler is either in the list, or
178 // it's not and its resources are guaranteed to be freed.
179 handler->FreeResources();
180 dm_users_.erase(iter);
181 }
182 }
183
InitiateMerge(const std::string & misc_name)184 bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
185 std::lock_guard<std::mutex> lock(lock_);
186 auto iter = FindHandler(&lock, misc_name);
187 if (iter == dm_users_.end()) {
188 LOG(ERROR) << "Could not find handler: " << misc_name;
189 return false;
190 }
191
192 return StartMerge(&lock, *iter);
193 }
194
StartMerge(std::lock_guard<std::mutex> * proof_of_lock,const std::shared_ptr<HandlerThread> & handler)195 bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
196 const std::shared_ptr<HandlerThread>& handler) {
197 CHECK(proof_of_lock);
198
199 if (!handler->snapuserd()->IsAttached()) {
200 LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
201 return false;
202 }
203
204 handler->snapuserd()->MonitorMerge();
205
206 if (!merge_monitor_.joinable()) {
207 merge_monitor_ = std::thread(&SnapshotHandlerManager::MonitorMerge, this);
208 }
209
210 merge_handlers_.push(handler);
211 WakeupMonitorMergeThread();
212 return true;
213 }
214
WakeupMonitorMergeThread()215 void SnapshotHandlerManager::WakeupMonitorMergeThread() {
216 uint64_t notify = 1;
217 ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify)));
218 if (rc < 0) {
219 PLOG(FATAL) << "failed to notify monitor merge thread";
220 }
221 }
222
MonitorMerge()223 void SnapshotHandlerManager::MonitorMerge() {
224 pthread_setname_np(pthread_self(), "Merge Monitor");
225 while (!stop_monitor_merge_thread_) {
226 uint64_t testVal;
227 ssize_t ret =
228 TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
229 if (ret == -1) {
230 PLOG(FATAL) << "Failed to read from eventfd";
231 } else if (ret == 0) {
232 LOG(FATAL) << "Hit EOF on eventfd";
233 }
234
235 LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
236 {
237 auto num_merge_threads = android::base::GetUintProperty<uint>(
238 "ro.virtual_ab.num_merge_threads", kMaxMergeThreads);
239 std::lock_guard<std::mutex> lock(lock_);
240 while (active_merge_threads_ < num_merge_threads && merge_handlers_.size() > 0) {
241 auto handler = merge_handlers_.front();
242 merge_handlers_.pop();
243
244 if (!handler->snapuserd()) {
245 LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
246 continue;
247 }
248
249 LOG(INFO) << "Starting merge for partition: "
250 << handler->snapuserd()->GetMiscName();
251 handler->snapuserd()->InitiateMerge();
252 active_merge_threads_ += 1;
253 }
254 }
255 }
256
257 LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
258 }
259
GetMergeStatus(const std::string & misc_name)260 std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
261 std::lock_guard<std::mutex> lock(lock_);
262 auto iter = FindHandler(&lock, misc_name);
263 if (iter == dm_users_.end()) {
264 LOG(ERROR) << "Could not find handler: " << misc_name;
265 return {};
266 }
267
268 return (*iter)->snapuserd()->GetMergeStatus();
269 }
270
GetMergePercentage()271 double SnapshotHandlerManager::GetMergePercentage() {
272 std::lock_guard<std::mutex> lock(lock_);
273
274 double percentage = 0.0;
275 int n = 0;
276
277 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
278 auto& th = (*iter)->thread();
279 if (th.joinable()) {
280 // Merge percentage by individual partitions wherein merge is still
281 // in-progress
282 percentage += (*iter)->snapuserd()->GetMergePercentage();
283 n += 1;
284 }
285 }
286
287 // Calculate final merge including those partitions where merge was already
288 // completed - num_partitions_merge_complete_ will track them when each
289 // thread exists in RunThread.
290 int total_partitions = n + num_partitions_merge_complete_;
291
292 if (total_partitions) {
293 percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
294 }
295
296 LOG(DEBUG) << "Merge %: " << percentage
297 << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
298 << " total_partitions: " << total_partitions << " n: " << n;
299 return percentage;
300 }
301
GetVerificationStatus()302 bool SnapshotHandlerManager::GetVerificationStatus() {
303 std::lock_guard<std::mutex> lock(lock_);
304
305 bool status = true;
306 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
307 auto& th = (*iter)->thread();
308 if (th.joinable() && status) {
309 status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
310 } else {
311 // return immediately if there is a failure
312 return false;
313 }
314 }
315
316 return status;
317 }
318
RemoveAndJoinHandler(const std::string & misc_name)319 bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
320 std::shared_ptr<HandlerThread> handler;
321 {
322 std::lock_guard<std::mutex> lock(lock_);
323
324 auto iter = FindHandler(&lock, misc_name);
325 if (iter == dm_users_.end()) {
326 // Client already deleted.
327 return true;
328 }
329 handler = std::move(*iter);
330 dm_users_.erase(iter);
331 }
332
333 auto& th = handler->thread();
334 if (th.joinable()) {
335 th.join();
336 }
337 return true;
338 }
339
TerminateMergeThreads()340 void SnapshotHandlerManager::TerminateMergeThreads() {
341 std::lock_guard<std::mutex> guard(lock_);
342
343 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
344 if (!(*iter)->ThreadTerminated()) {
345 (*iter)->snapuserd()->NotifyIOTerminated();
346 }
347 }
348 }
349
JoinAllThreads()350 void SnapshotHandlerManager::JoinAllThreads() {
351 // Acquire the thread list within the lock.
352 std::vector<std::shared_ptr<HandlerThread>> dm_users;
353 {
354 std::lock_guard<std::mutex> guard(lock_);
355 dm_users = std::move(dm_users_);
356 }
357
358 for (auto& client : dm_users) {
359 auto& th = client->thread();
360
361 if (th.joinable()) th.join();
362 }
363
364 if (merge_monitor_.joinable()) {
365 stop_monitor_merge_thread_ = true;
366 WakeupMonitorMergeThread();
367
368 merge_monitor_.join();
369 }
370 }
371
FindHandler(std::lock_guard<std::mutex> * proof_of_lock,const std::string & misc_name)372 auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
373 const std::string& misc_name) -> HandlerList::iterator {
374 CHECK(proof_of_lock);
375
376 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
377 if ((*iter)->misc_name() == misc_name) {
378 return iter;
379 }
380 }
381 return dm_users_.end();
382 }
383
PauseMerge()384 void SnapshotHandlerManager::PauseMerge() {
385 std::lock_guard<std::mutex> guard(lock_);
386
387 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
388 if (!(*iter)->ThreadTerminated()) {
389 (*iter)->snapuserd()->PauseMergeThreads();
390 }
391 }
392 }
393
ResumeMerge()394 void SnapshotHandlerManager::ResumeMerge() {
395 std::lock_guard<std::mutex> guard(lock_);
396
397 for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
398 if (!(*iter)->ThreadTerminated()) {
399 (*iter)->snapuserd()->ResumeMergeThreads();
400 }
401 }
402 }
403
404 } // namespace snapshot
405 } // namespace android
406