1 /*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "LogReaderThread.h"
18
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22
23 #include <thread>
24
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27 #include "SerializedFlushToState.h"
28
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)29 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
30 std::unique_ptr<LogWriter> writer, bool non_block,
31 unsigned long tail, LogMask log_mask, pid_t pid,
32 log_time start_time, uint64_t start,
33 std::chrono::steady_clock::time_point deadline)
34 : log_buffer_(log_buffer),
35 reader_list_(reader_list),
36 writer_(std::move(writer)),
37 pid_(pid),
38 tail_(tail),
39 count_(0),
40 index_(0),
41 start_time_(start_time),
42 deadline_(deadline),
43 non_block_(non_block) {
44 CleanSkip();
45 flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
46 auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
47 thread.detach();
48 }
49
ThreadFunction()50 void LogReaderThread::ThreadFunction() {
51 prctl(PR_SET_NAME, "logd.reader.per");
52
53 auto lock = std::unique_lock{logd_lock};
54 auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
55
56 while (!release_) {
57 if (deadline_.time_since_epoch().count() != 0) {
58 if (thread_triggered_condition_.wait_until(lock, deadline_) ==
59 std::cv_status::timeout) {
60 deadline_ = {};
61 }
62 if (release_) {
63 break;
64 }
65 }
66
67 if (tail_) {
68 auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
69 flush_to_state_->log_mask());
70 log_buffer_->FlushTo(writer_.get(), *first_pass_state,
71 [this](log_id_t log_id, pid_t pid, uint64_t sequence,
72 log_time realtime) REQUIRES(logd_lock) {
73 return FilterFirstPass(log_id, pid, sequence, realtime);
74 });
75 }
76 bool flush_success = log_buffer_->FlushTo(
77 writer_.get(), *flush_to_state_,
78 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
79 logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
80
81 // We only ignore entries before the original start time for the first flushTo(), if we
82 // get entries after this first flush before the original start time, then the client
83 // wouldn't have seen them.
84 // Note: this is still racy and may skip out of order events that came in since the last
85 // time the client disconnected and then reconnected with the new start time. The long term
86 // solution here is that clients must request events since a specific sequence number.
87 start_time_.tv_sec = 0;
88 start_time_.tv_nsec = 0;
89
90 if (!flush_success) {
91 break;
92 }
93
94 if (non_block_ || release_) {
95 break;
96 }
97
98 CleanSkip();
99
100 if (deadline_.time_since_epoch().count() == 0) {
101 thread_triggered_condition_.wait(lock);
102 }
103 }
104
105 writer_->Release();
106
107 auto& log_reader_threads = reader_list_->reader_threads();
108 auto it = std::find_if(log_reader_threads.begin(), log_reader_threads.end(),
109 [this](const auto& other) { return other.get() == this; });
110
111 if (it != log_reader_threads.end()) {
112 log_reader_threads.erase(it);
113 }
114 }
115
116 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)117 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
118 if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
119 ++count_;
120 }
121
122 return FilterResult::kSkip;
123 }
124
125 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)126 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
127 log_time realtime) {
128 if (skip_ahead_[log_id]) {
129 skip_ahead_[log_id]--;
130 return FilterResult::kSkip;
131 }
132
133 // Truncate to close race between first and second pass
134 if (non_block_ && tail_ && index_ >= count_) {
135 return FilterResult::kStop;
136 }
137
138 if (pid_ && pid_ != pid) {
139 return FilterResult::kSkip;
140 }
141
142 if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
143 return FilterResult::kSkip;
144 }
145
146 if (release_) {
147 return FilterResult::kStop;
148 }
149
150 if (!tail_) {
151 goto ok;
152 }
153
154 ++index_;
155
156 if (count_ > tail_ && index_ <= (count_ - tail_)) {
157 return FilterResult::kSkip;
158 }
159
160 if (!non_block_) {
161 tail_ = 0;
162 }
163
164 ok:
165 if (!skip_ahead_[log_id]) {
166 return FilterResult::kWrite;
167 }
168 return FilterResult::kSkip;
169 }
170