1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <libsnapshot/cow_format.h>
18 #include <pthread.h>
19
20 #include <android-base/properties.h>
21
22 #include "merge_worker.h"
23 #include "snapuserd_core.h"
24 #include "utility.h"
25
26 namespace android {
27 namespace snapshot {
28
29 using namespace android;
30 using namespace android::dm;
31 using android::base::unique_fd;
32
MergeWorker(const std::string & cow_device,const std::string & misc_name,const std::string & base_path_merge,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)33 MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
34 const std::string& base_path_merge,
35 std::shared_ptr<SnapshotHandler> snapuserd, uint32_t cow_op_merge_size)
36 : Worker(cow_device, misc_name, base_path_merge, snapuserd),
37 cow_op_merge_size_(cow_op_merge_size) {}
38
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)39 int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
40 std::vector<const CowOperation*>* replace_zero_vec) {
41 int num_ops = *pending_ops;
42 // 0 indicates ro.virtual_ab.cow_op_merge_size was not set in the build
43 if (cow_op_merge_size_ != 0) {
44 num_ops = std::min(cow_op_merge_size_, static_cast<uint32_t>(*pending_ops));
45 }
46
47 int nr_consecutive = 0;
48 bool checkOrderedOp = (replace_zero_vec == nullptr);
49 size_t num_blocks = 1;
50
51 do {
52 if (!cowop_iter_->AtEnd() && num_ops) {
53 const CowOperation* cow_op = cowop_iter_->Get();
54 if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
55 break;
56 }
57
58 *source_offset = static_cast<uint64_t>(cow_op->new_block) * BLOCK_SZ;
59 if (!checkOrderedOp) {
60 replace_zero_vec->push_back(cow_op);
61 if (cow_op->type() == kCowReplaceOp) {
62 // Get the number of blocks this op has compressed
63 num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
64 }
65 }
66
67 cowop_iter_->Next();
68 num_ops -= num_blocks;
69 nr_consecutive = num_blocks;
70
71 while (!cowop_iter_->AtEnd() && num_ops) {
72 const CowOperation* op = cowop_iter_->Get();
73 if (checkOrderedOp && !IsOrderedOp(*op)) {
74 break;
75 }
76
77 uint64_t next_offset = static_cast<uint64_t>(op->new_block) * BLOCK_SZ;
78 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
79 break;
80 }
81
82 if (!checkOrderedOp) {
83 if (op->type() == kCowReplaceOp) {
84 num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
85 if (num_ops < num_blocks) {
86 break;
87 }
88 } else {
89 // zero op
90 num_blocks = 1;
91 }
92 replace_zero_vec->push_back(op);
93 }
94
95 nr_consecutive += num_blocks;
96 num_ops -= num_blocks;
97 cowop_iter_->Next();
98 }
99 }
100 } while (0);
101
102 return nr_consecutive;
103 }
104
MergeReplaceZeroOps()105 bool MergeWorker::MergeReplaceZeroOps() {
106 // Flush after merging 1MB. Since all ops are independent and there is no
107 // dependency between COW ops, we will flush the data and the number
108 // of ops merged in COW block device. If there is a crash, we will
109 // end up replaying some of the COW ops which were already merged. That is
110 // ok.
111 //
112 // Although increasing this greater than 1MB may help in improving merge
113 // times; however, on devices with low memory, this can be problematic
114 // when there are multiple merge threads in parallel.
115 int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ);
116 int num_ops_merged = 0;
117
118 SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
119
120 while (!cowop_iter_->AtEnd()) {
121 int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
122 std::vector<const CowOperation*> replace_zero_vec;
123 uint64_t source_offset;
124
125 int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
126 if (linear_blocks == 0) {
127 // Merge complete
128 CHECK(cowop_iter_->AtEnd());
129 break;
130 }
131
132 for (size_t i = 0; i < replace_zero_vec.size(); i++) {
133 const CowOperation* cow_op = replace_zero_vec[i];
134 if (cow_op->type() == kCowReplaceOp) {
135 size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
136 void* buffer = bufsink_.AcquireBuffer(buffer_size);
137 if (!buffer) {
138 SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
139 return false;
140 }
141 // Read the entire compressed buffer spanning multiple blocks
142 if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
143 SNAP_LOG(ERROR) << "Failed to read COW in merge";
144 return false;
145 }
146 } else {
147 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
148 if (!buffer) {
149 SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
150 return false;
151 }
152 CHECK(cow_op->type() == kCowZeroOp);
153 memset(buffer, 0, BLOCK_SZ);
154 }
155 }
156
157 size_t io_size = linear_blocks * BLOCK_SZ;
158
159 // Merge - Write the contents back to base device
160 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
161 io_size, source_offset));
162 if (ret < 0 || ret != io_size) {
163 SNAP_LOG(ERROR)
164 << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
165 << " at offset: " << source_offset << " io_size: " << io_size;
166 return false;
167 }
168
169 num_ops_merged += replace_zero_vec.size();
170
171 if (num_ops_merged >= total_ops_merged_per_commit) {
172 // Flush the data
173 if (fsync(base_path_merge_fd_.get()) < 0) {
174 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
175 return false;
176 }
177
178 // Track the merge completion
179 if (!snapuserd_->CommitMerge(num_ops_merged)) {
180 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
181 return false;
182 }
183
184 num_ops_merged = 0;
185 }
186
187 bufsink_.ResetBufferOffset();
188
189 if (snapuserd_->IsIOTerminated()) {
190 SNAP_LOG(ERROR) << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting "
191 "down merge";
192 return false;
193 }
194
195 // Safe to check if there is a pause request.
196 snapuserd_->PauseMergeIfRequired();
197 }
198
199 // Any left over ops not flushed yet.
200 if (num_ops_merged) {
201 // Flush the data
202 if (fsync(base_path_merge_fd_.get()) < 0) {
203 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
204 return false;
205 }
206
207 if (!snapuserd_->CommitMerge(num_ops_merged)) {
208 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
209 return false;
210 }
211
212 num_ops_merged = 0;
213 }
214
215 return true;
216 }
217
MergeOrderedOpsAsync()218 bool MergeWorker::MergeOrderedOpsAsync() {
219 void* mapped_addr = snapuserd_->GetMappedAddr();
220 void* read_ahead_buffer =
221 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
222
223 SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
224
225 while (!cowop_iter_->AtEnd()) {
226 const CowOperation* cow_op = cowop_iter_->Get();
227 if (!IsOrderedOp(*cow_op)) {
228 break;
229 }
230
231 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
232 // Wait for RA thread to notify that the merge window
233 // is ready for merging.
234 if (!snapuserd_->WaitForMergeBegin()) {
235 SNAP_LOG(ERROR) << "Failed waiting for merge to begin";
236 return false;
237 }
238
239 std::optional<std::lock_guard<std::mutex>> buffer_lock;
240 // Acquire the buffer lock at this point so that RA thread
241 // doesn't step into this buffer. See b/377819507
242 buffer_lock.emplace(snapuserd_->GetBufferLock());
243
244 snapuserd_->SetMergeInProgress(ra_block_index_);
245
246 loff_t offset = 0;
247 int num_ops = snapuserd_->GetTotalBlocksToMerge();
248
249 int pending_sqe = queue_depth_;
250 int pending_ios_to_submit = 0;
251 bool flush_required = false;
252 blocks_merged_in_group_ = 0;
253
254 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
255 while (num_ops) {
256 uint64_t source_offset;
257
258 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
259
260 if (linear_blocks != 0) {
261 size_t io_size = (linear_blocks * BLOCK_SZ);
262
263 // Get an SQE entry from the ring and populate the I/O variables
264 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
265 if (!sqe) {
266 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
267 return false;
268 }
269
270 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
271 (char*)read_ahead_buffer + offset, io_size, source_offset);
272
273 offset += io_size;
274 num_ops -= linear_blocks;
275 blocks_merged_in_group_ += linear_blocks;
276
277 pending_sqe -= 1;
278 pending_ios_to_submit += 1;
279 // These flags are important - We need to make sure that the
280 // blocks are linked and are written in the same order as
281 // populated. This is because of overlapping block writes.
282 //
283 // If there are no dependency, we can optimize this further by
284 // allowing parallel writes; but for now, just link all the SQ
285 // entries.
286 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
287 }
288
289 // Ring is full or no more COW ops to be merged in this batch
290 if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
291 // If this is a last set of COW ops to be merged in this batch, we need
292 // to sync the merged data. We will try to grab an SQE entry
293 // and set the FSYNC command; additionally, make sure that
294 // the fsync is done after all the I/O operations queued
295 // in the ring is completed by setting IOSQE_IO_DRAIN.
296 //
297 // If there is no space in the ring, we will flush it later
298 // by explicitly calling fsync() system call.
299 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
300 if (pending_sqe != 0) {
301 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
302 if (!sqe) {
303 // very unlikely but let's continue and not fail the
304 // merge - we will flush it later
305 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
306 flush_required = true;
307 } else {
308 io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
309 // Drain the queue before fsync
310 io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
311 pending_sqe -= 1;
312 flush_required = false;
313 pending_ios_to_submit += 1;
314 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
315 }
316 } else {
317 flush_required = true;
318 }
319 }
320
321 // Submit the IO for all the COW ops in a single syscall
322 int ret = io_uring_submit(ring_.get());
323 if (ret != pending_ios_to_submit) {
324 SNAP_PLOG(ERROR)
325 << "io_uring_submit failed for read-ahead: "
326 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
327 return false;
328 }
329
330 int pending_ios_to_complete = pending_ios_to_submit;
331 pending_ios_to_submit = 0;
332
333 bool status = true;
334
335 // Reap I/O completions
336 while (pending_ios_to_complete) {
337 struct io_uring_cqe* cqe;
338
339 // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
340 // these error codes are not truly I/O errors; we can retry them
341 // by re-populating the SQE entries and submitting the I/O
342 // request back. However, we don't do that now; instead we
343 // will fallback to synchronous I/O.
344 ret = io_uring_wait_cqe(ring_.get(), &cqe);
345 if (ret) {
346 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << strerror(-ret);
347 status = false;
348 break;
349 }
350
351 if (cqe->res < 0) {
352 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
353 status = false;
354 break;
355 }
356
357 io_uring_cqe_seen(ring_.get(), cqe);
358 pending_ios_to_complete -= 1;
359 }
360
361 if (!status) {
362 return false;
363 }
364
365 pending_sqe = queue_depth_;
366 }
367
368 if (linear_blocks == 0) {
369 break;
370 }
371 }
372
373 // Verify all ops are merged
374 CHECK(num_ops == 0);
375
376 // Flush the data
377 if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
378 SNAP_LOG(ERROR) << " Failed to fsync merged data";
379 return false;
380 }
381
382 // Merge is done and data is on disk. Update the COW Header about
383 // the merge completion
384 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
385 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
386 return false;
387 }
388
389 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
390
391 // Mark the block as merge complete
392 snapuserd_->SetMergeCompleted(ra_block_index_);
393
394 // Release the buffer lock
395 buffer_lock.reset();
396
397 // Notify RA thread that the merge thread is ready to merge the next
398 // window
399 snapuserd_->NotifyRAForMergeReady();
400
401 // Get the next block
402 ra_block_index_ += 1;
403 }
404
405 return true;
406 }
407
MergeOrderedOps()408 bool MergeWorker::MergeOrderedOps() {
409 void* mapped_addr = snapuserd_->GetMappedAddr();
410 void* read_ahead_buffer =
411 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
412
413 SNAP_LOG(INFO) << "MergeOrderedOps started....";
414
415 while (!cowop_iter_->AtEnd()) {
416 const CowOperation* cow_op = cowop_iter_->Get();
417 if (!IsOrderedOp(*cow_op)) {
418 break;
419 }
420
421 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
422 // Wait for RA thread to notify that the merge window
423 // is ready for merging.
424 if (!snapuserd_->WaitForMergeBegin()) {
425 snapuserd_->SetMergeFailed(ra_block_index_);
426 return false;
427 }
428
429 std::optional<std::lock_guard<std::mutex>> buffer_lock;
430 // Acquire the buffer lock at this point so that RA thread
431 // doesn't step into this buffer. See b/377819507
432 buffer_lock.emplace(snapuserd_->GetBufferLock());
433
434 snapuserd_->SetMergeInProgress(ra_block_index_);
435
436 loff_t offset = 0;
437 int num_ops = snapuserd_->GetTotalBlocksToMerge();
438 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
439 while (num_ops) {
440 uint64_t source_offset;
441
442 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
443 if (linear_blocks == 0) {
444 break;
445 }
446
447 size_t io_size = (linear_blocks * BLOCK_SZ);
448 // Write to the base device. Data is already in the RA buffer. Note
449 // that XOR ops is already handled by the RA thread. We just write
450 // the contents out.
451 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
452 (char*)read_ahead_buffer + offset, io_size,
453 source_offset));
454 if (ret < 0 || ret != io_size) {
455 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
456 << " at offset: " << source_offset << " io_size: " << io_size;
457 snapuserd_->SetMergeFailed(ra_block_index_);
458 return false;
459 }
460
461 offset += io_size;
462 num_ops -= linear_blocks;
463 }
464
465 // Verify all ops are merged
466 CHECK(num_ops == 0);
467
468 // Flush the data
469 if (fsync(base_path_merge_fd_.get()) < 0) {
470 SNAP_LOG(ERROR) << " Failed to fsync merged data";
471 snapuserd_->SetMergeFailed(ra_block_index_);
472 return false;
473 }
474
475 // Merge is done and data is on disk. Update the COW Header about
476 // the merge completion
477 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
478 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
479 snapuserd_->SetMergeFailed(ra_block_index_);
480 return false;
481 }
482
483 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
484 // Mark the block as merge complete
485 snapuserd_->SetMergeCompleted(ra_block_index_);
486
487 // Release the buffer lock
488 buffer_lock.reset();
489
490 // Notify RA thread that the merge thread is ready to merge the next
491 // window
492 snapuserd_->NotifyRAForMergeReady();
493
494 // Get the next block
495 ra_block_index_ += 1;
496 }
497
498 return true;
499 }
500
AsyncMerge()501 bool MergeWorker::AsyncMerge() {
502 if (!MergeOrderedOpsAsync()) {
503 SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
504 // Reset the iter so that we retry the merge
505 while (blocks_merged_in_group_ && !cowop_iter_->AtBegin()) {
506 cowop_iter_->Prev();
507 blocks_merged_in_group_ -= 1;
508 }
509
510 return false;
511 }
512
513 SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
514 return true;
515 }
516
SyncMerge()517 bool MergeWorker::SyncMerge() {
518 if (!MergeOrderedOps()) {
519 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
520 return false;
521 }
522
523 SNAP_LOG(INFO) << "MergeOrderedOps completed";
524 return true;
525 }
526
Merge()527 bool MergeWorker::Merge() {
528 cowop_iter_ = reader_->GetOpIter(true);
529
530 bool retry = false;
531 bool ordered_ops_merge_status;
532
533 // Start Async Merge
534 if (merge_async_) {
535 ordered_ops_merge_status = AsyncMerge();
536 if (!ordered_ops_merge_status) {
537 FinalizeIouring();
538 retry = true;
539 merge_async_ = false;
540 }
541 }
542
543 // Check if we need to fallback and retry the merge
544 //
545 // If the device doesn't support async merge, we
546 // will directly enter here (aka devices with 4.x kernels)
547 const bool sync_merge_required = (retry || !merge_async_);
548
549 if (sync_merge_required) {
550 ordered_ops_merge_status = SyncMerge();
551 if (!ordered_ops_merge_status) {
552 // Merge failed. Device will continue to be mounted
553 // off snapshots; merge will be retried during
554 // next reboot
555 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
556 snapuserd_->MergeFailed();
557 return false;
558 }
559 }
560
561 // Replace and Zero ops
562 if (!MergeReplaceZeroOps()) {
563 SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
564 snapuserd_->MergeFailed();
565 return false;
566 }
567
568 snapuserd_->MergeCompleted();
569
570 return true;
571 }
572
InitializeIouring()573 bool MergeWorker::InitializeIouring() {
574 if (!snapuserd_->IsIouringSupported()) {
575 return false;
576 }
577
578 ring_ = std::make_unique<struct io_uring>();
579
580 int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
581 if (ret) {
582 LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
583 return false;
584 }
585
586 merge_async_ = true;
587
588 LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
589 return true;
590 }
591
FinalizeIouring()592 void MergeWorker::FinalizeIouring() {
593 if (merge_async_) {
594 io_uring_queue_exit(ring_.get());
595 }
596 }
597
Run()598 bool MergeWorker::Run() {
599 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
600
601 pthread_setname_np(pthread_self(), "MergeWorker");
602
603 if (!snapuserd_->WaitForMergeBegin()) {
604 return true;
605 }
606 auto merge_thread_priority = android::base::GetUintProperty<uint32_t>(
607 "ro.virtual_ab.merge_thread_priority", ANDROID_PRIORITY_BACKGROUND);
608
609 if (!SetThreadPriority(merge_thread_priority)) {
610 SNAP_PLOG(ERROR) << "Failed to set thread priority";
611 }
612
613 if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
614 SNAP_PLOG(ERROR) << "Failed to assign task profile to Mergeworker thread";
615 }
616
617 SNAP_LOG(INFO) << "Merge starting..";
618
619 bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
620
621 if (!Init()) {
622 SNAP_LOG(ERROR) << "Merge thread initialization failed...";
623 snapuserd_->MergeFailed();
624 return false;
625 }
626
627 InitializeIouring();
628
629 if (!Merge()) {
630 return false;
631 }
632
633 FinalizeIouring();
634 CloseFds();
635 reader_->CloseCowFd();
636
637 SNAP_LOG(INFO) << "Snapshot-Merge completed";
638
639 return true;
640 }
641
642 } // namespace snapshot
643 } // namespace android
644