• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LogReaderThread.h"
18 
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22 
23 #include <thread>
24 
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27 #include "SerializedFlushToState.h"
28 
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)29 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
30                                  std::unique_ptr<LogWriter> writer, bool non_block,
31                                  unsigned long tail, LogMask log_mask, pid_t pid,
32                                  log_time start_time, uint64_t start,
33                                  std::chrono::steady_clock::time_point deadline)
34     : log_buffer_(log_buffer),
35       reader_list_(reader_list),
36       writer_(std::move(writer)),
37       pid_(pid),
38       tail_(tail),
39       count_(0),
40       index_(0),
41       start_time_(start_time),
42       deadline_(deadline),
43       non_block_(non_block) {
44     CleanSkip();
45     flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
46 }
47 
Run()48 void LogReaderThread::Run() {
49     auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
50     thread.detach();
51 }
52 
ThreadFunction()53 void LogReaderThread::ThreadFunction() {
54     prctl(PR_SET_NAME, "logd.reader.per");
55 
56     auto lock = std::unique_lock{logd_lock};
57     auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
58 
59     while (!release_) {
60         if (deadline_.time_since_epoch().count() != 0) {
61             if (thread_triggered_condition_.wait_until(lock, deadline_) ==
62                 std::cv_status::timeout) {
63                 deadline_ = {};
64             }
65             if (release_) {
66                 break;
67             }
68         }
69 
70         if (tail_) {
71             auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
72                                                                     flush_to_state_->log_mask());
73             log_buffer_->FlushTo(writer_.get(), *first_pass_state,
74                                  [this](log_id_t log_id, pid_t pid, uint64_t sequence,
75                                         log_time realtime) REQUIRES(logd_lock) {
76                                      return FilterFirstPass(log_id, pid, sequence, realtime);
77                                  });
78         }
79         bool flush_success = log_buffer_->FlushTo(
80                 writer_.get(), *flush_to_state_,
81                 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
82                         logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
83 
84         // We only ignore entries before the original start time for the first flushTo(), if we
85         // get entries after this first flush before the original start time, then the client
86         // wouldn't have seen them.
87         // Note: this is still racy and may skip out of order events that came in since the last
88         // time the client disconnected and then reconnected with the new start time.  The long term
89         // solution here is that clients must request events since a specific sequence number.
90         start_time_.tv_sec = 0;
91         start_time_.tv_nsec = 0;
92 
93         if (!flush_success) {
94             break;
95         }
96 
97         if (non_block_ || release_) {
98             break;
99         }
100 
101         CleanSkip();
102 
103         if (deadline_.time_since_epoch().count() == 0) {
104             thread_triggered_condition_.wait(lock);
105         }
106     }
107 
108     writer_->Release();
109     reader_list_->RemoveRunningThread(this);
110 }
111 
112 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)113 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
114     if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
115         ++count_;
116     }
117 
118     return FilterResult::kSkip;
119 }
120 
121 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)122 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
123                                                log_time realtime) {
124     if (skip_ahead_[log_id]) {
125         skip_ahead_[log_id]--;
126         return FilterResult::kSkip;
127     }
128 
129     // Truncate to close race between first and second pass
130     if (non_block_ && tail_ && index_ >= count_) {
131         return FilterResult::kStop;
132     }
133 
134     if (pid_ && pid_ != pid) {
135         return FilterResult::kSkip;
136     }
137 
138     if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
139         return FilterResult::kSkip;
140     }
141 
142     if (release_) {
143         return FilterResult::kStop;
144     }
145 
146     if (!tail_) {
147         goto ok;
148     }
149 
150     ++index_;
151 
152     if (count_ > tail_ && index_ <= (count_ - tail_)) {
153         return FilterResult::kSkip;
154     }
155 
156     if (!non_block_) {
157         tail_ = 0;
158     }
159 
160 ok:
161     if (!skip_ahead_[log_id]) {
162         return FilterResult::kWrite;
163     }
164     return FilterResult::kSkip;
165 }
166