• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (C) 2019 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 #include <algorithm>
17 #include <iostream>
18 
19 #include "common/libs/glog/logging.h"
20 #include "common/libs/fs/shared_buf.h"
21 #include "common/libs/fs/tee.h"
22 
23 static const std::size_t READ_SIZE = 512;
24 
25 namespace cvd {
26 
AddSubscriber(TeeSubscriber subscriber)27 TeeSubscriber* Tee::AddSubscriber(TeeSubscriber subscriber) {
28   if (reader_.joinable()) {
29     return nullptr;
30   }
31   return &targets_.emplace_back(std::move(subscriber)).handler;
32 }
33 
Start(SharedFD source)34 void Tee::Start(SharedFD source) {
35   reader_ = std::thread([this, source]() {
36     while (true) {
37       // TODO(schfufelen): Use multiple buffers at once for readv
38       // TODO(schuffelen): Reuse buffers
39       TeeBufferPtr buffer = std::make_shared<std::vector<char>>(READ_SIZE);
40       ssize_t read = source->Read(buffer->data(), buffer->size());
41       if (read <= 0) {
42         for (auto& target : targets_) {
43           target.content_queue.Push(nullptr);
44         }
45         break;
46       }
47       buffer->resize(read);
48       for (auto& target : targets_) {
49         target.content_queue.Push(buffer);
50       }
51     }
52   });
53   for (auto& target : targets_) {
54     target.runner = std::thread([&target]() {
55       while (true) {
56         auto queue_chunk = target.content_queue.PopAll();
57         // TODO(schuffelen): Pass multiple buffers to support writev
58         for (auto& buffer : queue_chunk) {
59           if (!buffer) {
60             return;
61           }
62           target.handler(buffer);
63         }
64       }
65     });
66   }
67 }
68 
~Tee()69 Tee::~Tee() {
70   Join();
71 }
72 
Join()73 void Tee::Join() {
74   if (reader_.joinable()) {
75     reader_.join();
76   }
77   auto it = targets_.begin();
78   while (it != targets_.end()) {
79     if (it->runner.joinable()) {
80       it->runner.join();
81     }
82     it = targets_.erase(it);
83   }
84 }
85 
SharedFDWriter(SharedFD fd)86 TeeSubscriber SharedFDWriter(SharedFD fd) {
87   return [fd](const TeeBufferPtr buffer) { WriteAll(fd, *buffer); };
88 }
89 
90 // An alternative to this would have been to modify the logger, but that would
91 // not capture logs from subprocesses.
TeeStderrToFile()92 TeeStderrToFile::TeeStderrToFile() {
93   original_stderr_ = SharedFD::Dup(2);
94 
95   SharedFD stderr_read, stderr_write;
96   SharedFD::Pipe(&stderr_read, &stderr_write);
97   stderr_write->UNMANAGED_Dup2(2);
98   stderr_write->Close();
99 
100   tee_.AddSubscriber(SharedFDWriter(original_stderr_));
101   tee_.AddSubscriber(
102       [this](cvd::TeeBufferPtr data) {
103         std::unique_lock lock(mutex_);
104         while (!log_file_->IsOpen()) {
105           notifier_.wait(lock);
106         }
107         cvd::WriteAll(log_file_, *data);
108       });
109   tee_.Start(std::move(stderr_read));
110 }
111 
~TeeStderrToFile()112 TeeStderrToFile::~TeeStderrToFile() {
113   original_stderr_->UNMANAGED_Dup2(2);
114 }
115 
SetFile(SharedFD file)116 void TeeStderrToFile::SetFile(SharedFD file) {
117   std::lock_guard lock(mutex_);
118   log_file_ = file;
119   notifier_.notify_all();
120 }
121 
122 } // namespace
123