1 //
2 // Copyright (C) 2019 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 #include <algorithm>
17 #include <iostream>
18
19 #include "common/libs/glog/logging.h"
20 #include "common/libs/fs/shared_buf.h"
21 #include "common/libs/fs/tee.h"
22
23 static const std::size_t READ_SIZE = 512;
24
25 namespace cvd {
26
AddSubscriber(TeeSubscriber subscriber)27 TeeSubscriber* Tee::AddSubscriber(TeeSubscriber subscriber) {
28 if (reader_.joinable()) {
29 return nullptr;
30 }
31 return &targets_.emplace_back(std::move(subscriber)).handler;
32 }
33
Start(SharedFD source)34 void Tee::Start(SharedFD source) {
35 reader_ = std::thread([this, source]() {
36 while (true) {
37 // TODO(schfufelen): Use multiple buffers at once for readv
38 // TODO(schuffelen): Reuse buffers
39 TeeBufferPtr buffer = std::make_shared<std::vector<char>>(READ_SIZE);
40 ssize_t read = source->Read(buffer->data(), buffer->size());
41 if (read <= 0) {
42 for (auto& target : targets_) {
43 target.content_queue.Push(nullptr);
44 }
45 break;
46 }
47 buffer->resize(read);
48 for (auto& target : targets_) {
49 target.content_queue.Push(buffer);
50 }
51 }
52 });
53 for (auto& target : targets_) {
54 target.runner = std::thread([&target]() {
55 while (true) {
56 auto queue_chunk = target.content_queue.PopAll();
57 // TODO(schuffelen): Pass multiple buffers to support writev
58 for (auto& buffer : queue_chunk) {
59 if (!buffer) {
60 return;
61 }
62 target.handler(buffer);
63 }
64 }
65 });
66 }
67 }
68
~Tee()69 Tee::~Tee() {
70 Join();
71 }
72
Join()73 void Tee::Join() {
74 if (reader_.joinable()) {
75 reader_.join();
76 }
77 auto it = targets_.begin();
78 while (it != targets_.end()) {
79 if (it->runner.joinable()) {
80 it->runner.join();
81 }
82 it = targets_.erase(it);
83 }
84 }
85
SharedFDWriter(SharedFD fd)86 TeeSubscriber SharedFDWriter(SharedFD fd) {
87 return [fd](const TeeBufferPtr buffer) { WriteAll(fd, *buffer); };
88 }
89
90 // An alternative to this would have been to modify the logger, but that would
91 // not capture logs from subprocesses.
TeeStderrToFile()92 TeeStderrToFile::TeeStderrToFile() {
93 original_stderr_ = SharedFD::Dup(2);
94
95 SharedFD stderr_read, stderr_write;
96 SharedFD::Pipe(&stderr_read, &stderr_write);
97 stderr_write->UNMANAGED_Dup2(2);
98 stderr_write->Close();
99
100 tee_.AddSubscriber(SharedFDWriter(original_stderr_));
101 tee_.AddSubscriber(
102 [this](cvd::TeeBufferPtr data) {
103 std::unique_lock lock(mutex_);
104 while (!log_file_->IsOpen()) {
105 notifier_.wait(lock);
106 }
107 cvd::WriteAll(log_file_, *data);
108 });
109 tee_.Start(std::move(stderr_read));
110 }
111
~TeeStderrToFile()112 TeeStderrToFile::~TeeStderrToFile() {
113 original_stderr_->UNMANAGED_Dup2(2);
114 }
115
SetFile(SharedFD file)116 void TeeStderrToFile::SetFile(SharedFD file) {
117 std::lock_guard lock(mutex_);
118 log_file_ = file;
119 notifier_.notify_all();
120 }
121
122 } // namespace
123