• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG INCREMENTAL
18 
19 #include "incremental_server.h"
20 
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36 
37 #include "adb.h"
38 #include "adb_io.h"
39 #include "adb_trace.h"
40 #include "adb_unique_fd.h"
41 #include "adb_utils.h"
42 #include "incremental_utils.h"
43 #include "sysdeps.h"
44 
45 namespace incremental {
46 
47 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
48 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
49 static constexpr int8_t kTypeData = 0;
50 static constexpr int8_t kTypeHash = 1;
51 static constexpr int8_t kCompressionNone = 0;
52 static constexpr int8_t kCompressionLZ4 = 1;
53 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
54 static constexpr auto kReadBufferSize = 128 * 1024;
55 static constexpr int kPollTimeoutMillis = 300000;  // 5 minutes
56 
57 using BlockSize = int16_t;
58 using FileId = int16_t;
59 using BlockIdx = int32_t;
60 using NumBlocks = int32_t;
61 using BlockType = int8_t;
62 using CompressionType = int8_t;
63 using RequestType = int16_t;
64 using ChunkHeader = int32_t;
65 using MagicType = uint32_t;
66 
67 static constexpr MagicType INCR = 0x494e4352;  // LE INCR
68 
69 static constexpr RequestType SERVING_COMPLETE = 0;
70 static constexpr RequestType BLOCK_MISSING = 1;
71 static constexpr RequestType PREFETCH = 2;
72 static constexpr RequestType DESTROY = 3;
73 
roundDownToBlockOffset(int64_t val)74 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
75     return val & ~(kBlockSize - 1);
76 }
77 
roundUpToBlockOffset(int64_t val)78 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
79     return roundDownToBlockOffset(val + kBlockSize - 1);
80 }
81 
numBytesToNumBlocks(int64_t bytes)82 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
83     return roundUpToBlockOffset(bytes) / kBlockSize;
84 }
85 
blockIndexToOffset(BlockIdx blockIdx)86 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
87     return static_cast<off64_t>(blockIdx) * kBlockSize;
88 }
89 
90 template <typename T>
toBigEndian(T t)91 static inline constexpr T toBigEndian(T t) {
92     using unsigned_type = std::make_unsigned_t<T>;
93     if constexpr (std::is_same_v<T, int16_t>) {
94         return htobe16(static_cast<unsigned_type>(t));
95     } else if constexpr (std::is_same_v<T, int32_t>) {
96         return htobe32(static_cast<unsigned_type>(t));
97     } else if constexpr (std::is_same_v<T, int64_t>) {
98         return htobe64(static_cast<unsigned_type>(t));
99     } else {
100         return t;
101     }
102 }
103 
104 template <typename T>
readBigEndian(void * data)105 static inline constexpr T readBigEndian(void* data) {
106     using unsigned_type = std::make_unsigned_t<T>;
107     if constexpr (std::is_same_v<T, int16_t>) {
108         return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
109     } else if constexpr (std::is_same_v<T, int32_t>) {
110         return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
111     } else if constexpr (std::is_same_v<T, int64_t>) {
112         return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
113     } else {
114         return T();
115     }
116 }
117 
118 // Received from device
119 // !Does not include magic!
120 struct RequestCommand {
121     RequestType request_type;  // 2 bytes
122     FileId file_id;            // 2 bytes
123     union {
124         BlockIdx block_idx;
125         NumBlocks num_blocks;
126     };  // 4 bytes
127 } __attribute__((packed));
128 
129 // Placed before actual data bytes of each block
130 struct ResponseHeader {
131     FileId file_id;                    // 2 bytes
132     BlockType block_type;              // 1 byte
133     CompressionType compression_type;  // 1 byte
134     BlockIdx block_idx;                // 4 bytes
135     BlockSize block_size;              // 2 bytes
136 
responseSizeForincremental::ResponseHeader137     static constexpr size_t responseSizeFor(size_t dataSize) {
138         return dataSize + sizeof(ResponseHeader);
139     }
140 } __attribute__((packed));
141 
142 template <size_t Size = kBlockSize>
143 struct BlockBuffer {
144     ResponseHeader header;
145     char data[Size];
146 } __attribute__((packed));
147 
148 // Holds streaming state for a file
149 class File {
150   public:
151     // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)152     File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
153          unique_fd tree_fd)
154         : File(filepath, id, size, tree_offset) {
155         this->fd_ = std::move(fd);
156         this->tree_fd_ = std::move(tree_fd);
157         priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
158     }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const159     int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
160         int64_t bytes_read = -1;
161         const off64_t offsetStart = blockIndexToOffset(block_idx);
162         bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
163         return bytes_read;
164     }
ReadTreeBlock(BlockIdx block_idx,void * buf) const165     int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
166         int64_t bytes_read = -1;
167         const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
168         bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
169         return bytes_read;
170     }
171 
PriorityBlocks() const172     const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
173 
174     std::vector<bool> sentBlocks;
175     NumBlocks sentBlocksCount = 0;
176 
177     std::vector<bool> sentTreeBlocks;
178 
179     const char* const filepath;
180     const FileId id;
181     const int64_t size;
182 
183   private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)184     File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
185         : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
186         sentBlocks.resize(numBytesToNumBlocks(size));
187         sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
188     }
189     unique_fd fd_;
190     std::vector<BlockIdx> priority_blocks_;
191 
192     unique_fd tree_fd_;
193     const int64_t tree_offset_;
194 };
195 
196 class IncrementalServer {
197   public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)198     IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
199         : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
200         buffer_.reserve(kReadBufferSize);
201         pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
202         pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
203     }
204 
205     bool Serve();
206 
207   private:
208     struct PrefetchState {
209         const File* file;
210         BlockIdx overallIndex = 0;
211         BlockIdx overallEnd = 0;
212         BlockIdx priorityIndex = 0;
213 
PrefetchStateincremental::IncrementalServer::PrefetchState214         explicit PrefetchState(const File& f, BlockIdx start, int count)
215             : file(&f),
216               overallIndex(start),
217               overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
218 
PrefetchStateincremental::IncrementalServer::PrefetchState219         explicit PrefetchState(const File& f)
220             : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
221 
doneincremental::IncrementalServer::PrefetchState222         bool done() const {
223             const bool overallSent = (overallIndex >= overallEnd);
224             if (file->PriorityBlocks().empty()) {
225                 return overallSent;
226             }
227             return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
228         }
229     };
230 
231     bool SkipToRequest(void* buffer, size_t* size, bool blocking);
232     std::optional<RequestCommand> ReadRequest(bool blocking);
233 
erase_buffer_head(int count)234     void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
235 
236     enum class SendResult { Sent, Skipped, Error };
237     SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
238 
239     bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
240     bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
241 
242     bool SendDone();
243     void RunPrefetching();
244 
245     void Send(const void* data, size_t size, bool flush);
246     void Flush();
247     using TimePoint = decltype(std::chrono::high_resolution_clock::now());
248     bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
249 
250     unique_fd const adb_fd_;
251     unique_fd const output_fd_;
252     std::vector<File> files_;
253 
254     // Incoming data buffer.
255     std::vector<char> buffer_;
256 
257     std::deque<PrefetchState> prefetches_;
258     int compressed_ = 0, uncompressed_ = 0;
259     long long sentSize_ = 0;
260 
261     static constexpr auto kChunkFlushSize = 31 * kBlockSize;
262 
263     std::vector<char> pendingBlocksBuffer_;
264     char* pendingBlocks_ = nullptr;
265 
266     // True when client notifies that all the data has been received
267     bool servingComplete_ = false;
268 };
269 
SkipToRequest(void * buffer,size_t * size,bool blocking)270 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
271     while (true) {
272         // Looking for INCR magic.
273         bool magic_found = false;
274         int bcur = 0;
275         int bsize = buffer_.size();
276         for (bcur = 0; bcur + 4 < bsize; ++bcur) {
277             uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
278             if (magic == INCR) {
279                 magic_found = true;
280                 break;
281             }
282         }
283 
284         if (bcur > 0) {
285             // output the rest.
286             (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
287             erase_buffer_head(bcur);
288         }
289 
290         if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
291             // fine, return
292             memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
293             erase_buffer_head(*size + sizeof(INCR));
294             return true;
295         }
296 
297         adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
298         auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
299 
300         if (res != 1) {
301             auto err = errno;
302             (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
303             if (res < 0) {
304                 D("Failed to poll: %s", strerror(err));
305                 return false;
306             }
307             if (blocking) {
308                 fprintf(stderr, "Timed out waiting for data from device.\n");
309             }
310             if (blocking && servingComplete_) {
311                 // timeout waiting from client. Serving is complete, so quit.
312                 return false;
313             }
314             *size = 0;
315             return true;
316         }
317 
318         bsize = buffer_.size();
319         buffer_.resize(kReadBufferSize);
320         int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
321         if (r > 0) {
322             buffer_.resize(bsize + r);
323             continue;
324         }
325 
326         D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
327         break;
328     }
329     // socket is closed. print remaining messages
330     WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
331     return false;
332 }
333 
ReadRequest(bool blocking)334 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
335     uint8_t commandBuf[sizeof(RequestCommand)];
336     auto size = sizeof(commandBuf);
337     if (!SkipToRequest(&commandBuf, &size, blocking)) {
338         return {{DESTROY}};
339     }
340     if (size < sizeof(RequestCommand)) {
341         return {};
342     }
343     RequestCommand request;
344     request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
345     request.file_id = readBigEndian<FileId>(&commandBuf[2]);
346     request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
347     return request;
348 }
349 
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)350 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
351     auto& file = files_[fileId];
352     const int32_t data_block_count = numBytesToNumBlocks(file.size);
353 
354     const int32_t total_nodes_count(file.sentTreeBlocks.size());
355     const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
356 
357     const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
358 
359     // Leaf level, sending only 1 block.
360     const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
361     if (file.sentTreeBlocks[leaf_idx]) {
362         return true;
363     }
364     if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
365         return false;
366     }
367     file.sentTreeBlocks[leaf_idx] = true;
368 
369     // Non-leaf, sending EVERYTHING. This should be done only once.
370     if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
371         return true;
372     }
373 
374     for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
375         if (!SendTreeBlock(fileId, blockIdx, i)) {
376             return false;
377         }
378         file.sentTreeBlocks[i] = true;
379     }
380     return true;
381 }
382 
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)383 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
384     const auto& file = files_[fileId];
385 
386     BlockBuffer buffer;
387     const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
388     if (bytesRead <= 0) {
389         fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
390                 blockIdx);
391         return false;
392     }
393 
394     buffer.header.compression_type = kCompressionNone;
395     buffer.header.block_type = kTypeHash;
396     buffer.header.file_id = toBigEndian(fileId);
397     buffer.header.block_size = toBigEndian(int16_t(bytesRead));
398     buffer.header.block_idx = toBigEndian(blockIdx);
399 
400     Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
401 
402     return true;
403 }
404 
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)405 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
406     auto& file = files_[fileId];
407     if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
408         // may happen as we schedule some extra blocks for reported page misses
409         D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
410         return SendResult::Skipped;
411     }
412     if (file.sentBlocks[blockIdx]) {
413         return SendResult::Skipped;
414     }
415 
416     if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
417         return SendResult::Error;
418     }
419 
420     BlockBuffer raw;
421     bool isZipCompressed = false;
422     const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
423     if (bytesRead < 0) {
424         fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
425                 errno);
426         return SendResult::Error;
427     }
428 
429     BlockBuffer<kCompressBound> compressed;
430     int16_t compressedSize = 0;
431     if (!isZipCompressed) {
432         compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
433     }
434     int16_t blockSize;
435     ResponseHeader* header;
436     if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
437         ++compressed_;
438         blockSize = compressedSize;
439         header = &compressed.header;
440         header->compression_type = kCompressionLZ4;
441     } else {
442         ++uncompressed_;
443         blockSize = bytesRead;
444         header = &raw.header;
445         header->compression_type = kCompressionNone;
446     }
447 
448     header->block_type = kTypeData;
449     header->file_id = toBigEndian(fileId);
450     header->block_size = toBigEndian(blockSize);
451     header->block_idx = toBigEndian(blockIdx);
452 
453     file.sentBlocks[blockIdx] = true;
454     file.sentBlocksCount += 1;
455     Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
456 
457     return SendResult::Sent;
458 }
459 
SendDone()460 bool IncrementalServer::SendDone() {
461     ResponseHeader header;
462     header.file_id = -1;
463     header.block_type = 0;
464     header.compression_type = 0;
465     header.block_idx = 0;
466     header.block_size = 0;
467     Send(&header, sizeof(header), true);
468     return true;
469 }
470 
RunPrefetching()471 void IncrementalServer::RunPrefetching() {
472     constexpr auto kPrefetchBlocksPerIteration = 128;
473 
474     int blocksToSend = kPrefetchBlocksPerIteration;
475     while (!prefetches_.empty() && blocksToSend > 0) {
476         auto& prefetch = prefetches_.front();
477         const auto& file = *prefetch.file;
478         const auto& priority_blocks = file.PriorityBlocks();
479         if (!priority_blocks.empty()) {
480             for (auto& i = prefetch.priorityIndex;
481                  blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
482                 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
483                     res == SendResult::Sent) {
484                     --blocksToSend;
485                 } else if (res == SendResult::Error) {
486                     fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
487                 }
488             }
489         }
490         for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
491             if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
492                 --blocksToSend;
493             } else if (res == SendResult::Error) {
494                 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
495             }
496         }
497         if (prefetch.done()) {
498             prefetches_.pop_front();
499         }
500     }
501 }
502 
Send(const void * data,size_t size,bool flush)503 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
504     pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
505     if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
506         Flush();
507     }
508 }
509 
Flush()510 void IncrementalServer::Flush() {
511     auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
512     if (dataBytes == 0) {
513         return;
514     }
515 
516     *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
517     auto totalBytes = sizeof(ChunkHeader) + dataBytes;
518     if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
519         fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
520     }
521     sentSize_ += totalBytes;
522     pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
523 }
524 
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)525 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
526                                         int missesSent) {
527     servingComplete_ = true;
528     using namespace std::chrono;
529     auto endTime = high_resolution_clock::now();
530     D("Streaming completed.\n"
531       "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
532       "%d, mb: %.3f\n"
533       "Total time taken: %.3fms",
534       missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
535       duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
536     return true;
537 }
538 
Serve()539 bool IncrementalServer::Serve() {
540     // Initial handshake to verify connection is still alive
541     if (!SendOkay(adb_fd_)) {
542         fprintf(stderr, "Connection is dead. Abort.\n");
543         return false;
544     }
545 
546     std::unordered_set<FileId> prefetchedFiles;
547     bool doneSent = false;
548     int missesCount = 0;
549     int missesSent = 0;
550 
551     using namespace std::chrono;
552     std::optional<TimePoint> startTime;
553 
554     while (true) {
555         if (!doneSent && prefetches_.empty() &&
556             std::all_of(files_.begin(), files_.end(), [](const File& f) {
557                 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
558             })) {
559             fprintf(stderr, "All files should be loaded. Notifying the device.\n");
560             SendDone();
561             doneSent = true;
562         }
563 
564         const bool blocking = prefetches_.empty();
565         if (blocking) {
566             // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
567             Flush();
568         }
569         auto request = ReadRequest(blocking);
570 
571         if (!startTime) {
572             startTime = high_resolution_clock::now();
573         }
574 
575         if (request) {
576             FileId fileId = request->file_id;
577             BlockIdx blockIdx = request->block_idx;
578 
579             switch (request->request_type) {
580                 case DESTROY: {
581                     // Stop everything.
582                     return true;
583                 }
584                 case SERVING_COMPLETE: {
585                     // Not stopping the server here.
586                     ServingComplete(startTime, missesCount, missesSent);
587                     break;
588                 }
589                 case BLOCK_MISSING: {
590                     ++missesCount;
591                     // Sends one single block ASAP.
592                     if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
593                         blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
594                         fprintf(stderr,
595                                 "Received invalid data request for file_id %" PRId16
596                                 " block_idx %" PRId32 ".\n",
597                                 fileId, blockIdx);
598                         break;
599                     }
600 
601                     if (VLOG_IS_ON(INCREMENTAL)) {
602                         auto& file = files_[fileId];
603                         auto posP = std::find(file.PriorityBlocks().begin(),
604                                               file.PriorityBlocks().end(), blockIdx);
605                         D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
606                           (int)fileId, (int)blockIdx,
607                           posP == file.PriorityBlocks().end()
608                                   ? -1
609                                   : int(posP - file.PriorityBlocks().begin()),
610                           int(file.PriorityBlocks().size()));
611                     }
612 
613                     if (auto res = SendDataBlock(fileId, blockIdx, true);
614                         res == SendResult::Error) {
615                         fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
616                     } else if (res == SendResult::Sent) {
617                         ++missesSent;
618                         // Make sure we send more pages from this place onward, in case if the OS is
619                         // reading a bigger block.
620                         prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
621                     }
622                     break;
623                 }
624                 case PREFETCH: {
625                     // Start prefetching for a file
626                     if (fileId < 0) {
627                         fprintf(stderr,
628                                 "Received invalid prefetch request for file_id %" PRId16 "\n",
629                                 fileId);
630                         break;
631                     }
632                     if (!prefetchedFiles.insert(fileId).second) {
633                         fprintf(stderr,
634                                 "Received duplicate prefetch request for file_id %" PRId16 "\n",
635                                 fileId);
636                         break;
637                     }
638                     D("Received prefetch request for file_id %" PRId16 ".", fileId);
639                     prefetches_.emplace_back(files_[fileId]);
640                     break;
641                 }
642                 default:
643                     fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
644                             request->request_type, fileId, blockIdx);
645                     break;
646             }
647         }
648 
649         RunPrefetching();
650     }
651 }
652 
open_fd(const char * filepath)653 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
654     struct stat st;
655     if (stat(filepath, &st)) {
656         error_exit("inc-server: failed to stat input file '%s'.", filepath);
657     }
658 
659     unique_fd fd(adb_open(filepath, O_RDONLY));
660     if (fd < 0) {
661         error_exit("inc-server: failed to open file '%s'.", filepath);
662     }
663 
664     return {std::move(fd), st.st_size};
665 }
666 
open_signature(int64_t file_size,const char * filepath)667 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
668     std::string signature_file(filepath);
669     signature_file += IDSIG;
670 
671     unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
672     if (fd < 0) {
673         error_exit("inc-server: failed to open file '%s'.", signature_file.c_str());
674     }
675 
676     auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
677     if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
678         error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
679                    signature_file.c_str(), (long long)tree_size, (long long)expected);
680     }
681 
682     int32_t data_block_count = numBytesToNumBlocks(file_size);
683     int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
684     D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
685       int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
686 
687     return {std::move(fd), tree_offset};
688 }
689 
serve(int connection_fd,int output_fd,int argc,const char ** argv)690 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
691     auto connection_ufd = unique_fd(connection_fd);
692     auto output_ufd = unique_fd(output_fd);
693     if (argc <= 0) {
694         error_exit("inc-server: must specify at least one file.");
695     }
696 
697     std::vector<File> files;
698     files.reserve(argc);
699     for (int i = 0; i < argc; ++i) {
700         auto filepath = argv[i];
701 
702         auto [file_fd, file_size] = open_fd(filepath);
703         auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
704 
705         files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
706                            std::move(sign_fd));
707     }
708 
709     IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
710     printf("Serving...\n");
711     fclose(stdin);
712     fclose(stdout);
713     return server.Serve();
714 }
715 
716 }  // namespace incremental
717