• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_core.h"
18 
19 /*
20  * Readahead is used to optimize the merge of COPY and XOR Ops.
21  *
22  * We create a scratch space of 2MB to store the read-ahead data in the COW
23  * device.
24  *
25  *      +-----------------------+
26  *      |     Header (fixed)    |
27  *      +-----------------------+
28  *      |    Scratch space      |  <-- 2MB
29  *      +-----------------------+
30  *
31  *      Scratch space is as follows:
32  *
33  *      +-----------------------+
34  *      |       Metadata        | <- 4k page
35  *      +-----------------------+
36  *      |       Metadata        | <- 4k page
37  *      +-----------------------+
38  *      |                       |
39  *      |    Read-ahead data    |
40  *      |                       |
41  *      +-----------------------+
42  *
43  *
44  * * ===================================================================
45  *
46  * Example:
47  *
48  * We have 6 copy operations to be executed in OTA. Update-engine
49  * will write to COW file as follows:
50  *
51  * Op-1: 20 -> 23
52  * Op-2: 19 -> 22
53  * Op-3: 18 -> 21
54  * Op-4: 17 -> 20
55  * Op-5: 16 -> 19
56  * Op-6: 15 -> 18
57  *
58  * Read-ahead thread will read all the 6 source blocks and store the data in the
59  * scratch space. Metadata will contain the destination block numbers. Thus,
60  * scratch space will look something like this:
61  *
62  * +--------------+
63  * | Block   23   |
64  * | offset - 1   |
65  * +--------------+
66  * | Block   22   |
67  * | offset - 2   |
68  * +--------------+
69  * | Block   21   |
70  * | offset - 3   |
71  * +--------------+
72  *    ...
73  *    ...
74  * +--------------+
75  * | Data-Block 20| <-- offset - 1
76  * +--------------+
77  * | Data-Block 19| <-- offset - 2
78  * +--------------+
79  * | Data-Block 18| <-- offset - 3
80  * +--------------+
81  *     ...
82  *     ...
83  *
84  * ====================================================================
85  *
86  *
87  *  Read-ahead thread will process the COW Ops in fixed set. Consider
88  *  the following example:
89  *
90  *  +--------------------------+
91  *  |op-1|op-2|op-3|....|op-510|
92  *  +--------------------------+
93  *
94  *  <------ One RA Block ------>
95  *
96  *  RA thread will read 510 ordered COW ops at a time and will store
97  *  the data in the scratch space.
98  *
99  *  RA thread and Merge thread will go lock-step wherein RA thread
100  *  will make sure that 510 COW operation data are read upfront
101  *  and is in memory. Thus, when merge thread will pick up the data
102  *  directly from memory and write it back to base device.
103  *
104  *
105  *  +--------------------------+------------------------------------+
106  *  |op-1|op-2|op-3|....|op-510|op-511|op-512|op-513........|op-1020|
107  *  +--------------------------+------------------------------------+
108  *
109  *  <------Merge 510 Blocks----><-Prepare 510 blocks for merge by RA->
110  *           ^                                  ^
111  *           |                                  |
112  *      Merge thread                        RA thread
113  *
114  * Both Merge and RA thread will strive to work in parallel.
115  *
116  * ===========================================================================
117  *
118  * State transitions and communication between RA thread and Merge thread:
119  *
120  *  Merge Thread                                      RA Thread
121  *  ----------------------------------------------------------------------------
122  *
123  *          |                                         |
124  *    WAIT for RA Block N                     READ one RA Block (N)
125  *        for merge                                   |
126  *          |                                         |
127  *          |                                         |
128  *          <--------------MERGE BEGIN--------READ Block N done(copy to scratch)
129  *          |                                         |
130  *          |                                         |
131  *    Merge Begin Block N                     READ one RA BLock (N+1)
132  *          |                                         |
133  *          |                                         |
134  *          |                                  READ done. Wait for merge complete
135  *          |                                         |
136  *          |                                        WAIT
137  *          |                                         |
138  *    Merge done Block N                              |
139  *          ----------------MERGE READY-------------->|
140  *    WAIT for RA Block N+1                     Copy RA Block (N+1)
141  *        for merge                              to scratch space
142  *          |                                         |
143  *          <---------------MERGE BEGIN---------BLOCK N+1 Done
144  *          |                                         |
145  *          |                                         |
146  *    Merge Begin Block N+1                   READ one RA BLock (N+2)
147  *          |                                         |
148  *          |                                         |
149  *          |                                  READ done. Wait for merge complete
150  *          |                                         |
151  *          |                                        WAIT
152  *          |                                         |
153  *    Merge done Block N+1                            |
154  *          ----------------MERGE READY-------------->|
155  *    WAIT for RA Block N+2                     Copy RA Block (N+2)
156  *        for merge                              to scratch space
157  *          |                                         |
158  *          <---------------MERGE BEGIN---------BLOCK N+2 Done
159  */
160 
161 namespace android {
162 namespace snapshot {
163 
164 using namespace android;
165 using namespace android::dm;
166 using android::base::unique_fd;
167 
MonitorMerge()168 void SnapshotHandler::MonitorMerge() {
169     {
170         std::lock_guard<std::mutex> lock(lock_);
171         merge_monitored_ = true;
172     }
173 }
174 
175 // This is invoked once primarily by update-engine to initiate
176 // the merge
InitiateMerge()177 void SnapshotHandler::InitiateMerge() {
178     {
179         std::lock_guard<std::mutex> lock(lock_);
180         merge_initiated_ = true;
181 
182         // If there are only REPLACE ops to be merged, then we need
183         // to explicitly set the state to MERGE_BEGIN as there
184         // is no read-ahead thread
185         if (!ra_thread_) {
186             io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
187         }
188     }
189     cv.notify_all();
190 }
191 
IsMergeBeginError(MERGE_IO_TRANSITION io_state)192 static inline bool IsMergeBeginError(MERGE_IO_TRANSITION io_state) {
193     return io_state == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
194            io_state == MERGE_IO_TRANSITION::IO_TERMINATED;
195 }
196 
197 // Invoked by Merge thread - Waits on RA thread to resume merging. Will
198 // be waken up RA thread.
WaitForMergeBegin()199 bool SnapshotHandler::WaitForMergeBegin() {
200     std::unique_lock<std::mutex> lock(lock_);
201 
202     cv.wait(lock, [this]() -> bool { return MergeInitiated() || IsMergeBeginError(io_state_); });
203 
204     if (IsMergeBeginError(io_state_)) {
205         SNAP_LOG(VERBOSE) << "WaitForMergeBegin failed with state: " << io_state_;
206         return false;
207     }
208 
209     cv.wait(lock, [this]() -> bool {
210         return io_state_ == MERGE_IO_TRANSITION::MERGE_BEGIN || IsMergeBeginError(io_state_);
211     });
212 
213     if (IsMergeBeginError(io_state_)) {
214         SNAP_LOG(ERROR) << "WaitForMergeBegin failed with state: " << io_state_;
215         return false;
216     }
217     return true;
218 }
219 
220 // Invoked by RA thread - Flushes the RA block to scratch space if necessary
221 // and then notifies the merge thread to resume merging
ReadAheadIOCompleted(bool sync)222 bool SnapshotHandler::ReadAheadIOCompleted(bool sync) {
223     if (sync) {
224         // Flush the entire buffer region
225         int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC);
226         if (ret < 0) {
227             PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret;
228             return false;
229         }
230 
231         // Metadata and data are synced. Now, update the state.
232         // We need to update the state after flushing data; if there is a crash
233         // when read-ahead IO is in progress, the state of data in the COW file
234         // is unknown. kCowReadAheadDone acts as a checkpoint wherein the data
235         // in the scratch space is good and during next reboot, read-ahead thread
236         // can safely re-construct the data.
237         struct BufferState* ra_state = GetBufferState();
238         ra_state->read_ahead_state = kCowReadAheadDone;
239 
240         ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
241         if (ret < 0) {
242             PLOG(ERROR) << "msync failed to flush Readahead completion state...";
243             return false;
244         }
245     }
246 
247     // Notify the merge thread to resume merging
248     {
249         std::lock_guard<std::mutex> lock(lock_);
250         if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
251             io_state_ != MERGE_IO_TRANSITION::MERGE_FAILED) {
252             io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
253         }
254     }
255 
256     cv.notify_all();
257     return true;
258 }
259 
PauseMergeIfRequired()260 void SnapshotHandler::PauseMergeIfRequired() {
261     {
262         std::unique_lock<std::mutex> lock(pause_merge_lock_);
263         while (pause_merge_) {
264             SNAP_LOG(INFO) << "Merge thread paused";
265             pause_merge_cv_.wait(lock);
266             if (!pause_merge_) {
267                 SNAP_LOG(INFO) << "Merge thread resumed";
268             }
269         }
270     }
271 }
272 
273 // Invoked by RA thread - Waits for merge thread to finish merging
274 // RA Block N - RA thread would be ready will with Block N+1 but
275 // will wait to merge thread to finish Block N. Once Block N
276 // is merged, RA thread will be woken up by Merge thread and will
277 // flush the data of Block N+1 to scratch space
WaitForMergeReady()278 bool SnapshotHandler::WaitForMergeReady() {
279     {
280         std::unique_lock<std::mutex> lock(lock_);
281         while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_READY ||
282                  io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
283                  io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
284                  io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
285             cv.wait(lock);
286         }
287 
288         // Check if merge failed
289         if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
290             io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
291             io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
292             if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) {
293                 SNAP_LOG(ERROR) << "Wait for merge ready failed: " << io_state_;
294             }
295             return false;
296         }
297     }
298 
299     // This is a safe place to check if the RA thread should be
300     // paused. Since the scratch space isn't flushed yet, it is safe
301     // to wait here until resume is invoked.
302     PauseMergeIfRequired();
303     return true;
304 }
305 
306 // Invoked by Merge thread - Notify RA thread about Merge completion
307 // for Block N and wake up
NotifyRAForMergeReady()308 void SnapshotHandler::NotifyRAForMergeReady() {
309     {
310         std::lock_guard<std::mutex> lock(lock_);
311         if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
312             io_state_ != MERGE_IO_TRANSITION::READ_AHEAD_FAILURE) {
313             io_state_ = MERGE_IO_TRANSITION::MERGE_READY;
314         }
315     }
316 
317     cv.notify_all();
318 
319     // This is a safe place to check if the merge thread should be
320     // paused. The data from the scratch space is merged to disk and is safe
321     // to wait.
322     PauseMergeIfRequired();
323 }
324 
325 // The following transitions are mostly in the failure paths
MergeFailed()326 void SnapshotHandler::MergeFailed() {
327     {
328         std::lock_guard<std::mutex> lock(lock_);
329         io_state_ = MERGE_IO_TRANSITION::MERGE_FAILED;
330     }
331 
332     cv.notify_all();
333 }
334 
MergeCompleted()335 void SnapshotHandler::MergeCompleted() {
336     {
337         std::lock_guard<std::mutex> lock(lock_);
338         io_state_ = MERGE_IO_TRANSITION::MERGE_COMPLETE;
339     }
340 
341     cv.notify_all();
342 }
343 
344 // This is invoked by worker threads.
345 //
346 // Worker threads are terminated either by two scenarios:
347 //
348 // 1: If dm-user device is destroyed
349 // 2: We had an I/O failure when reading root partitions
350 //
351 // In case (1), this would be a graceful shutdown. In this case, merge
352 // thread and RA thread should have _already_ terminated by this point. We will be
353 // destroying the dm-user device only _after_ merge is completed.
354 //
355 // In case (2), if merge thread had started, then it will be
356 // continuing to merge; however, since we had an I/O failure and the
357 // I/O on root partitions are no longer served, we will terminate the
358 // merge.
359 //
360 // This functions is about handling case (2)
NotifyIOTerminated()361 void SnapshotHandler::NotifyIOTerminated() {
362     {
363         std::lock_guard<std::mutex> lock(lock_);
364         io_state_ = MERGE_IO_TRANSITION::IO_TERMINATED;
365     }
366 
367     cv.notify_all();
368 }
369 
IsIOTerminated()370 bool SnapshotHandler::IsIOTerminated() {
371     std::lock_guard<std::mutex> lock(lock_);
372     return (io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED);
373 }
374 
375 // Invoked by RA thread
ReadAheadIOFailed()376 void SnapshotHandler::ReadAheadIOFailed() {
377     {
378         std::lock_guard<std::mutex> lock(lock_);
379         io_state_ = MERGE_IO_TRANSITION::READ_AHEAD_FAILURE;
380     }
381 
382     cv.notify_all();
383 }
384 
WaitForMergeComplete()385 void SnapshotHandler::WaitForMergeComplete() {
386     std::unique_lock<std::mutex> lock(lock_);
387     while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
388              io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
389              io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
390         cv.wait(lock);
391     }
392 }
393 
RaThreadStarted()394 void SnapshotHandler::RaThreadStarted() {
395     std::unique_lock<std::mutex> lock(lock_);
396     ra_thread_started_ = true;
397 }
398 
WaitForRaThreadToStart()399 void SnapshotHandler::WaitForRaThreadToStart() {
400     auto now = std::chrono::system_clock::now();
401     auto deadline = now + 3s;
402     {
403         std::unique_lock<std::mutex> lock(lock_);
404         while (!ra_thread_started_) {
405             auto status = cv.wait_until(lock, deadline);
406             if (status == std::cv_status::timeout) {
407                 SNAP_LOG(ERROR) << "Read-ahead thread did not start";
408                 return;
409             }
410         }
411     }
412 }
413 
MarkMergeComplete()414 void SnapshotHandler::MarkMergeComplete() {
415     std::lock_guard<std::mutex> lock(lock_);
416     merge_complete_ = true;
417 }
418 
PauseMergeThreads()419 void SnapshotHandler::PauseMergeThreads() {
420     {
421         std::lock_guard<std::mutex> lock(pause_merge_lock_);
422         pause_merge_ = true;
423     }
424 }
425 
ResumeMergeThreads()426 void SnapshotHandler::ResumeMergeThreads() {
427     {
428         std::lock_guard<std::mutex> lock(pause_merge_lock_);
429         pause_merge_ = false;
430     }
431 }
432 
GetMergeStatus()433 std::string SnapshotHandler::GetMergeStatus() {
434     bool merge_not_initiated = false;
435     bool merge_monitored = false;
436     bool merge_failed = false;
437     bool merge_complete = false;
438 
439     {
440         std::lock_guard<std::mutex> lock(lock_);
441 
442         if (MergeMonitored()) {
443             merge_monitored = true;
444         }
445 
446         if (!MergeInitiated()) {
447             merge_not_initiated = true;
448         }
449 
450         if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) {
451             merge_failed = true;
452         }
453 
454         merge_complete = merge_complete_;
455     }
456 
457     if (merge_not_initiated) {
458         // Merge was not initiated yet; however, we have merge completion
459         // recorded in the COW Header. This can happen if the device was
460         // rebooted during merge. During next reboot, libsnapshot will
461         // query the status and if the merge is completed, then snapshot-status
462         // file will be deleted
463         if (merge_complete) {
464             return "snapshot-merge-complete";
465         }
466 
467         // Merge monitor thread is tracking the merge but the merge thread
468         // is not started yet.
469         if (merge_monitored) {
470             return "snapshot-merge";
471         }
472 
473         // Return the state as "snapshot". If the device was rebooted during
474         // merge, we will return the status as "snapshot". This is ok, as
475         // libsnapshot will explicitly resume the merge. This is slightly
476         // different from kernel snapshot wherein once the snapshot was switched
477         // to merge target, during next boot, we immediately switch to merge
478         // target. We don't do that here because, during first stage init, we
479         // don't want to initiate the merge. The problem is that we have daemon
480         // transition between first and second stage init. If the merge was
481         // started, then we will have to quiesce the merge before switching
482         // the dm tables. Instead, we just wait until second stage daemon is up
483         // before resuming the merge.
484         return "snapshot";
485     }
486 
487     if (merge_failed) {
488         return "snapshot-merge-failed";
489     }
490 
491     if (merge_complete) {
492         return "snapshot-merge-complete";
493     }
494 
495     // Merge is in-progress
496     return "snapshot-merge";
497 }
498 
499 //========== End of Read-ahead state transition functions ====================
500 
501 /*
502  * Root partitions are mounted off dm-user and the I/O's are served
503  * by snapuserd worker threads.
504  *
505  * When there is an I/O request to be served by worker threads, we check
506  * if the corresponding sector is "changed" due to OTA by doing a lookup.
507  * If the lookup succeeds then the sector has been changed and that can
508  * either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO.
509  *
510  * For the case of REPLACE and ZERO ops, there is not much of a concern
511  * as there is no dependency between blocks. Hence all the I/O request
512  * mapped to these two COW operations will be served by reading the COW device.
513  *
514  * However, COPY and XOR ops are tricky. Since the merge operations are
515  * in-progress, we cannot just go and read from the source device. We need
516  * to be in sync with the state of the merge thread before serving the I/O.
517  *
518  * Given that we know merge thread processes a set of COW ops called as RA
519  * Blocks - These set of COW ops are fixed size wherein each Block comprises
520  * of 510 COW ops.
521  *
522  *  +--------------------------+
523  *  |op-1|op-2|op-3|....|op-510|
524  *  +--------------------------+
525  *
526  *  <------ Merge Group Block N ------>
527  *
528  * Thus, a Merge Group Block N, will fall into one of these states and will
529  * transition the states in the following order:
530  *
531  * 1: GROUP_MERGE_PENDING
532  * 2: GROUP_MERGE_RA_READY
533  * 2: GROUP_MERGE_IN_PROGRESS
534  * 3: GROUP_MERGE_COMPLETED
535  * 4: GROUP_MERGE_FAILED
536  *
537  * Let's say that we have the I/O request from dm-user whose sector gets mapped
538  * to a COPY operation with op-10 in the above "Merge Group Block N".
539  *
540  * 1: If the Group is in "GROUP_MERGE_PENDING" state:
541  *
542  *    Just read the data from source block based on COW op->source field. Note,
543  *    that we will take a ref count on "Block N". This ref count will prevent
544  *    merge thread to begin merging if there are any pending I/Os. Once the I/O
545  *    is completed, ref count on "Group N" is decremented. Merge thread will
546  *    resume merging "Group N" if there are no pending I/Os.
547  *
548  * 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state:
549  *
550  *    When the merge thread is ready to process a "Group", it will first move
551  *    the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point
552  *    onwards, I/O will be served from Read-ahead buffer. However, merge thread
553  *    cannot start merging this "Group" immediately. If there were any in-flight
554  *    I/O requests, merge thread should wait and allow those I/O's to drain.
555  *    Once all the in-flight I/O's are completed, merge thread will move the
556  *    state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will
557  *    be continued to serve from Read-ahead buffer during the entire duration
558  *    of the merge.
559  *
560  *    See SetMergeInProgress().
561  *
562  * 3: If the Group is in "GROUP_MERGE_COMPLETED" state:
563  *
564  *    This is straightforward. We just read the data directly from "Base"
565  *    device. We should not be reading the COW op->source field.
566  *
567  * 4: If the Block is in "GROUP_MERGE_FAILED" state:
568  *
569  *    Terminate the I/O with an I/O error as we don't know which "op" in the
570  *    "Group" failed.
571  *
572  *    Transition ensures that the I/O from root partitions are never made to
573  *    wait and are processed immediately. Thus the state transition for any
574  *    "Group" is:
575  *
576  *    GROUP_MERGE_PENDING
577  *          |
578  *          |
579  *          v
580  *    GROUP_MERGE_RA_READY
581  *          |
582  *          |
583  *          v
584  *    GROUP_MERGE_IN_PROGRESS
585  *          |
586  *          |----------------------------(on failure)
587  *          |                           |
588  *          v                           v
589  *    GROUP_MERGE_COMPLETED           GROUP_MERGE_FAILED
590  *
591  */
592 
593 // Invoked by Merge thread
SetMergeCompleted(size_t ra_index)594 void SnapshotHandler::SetMergeCompleted(size_t ra_index) {
595     MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
596     {
597         std::lock_guard<std::mutex> lock(blk_state->m_lock);
598 
599         CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
600         CHECK(blk_state->num_ios_in_progress == 0);
601 
602         // Merge is complete - All I/O henceforth should be read directly
603         // from base device
604         blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED;
605     }
606 }
607 
608 // Invoked by Merge thread. This is called just before the beginning
609 // of merging a given Block of 510 ops. If there are any in-flight I/O's
610 // from dm-user then wait for them to complete.
SetMergeInProgress(size_t ra_index)611 void SnapshotHandler::SetMergeInProgress(size_t ra_index) {
612     MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
613     {
614         std::unique_lock<std::mutex> lock(blk_state->m_lock);
615 
616         // We may have fallback from Async-merge to synchronous merging
617         // on the existing block. There is no need to reset as the
618         // merge is already in progress.
619         if (blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS) {
620             return;
621         }
622 
623         CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
624 
625         // First set the state to RA_READY so that in-flight I/O will drain
626         // and any new I/O will start reading from RA buffer
627         blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY;
628 
629         // Wait if there are any in-flight I/O's - we cannot merge at this point
630         while (!(blk_state->num_ios_in_progress == 0)) {
631             blk_state->m_cv.wait(lock);
632         }
633 
634         blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS;
635     }
636 }
637 
638 // Invoked by Merge thread on failure
SetMergeFailed(size_t ra_index)639 void SnapshotHandler::SetMergeFailed(size_t ra_index) {
640     MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
641     {
642         std::unique_lock<std::mutex> lock(blk_state->m_lock);
643 
644         blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED;
645     }
646 }
647 
648 // Invoked by worker threads when I/O is complete on a "MERGE_PENDING"
649 // Block. If there are no more in-flight I/Os, wake up merge thread
650 // to resume merging.
NotifyIOCompletion(uint64_t new_block)651 void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) {
652     auto it = block_to_ra_index_.find(new_block);
653     CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block;
654 
655     bool pending_ios = true;
656 
657     int ra_index = it->second;
658     MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
659     {
660         std::unique_lock<std::mutex> lock(blk_state->m_lock);
661 
662         blk_state->num_ios_in_progress -= 1;
663         if (blk_state->num_ios_in_progress == 0) {
664             pending_ios = false;
665         }
666     }
667 
668     // Give a chance to merge-thread to resume merge
669     // as there are no pending I/O.
670     if (!pending_ios) {
671         blk_state->m_cv.notify_all();
672     }
673 }
674 
GetRABuffer(std::unique_lock<std::mutex> * lock,uint64_t block,void * buffer)675 bool SnapshotHandler::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block,
676                                   void* buffer) {
677     if (!lock->owns_lock()) {
678         SNAP_LOG(ERROR) << "GetRABuffer - Lock not held";
679         return false;
680     }
681     std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
682 
683     if (it == read_ahead_buffer_map_.end()) {
684         return false;
685     }
686 
687     memcpy(buffer, it->second, BLOCK_SZ);
688     return true;
689 }
690 
691 // Invoked by worker threads in the I/O path. This is called when a sector
692 // is mapped to a COPY/XOR COW op.
ProcessMergingBlock(uint64_t new_block,void * buffer)693 MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) {
694     auto it = block_to_ra_index_.find(new_block);
695     if (it == block_to_ra_index_.end()) {
696         return MERGE_GROUP_STATE::GROUP_INVALID;
697     }
698 
699     int ra_index = it->second;
700     MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
701     {
702         std::unique_lock<std::mutex> lock(blk_state->m_lock);
703 
704         MERGE_GROUP_STATE state = blk_state->merge_state_;
705         switch (state) {
706             case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
707                 // If this is a merge-resume path, check if the data is
708                 // available from scratch space. Data from scratch space takes
709                 // higher precedence than from source device for overlapping
710                 // blocks.
711                 if (resume_merge_ && GetRABuffer(&lock, new_block, buffer)) {
712                     return (MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
713                 }
714                 blk_state->num_ios_in_progress += 1;  // ref count
715                 [[fallthrough]];
716             }
717             case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
718                 [[fallthrough]];
719             }
720             case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: {
721                 return state;
722             }
723             // Fetch the data from RA buffer.
724             case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
725                 [[fallthrough]];
726             }
727             case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
728                 if (!GetRABuffer(&lock, new_block, buffer)) {
729                     return MERGE_GROUP_STATE::GROUP_INVALID;
730                 }
731                 return state;
732             }
733             default: {
734                 return MERGE_GROUP_STATE::GROUP_INVALID;
735             }
736         }
737     }
738 }
739 
operator <<(std::ostream & os,MERGE_IO_TRANSITION value)740 std::ostream& operator<<(std::ostream& os, MERGE_IO_TRANSITION value) {
741     switch (value) {
742         case MERGE_IO_TRANSITION::INVALID:
743             return os << "INVALID";
744         case MERGE_IO_TRANSITION::MERGE_READY:
745             return os << "MERGE_READY";
746         case MERGE_IO_TRANSITION::MERGE_BEGIN:
747             return os << "MERGE_BEGIN";
748         case MERGE_IO_TRANSITION::MERGE_FAILED:
749             return os << "MERGE_FAILED";
750         case MERGE_IO_TRANSITION::MERGE_COMPLETE:
751             return os << "MERGE_COMPLETE";
752         case MERGE_IO_TRANSITION::IO_TERMINATED:
753             return os << "IO_TERMINATED";
754         case MERGE_IO_TRANSITION::READ_AHEAD_FAILURE:
755             return os << "READ_AHEAD_FAILURE";
756         default:
757             return os << "unknown";
758     }
759 }
760 
761 }  // namespace snapshot
762 }  // namespace android
763