1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd.h"
18
19 #include <csignal>
20 #include <optional>
21 #include <set>
22
23 #include <snapuserd/snapuserd_client.h>
24
25 namespace android {
26 namespace snapshot {
27
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)35 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
36 const std::string& control_device, const std::string& misc_name,
37 std::shared_ptr<Snapuserd> snapuserd) {
38 cow_device_ = cow_device;
39 backing_store_device_ = backing_device;
40 control_device_ = control_device;
41 misc_name_ = misc_name;
42 snapuserd_ = snapuserd;
43 exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
44 }
45
InitializeFds()46 bool WorkerThread::InitializeFds() {
47 backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
48 if (backing_store_fd_ < 0) {
49 SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
50 return false;
51 }
52
53 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
54 if (cow_fd_ < 0) {
55 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
56 return false;
57 }
58
59 ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
60 if (ctrl_fd_ < 0) {
61 SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
62 return false;
63 }
64
65 return true;
66 }
67
InitReader()68 bool WorkerThread::InitReader() {
69 reader_ = snapuserd_->CloneReaderForWorker();
70
71 if (!reader_->InitForMerge(std::move(cow_fd_))) {
72 return false;
73 }
74 return true;
75 }
76
77 // Construct kernel COW header in memory
78 // This header will be in sector 0. The IO
79 // request will always be 4k. After constructing
80 // the header, zero out the remaining block.
ConstructKernelCowHeader()81 void WorkerThread::ConstructKernelCowHeader() {
82 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
83
84 memset(buffer, 0, BLOCK_SZ);
85
86 struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
87
88 dh->magic = SNAP_MAGIC;
89 dh->valid = SNAPSHOT_VALID;
90 dh->version = SNAPSHOT_DISK_VERSION;
91 dh->chunk_size = CHUNK_SIZE;
92 }
93
94 // Start the replace operation. This will read the
95 // internal COW format and if the block is compressed,
96 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)97 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
98 if (!reader_->ReadData(*cow_op, &bufsink_)) {
99 SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
100 return false;
101 }
102
103 return true;
104 }
105
ReadFromBaseDevice(const CowOperation * cow_op)106 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
107 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
108 if (buffer == nullptr) {
109 SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
110 return false;
111 }
112 SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
113 << " Source: " << cow_op->source;
114 uint64_t offset = cow_op->source;
115 if (cow_op->type == kCowCopyOp) {
116 offset *= BLOCK_SZ;
117 }
118 if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
119 std::string op;
120 if (cow_op->type == kCowCopyOp)
121 op = "Copy-op";
122 else {
123 op = "Xor-op";
124 }
125 SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
126 << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
127 return false;
128 }
129
130 return true;
131 }
132
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)133 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
134 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
135 if (buffer == nullptr) {
136 SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
137 return false;
138 }
139
140 if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
141 return false;
142 }
143
144 return true;
145 }
146
147 // Start the copy operation. This will read the backing
148 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)149 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
150 if (!GetReadAheadPopulatedBuffer(cow_op)) {
151 SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
152 << " new_block: " << cow_op->new_block;
153 if (!ReadFromBaseDevice(cow_op)) {
154 return false;
155 }
156 }
157
158 return true;
159 }
160
ProcessXorOp(const CowOperation * cow_op)161 bool WorkerThread::ProcessXorOp(const CowOperation* cow_op) {
162 if (!GetReadAheadPopulatedBuffer(cow_op)) {
163 SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
164 << " new_block: " << cow_op->new_block;
165 if (!ReadFromBaseDevice(cow_op)) {
166 return false;
167 }
168 }
169 xorsink_.Reset();
170 if (!reader_->ReadData(*cow_op, &xorsink_)) {
171 SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
172 return false;
173 }
174
175 return true;
176 }
177
ProcessZeroOp()178 bool WorkerThread::ProcessZeroOp() {
179 // Zero out the entire block
180 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
181 if (buffer == nullptr) {
182 SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
183 return false;
184 }
185
186 memset(buffer, 0, BLOCK_SZ);
187 return true;
188 }
189
ProcessCowOp(const CowOperation * cow_op)190 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
191 if (cow_op == nullptr) {
192 SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
193 return false;
194 }
195
196 switch (cow_op->type) {
197 case kCowReplaceOp: {
198 return ProcessReplaceOp(cow_op);
199 }
200
201 case kCowZeroOp: {
202 return ProcessZeroOp();
203 }
204
205 case kCowCopyOp: {
206 return ProcessCopyOp(cow_op);
207 }
208
209 case kCowXorOp: {
210 return ProcessXorOp(cow_op);
211 }
212
213 default: {
214 SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
215 }
216 }
217 return false;
218 }
219
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)220 int WorkerThread::ReadUnalignedSector(
221 sector_t sector, size_t size,
222 std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
223 size_t skip_sector_size = 0;
224
225 SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
226 << " Aligned sector: " << it->first;
227
228 if (!ProcessCowOp(it->second)) {
229 SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
230 << " Aligned sector: " << it->first;
231 return -1;
232 }
233
234 int num_sectors_skip = sector - it->first;
235
236 if (num_sectors_skip > 0) {
237 skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
238 char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
239 struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
240
241 if (skip_sector_size == BLOCK_SZ) {
242 SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
243 << " Base-sector: " << it->first;
244 return -1;
245 }
246
247 memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
248 (BLOCK_SZ - skip_sector_size));
249 }
250
251 bufsink_.ResetBufferOffset();
252 return std::min(size, (BLOCK_SZ - skip_sector_size));
253 }
254
255 /*
256 * Read the data for a given COW Operation.
257 *
258 * Kernel can issue IO at a sector granularity.
259 * Hence, an IO may end up with reading partial
260 * data from a COW operation or we may also
261 * end up with interspersed request between
262 * two COW operations.
263 *
264 */
ReadData(sector_t sector,size_t size)265 int WorkerThread::ReadData(sector_t sector, size_t size) {
266 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
267 std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
268 /*
269 * chunk_map stores COW operation at 4k granularity.
270 * If the requested IO with the sector falls on the 4k
271 * boundary, then we can read the COW op directly without
272 * any issue.
273 *
274 * However, if the requested sector is not 4K aligned,
275 * then we will have the find the nearest COW operation
276 * and chop the 4K block to fetch the requested sector.
277 */
278 it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
279 Snapuserd::compare);
280
281 bool read_end_of_device = false;
282 if (it == chunk_vec.end()) {
283 // |-------|-------|-------|
284 // 0 1 2 3
285 //
286 // Block 0 - op 1
287 // Block 1 - op 2
288 // Block 2 - op 3
289 //
290 // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
291 //
292 // Each block is 4k bytes. Thus, the last block will span 8 sectors
293 // ranging till block 3 (However, block 3 won't be in chunk_vec as
294 // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
295 // spanning between block 2 and block 3, we need to step back
296 // and get hold of the last element.
297 //
298 // Additionally, dm-snapshot makes sure that I/O request beyond block 3
299 // will not be routed to the daemon. Hence, it is safe to assume that
300 // if a sector is not available in the chunk_vec, the I/O falls in the
301 // end of region.
302 it = std::prev(chunk_vec.end());
303 read_end_of_device = true;
304 }
305
306 // We didn't find the required sector; hence find the previous sector
307 // as lower_bound will gives us the value greater than
308 // the requested sector
309 if (it->first != sector) {
310 if (it != chunk_vec.begin() && !read_end_of_device) {
311 --it;
312 }
313
314 /*
315 * If the IO is spanned between two COW operations,
316 * split the IO into two parts:
317 *
318 * 1: Read the first part from the single COW op
319 * 2: Read the second part from the next COW op.
320 *
321 * Ex: Let's say we have a 1024 Bytes IO request.
322 *
323 * 0 COW OP-1 4096 COW OP-2 8192
324 * |******************|*******************|
325 * |*****|*****|
326 * 3584 4608
327 * <- 1024B - >
328 *
329 * We have two COW operations which are 4k blocks.
330 * The IO is requested for 1024 Bytes which are spanned
331 * between two COW operations. We will split this IO
332 * into two parts:
333 *
334 * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
335 * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
336 */
337 return ReadUnalignedSector(sector, size, it);
338 }
339
340 int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
341 sector_t read_sector = sector;
342 while (num_ops) {
343 // We have to make sure that the reads are
344 // sequential; there shouldn't be a data
345 // request merged with a metadata IO.
346 if (it->first != read_sector) {
347 SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
348 << " cow-op sector: " << it->first;
349 return -1;
350 } else if (!ProcessCowOp(it->second)) {
351 return -1;
352 }
353 num_ops -= 1;
354 read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
355
356 it++;
357
358 if (it == chunk_vec.end() && num_ops) {
359 SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
360 << " COW ops completed; pending read-request: " << num_ops;
361 return -1;
362 }
363 // Update the buffer offset
364 bufsink_.UpdateBufferOffset(BLOCK_SZ);
365 }
366
367 // Reset the buffer offset
368 bufsink_.ResetBufferOffset();
369 return size;
370 }
371
372 /*
373 * dm-snap does prefetch reads while reading disk-exceptions.
374 * By default, prefetch value is set to 12; this means that
375 * dm-snap will issue 12 areas wherein each area is a 4k page
376 * of disk-exceptions.
377 *
378 * If during prefetch, if the chunk-id seen is beyond the
379 * actual number of metadata page, fill the buffer with zero.
380 * When dm-snap starts parsing the buffer, it will stop
381 * reading metadata page once the buffer content is zero.
382 */
ZerofillDiskExceptions(size_t read_size)383 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
384 size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
385
386 if (read_size > size) {
387 return false;
388 }
389
390 void* buffer = bufsink_.GetPayloadBuffer(size);
391 if (buffer == nullptr) {
392 SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
393 return false;
394 }
395
396 memset(buffer, 0, size);
397 return true;
398 }
399
400 /*
401 * A disk exception is a simple mapping of old_chunk to new_chunk.
402 * When dm-snapshot device is created, kernel requests these mapping.
403 *
404 * Each disk exception is of size 16 bytes. Thus a single 4k page can
405 * have:
406 *
407 * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
408 * is considered a metadata page and it is represented by chunk ID.
409 *
410 * Convert the chunk ID to index into the vector which gives us
411 * the metadata page.
412 */
ReadDiskExceptions(chunk_t chunk,size_t read_size)413 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
414 uint32_t stride = exceptions_per_area_ + 1;
415 size_t size;
416 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
417
418 // ChunkID to vector index
419 lldiv_t divresult = lldiv(chunk, stride);
420
421 if (divresult.quot < vec.size()) {
422 size = exceptions_per_area_ * sizeof(struct disk_exception);
423
424 if (read_size != size) {
425 SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
426 << " does not match with size: " << size;
427 return false;
428 }
429
430 void* buffer = bufsink_.GetPayloadBuffer(size);
431 if (buffer == nullptr) {
432 SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
433 return false;
434 }
435
436 memcpy(buffer, vec[divresult.quot].get(), size);
437 } else {
438 return ZerofillDiskExceptions(read_size);
439 }
440
441 return true;
442 }
443
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)444 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
445 int* unmerged_exceptions) {
446 loff_t offset = 0;
447 *unmerged_exceptions = 0;
448
449 while (*unmerged_exceptions <= exceptions_per_area_) {
450 struct disk_exception* merged_de =
451 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
452 struct disk_exception* cow_de =
453 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
454
455 // Unmerged op by the kernel
456 if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
457 if (!(merged_de->old_chunk == cow_de->old_chunk)) {
458 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
459 << merged_de->old_chunk
460 << "cow_de->old_chunk: " << cow_de->old_chunk;
461 return -1;
462 }
463
464 if (!(merged_de->new_chunk == cow_de->new_chunk)) {
465 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
466 << merged_de->new_chunk
467 << "cow_de->new_chunk: " << cow_de->new_chunk;
468 return -1;
469 }
470
471 offset += sizeof(struct disk_exception);
472 *unmerged_exceptions += 1;
473 continue;
474 }
475
476 break;
477 }
478
479 SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
480 return offset;
481 }
482
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * ordered_op,bool * commit)483 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
484 int unmerged_exceptions, bool* ordered_op, bool* commit) {
485 int merged_ops_cur_iter = 0;
486 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
487 *ordered_op = false;
488 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
489
490 // Find the operations which are merged in this cycle.
491 while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
492 struct disk_exception* merged_de =
493 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
494 struct disk_exception* cow_de =
495 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
496
497 if (!(merged_de->new_chunk == 0)) {
498 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
499 return -1;
500 }
501
502 if (!(merged_de->old_chunk == 0)) {
503 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
504 return -1;
505 }
506
507 if (cow_de->new_chunk != 0) {
508 merged_ops_cur_iter += 1;
509 offset += sizeof(struct disk_exception);
510 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
511 std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
512 Snapuserd::compare);
513
514 if (!(it != chunk_vec.end())) {
515 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
516 return -1;
517 }
518
519 if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
520 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
521 return -1;
522 }
523 const CowOperation* cow_op = it->second;
524
525 if (snapuserd_->IsReadAheadFeaturePresent() && IsOrderedOp(*cow_op)) {
526 *ordered_op = true;
527 // Every single ordered operation has to come from read-ahead
528 // cache.
529 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
530 SNAP_LOG(ERROR)
531 << " Block: " << cow_op->new_block << " not found in read-ahead cache"
532 << " Source: " << cow_op->source;
533 return -1;
534 }
535 // If this is a final block merged in the read-ahead buffer
536 // region, notify the read-ahead thread to make forward
537 // progress
538 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
539 *commit = true;
540 }
541 }
542
543 // zero out to indicate that operation is merged.
544 cow_de->old_chunk = 0;
545 cow_de->new_chunk = 0;
546 } else if (cow_de->old_chunk == 0) {
547 // Already merged op in previous iteration or
548 // This could also represent a partially filled area.
549 //
550 // If the op was merged in previous cycle, we don't have
551 // to count them.
552 break;
553 } else {
554 SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
555 << " merged_de-old-chunk: " << merged_de->old_chunk
556 << " merged_de-new-chunk: " << merged_de->new_chunk
557 << " cow_de-old-chunk: " << cow_de->old_chunk
558 << " cow_de-new-chunk: " << cow_de->new_chunk
559 << " unmerged_exceptions: " << unmerged_exceptions
560 << " merged_ops_cur_iter: " << merged_ops_cur_iter
561 << " offset: " << offset;
562 return -1;
563 }
564 }
565 return merged_ops_cur_iter;
566 }
567
ProcessMergeComplete(chunk_t chunk,void * buffer)568 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
569 uint32_t stride = exceptions_per_area_ + 1;
570 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
571 bool ordered_op = false;
572 bool commit = false;
573
574 // ChunkID to vector index
575 lldiv_t divresult = lldiv(chunk, stride);
576
577 if (!(divresult.quot < vec.size())) {
578 SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
579 << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
580 return false;
581 }
582
583 SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
584 << " Metadata-Index: " << divresult.quot;
585
586 int unmerged_exceptions = 0;
587 loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
588
589 if (offset < 0) {
590 SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
591 << unmerged_exceptions;
592 return false;
593 }
594
595 int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
596 unmerged_exceptions, &ordered_op, &commit);
597
598 // There should be at least one operation merged in this cycle
599 if (!(merged_ops_cur_iter > 0)) {
600 SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
601 return false;
602 }
603
604 if (ordered_op) {
605 if (commit) {
606 // Push the flushing logic to read-ahead thread so that merge thread
607 // can make forward progress. Sync will happen in the background
608 snapuserd_->StartReadAhead();
609 }
610 } else {
611 // Non-copy ops and all ops in older COW format
612 if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
613 SNAP_LOG(ERROR) << "CommitMerge failed...";
614 return false;
615 }
616 }
617
618 SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
619 return true;
620 }
621
622 // Read Header from dm-user misc device. This gives
623 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()624 bool WorkerThread::ReadDmUserHeader() {
625 if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
626 if (errno != ENOTBLK) {
627 SNAP_PLOG(ERROR) << "Control-read failed";
628 }
629
630 return false;
631 }
632
633 return true;
634 }
635
636 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)637 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
638 size_t payload_size = size;
639 void* buf = bufsink_.GetPayloadBufPtr();
640 if (header_response) {
641 payload_size += sizeof(struct dm_user_header);
642 buf = bufsink_.GetBufPtr();
643 }
644
645 if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
646 SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
647 return false;
648 }
649
650 return true;
651 }
652
ReadDmUserPayload(void * buffer,size_t size)653 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
654 if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
655 SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
656 return false;
657 }
658
659 return true;
660 }
661
DmuserWriteRequest()662 bool WorkerThread::DmuserWriteRequest() {
663 struct dm_user_header* header = bufsink_.GetHeaderPtr();
664
665 // device mapper has the capability to allow
666 // targets to flush the cache when writes are completed. This
667 // is controlled by each target by a flag "flush_supported".
668 // This flag is set by dm-user. When flush is supported,
669 // a number of zero-length bio's will be submitted to
670 // the target for the purpose of flushing cache. It is the
671 // responsibility of the target driver - which is dm-user in this
672 // case, to remap these bio's to the underlying device. Since,
673 // there is no underlying device for dm-user, this zero length
674 // bio's gets routed to daemon.
675 //
676 // Flush operations are generated post merge by dm-snap by having
677 // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
678 // to flush per se; hence, just respond back with a success message.
679 if (header->sector == 0) {
680 if (!(header->len == 0)) {
681 SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
682 header->type = DM_USER_RESP_ERROR;
683 } else {
684 header->type = DM_USER_RESP_SUCCESS;
685 }
686
687 if (!WriteDmUserPayload(0, true)) {
688 return false;
689 }
690 return true;
691 }
692
693 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
694 size_t remaining_size = header->len;
695 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
696
697 chunk_t chunk = SectorToChunk(header->sector);
698 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
699 std::make_pair(header->sector, nullptr), Snapuserd::compare);
700
701 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
702
703 if (not_found) {
704 void* buffer = bufsink_.GetPayloadBuffer(read_size);
705 if (buffer == nullptr) {
706 SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
707 << read_size;
708 header->type = DM_USER_RESP_ERROR;
709 } else {
710 header->type = DM_USER_RESP_SUCCESS;
711
712 if (!ReadDmUserPayload(buffer, read_size)) {
713 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
714 << "Sector: " << header->sector;
715 header->type = DM_USER_RESP_ERROR;
716 }
717
718 if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
719 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
720 << "Sector: " << header->sector;
721 header->type = DM_USER_RESP_ERROR;
722 }
723 }
724 } else {
725 SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
726 header->type = DM_USER_RESP_ERROR;
727 }
728
729 if (!WriteDmUserPayload(0, true)) {
730 return false;
731 }
732
733 return true;
734 }
735
DmuserReadRequest()736 bool WorkerThread::DmuserReadRequest() {
737 struct dm_user_header* header = bufsink_.GetHeaderPtr();
738 size_t remaining_size = header->len;
739 loff_t offset = 0;
740 sector_t sector = header->sector;
741 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
742 bool header_response = true;
743 do {
744 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
745
746 int ret = read_size;
747 header->type = DM_USER_RESP_SUCCESS;
748 chunk_t chunk = SectorToChunk(header->sector);
749
750 // Request to sector 0 is always for kernel
751 // representation of COW header. This IO should be only
752 // once during dm-snapshot device creation. We should
753 // never see multiple IO requests. Additionally this IO
754 // will always be a single 4k.
755 if (header->sector == 0) {
756 if (read_size == BLOCK_SZ) {
757 ConstructKernelCowHeader();
758 SNAP_LOG(DEBUG) << "Kernel header constructed";
759 } else {
760 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
761 header->type = DM_USER_RESP_ERROR;
762 }
763 } else {
764 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
765 std::make_pair(header->sector, nullptr), Snapuserd::compare);
766 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
767 if (!offset && (read_size == BLOCK_SZ) && not_found) {
768 if (!ReadDiskExceptions(chunk, read_size)) {
769 SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
770 << "Sector: " << header->sector;
771 header->type = DM_USER_RESP_ERROR;
772 } else {
773 SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
774 << "Sector: " << header->sector;
775 }
776 } else {
777 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
778
779 ret = ReadData(sector + num_sectors_read, read_size);
780 if (ret < 0) {
781 SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
782 << " Sector: " << (sector + num_sectors_read)
783 << " size: " << read_size << " header-len: " << header->len;
784 header->type = DM_USER_RESP_ERROR;
785 } else {
786 SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
787 << "Sector: " << header->sector;
788 }
789 }
790 }
791
792 // Just return the header if it is an error
793 if (header->type == DM_USER_RESP_ERROR) {
794 SNAP_LOG(ERROR) << "IO read request failed...";
795 ret = 0;
796 }
797
798 if (!header_response) {
799 CHECK(header->type == DM_USER_RESP_SUCCESS)
800 << " failed for sector: " << sector << " header->len: " << header->len
801 << " remaining_size: " << remaining_size;
802 }
803
804 // Daemon will not be terminated if there is any error. We will
805 // just send the error back to dm-user.
806 if (!WriteDmUserPayload(ret, header_response)) {
807 return false;
808 }
809
810 if (header->type == DM_USER_RESP_ERROR) {
811 break;
812 }
813
814 remaining_size -= ret;
815 offset += ret;
816 header_response = false;
817 } while (remaining_size > 0);
818
819 return true;
820 }
821
InitializeBufsink()822 void WorkerThread::InitializeBufsink() {
823 // Allocate the buffer which is used to communicate between
824 // daemon and dm-user. The buffer comprises of header and a fixed payload.
825 // If the dm-user requests a big IO, the IO will be broken into chunks
826 // of PAYLOAD_SIZE.
827 size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
828 bufsink_.Initialize(buf_size);
829 }
830
RunThread()831 bool WorkerThread::RunThread() {
832 InitializeBufsink();
833 xorsink_.Initialize(&bufsink_, BLOCK_SZ);
834
835 if (!InitializeFds()) {
836 return false;
837 }
838
839 if (!InitReader()) {
840 return false;
841 }
842
843 // Start serving IO
844 while (true) {
845 if (!ProcessIORequest()) {
846 break;
847 }
848 }
849
850 CloseFds();
851 reader_->CloseCowFd();
852
853 return true;
854 }
855
ProcessIORequest()856 bool WorkerThread::ProcessIORequest() {
857 struct dm_user_header* header = bufsink_.GetHeaderPtr();
858
859 if (!ReadDmUserHeader()) {
860 return false;
861 }
862
863 SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
864 SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
865 SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
866 SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
867 SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
868
869 switch (header->type) {
870 case DM_USER_REQ_MAP_READ: {
871 if (!DmuserReadRequest()) {
872 return false;
873 }
874 break;
875 }
876
877 case DM_USER_REQ_MAP_WRITE: {
878 if (!DmuserWriteRequest()) {
879 return false;
880 }
881 break;
882 }
883 }
884
885 return true;
886 }
887
888 } // namespace snapshot
889 } // namespace android
890