1 /*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "LogReaderThread.h"
18
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22
23 #include <thread>
24
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27 #include "SerializedFlushToState.h"
28
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)29 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
30 std::unique_ptr<LogWriter> writer, bool non_block,
31 unsigned long tail, LogMask log_mask, pid_t pid,
32 log_time start_time, uint64_t start,
33 std::chrono::steady_clock::time_point deadline)
34 : log_buffer_(log_buffer),
35 reader_list_(reader_list),
36 writer_(std::move(writer)),
37 pid_(pid),
38 tail_(tail),
39 count_(0),
40 index_(0),
41 start_time_(start_time),
42 deadline_(deadline),
43 non_block_(non_block) {
44 CleanSkip();
45 flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
46 }
47
Run()48 void LogReaderThread::Run() {
49 auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
50 thread.detach();
51 }
52
ThreadFunction()53 void LogReaderThread::ThreadFunction() {
54 prctl(PR_SET_NAME, "logd.reader.per");
55
56 auto lock = std::unique_lock{logd_lock};
57 auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
58
59 while (!release_) {
60 if (deadline_.time_since_epoch().count() != 0) {
61 if (thread_triggered_condition_.wait_until(lock, deadline_) ==
62 std::cv_status::timeout) {
63 deadline_ = {};
64 }
65 if (release_) {
66 break;
67 }
68 }
69
70 if (tail_) {
71 auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
72 flush_to_state_->log_mask());
73 log_buffer_->FlushTo(writer_.get(), *first_pass_state,
74 [this](log_id_t log_id, pid_t pid, uint64_t sequence,
75 log_time realtime) REQUIRES(logd_lock) {
76 return FilterFirstPass(log_id, pid, sequence, realtime);
77 });
78 }
79 bool flush_success = log_buffer_->FlushTo(
80 writer_.get(), *flush_to_state_,
81 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
82 logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
83
84 // We only ignore entries before the original start time for the first flushTo(), if we
85 // get entries after this first flush before the original start time, then the client
86 // wouldn't have seen them.
87 // Note: this is still racy and may skip out of order events that came in since the last
88 // time the client disconnected and then reconnected with the new start time. The long term
89 // solution here is that clients must request events since a specific sequence number.
90 start_time_.tv_sec = 0;
91 start_time_.tv_nsec = 0;
92
93 if (!flush_success) {
94 break;
95 }
96
97 if (non_block_ || release_) {
98 break;
99 }
100
101 CleanSkip();
102
103 if (deadline_.time_since_epoch().count() == 0) {
104 thread_triggered_condition_.wait(lock);
105 }
106 }
107
108 writer_->Release();
109 reader_list_->RemoveRunningThread(this);
110 }
111
112 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)113 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
114 if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
115 ++count_;
116 }
117
118 return FilterResult::kSkip;
119 }
120
121 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)122 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
123 log_time realtime) {
124 if (skip_ahead_[log_id]) {
125 skip_ahead_[log_id]--;
126 return FilterResult::kSkip;
127 }
128
129 // Truncate to close race between first and second pass
130 if (non_block_ && tail_ && index_ >= count_) {
131 return FilterResult::kStop;
132 }
133
134 if (pid_ && pid_ != pid) {
135 return FilterResult::kSkip;
136 }
137
138 if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
139 return FilterResult::kSkip;
140 }
141
142 if (release_) {
143 return FilterResult::kStop;
144 }
145
146 if (!tail_) {
147 goto ok;
148 }
149
150 ++index_;
151
152 if (count_ > tail_ && index_ <= (count_ - tail_)) {
153 return FilterResult::kSkip;
154 }
155
156 if (!non_block_) {
157 tail_ = 0;
158 }
159
160 ok:
161 if (!skip_ahead_[log_id]) {
162 return FilterResult::kWrite;
163 }
164 return FilterResult::kSkip;
165 }
166