• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd.h"
18 
19 #include <csignal>
20 #include <optional>
21 #include <set>
22 
23 #include <snapuserd/snapuserd_client.h>
24 
25 namespace android {
26 namespace snapshot {
27 
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31 
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34 
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)35 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
36                            const std::string& control_device, const std::string& misc_name,
37                            std::shared_ptr<Snapuserd> snapuserd) {
38     cow_device_ = cow_device;
39     backing_store_device_ = backing_device;
40     control_device_ = control_device;
41     misc_name_ = misc_name;
42     snapuserd_ = snapuserd;
43     exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
44 }
45 
InitializeFds()46 bool WorkerThread::InitializeFds() {
47     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
48     if (backing_store_fd_ < 0) {
49         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
50         return false;
51     }
52 
53     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
54     if (cow_fd_ < 0) {
55         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
56         return false;
57     }
58 
59     ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
60     if (ctrl_fd_ < 0) {
61         SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
62         return false;
63     }
64 
65     return true;
66 }
67 
InitReader()68 bool WorkerThread::InitReader() {
69     reader_ = snapuserd_->CloneReaderForWorker();
70 
71     if (!reader_->InitForMerge(std::move(cow_fd_))) {
72         return false;
73     }
74     return true;
75 }
76 
77 // Construct kernel COW header in memory
78 // This header will be in sector 0. The IO
79 // request will always be 4k. After constructing
80 // the header, zero out the remaining block.
ConstructKernelCowHeader()81 void WorkerThread::ConstructKernelCowHeader() {
82     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
83 
84     memset(buffer, 0, BLOCK_SZ);
85 
86     struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
87 
88     dh->magic = SNAP_MAGIC;
89     dh->valid = SNAPSHOT_VALID;
90     dh->version = SNAPSHOT_DISK_VERSION;
91     dh->chunk_size = CHUNK_SIZE;
92 }
93 
94 // Start the replace operation. This will read the
95 // internal COW format and if the block is compressed,
96 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)97 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
98     if (!reader_->ReadData(*cow_op, &bufsink_)) {
99         SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
100         return false;
101     }
102 
103     return true;
104 }
105 
ReadFromBaseDevice(const CowOperation * cow_op)106 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
107     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
108     if (buffer == nullptr) {
109         SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
110         return false;
111     }
112     SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
113                     << " Source: " << cow_op->source;
114     uint64_t offset = cow_op->source;
115     if (cow_op->type == kCowCopyOp) {
116         offset *= BLOCK_SZ;
117     }
118     if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
119         std::string op;
120         if (cow_op->type == kCowCopyOp)
121             op = "Copy-op";
122         else {
123             op = "Xor-op";
124         }
125         SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
126                          << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
127         return false;
128     }
129 
130     return true;
131 }
132 
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)133 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
134     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
135     if (buffer == nullptr) {
136         SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
137         return false;
138     }
139 
140     if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
141         return false;
142     }
143 
144     return true;
145 }
146 
147 // Start the copy operation. This will read the backing
148 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)149 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
150     if (!GetReadAheadPopulatedBuffer(cow_op)) {
151         SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
152                         << " new_block: " << cow_op->new_block;
153         if (!ReadFromBaseDevice(cow_op)) {
154             return false;
155         }
156     }
157 
158     return true;
159 }
160 
ProcessXorOp(const CowOperation * cow_op)161 bool WorkerThread::ProcessXorOp(const CowOperation* cow_op) {
162     if (!GetReadAheadPopulatedBuffer(cow_op)) {
163         SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
164                         << " new_block: " << cow_op->new_block;
165         if (!ReadFromBaseDevice(cow_op)) {
166             return false;
167         }
168     }
169     xorsink_.Reset();
170     if (!reader_->ReadData(*cow_op, &xorsink_)) {
171         SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
172         return false;
173     }
174 
175     return true;
176 }
177 
ProcessZeroOp()178 bool WorkerThread::ProcessZeroOp() {
179     // Zero out the entire block
180     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
181     if (buffer == nullptr) {
182         SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
183         return false;
184     }
185 
186     memset(buffer, 0, BLOCK_SZ);
187     return true;
188 }
189 
ProcessCowOp(const CowOperation * cow_op)190 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
191     if (cow_op == nullptr) {
192         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
193         return false;
194     }
195 
196     switch (cow_op->type) {
197         case kCowReplaceOp: {
198             return ProcessReplaceOp(cow_op);
199         }
200 
201         case kCowZeroOp: {
202             return ProcessZeroOp();
203         }
204 
205         case kCowCopyOp: {
206             return ProcessCopyOp(cow_op);
207         }
208 
209         case kCowXorOp: {
210             return ProcessXorOp(cow_op);
211         }
212 
213         default: {
214             SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
215         }
216     }
217     return false;
218 }
219 
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)220 int WorkerThread::ReadUnalignedSector(
221         sector_t sector, size_t size,
222         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
223     size_t skip_sector_size = 0;
224 
225     SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
226                     << " Aligned sector: " << it->first;
227 
228     if (!ProcessCowOp(it->second)) {
229         SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
230                         << " Aligned sector: " << it->first;
231         return -1;
232     }
233 
234     int num_sectors_skip = sector - it->first;
235 
236     if (num_sectors_skip > 0) {
237         skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
238         char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
239         struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
240 
241         if (skip_sector_size == BLOCK_SZ) {
242             SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
243                             << " Base-sector: " << it->first;
244             return -1;
245         }
246 
247         memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
248                 (BLOCK_SZ - skip_sector_size));
249     }
250 
251     bufsink_.ResetBufferOffset();
252     return std::min(size, (BLOCK_SZ - skip_sector_size));
253 }
254 
255 /*
256  * Read the data for a given COW Operation.
257  *
258  * Kernel can issue IO at a sector granularity.
259  * Hence, an IO may end up with reading partial
260  * data from a COW operation or we may also
261  * end up with interspersed request between
262  * two COW operations.
263  *
264  */
ReadData(sector_t sector,size_t size)265 int WorkerThread::ReadData(sector_t sector, size_t size) {
266     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
267     std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
268     /*
269      * chunk_map stores COW operation at 4k granularity.
270      * If the requested IO with the sector falls on the 4k
271      * boundary, then we can read the COW op directly without
272      * any issue.
273      *
274      * However, if the requested sector is not 4K aligned,
275      * then we will have the find the nearest COW operation
276      * and chop the 4K block to fetch the requested sector.
277      */
278     it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
279                           Snapuserd::compare);
280 
281     bool read_end_of_device = false;
282     if (it == chunk_vec.end()) {
283         // |-------|-------|-------|
284         // 0       1       2       3
285         //
286         // Block 0 - op 1
287         // Block 1 - op 2
288         // Block 2 - op 3
289         //
290         // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
291         //
292         // Each block is 4k bytes. Thus, the last block will span 8 sectors
293         // ranging till block 3 (However, block 3 won't be in chunk_vec as
294         // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
295         // spanning between block 2 and block 3, we need to step back
296         // and get hold of the last element.
297         //
298         // Additionally, dm-snapshot makes sure that I/O request beyond block 3
299         // will not be routed to the daemon. Hence, it is safe to assume that
300         // if a sector is not available in the chunk_vec, the I/O falls in the
301         // end of region.
302         it = std::prev(chunk_vec.end());
303         read_end_of_device = true;
304     }
305 
306     // We didn't find the required sector; hence find the previous sector
307     // as lower_bound will gives us the value greater than
308     // the requested sector
309     if (it->first != sector) {
310         if (it != chunk_vec.begin() && !read_end_of_device) {
311             --it;
312         }
313 
314         /*
315          * If the IO is spanned between two COW operations,
316          * split the IO into two parts:
317          *
318          * 1: Read the first part from the single COW op
319          * 2: Read the second part from the next COW op.
320          *
321          * Ex: Let's say we have a 1024 Bytes IO request.
322          *
323          * 0       COW OP-1  4096     COW OP-2  8192
324          * |******************|*******************|
325          *              |*****|*****|
326          *           3584           4608
327          *              <- 1024B - >
328          *
329          * We have two COW operations which are 4k blocks.
330          * The IO is requested for 1024 Bytes which are spanned
331          * between two COW operations. We will split this IO
332          * into two parts:
333          *
334          * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
335          * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
336          */
337         return ReadUnalignedSector(sector, size, it);
338     }
339 
340     int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
341     sector_t read_sector = sector;
342     while (num_ops) {
343         // We have to make sure that the reads are
344         // sequential; there shouldn't be a data
345         // request merged with a metadata IO.
346         if (it->first != read_sector) {
347             SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
348                             << " cow-op sector: " << it->first;
349             return -1;
350         } else if (!ProcessCowOp(it->second)) {
351             return -1;
352         }
353         num_ops -= 1;
354         read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
355 
356         it++;
357 
358         if (it == chunk_vec.end() && num_ops) {
359             SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
360                             << " COW ops completed; pending read-request: " << num_ops;
361             return -1;
362         }
363         // Update the buffer offset
364         bufsink_.UpdateBufferOffset(BLOCK_SZ);
365     }
366 
367     // Reset the buffer offset
368     bufsink_.ResetBufferOffset();
369     return size;
370 }
371 
372 /*
373  * dm-snap does prefetch reads while reading disk-exceptions.
374  * By default, prefetch value is set to 12; this means that
375  * dm-snap will issue 12 areas wherein each area is a 4k page
376  * of disk-exceptions.
377  *
378  * If during prefetch, if the chunk-id seen is beyond the
379  * actual number of metadata page, fill the buffer with zero.
380  * When dm-snap starts parsing the buffer, it will stop
381  * reading metadata page once the buffer content is zero.
382  */
ZerofillDiskExceptions(size_t read_size)383 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
384     size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
385 
386     if (read_size > size) {
387         return false;
388     }
389 
390     void* buffer = bufsink_.GetPayloadBuffer(size);
391     if (buffer == nullptr) {
392         SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
393         return false;
394     }
395 
396     memset(buffer, 0, size);
397     return true;
398 }
399 
400 /*
401  * A disk exception is a simple mapping of old_chunk to new_chunk.
402  * When dm-snapshot device is created, kernel requests these mapping.
403  *
404  * Each disk exception is of size 16 bytes. Thus a single 4k page can
405  * have:
406  *
407  * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
408  * is considered a metadata page and it is represented by chunk ID.
409  *
410  * Convert the chunk ID to index into the vector which gives us
411  * the metadata page.
412  */
ReadDiskExceptions(chunk_t chunk,size_t read_size)413 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
414     uint32_t stride = exceptions_per_area_ + 1;
415     size_t size;
416     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
417 
418     // ChunkID to vector index
419     lldiv_t divresult = lldiv(chunk, stride);
420 
421     if (divresult.quot < vec.size()) {
422         size = exceptions_per_area_ * sizeof(struct disk_exception);
423 
424         if (read_size != size) {
425             SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
426                             << " does not match with size: " << size;
427             return false;
428         }
429 
430         void* buffer = bufsink_.GetPayloadBuffer(size);
431         if (buffer == nullptr) {
432             SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
433             return false;
434         }
435 
436         memcpy(buffer, vec[divresult.quot].get(), size);
437     } else {
438         return ZerofillDiskExceptions(read_size);
439     }
440 
441     return true;
442 }
443 
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)444 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
445                                          int* unmerged_exceptions) {
446     loff_t offset = 0;
447     *unmerged_exceptions = 0;
448 
449     while (*unmerged_exceptions <= exceptions_per_area_) {
450         struct disk_exception* merged_de =
451                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
452         struct disk_exception* cow_de =
453                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
454 
455         // Unmerged op by the kernel
456         if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
457             if (!(merged_de->old_chunk == cow_de->old_chunk)) {
458                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
459                                 << merged_de->old_chunk
460                                 << "cow_de->old_chunk: " << cow_de->old_chunk;
461                 return -1;
462             }
463 
464             if (!(merged_de->new_chunk == cow_de->new_chunk)) {
465                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
466                                 << merged_de->new_chunk
467                                 << "cow_de->new_chunk: " << cow_de->new_chunk;
468                 return -1;
469             }
470 
471             offset += sizeof(struct disk_exception);
472             *unmerged_exceptions += 1;
473             continue;
474         }
475 
476         break;
477     }
478 
479     SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
480     return offset;
481 }
482 
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * ordered_op,bool * commit)483 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
484                                        int unmerged_exceptions, bool* ordered_op, bool* commit) {
485     int merged_ops_cur_iter = 0;
486     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
487     *ordered_op = false;
488     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
489 
490     // Find the operations which are merged in this cycle.
491     while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
492         struct disk_exception* merged_de =
493                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
494         struct disk_exception* cow_de =
495                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
496 
497         if (!(merged_de->new_chunk == 0)) {
498             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
499             return -1;
500         }
501 
502         if (!(merged_de->old_chunk == 0)) {
503             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
504             return -1;
505         }
506 
507         if (cow_de->new_chunk != 0) {
508             merged_ops_cur_iter += 1;
509             offset += sizeof(struct disk_exception);
510             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
511                                        std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
512                                        Snapuserd::compare);
513 
514             if (!(it != chunk_vec.end())) {
515                 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
516                 return -1;
517             }
518 
519             if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
520                 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
521                 return -1;
522             }
523             const CowOperation* cow_op = it->second;
524 
525             if (snapuserd_->IsReadAheadFeaturePresent() && IsOrderedOp(*cow_op)) {
526                 *ordered_op = true;
527                 // Every single ordered operation has to come from read-ahead
528                 // cache.
529                 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
530                     SNAP_LOG(ERROR)
531                             << " Block: " << cow_op->new_block << " not found in read-ahead cache"
532                             << " Source: " << cow_op->source;
533                     return -1;
534                 }
535                 // If this is a final block merged in the read-ahead buffer
536                 // region, notify the read-ahead thread to make forward
537                 // progress
538                 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
539                     *commit = true;
540                 }
541             }
542 
543             // zero out to indicate that operation is merged.
544             cow_de->old_chunk = 0;
545             cow_de->new_chunk = 0;
546         } else if (cow_de->old_chunk == 0) {
547             // Already merged op in previous iteration or
548             // This could also represent a partially filled area.
549             //
550             // If the op was merged in previous cycle, we don't have
551             // to count them.
552             break;
553         } else {
554             SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
555                             << " merged_de-old-chunk: " << merged_de->old_chunk
556                             << " merged_de-new-chunk: " << merged_de->new_chunk
557                             << " cow_de-old-chunk: " << cow_de->old_chunk
558                             << " cow_de-new-chunk: " << cow_de->new_chunk
559                             << " unmerged_exceptions: " << unmerged_exceptions
560                             << " merged_ops_cur_iter: " << merged_ops_cur_iter
561                             << " offset: " << offset;
562             return -1;
563         }
564     }
565     return merged_ops_cur_iter;
566 }
567 
ProcessMergeComplete(chunk_t chunk,void * buffer)568 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
569     uint32_t stride = exceptions_per_area_ + 1;
570     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
571     bool ordered_op = false;
572     bool commit = false;
573 
574     // ChunkID to vector index
575     lldiv_t divresult = lldiv(chunk, stride);
576 
577     if (!(divresult.quot < vec.size())) {
578         SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
579                         << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
580         return false;
581     }
582 
583     SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
584                     << " Metadata-Index: " << divresult.quot;
585 
586     int unmerged_exceptions = 0;
587     loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
588 
589     if (offset < 0) {
590         SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
591                         << unmerged_exceptions;
592         return false;
593     }
594 
595     int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
596                                                    unmerged_exceptions, &ordered_op, &commit);
597 
598     // There should be at least one operation merged in this cycle
599     if (!(merged_ops_cur_iter > 0)) {
600         SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
601         return false;
602     }
603 
604     if (ordered_op) {
605         if (commit) {
606             // Push the flushing logic to read-ahead thread so that merge thread
607             // can make forward progress. Sync will happen in the background
608             snapuserd_->StartReadAhead();
609         }
610     } else {
611         // Non-copy ops and all ops in older COW format
612         if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
613             SNAP_LOG(ERROR) << "CommitMerge failed...";
614             return false;
615         }
616     }
617 
618     SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
619     return true;
620 }
621 
622 // Read Header from dm-user misc device. This gives
623 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()624 bool WorkerThread::ReadDmUserHeader() {
625     if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
626         if (errno != ENOTBLK) {
627             SNAP_PLOG(ERROR) << "Control-read failed";
628         }
629 
630         return false;
631     }
632 
633     return true;
634 }
635 
636 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)637 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
638     size_t payload_size = size;
639     void* buf = bufsink_.GetPayloadBufPtr();
640     if (header_response) {
641         payload_size += sizeof(struct dm_user_header);
642         buf = bufsink_.GetBufPtr();
643     }
644 
645     if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
646         SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
647         return false;
648     }
649 
650     return true;
651 }
652 
ReadDmUserPayload(void * buffer,size_t size)653 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
654     if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
655         SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
656         return false;
657     }
658 
659     return true;
660 }
661 
DmuserWriteRequest()662 bool WorkerThread::DmuserWriteRequest() {
663     struct dm_user_header* header = bufsink_.GetHeaderPtr();
664 
665     // device mapper has the capability to allow
666     // targets to flush the cache when writes are completed. This
667     // is controlled by each target by a flag "flush_supported".
668     // This flag is set by dm-user. When flush is supported,
669     // a number of zero-length bio's will be submitted to
670     // the target for the purpose of flushing cache. It is the
671     // responsibility of the target driver - which is dm-user in this
672     // case, to remap these bio's to the underlying device. Since,
673     // there is no underlying device for dm-user, this zero length
674     // bio's gets routed to daemon.
675     //
676     // Flush operations are generated post merge by dm-snap by having
677     // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
678     // to flush per se; hence, just respond back with a success message.
679     if (header->sector == 0) {
680         if (!(header->len == 0)) {
681             SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
682             header->type = DM_USER_RESP_ERROR;
683         } else {
684             header->type = DM_USER_RESP_SUCCESS;
685         }
686 
687         if (!WriteDmUserPayload(0, true)) {
688             return false;
689         }
690         return true;
691     }
692 
693     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
694     size_t remaining_size = header->len;
695     size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
696 
697     chunk_t chunk = SectorToChunk(header->sector);
698     auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
699                                std::make_pair(header->sector, nullptr), Snapuserd::compare);
700 
701     bool not_found = (it == chunk_vec.end() || it->first != header->sector);
702 
703     if (not_found) {
704         void* buffer = bufsink_.GetPayloadBuffer(read_size);
705         if (buffer == nullptr) {
706             SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
707                             << read_size;
708             header->type = DM_USER_RESP_ERROR;
709         } else {
710             header->type = DM_USER_RESP_SUCCESS;
711 
712             if (!ReadDmUserPayload(buffer, read_size)) {
713                 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
714                                 << "Sector: " << header->sector;
715                 header->type = DM_USER_RESP_ERROR;
716             }
717 
718             if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
719                 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
720                                 << "Sector: " << header->sector;
721                 header->type = DM_USER_RESP_ERROR;
722             }
723         }
724     } else {
725         SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
726         header->type = DM_USER_RESP_ERROR;
727     }
728 
729     if (!WriteDmUserPayload(0, true)) {
730         return false;
731     }
732 
733     return true;
734 }
735 
DmuserReadRequest()736 bool WorkerThread::DmuserReadRequest() {
737     struct dm_user_header* header = bufsink_.GetHeaderPtr();
738     size_t remaining_size = header->len;
739     loff_t offset = 0;
740     sector_t sector = header->sector;
741     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
742     bool header_response = true;
743     do {
744         size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
745 
746         int ret = read_size;
747         header->type = DM_USER_RESP_SUCCESS;
748         chunk_t chunk = SectorToChunk(header->sector);
749 
750         // Request to sector 0 is always for kernel
751         // representation of COW header. This IO should be only
752         // once during dm-snapshot device creation. We should
753         // never see multiple IO requests. Additionally this IO
754         // will always be a single 4k.
755         if (header->sector == 0) {
756             if (read_size == BLOCK_SZ) {
757                 ConstructKernelCowHeader();
758                 SNAP_LOG(DEBUG) << "Kernel header constructed";
759             } else {
760                 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
761                 header->type = DM_USER_RESP_ERROR;
762             }
763         } else {
764             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
765                                        std::make_pair(header->sector, nullptr), Snapuserd::compare);
766             bool not_found = (it == chunk_vec.end() || it->first != header->sector);
767             if (!offset && (read_size == BLOCK_SZ) && not_found) {
768                 if (!ReadDiskExceptions(chunk, read_size)) {
769                     SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
770                                     << "Sector: " << header->sector;
771                     header->type = DM_USER_RESP_ERROR;
772                 } else {
773                     SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
774                                     << "Sector: " << header->sector;
775                 }
776             } else {
777                 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
778 
779                 ret = ReadData(sector + num_sectors_read, read_size);
780                 if (ret < 0) {
781                     SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
782                                     << " Sector: " << (sector + num_sectors_read)
783                                     << " size: " << read_size << " header-len: " << header->len;
784                     header->type = DM_USER_RESP_ERROR;
785                 } else {
786                     SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
787                                     << "Sector: " << header->sector;
788                 }
789             }
790         }
791 
792         // Just return the header if it is an error
793         if (header->type == DM_USER_RESP_ERROR) {
794             SNAP_LOG(ERROR) << "IO read request failed...";
795             ret = 0;
796         }
797 
798         if (!header_response) {
799             CHECK(header->type == DM_USER_RESP_SUCCESS)
800                     << " failed for sector: " << sector << " header->len: " << header->len
801                     << " remaining_size: " << remaining_size;
802         }
803 
804         // Daemon will not be terminated if there is any error. We will
805         // just send the error back to dm-user.
806         if (!WriteDmUserPayload(ret, header_response)) {
807             return false;
808         }
809 
810         if (header->type == DM_USER_RESP_ERROR) {
811             break;
812         }
813 
814         remaining_size -= ret;
815         offset += ret;
816         header_response = false;
817     } while (remaining_size > 0);
818 
819     return true;
820 }
821 
InitializeBufsink()822 void WorkerThread::InitializeBufsink() {
823     // Allocate the buffer which is used to communicate between
824     // daemon and dm-user. The buffer comprises of header and a fixed payload.
825     // If the dm-user requests a big IO, the IO will be broken into chunks
826     // of PAYLOAD_SIZE.
827     size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
828     bufsink_.Initialize(buf_size);
829 }
830 
RunThread()831 bool WorkerThread::RunThread() {
832     InitializeBufsink();
833     xorsink_.Initialize(&bufsink_, BLOCK_SZ);
834 
835     if (!InitializeFds()) {
836         return false;
837     }
838 
839     if (!InitReader()) {
840         return false;
841     }
842 
843     // Start serving IO
844     while (true) {
845         if (!ProcessIORequest()) {
846             break;
847         }
848     }
849 
850     CloseFds();
851     reader_->CloseCowFd();
852 
853     return true;
854 }
855 
ProcessIORequest()856 bool WorkerThread::ProcessIORequest() {
857     struct dm_user_header* header = bufsink_.GetHeaderPtr();
858 
859     if (!ReadDmUserHeader()) {
860         return false;
861     }
862 
863     SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
864     SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
865     SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
866     SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
867     SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
868 
869     switch (header->type) {
870         case DM_USER_REQ_MAP_READ: {
871             if (!DmuserReadRequest()) {
872                 return false;
873             }
874             break;
875         }
876 
877         case DM_USER_REQ_MAP_WRITE: {
878             if (!DmuserWriteRequest()) {
879                 return false;
880             }
881             break;
882         }
883     }
884 
885     return true;
886 }
887 
888 }  // namespace snapshot
889 }  // namespace android
890