• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "handler_manager.h"
16 
17 #include <pthread.h>
18 #include <sys/eventfd.h>
19 
20 #include <android-base/logging.h>
21 
22 #include "android-base/properties.h"
23 #include "merge_worker.h"
24 #include "read_worker.h"
25 #include "snapuserd_core.h"
26 
27 namespace android {
28 namespace snapshot {
29 
30 static constexpr uint8_t kMaxMergeThreads = 2;
31 
HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)32 HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
33     : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
34 
FreeResources()35 void HandlerThread::FreeResources() {
36     // Each worker thread holds a reference to snapuserd.
37     // Clear them so that all the resources
38     // held by snapuserd is released
39     if (snapuserd_) {
40         snapuserd_->FreeResources();
41         snapuserd_ = nullptr;
42     }
43 }
44 
SnapshotHandlerManager()45 SnapshotHandlerManager::SnapshotHandlerManager() {
46     monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
47     if (monitor_merge_event_fd_ == -1) {
48         PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
49     }
50 }
51 
AddHandler(const std::string & misc_name,const std::string & cow_device_path,const std::string & backing_device,const std::string & base_path_merge,std::shared_ptr<IBlockServerOpener> opener,HandlerOptions options)52 std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
53         const std::string& misc_name, const std::string& cow_device_path,
54         const std::string& backing_device, const std::string& base_path_merge,
55         std::shared_ptr<IBlockServerOpener> opener, HandlerOptions options) {
56     auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
57                                                        base_path_merge, opener, options);
58     if (!snapuserd->InitCowDevice()) {
59         LOG(ERROR) << "Failed to initialize Snapuserd";
60         return nullptr;
61     }
62 
63     if (!snapuserd->InitializeWorkers()) {
64         LOG(ERROR) << "Failed to initialize workers";
65         return nullptr;
66     }
67 
68     auto handler = std::make_shared<HandlerThread>(snapuserd);
69     {
70         std::lock_guard<std::mutex> lock(lock_);
71         if (FindHandler(&lock, misc_name) != dm_users_.end()) {
72             LOG(ERROR) << "Handler already exists: " << misc_name;
73             return nullptr;
74         }
75         dm_users_.push_back(handler);
76     }
77     return handler;
78 }
79 
StartHandler(const std::string & misc_name)80 bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
81     std::lock_guard<std::mutex> lock(lock_);
82     auto iter = FindHandler(&lock, misc_name);
83     if (iter == dm_users_.end()) {
84         LOG(ERROR) << "Could not find handler: " << misc_name;
85         return false;
86     }
87     if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
88         LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
89         return false;
90     }
91     if (!StartHandler(*iter)) {
92         return false;
93     }
94     return true;
95 }
96 
StartHandler(const std::shared_ptr<HandlerThread> & handler)97 bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
98     if (handler->snapuserd()->IsAttached()) {
99         LOG(ERROR) << "Handler already attached";
100         return false;
101     }
102 
103     handler->snapuserd()->AttachControlDevice();
104 
105     handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
106     return true;
107 }
108 
DeleteHandler(const std::string & misc_name)109 bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
110     {
111         std::lock_guard<std::mutex> lock(lock_);
112         auto iter = FindHandler(&lock, misc_name);
113         if (iter == dm_users_.end()) {
114             // After merge is completed, we swap dm-user table with
115             // the underlying dm-linear base device. Hence, worker
116             // threads would have terminted and was removed from
117             // the list.
118             LOG(DEBUG) << "Could not find handler: " << misc_name;
119             return true;
120         }
121 
122         if (!(*iter)->ThreadTerminated()) {
123             (*iter)->snapuserd()->NotifyIOTerminated();
124         }
125     }
126     if (!RemoveAndJoinHandler(misc_name)) {
127         return false;
128     }
129     return true;
130 }
131 
RunThread(std::shared_ptr<HandlerThread> handler)132 void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
133     LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
134 
135     pthread_setname_np(pthread_self(), "Handler");
136 
137     if (!handler->snapuserd()->Start()) {
138         LOG(ERROR) << " Failed to launch all worker threads";
139     }
140 
141     handler->snapuserd()->CloseFds();
142     bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
143     handler->snapuserd()->UnmapBufferRegion();
144 
145     auto misc_name = handler->misc_name();
146     LOG(INFO) << "Handler thread about to exit: " << misc_name;
147 
148     {
149         std::lock_guard<std::mutex> lock(lock_);
150         if (merge_completed) {
151             num_partitions_merge_complete_ += 1;
152             active_merge_threads_ -= 1;
153             WakeupMonitorMergeThread();
154         }
155         handler->SetThreadTerminated();
156         auto iter = FindHandler(&lock, handler->misc_name());
157         if (iter == dm_users_.end()) {
158             // RemoveAndJoinHandler() already removed us from the list, and is
159             // now waiting on a join(), so just return. Additionally, release
160             // all the resources held by snapuserd object which are shared
161             // by worker threads. This should be done when the last reference
162             // of "handler" is released; but we will explicitly release here
163             // to make sure snapuserd object is freed as it is the biggest
164             // consumer of memory in the daemon.
165             handler->FreeResources();
166             LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
167             return;
168         }
169 
170         LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
171 
172         if (handler->snapuserd()->IsAttached()) {
173             handler->thread().detach();
174         }
175 
176         // Important: free resources within the lock. This ensures that if
177         // WaitForDelete() is called, the handler is either in the list, or
178         // it's not and its resources are guaranteed to be freed.
179         handler->FreeResources();
180         dm_users_.erase(iter);
181     }
182 }
183 
InitiateMerge(const std::string & misc_name)184 bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
185     std::lock_guard<std::mutex> lock(lock_);
186     auto iter = FindHandler(&lock, misc_name);
187     if (iter == dm_users_.end()) {
188         LOG(ERROR) << "Could not find handler: " << misc_name;
189         return false;
190     }
191 
192     return StartMerge(&lock, *iter);
193 }
194 
StartMerge(std::lock_guard<std::mutex> * proof_of_lock,const std::shared_ptr<HandlerThread> & handler)195 bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
196                                         const std::shared_ptr<HandlerThread>& handler) {
197     CHECK(proof_of_lock);
198 
199     if (!handler->snapuserd()->IsAttached()) {
200         LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
201         return false;
202     }
203 
204     handler->snapuserd()->MonitorMerge();
205 
206     if (!merge_monitor_.joinable()) {
207         merge_monitor_ = std::thread(&SnapshotHandlerManager::MonitorMerge, this);
208     }
209 
210     merge_handlers_.push(handler);
211     WakeupMonitorMergeThread();
212     return true;
213 }
214 
WakeupMonitorMergeThread()215 void SnapshotHandlerManager::WakeupMonitorMergeThread() {
216     uint64_t notify = 1;
217     ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
218     if (rc < 0) {
219         PLOG(FATAL) << "failed to notify monitor merge thread";
220     }
221 }
222 
MonitorMerge()223 void SnapshotHandlerManager::MonitorMerge() {
224     pthread_setname_np(pthread_self(), "Merge Monitor");
225     while (!stop_monitor_merge_thread_) {
226         uint64_t testVal;
227         ssize_t ret =
228                 TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
229         if (ret == -1) {
230             PLOG(FATAL) << "Failed to read from eventfd";
231         } else if (ret == 0) {
232             LOG(FATAL) << "Hit EOF on eventfd";
233         }
234 
235         LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
236         {
237             auto num_merge_threads = android::base::GetUintProperty<uint>(
238                     "ro.virtual_ab.num_merge_threads", kMaxMergeThreads);
239             std::lock_guard<std::mutex> lock(lock_);
240             while (active_merge_threads_ < num_merge_threads && merge_handlers_.size() > 0) {
241                 auto handler = merge_handlers_.front();
242                 merge_handlers_.pop();
243 
244                 if (!handler->snapuserd()) {
245                     LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
246                     continue;
247                 }
248 
249                 LOG(INFO) << "Starting merge for partition: "
250                           << handler->snapuserd()->GetMiscName();
251                 handler->snapuserd()->InitiateMerge();
252                 active_merge_threads_ += 1;
253             }
254         }
255     }
256 
257     LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
258 }
259 
GetMergeStatus(const std::string & misc_name)260 std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
261     std::lock_guard<std::mutex> lock(lock_);
262     auto iter = FindHandler(&lock, misc_name);
263     if (iter == dm_users_.end()) {
264         LOG(ERROR) << "Could not find handler: " << misc_name;
265         return {};
266     }
267 
268     return (*iter)->snapuserd()->GetMergeStatus();
269 }
270 
GetMergePercentage()271 double SnapshotHandlerManager::GetMergePercentage() {
272     std::lock_guard<std::mutex> lock(lock_);
273 
274     double percentage = 0.0;
275     int n = 0;
276 
277     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
278         auto& th = (*iter)->thread();
279         if (th.joinable()) {
280             // Merge percentage by individual partitions wherein merge is still
281             // in-progress
282             percentage += (*iter)->snapuserd()->GetMergePercentage();
283             n += 1;
284         }
285     }
286 
287     // Calculate final merge including those partitions where merge was already
288     // completed - num_partitions_merge_complete_ will track them when each
289     // thread exists in RunThread.
290     int total_partitions = n + num_partitions_merge_complete_;
291 
292     if (total_partitions) {
293         percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
294     }
295 
296     LOG(DEBUG) << "Merge %: " << percentage
297                << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
298                << " total_partitions: " << total_partitions << " n: " << n;
299     return percentage;
300 }
301 
GetVerificationStatus()302 bool SnapshotHandlerManager::GetVerificationStatus() {
303     std::lock_guard<std::mutex> lock(lock_);
304 
305     bool status = true;
306     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
307         auto& th = (*iter)->thread();
308         if (th.joinable() && status) {
309             status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
310         } else {
311             // return immediately if there is a failure
312             return false;
313         }
314     }
315 
316     return status;
317 }
318 
RemoveAndJoinHandler(const std::string & misc_name)319 bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
320     std::shared_ptr<HandlerThread> handler;
321     {
322         std::lock_guard<std::mutex> lock(lock_);
323 
324         auto iter = FindHandler(&lock, misc_name);
325         if (iter == dm_users_.end()) {
326             // Client already deleted.
327             return true;
328         }
329         handler = std::move(*iter);
330         dm_users_.erase(iter);
331     }
332 
333     auto& th = handler->thread();
334     if (th.joinable()) {
335         th.join();
336     }
337     return true;
338 }
339 
TerminateMergeThreads()340 void SnapshotHandlerManager::TerminateMergeThreads() {
341     std::lock_guard<std::mutex> guard(lock_);
342 
343     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
344         if (!(*iter)->ThreadTerminated()) {
345             (*iter)->snapuserd()->NotifyIOTerminated();
346         }
347     }
348 }
349 
JoinAllThreads()350 void SnapshotHandlerManager::JoinAllThreads() {
351     // Acquire the thread list within the lock.
352     std::vector<std::shared_ptr<HandlerThread>> dm_users;
353     {
354         std::lock_guard<std::mutex> guard(lock_);
355         dm_users = std::move(dm_users_);
356     }
357 
358     for (auto& client : dm_users) {
359         auto& th = client->thread();
360 
361         if (th.joinable()) th.join();
362     }
363 
364     if (merge_monitor_.joinable()) {
365         stop_monitor_merge_thread_ = true;
366         WakeupMonitorMergeThread();
367 
368         merge_monitor_.join();
369     }
370 }
371 
FindHandler(std::lock_guard<std::mutex> * proof_of_lock,const std::string & misc_name)372 auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
373                                          const std::string& misc_name) -> HandlerList::iterator {
374     CHECK(proof_of_lock);
375 
376     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
377         if ((*iter)->misc_name() == misc_name) {
378             return iter;
379         }
380     }
381     return dm_users_.end();
382 }
383 
PauseMerge()384 void SnapshotHandlerManager::PauseMerge() {
385     std::lock_guard<std::mutex> guard(lock_);
386 
387     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
388         if (!(*iter)->ThreadTerminated()) {
389             (*iter)->snapuserd()->PauseMergeThreads();
390         }
391     }
392 }
393 
ResumeMerge()394 void SnapshotHandlerManager::ResumeMerge() {
395     std::lock_guard<std::mutex> guard(lock_);
396 
397     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
398         if (!(*iter)->ThreadTerminated()) {
399             (*iter)->snapuserd()->ResumeMergeThreads();
400         }
401     }
402 }
403 
404 }  // namespace snapshot
405 }  // namespace android
406