1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG INCREMENTAL
18
19 #include "incremental_server.h"
20
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36
37 #include "adb.h"
38 #include "adb_io.h"
39 #include "adb_trace.h"
40 #include "adb_unique_fd.h"
41 #include "adb_utils.h"
42 #include "incremental_utils.h"
43 #include "sysdeps.h"
44
45 namespace incremental {
46
47 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
48 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
49 static constexpr int8_t kTypeData = 0;
50 static constexpr int8_t kTypeHash = 1;
51 static constexpr int8_t kCompressionNone = 0;
52 static constexpr int8_t kCompressionLZ4 = 1;
53 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
54 static constexpr auto kReadBufferSize = 128 * 1024;
55 static constexpr int kPollTimeoutMillis = 300000; // 5 minutes
56
57 using BlockSize = int16_t;
58 using FileId = int16_t;
59 using BlockIdx = int32_t;
60 using NumBlocks = int32_t;
61 using BlockType = int8_t;
62 using CompressionType = int8_t;
63 using RequestType = int16_t;
64 using ChunkHeader = int32_t;
65 using MagicType = uint32_t;
66
67 static constexpr MagicType INCR = 0x494e4352; // LE INCR
68
69 static constexpr RequestType SERVING_COMPLETE = 0;
70 static constexpr RequestType BLOCK_MISSING = 1;
71 static constexpr RequestType PREFETCH = 2;
72 static constexpr RequestType DESTROY = 3;
73
roundDownToBlockOffset(int64_t val)74 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
75 return val & ~(kBlockSize - 1);
76 }
77
roundUpToBlockOffset(int64_t val)78 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
79 return roundDownToBlockOffset(val + kBlockSize - 1);
80 }
81
numBytesToNumBlocks(int64_t bytes)82 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
83 return roundUpToBlockOffset(bytes) / kBlockSize;
84 }
85
blockIndexToOffset(BlockIdx blockIdx)86 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
87 return static_cast<off64_t>(blockIdx) * kBlockSize;
88 }
89
90 template <typename T>
toBigEndian(T t)91 static inline constexpr T toBigEndian(T t) {
92 using unsigned_type = std::make_unsigned_t<T>;
93 if constexpr (std::is_same_v<T, int16_t>) {
94 return htobe16(static_cast<unsigned_type>(t));
95 } else if constexpr (std::is_same_v<T, int32_t>) {
96 return htobe32(static_cast<unsigned_type>(t));
97 } else if constexpr (std::is_same_v<T, int64_t>) {
98 return htobe64(static_cast<unsigned_type>(t));
99 } else {
100 return t;
101 }
102 }
103
104 template <typename T>
readBigEndian(void * data)105 static inline constexpr T readBigEndian(void* data) {
106 using unsigned_type = std::make_unsigned_t<T>;
107 if constexpr (std::is_same_v<T, int16_t>) {
108 return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
109 } else if constexpr (std::is_same_v<T, int32_t>) {
110 return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
111 } else if constexpr (std::is_same_v<T, int64_t>) {
112 return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
113 } else {
114 return T();
115 }
116 }
117
118 // Received from device
119 // !Does not include magic!
120 struct RequestCommand {
121 RequestType request_type; // 2 bytes
122 FileId file_id; // 2 bytes
123 union {
124 BlockIdx block_idx;
125 NumBlocks num_blocks;
126 }; // 4 bytes
127 } __attribute__((packed));
128
129 // Placed before actual data bytes of each block
130 struct ResponseHeader {
131 FileId file_id; // 2 bytes
132 BlockType block_type; // 1 byte
133 CompressionType compression_type; // 1 byte
134 BlockIdx block_idx; // 4 bytes
135 BlockSize block_size; // 2 bytes
136
responseSizeForincremental::ResponseHeader137 static constexpr size_t responseSizeFor(size_t dataSize) {
138 return dataSize + sizeof(ResponseHeader);
139 }
140 } __attribute__((packed));
141
142 template <size_t Size = kBlockSize>
143 struct BlockBuffer {
144 ResponseHeader header;
145 char data[Size];
146 } __attribute__((packed));
147
148 // Holds streaming state for a file
149 class File {
150 public:
151 // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)152 File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
153 unique_fd tree_fd)
154 : File(filepath, id, size, tree_offset) {
155 this->fd_ = std::move(fd);
156 this->tree_fd_ = std::move(tree_fd);
157 priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
158 }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const159 int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
160 int64_t bytes_read = -1;
161 const off64_t offsetStart = blockIndexToOffset(block_idx);
162 bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
163 return bytes_read;
164 }
ReadTreeBlock(BlockIdx block_idx,void * buf) const165 int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
166 int64_t bytes_read = -1;
167 const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
168 bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
169 return bytes_read;
170 }
171
PriorityBlocks() const172 const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
173
174 std::vector<bool> sentBlocks;
175 NumBlocks sentBlocksCount = 0;
176
177 std::vector<bool> sentTreeBlocks;
178
179 const char* const filepath;
180 const FileId id;
181 const int64_t size;
182
183 private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)184 File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
185 : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
186 sentBlocks.resize(numBytesToNumBlocks(size));
187 sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
188 }
189 unique_fd fd_;
190 std::vector<BlockIdx> priority_blocks_;
191
192 unique_fd tree_fd_;
193 const int64_t tree_offset_;
194 };
195
196 class IncrementalServer {
197 public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)198 IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
199 : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
200 buffer_.reserve(kReadBufferSize);
201 pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
202 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
203 }
204
205 bool Serve();
206
207 private:
208 struct PrefetchState {
209 const File* file;
210 BlockIdx overallIndex = 0;
211 BlockIdx overallEnd = 0;
212 BlockIdx priorityIndex = 0;
213
PrefetchStateincremental::IncrementalServer::PrefetchState214 explicit PrefetchState(const File& f, BlockIdx start, int count)
215 : file(&f),
216 overallIndex(start),
217 overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
218
PrefetchStateincremental::IncrementalServer::PrefetchState219 explicit PrefetchState(const File& f)
220 : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
221
doneincremental::IncrementalServer::PrefetchState222 bool done() const {
223 const bool overallSent = (overallIndex >= overallEnd);
224 if (file->PriorityBlocks().empty()) {
225 return overallSent;
226 }
227 return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
228 }
229 };
230
231 bool SkipToRequest(void* buffer, size_t* size, bool blocking);
232 std::optional<RequestCommand> ReadRequest(bool blocking);
233
erase_buffer_head(int count)234 void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
235
236 enum class SendResult { Sent, Skipped, Error };
237 SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
238
239 bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
240 bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
241
242 bool SendDone();
243 void RunPrefetching();
244
245 void Send(const void* data, size_t size, bool flush);
246 void Flush();
247 using TimePoint = decltype(std::chrono::high_resolution_clock::now());
248 bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
249
250 unique_fd const adb_fd_;
251 unique_fd const output_fd_;
252 std::vector<File> files_;
253
254 // Incoming data buffer.
255 std::vector<char> buffer_;
256
257 std::deque<PrefetchState> prefetches_;
258 int compressed_ = 0, uncompressed_ = 0;
259 long long sentSize_ = 0;
260
261 static constexpr auto kChunkFlushSize = 31 * kBlockSize;
262
263 std::vector<char> pendingBlocksBuffer_;
264 char* pendingBlocks_ = nullptr;
265
266 // True when client notifies that all the data has been received
267 bool servingComplete_ = false;
268 };
269
SkipToRequest(void * buffer,size_t * size,bool blocking)270 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
271 while (true) {
272 // Looking for INCR magic.
273 bool magic_found = false;
274 int bcur = 0;
275 int bsize = buffer_.size();
276 for (bcur = 0; bcur + 4 < bsize; ++bcur) {
277 uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
278 if (magic == INCR) {
279 magic_found = true;
280 break;
281 }
282 }
283
284 if (bcur > 0) {
285 // output the rest.
286 (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
287 erase_buffer_head(bcur);
288 }
289
290 if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
291 // fine, return
292 memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
293 erase_buffer_head(*size + sizeof(INCR));
294 return true;
295 }
296
297 adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
298 auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
299
300 if (res != 1) {
301 auto err = errno;
302 (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
303 if (res < 0) {
304 D("Failed to poll: %s", strerror(err));
305 return false;
306 }
307 if (blocking) {
308 fprintf(stderr, "Timed out waiting for data from device.\n");
309 }
310 if (blocking && servingComplete_) {
311 // timeout waiting from client. Serving is complete, so quit.
312 return false;
313 }
314 *size = 0;
315 return true;
316 }
317
318 bsize = buffer_.size();
319 buffer_.resize(kReadBufferSize);
320 int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
321 if (r > 0) {
322 buffer_.resize(bsize + r);
323 continue;
324 }
325
326 D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
327 break;
328 }
329 // socket is closed. print remaining messages
330 WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
331 return false;
332 }
333
ReadRequest(bool blocking)334 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
335 uint8_t commandBuf[sizeof(RequestCommand)];
336 auto size = sizeof(commandBuf);
337 if (!SkipToRequest(&commandBuf, &size, blocking)) {
338 return {{DESTROY}};
339 }
340 if (size < sizeof(RequestCommand)) {
341 return {};
342 }
343 RequestCommand request;
344 request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
345 request.file_id = readBigEndian<FileId>(&commandBuf[2]);
346 request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
347 return request;
348 }
349
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)350 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
351 auto& file = files_[fileId];
352 const int32_t data_block_count = numBytesToNumBlocks(file.size);
353
354 const int32_t total_nodes_count(file.sentTreeBlocks.size());
355 const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
356
357 const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
358
359 // Leaf level, sending only 1 block.
360 const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
361 if (file.sentTreeBlocks[leaf_idx]) {
362 return true;
363 }
364 if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
365 return false;
366 }
367 file.sentTreeBlocks[leaf_idx] = true;
368
369 // Non-leaf, sending EVERYTHING. This should be done only once.
370 if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
371 return true;
372 }
373
374 for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
375 if (!SendTreeBlock(fileId, blockIdx, i)) {
376 return false;
377 }
378 file.sentTreeBlocks[i] = true;
379 }
380 return true;
381 }
382
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)383 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
384 const auto& file = files_[fileId];
385
386 BlockBuffer buffer;
387 const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
388 if (bytesRead <= 0) {
389 fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
390 blockIdx);
391 return false;
392 }
393
394 buffer.header.compression_type = kCompressionNone;
395 buffer.header.block_type = kTypeHash;
396 buffer.header.file_id = toBigEndian(fileId);
397 buffer.header.block_size = toBigEndian(int16_t(bytesRead));
398 buffer.header.block_idx = toBigEndian(blockIdx);
399
400 Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
401
402 return true;
403 }
404
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)405 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
406 auto& file = files_[fileId];
407 if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
408 // may happen as we schedule some extra blocks for reported page misses
409 D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
410 return SendResult::Skipped;
411 }
412 if (file.sentBlocks[blockIdx]) {
413 return SendResult::Skipped;
414 }
415
416 if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
417 return SendResult::Error;
418 }
419
420 BlockBuffer raw;
421 bool isZipCompressed = false;
422 const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
423 if (bytesRead < 0) {
424 fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
425 errno);
426 return SendResult::Error;
427 }
428
429 BlockBuffer<kCompressBound> compressed;
430 int16_t compressedSize = 0;
431 if (!isZipCompressed) {
432 compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
433 }
434 int16_t blockSize;
435 ResponseHeader* header;
436 if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
437 ++compressed_;
438 blockSize = compressedSize;
439 header = &compressed.header;
440 header->compression_type = kCompressionLZ4;
441 } else {
442 ++uncompressed_;
443 blockSize = bytesRead;
444 header = &raw.header;
445 header->compression_type = kCompressionNone;
446 }
447
448 header->block_type = kTypeData;
449 header->file_id = toBigEndian(fileId);
450 header->block_size = toBigEndian(blockSize);
451 header->block_idx = toBigEndian(blockIdx);
452
453 file.sentBlocks[blockIdx] = true;
454 file.sentBlocksCount += 1;
455 Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
456
457 return SendResult::Sent;
458 }
459
SendDone()460 bool IncrementalServer::SendDone() {
461 ResponseHeader header;
462 header.file_id = -1;
463 header.block_type = 0;
464 header.compression_type = 0;
465 header.block_idx = 0;
466 header.block_size = 0;
467 Send(&header, sizeof(header), true);
468 return true;
469 }
470
RunPrefetching()471 void IncrementalServer::RunPrefetching() {
472 constexpr auto kPrefetchBlocksPerIteration = 128;
473
474 int blocksToSend = kPrefetchBlocksPerIteration;
475 while (!prefetches_.empty() && blocksToSend > 0) {
476 auto& prefetch = prefetches_.front();
477 const auto& file = *prefetch.file;
478 const auto& priority_blocks = file.PriorityBlocks();
479 if (!priority_blocks.empty()) {
480 for (auto& i = prefetch.priorityIndex;
481 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
482 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
483 res == SendResult::Sent) {
484 --blocksToSend;
485 } else if (res == SendResult::Error) {
486 fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
487 }
488 }
489 }
490 for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
491 if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
492 --blocksToSend;
493 } else if (res == SendResult::Error) {
494 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
495 }
496 }
497 if (prefetch.done()) {
498 prefetches_.pop_front();
499 }
500 }
501 }
502
Send(const void * data,size_t size,bool flush)503 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
504 pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
505 if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
506 Flush();
507 }
508 }
509
Flush()510 void IncrementalServer::Flush() {
511 auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
512 if (dataBytes == 0) {
513 return;
514 }
515
516 *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
517 auto totalBytes = sizeof(ChunkHeader) + dataBytes;
518 if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
519 fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
520 }
521 sentSize_ += totalBytes;
522 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
523 }
524
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)525 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
526 int missesSent) {
527 servingComplete_ = true;
528 using namespace std::chrono;
529 auto endTime = high_resolution_clock::now();
530 D("Streaming completed.\n"
531 "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
532 "%d, mb: %.3f\n"
533 "Total time taken: %.3fms",
534 missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
535 duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
536 return true;
537 }
538
Serve()539 bool IncrementalServer::Serve() {
540 // Initial handshake to verify connection is still alive
541 if (!SendOkay(adb_fd_)) {
542 fprintf(stderr, "Connection is dead. Abort.\n");
543 return false;
544 }
545
546 std::unordered_set<FileId> prefetchedFiles;
547 bool doneSent = false;
548 int missesCount = 0;
549 int missesSent = 0;
550
551 using namespace std::chrono;
552 std::optional<TimePoint> startTime;
553
554 while (true) {
555 if (!doneSent && prefetches_.empty() &&
556 std::all_of(files_.begin(), files_.end(), [](const File& f) {
557 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
558 })) {
559 fprintf(stderr, "All files should be loaded. Notifying the device.\n");
560 SendDone();
561 doneSent = true;
562 }
563
564 const bool blocking = prefetches_.empty();
565 if (blocking) {
566 // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
567 Flush();
568 }
569 auto request = ReadRequest(blocking);
570
571 if (!startTime) {
572 startTime = high_resolution_clock::now();
573 }
574
575 if (request) {
576 FileId fileId = request->file_id;
577 BlockIdx blockIdx = request->block_idx;
578
579 switch (request->request_type) {
580 case DESTROY: {
581 // Stop everything.
582 return true;
583 }
584 case SERVING_COMPLETE: {
585 // Not stopping the server here.
586 ServingComplete(startTime, missesCount, missesSent);
587 break;
588 }
589 case BLOCK_MISSING: {
590 ++missesCount;
591 // Sends one single block ASAP.
592 if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
593 blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
594 fprintf(stderr,
595 "Received invalid data request for file_id %" PRId16
596 " block_idx %" PRId32 ".\n",
597 fileId, blockIdx);
598 break;
599 }
600
601 if (VLOG_IS_ON(INCREMENTAL)) {
602 auto& file = files_[fileId];
603 auto posP = std::find(file.PriorityBlocks().begin(),
604 file.PriorityBlocks().end(), blockIdx);
605 D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
606 (int)fileId, (int)blockIdx,
607 posP == file.PriorityBlocks().end()
608 ? -1
609 : int(posP - file.PriorityBlocks().begin()),
610 int(file.PriorityBlocks().size()));
611 }
612
613 if (auto res = SendDataBlock(fileId, blockIdx, true);
614 res == SendResult::Error) {
615 fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
616 } else if (res == SendResult::Sent) {
617 ++missesSent;
618 // Make sure we send more pages from this place onward, in case if the OS is
619 // reading a bigger block.
620 prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
621 }
622 break;
623 }
624 case PREFETCH: {
625 // Start prefetching for a file
626 if (fileId < 0) {
627 fprintf(stderr,
628 "Received invalid prefetch request for file_id %" PRId16 "\n",
629 fileId);
630 break;
631 }
632 if (!prefetchedFiles.insert(fileId).second) {
633 fprintf(stderr,
634 "Received duplicate prefetch request for file_id %" PRId16 "\n",
635 fileId);
636 break;
637 }
638 D("Received prefetch request for file_id %" PRId16 ".", fileId);
639 prefetches_.emplace_back(files_[fileId]);
640 break;
641 }
642 default:
643 fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
644 request->request_type, fileId, blockIdx);
645 break;
646 }
647 }
648
649 RunPrefetching();
650 }
651 }
652
open_fd(const char * filepath)653 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
654 struct stat st;
655 if (stat(filepath, &st)) {
656 error_exit("inc-server: failed to stat input file '%s'.", filepath);
657 }
658
659 unique_fd fd(adb_open(filepath, O_RDONLY));
660 if (fd < 0) {
661 error_exit("inc-server: failed to open file '%s'.", filepath);
662 }
663
664 return {std::move(fd), st.st_size};
665 }
666
open_signature(int64_t file_size,const char * filepath)667 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
668 std::string signature_file(filepath);
669 signature_file += IDSIG;
670
671 unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
672 if (fd < 0) {
673 error_exit("inc-server: failed to open file '%s'.", signature_file.c_str());
674 }
675
676 auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
677 if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
678 error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
679 signature_file.c_str(), (long long)tree_size, (long long)expected);
680 }
681
682 int32_t data_block_count = numBytesToNumBlocks(file_size);
683 int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
684 D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
685 int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
686
687 return {std::move(fd), tree_offset};
688 }
689
serve(int connection_fd,int output_fd,int argc,const char ** argv)690 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
691 auto connection_ufd = unique_fd(connection_fd);
692 auto output_ufd = unique_fd(output_fd);
693 if (argc <= 0) {
694 error_exit("inc-server: must specify at least one file.");
695 }
696
697 std::vector<File> files;
698 files.reserve(argc);
699 for (int i = 0; i < argc; ++i) {
700 auto filepath = argv[i];
701
702 auto [file_fd, file_size] = open_fd(filepath);
703 auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
704
705 files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
706 std::move(sign_fd));
707 }
708
709 IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
710 printf("Serving...\n");
711 fclose(stdin);
712 fclose(stdout);
713 return server.Serve();
714 }
715
716 } // namespace incremental
717