• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <libsnapshot/cow_format.h>
18 #include <pthread.h>
19 
20 #include <android-base/properties.h>
21 
22 #include "merge_worker.h"
23 #include "snapuserd_core.h"
24 #include "utility.h"
25 
26 namespace android {
27 namespace snapshot {
28 
29 using namespace android;
30 using namespace android::dm;
31 using android::base::unique_fd;
32 
MergeWorker(const std::string & cow_device,const std::string & misc_name,const std::string & base_path_merge,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)33 MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
34                          const std::string& base_path_merge,
35                          std::shared_ptr<SnapshotHandler> snapuserd, uint32_t cow_op_merge_size)
36     : Worker(cow_device, misc_name, base_path_merge, snapuserd),
37       cow_op_merge_size_(cow_op_merge_size) {}
38 
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)39 int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
40                               std::vector<const CowOperation*>* replace_zero_vec) {
41     int num_ops = *pending_ops;
42     // 0 indicates ro.virtual_ab.cow_op_merge_size was not set in the build
43     if (cow_op_merge_size_ != 0) {
44         num_ops = std::min(cow_op_merge_size_, static_cast<uint32_t>(*pending_ops));
45     }
46 
47     int nr_consecutive = 0;
48     bool checkOrderedOp = (replace_zero_vec == nullptr);
49     size_t num_blocks = 1;
50 
51     do {
52         if (!cowop_iter_->AtEnd() && num_ops) {
53             const CowOperation* cow_op = cowop_iter_->Get();
54             if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
55                 break;
56             }
57 
58             *source_offset = static_cast<uint64_t>(cow_op->new_block) * BLOCK_SZ;
59             if (!checkOrderedOp) {
60                 replace_zero_vec->push_back(cow_op);
61                 if (cow_op->type() == kCowReplaceOp) {
62                     // Get the number of blocks this op has compressed
63                     num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
64                 }
65             }
66 
67             cowop_iter_->Next();
68             num_ops -= num_blocks;
69             nr_consecutive = num_blocks;
70 
71             while (!cowop_iter_->AtEnd() && num_ops) {
72                 const CowOperation* op = cowop_iter_->Get();
73                 if (checkOrderedOp && !IsOrderedOp(*op)) {
74                     break;
75                 }
76 
77                 uint64_t next_offset = static_cast<uint64_t>(op->new_block) * BLOCK_SZ;
78                 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
79                     break;
80                 }
81 
82                 if (!checkOrderedOp) {
83                     if (op->type() == kCowReplaceOp) {
84                         num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
85                         if (num_ops < num_blocks) {
86                             break;
87                         }
88                     } else {
89                         // zero op
90                         num_blocks = 1;
91                     }
92                     replace_zero_vec->push_back(op);
93                 }
94 
95                 nr_consecutive += num_blocks;
96                 num_ops -= num_blocks;
97                 cowop_iter_->Next();
98             }
99         }
100     } while (0);
101 
102     return nr_consecutive;
103 }
104 
MergeReplaceZeroOps()105 bool MergeWorker::MergeReplaceZeroOps() {
106     // Flush after merging 1MB. Since all ops are independent and there is no
107     // dependency between COW ops, we will flush the data and the number
108     // of ops merged in COW block device. If there is a crash, we will
109     // end up replaying some of the COW ops which were already merged. That is
110     // ok.
111     //
112     // Although increasing this greater than 1MB may help in improving merge
113     // times; however, on devices with low memory, this can be problematic
114     // when there are multiple merge threads in parallel.
115     int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ);
116     int num_ops_merged = 0;
117 
118     SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
119 
120     while (!cowop_iter_->AtEnd()) {
121         int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
122         std::vector<const CowOperation*> replace_zero_vec;
123         uint64_t source_offset;
124 
125         int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
126         if (linear_blocks == 0) {
127             // Merge complete
128             CHECK(cowop_iter_->AtEnd());
129             break;
130         }
131 
132         for (size_t i = 0; i < replace_zero_vec.size(); i++) {
133             const CowOperation* cow_op = replace_zero_vec[i];
134             if (cow_op->type() == kCowReplaceOp) {
135                 size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
136                 void* buffer = bufsink_.AcquireBuffer(buffer_size);
137                 if (!buffer) {
138                     SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
139                     return false;
140                 }
141                 // Read the entire compressed buffer spanning multiple blocks
142                 if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
143                     SNAP_LOG(ERROR) << "Failed to read COW in merge";
144                     return false;
145                 }
146             } else {
147                 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
148                 if (!buffer) {
149                     SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
150                     return false;
151                 }
152                 CHECK(cow_op->type() == kCowZeroOp);
153                 memset(buffer, 0, BLOCK_SZ);
154             }
155         }
156 
157         size_t io_size = linear_blocks * BLOCK_SZ;
158 
159         // Merge - Write the contents back to base device
160         int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
161                                             io_size, source_offset));
162         if (ret < 0 || ret != io_size) {
163             SNAP_LOG(ERROR)
164                     << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
165                     << " at offset: " << source_offset << " io_size: " << io_size;
166             return false;
167         }
168 
169         num_ops_merged += replace_zero_vec.size();
170 
171         if (num_ops_merged >= total_ops_merged_per_commit) {
172             // Flush the data
173             if (fsync(base_path_merge_fd_.get()) < 0) {
174                 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
175                 return false;
176             }
177 
178             // Track the merge completion
179             if (!snapuserd_->CommitMerge(num_ops_merged)) {
180                 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
181                 return false;
182             }
183 
184             num_ops_merged = 0;
185         }
186 
187         bufsink_.ResetBufferOffset();
188 
189         if (snapuserd_->IsIOTerminated()) {
190             SNAP_LOG(ERROR) << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting "
191                                "down merge";
192             return false;
193         }
194 
195         // Safe to check if there is a pause request.
196         snapuserd_->PauseMergeIfRequired();
197     }
198 
199     // Any left over ops not flushed yet.
200     if (num_ops_merged) {
201         // Flush the data
202         if (fsync(base_path_merge_fd_.get()) < 0) {
203             SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
204             return false;
205         }
206 
207         if (!snapuserd_->CommitMerge(num_ops_merged)) {
208             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
209             return false;
210         }
211 
212         num_ops_merged = 0;
213     }
214 
215     return true;
216 }
217 
MergeOrderedOpsAsync()218 bool MergeWorker::MergeOrderedOpsAsync() {
219     void* mapped_addr = snapuserd_->GetMappedAddr();
220     void* read_ahead_buffer =
221             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
222 
223     SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
224 
225     while (!cowop_iter_->AtEnd()) {
226         const CowOperation* cow_op = cowop_iter_->Get();
227         if (!IsOrderedOp(*cow_op)) {
228             break;
229         }
230 
231         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
232         // Wait for RA thread to notify that the merge window
233         // is ready for merging.
234         if (!snapuserd_->WaitForMergeBegin()) {
235             SNAP_LOG(ERROR) << "Failed waiting for merge to begin";
236             return false;
237         }
238 
239         std::optional<std::lock_guard<std::mutex>> buffer_lock;
240         // Acquire the buffer lock at this point so that RA thread
241         // doesn't step into this buffer. See b/377819507
242         buffer_lock.emplace(snapuserd_->GetBufferLock());
243 
244         snapuserd_->SetMergeInProgress(ra_block_index_);
245 
246         loff_t offset = 0;
247         int num_ops = snapuserd_->GetTotalBlocksToMerge();
248 
249         int pending_sqe = queue_depth_;
250         int pending_ios_to_submit = 0;
251         bool flush_required = false;
252         blocks_merged_in_group_ = 0;
253 
254         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
255         while (num_ops) {
256             uint64_t source_offset;
257 
258             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
259 
260             if (linear_blocks != 0) {
261                 size_t io_size = (linear_blocks * BLOCK_SZ);
262 
263                 // Get an SQE entry from the ring and populate the I/O variables
264                 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
265                 if (!sqe) {
266                     SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
267                     return false;
268                 }
269 
270                 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
271                                     (char*)read_ahead_buffer + offset, io_size, source_offset);
272 
273                 offset += io_size;
274                 num_ops -= linear_blocks;
275                 blocks_merged_in_group_ += linear_blocks;
276 
277                 pending_sqe -= 1;
278                 pending_ios_to_submit += 1;
279                 // These flags are important - We need to make sure that the
280                 // blocks are linked and are written in the same order as
281                 // populated. This is because of overlapping block writes.
282                 //
283                 // If there are no dependency, we can optimize this further by
284                 // allowing parallel writes; but for now, just link all the SQ
285                 // entries.
286                 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
287             }
288 
289             // Ring is full or no more COW ops to be merged in this batch
290             if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
291                 // If this is a last set of COW ops to be merged in this batch, we need
292                 // to sync the merged data. We will try to grab an SQE entry
293                 // and set the FSYNC command; additionally, make sure that
294                 // the fsync is done after all the I/O operations queued
295                 // in the ring is completed by setting IOSQE_IO_DRAIN.
296                 //
297                 // If there is no space in the ring, we will flush it later
298                 // by explicitly calling fsync() system call.
299                 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
300                     if (pending_sqe != 0) {
301                         struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
302                         if (!sqe) {
303                             // very unlikely but let's continue and not fail the
304                             // merge - we will flush it later
305                             SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
306                             flush_required = true;
307                         } else {
308                             io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
309                             // Drain the queue before fsync
310                             io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
311                             pending_sqe -= 1;
312                             flush_required = false;
313                             pending_ios_to_submit += 1;
314                             sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
315                         }
316                     } else {
317                         flush_required = true;
318                     }
319                 }
320 
321                 // Submit the IO for all the COW ops in a single syscall
322                 int ret = io_uring_submit(ring_.get());
323                 if (ret != pending_ios_to_submit) {
324                     SNAP_PLOG(ERROR)
325                             << "io_uring_submit failed for read-ahead: "
326                             << " io submit: " << ret << " expected: " << pending_ios_to_submit;
327                     return false;
328                 }
329 
330                 int pending_ios_to_complete = pending_ios_to_submit;
331                 pending_ios_to_submit = 0;
332 
333                 bool status = true;
334 
335                 // Reap I/O completions
336                 while (pending_ios_to_complete) {
337                     struct io_uring_cqe* cqe;
338 
339                     // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
340                     // these error codes are not truly I/O errors; we can retry them
341                     // by re-populating the SQE entries and submitting the I/O
342                     // request back. However, we don't do that now; instead we
343                     // will fallback to synchronous I/O.
344                     ret = io_uring_wait_cqe(ring_.get(), &cqe);
345                     if (ret) {
346                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << strerror(-ret);
347                         status = false;
348                         break;
349                     }
350 
351                     if (cqe->res < 0) {
352                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
353                         status = false;
354                         break;
355                     }
356 
357                     io_uring_cqe_seen(ring_.get(), cqe);
358                     pending_ios_to_complete -= 1;
359                 }
360 
361                 if (!status) {
362                     return false;
363                 }
364 
365                 pending_sqe = queue_depth_;
366             }
367 
368             if (linear_blocks == 0) {
369                 break;
370             }
371         }
372 
373         // Verify all ops are merged
374         CHECK(num_ops == 0);
375 
376         // Flush the data
377         if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
378             SNAP_LOG(ERROR) << " Failed to fsync merged data";
379             return false;
380         }
381 
382         // Merge is done and data is on disk. Update the COW Header about
383         // the merge completion
384         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
385             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
386             return false;
387         }
388 
389         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
390 
391         // Mark the block as merge complete
392         snapuserd_->SetMergeCompleted(ra_block_index_);
393 
394         // Release the buffer lock
395         buffer_lock.reset();
396 
397         // Notify RA thread that the merge thread is ready to merge the next
398         // window
399         snapuserd_->NotifyRAForMergeReady();
400 
401         // Get the next block
402         ra_block_index_ += 1;
403     }
404 
405     return true;
406 }
407 
MergeOrderedOps()408 bool MergeWorker::MergeOrderedOps() {
409     void* mapped_addr = snapuserd_->GetMappedAddr();
410     void* read_ahead_buffer =
411             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
412 
413     SNAP_LOG(INFO) << "MergeOrderedOps started....";
414 
415     while (!cowop_iter_->AtEnd()) {
416         const CowOperation* cow_op = cowop_iter_->Get();
417         if (!IsOrderedOp(*cow_op)) {
418             break;
419         }
420 
421         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
422         // Wait for RA thread to notify that the merge window
423         // is ready for merging.
424         if (!snapuserd_->WaitForMergeBegin()) {
425             snapuserd_->SetMergeFailed(ra_block_index_);
426             return false;
427         }
428 
429         std::optional<std::lock_guard<std::mutex>> buffer_lock;
430         // Acquire the buffer lock at this point so that RA thread
431         // doesn't step into this buffer. See b/377819507
432         buffer_lock.emplace(snapuserd_->GetBufferLock());
433 
434         snapuserd_->SetMergeInProgress(ra_block_index_);
435 
436         loff_t offset = 0;
437         int num_ops = snapuserd_->GetTotalBlocksToMerge();
438         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
439         while (num_ops) {
440             uint64_t source_offset;
441 
442             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
443             if (linear_blocks == 0) {
444                 break;
445             }
446 
447             size_t io_size = (linear_blocks * BLOCK_SZ);
448             // Write to the base device. Data is already in the RA buffer. Note
449             // that XOR ops is already handled by the RA thread. We just write
450             // the contents out.
451             int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
452                                                 (char*)read_ahead_buffer + offset, io_size,
453                                                 source_offset));
454             if (ret < 0 || ret != io_size) {
455                 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
456                                 << " at offset: " << source_offset << " io_size: " << io_size;
457                 snapuserd_->SetMergeFailed(ra_block_index_);
458                 return false;
459             }
460 
461             offset += io_size;
462             num_ops -= linear_blocks;
463         }
464 
465         // Verify all ops are merged
466         CHECK(num_ops == 0);
467 
468         // Flush the data
469         if (fsync(base_path_merge_fd_.get()) < 0) {
470             SNAP_LOG(ERROR) << " Failed to fsync merged data";
471             snapuserd_->SetMergeFailed(ra_block_index_);
472             return false;
473         }
474 
475         // Merge is done and data is on disk. Update the COW Header about
476         // the merge completion
477         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
478             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
479             snapuserd_->SetMergeFailed(ra_block_index_);
480             return false;
481         }
482 
483         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
484         // Mark the block as merge complete
485         snapuserd_->SetMergeCompleted(ra_block_index_);
486 
487         // Release the buffer lock
488         buffer_lock.reset();
489 
490         // Notify RA thread that the merge thread is ready to merge the next
491         // window
492         snapuserd_->NotifyRAForMergeReady();
493 
494         // Get the next block
495         ra_block_index_ += 1;
496     }
497 
498     return true;
499 }
500 
AsyncMerge()501 bool MergeWorker::AsyncMerge() {
502     if (!MergeOrderedOpsAsync()) {
503         SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
504         // Reset the iter so that we retry the merge
505         while (blocks_merged_in_group_ && !cowop_iter_->AtBegin()) {
506             cowop_iter_->Prev();
507             blocks_merged_in_group_ -= 1;
508         }
509 
510         return false;
511     }
512 
513     SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
514     return true;
515 }
516 
SyncMerge()517 bool MergeWorker::SyncMerge() {
518     if (!MergeOrderedOps()) {
519         SNAP_LOG(ERROR) << "Merge failed for ordered ops";
520         return false;
521     }
522 
523     SNAP_LOG(INFO) << "MergeOrderedOps completed";
524     return true;
525 }
526 
Merge()527 bool MergeWorker::Merge() {
528     cowop_iter_ = reader_->GetOpIter(true);
529 
530     bool retry = false;
531     bool ordered_ops_merge_status;
532 
533     // Start Async Merge
534     if (merge_async_) {
535         ordered_ops_merge_status = AsyncMerge();
536         if (!ordered_ops_merge_status) {
537             FinalizeIouring();
538             retry = true;
539             merge_async_ = false;
540         }
541     }
542 
543     // Check if we need to fallback and retry the merge
544     //
545     // If the device doesn't support async merge, we
546     // will directly enter here (aka devices with 4.x kernels)
547     const bool sync_merge_required = (retry || !merge_async_);
548 
549     if (sync_merge_required) {
550         ordered_ops_merge_status = SyncMerge();
551         if (!ordered_ops_merge_status) {
552             // Merge failed. Device will continue to be mounted
553             // off snapshots; merge will be retried during
554             // next reboot
555             SNAP_LOG(ERROR) << "Merge failed for ordered ops";
556             snapuserd_->MergeFailed();
557             return false;
558         }
559     }
560 
561     // Replace and Zero ops
562     if (!MergeReplaceZeroOps()) {
563         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
564         snapuserd_->MergeFailed();
565         return false;
566     }
567 
568     snapuserd_->MergeCompleted();
569 
570     return true;
571 }
572 
InitializeIouring()573 bool MergeWorker::InitializeIouring() {
574     if (!snapuserd_->IsIouringSupported()) {
575         return false;
576     }
577 
578     ring_ = std::make_unique<struct io_uring>();
579 
580     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
581     if (ret) {
582         LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
583         return false;
584     }
585 
586     merge_async_ = true;
587 
588     LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
589     return true;
590 }
591 
FinalizeIouring()592 void MergeWorker::FinalizeIouring() {
593     if (merge_async_) {
594         io_uring_queue_exit(ring_.get());
595     }
596 }
597 
Run()598 bool MergeWorker::Run() {
599     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
600 
601     pthread_setname_np(pthread_self(), "MergeWorker");
602 
603     if (!snapuserd_->WaitForMergeBegin()) {
604         return true;
605     }
606     auto merge_thread_priority = android::base::GetUintProperty<uint32_t>(
607             "ro.virtual_ab.merge_thread_priority", ANDROID_PRIORITY_BACKGROUND);
608 
609     if (!SetThreadPriority(merge_thread_priority)) {
610         SNAP_PLOG(ERROR) << "Failed to set thread priority";
611     }
612 
613     if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
614         SNAP_PLOG(ERROR) << "Failed to assign task profile to Mergeworker thread";
615     }
616 
617     SNAP_LOG(INFO) << "Merge starting..";
618 
619     bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
620 
621     if (!Init()) {
622         SNAP_LOG(ERROR) << "Merge thread initialization failed...";
623         snapuserd_->MergeFailed();
624         return false;
625     }
626 
627     InitializeIouring();
628 
629     if (!Merge()) {
630         return false;
631     }
632 
633     FinalizeIouring();
634     CloseFds();
635     reader_->CloseCowFd();
636 
637     SNAP_LOG(INFO) << "Snapshot-Merge completed";
638 
639     return true;
640 }
641 
642 }  // namespace snapshot
643 }  // namespace android
644