1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/dynamic/thread_state_generator.h"
18
19 #include <memory>
20 #include <set>
21
22 #include "src/trace_processor/types/trace_processor_context.h"
23
24 namespace perfetto {
25 namespace trace_processor {
26
ThreadStateGenerator(TraceProcessorContext * context)27 ThreadStateGenerator::ThreadStateGenerator(TraceProcessorContext* context)
28 : running_string_id_(context->storage->InternString("Running")),
29 runnable_string_id_(context->storage->InternString("R")),
30 context_(context) {}
31
32 ThreadStateGenerator::~ThreadStateGenerator() = default;
33
ValidateConstraints(const QueryConstraints &)34 base::Status ThreadStateGenerator::ValidateConstraints(
35 const QueryConstraints&) {
36 return base::OkStatus();
37 }
38
ComputeTable(const std::vector<Constraint> &,const std::vector<Order> &,const BitVector &,std::unique_ptr<Table> & table_return)39 base::Status ThreadStateGenerator::ComputeTable(
40 const std::vector<Constraint>&,
41 const std::vector<Order>&,
42 const BitVector&,
43 std::unique_ptr<Table>& table_return) {
44 if (!unsorted_thread_state_table_) {
45 int64_t trace_end_ts =
46 context_->storage->GetTraceTimestampBoundsNs().second;
47
48 unsorted_thread_state_table_ = ComputeThreadStateTable(trace_end_ts);
49
50 // We explicitly sort by ts here as ComputeThreadStateTable does not insert
51 // rows in sorted order but we expect our clients to always want to sort
52 // on ts. Writing ComputeThreadStateTable to insert in sorted order is
53 // more trouble than its worth.
54 sorted_thread_state_table_ = unsorted_thread_state_table_->Sort(
55 {unsorted_thread_state_table_->ts().ascending()});
56 }
57 // TODO(rsavitski): return base::ErrStatus instead?
58 PERFETTO_CHECK(sorted_thread_state_table_);
59 table_return =
60 std::unique_ptr<Table>(new Table(sorted_thread_state_table_->Copy()));
61 return base::OkStatus();
62 }
63
64 std::unique_ptr<tables::ThreadStateTable>
ComputeThreadStateTable(int64_t trace_end_ts)65 ThreadStateGenerator::ComputeThreadStateTable(int64_t trace_end_ts) {
66 std::unique_ptr<tables::ThreadStateTable> table(new tables::ThreadStateTable(
67 context_->storage->mutable_string_pool(), nullptr));
68
69 const auto& raw_sched = context_->storage->sched_slice_table();
70 const auto& instants = context_->storage->instant_table();
71
72 // In both tables, exclude utid == 0 which represents the idle thread.
73 Table sched = raw_sched.Filter({raw_sched.utid().ne(0)},
74 RowMap::OptimizeFor::kLookupSpeed);
75 Table waking = instants.Filter(
76 {instants.name().eq("sched_waking"), instants.ref().ne(0)},
77 RowMap::OptimizeFor::kLookupSpeed);
78
79 // We prefer to use waking if at all possible and fall back to wakeup if not
80 // available.
81 if (waking.row_count() == 0) {
82 waking = instants.Filter(
83 {instants.name().eq("sched_wakeup"), instants.ref().ne(0)},
84 RowMap::OptimizeFor::kLookupSpeed);
85 }
86
87 Table sched_blocked_reason = instants.Filter(
88 {instants.name().eq("sched_blocked_reason"), instants.ref().ne(0)},
89 RowMap::OptimizeFor::kLookupSpeed);
90
91 const auto& sched_ts_col = sched.GetTypedColumnByName<int64_t>("ts");
92 const auto& waking_ts_col = waking.GetTypedColumnByName<int64_t>("ts");
93 const auto& blocked_ts_col =
94 sched_blocked_reason.GetTypedColumnByName<int64_t>("ts");
95
96 uint32_t sched_idx = 0;
97 uint32_t waking_idx = 0;
98 uint32_t blocked_idx = 0;
99 TidInfoMap state_map(/*initial_capacity=*/1024);
100 while (sched_idx < sched.row_count() || waking_idx < waking.row_count() ||
101 blocked_idx < sched_blocked_reason.row_count()) {
102 int64_t sched_ts = sched_idx < sched.row_count()
103 ? sched_ts_col[sched_idx]
104 : std::numeric_limits<int64_t>::max();
105 int64_t waking_ts = waking_idx < waking.row_count()
106 ? waking_ts_col[waking_idx]
107 : std::numeric_limits<int64_t>::max();
108 int64_t blocked_ts = blocked_idx < sched_blocked_reason.row_count()
109 ? blocked_ts_col[blocked_idx]
110 : std::numeric_limits<int64_t>::max();
111
112 // We go through all tables, picking the earliest timestamp from any
113 // to process that event.
114 int64_t min_ts = std::min({sched_ts, waking_ts, blocked_ts});
115 if (min_ts == sched_ts) {
116 AddSchedEvent(sched, sched_idx++, state_map, trace_end_ts, table.get());
117 } else if (min_ts == waking_ts) {
118 AddWakingEvent(waking, waking_idx++, state_map);
119 } else /* (min_ts == blocked_ts) */ {
120 AddBlockedReasonEvent(sched_blocked_reason, blocked_idx++, state_map);
121 }
122 }
123
124 // At the end, go through and flush any remaining pending events.
125 for (auto it = state_map.GetIterator(); it; ++it) {
126 // for (const auto& utid_to_pending_info : state_map) {
127 UniqueTid utid = it.key();
128 const ThreadSchedInfo& pending_info = it.value();
129 FlushPendingEventsForThread(utid, pending_info, table.get(), base::nullopt);
130 }
131
132 return table;
133 }
134
AddSchedEvent(const Table & sched,uint32_t sched_idx,TidInfoMap & state_map,int64_t trace_end_ts,tables::ThreadStateTable * table)135 void ThreadStateGenerator::AddSchedEvent(const Table& sched,
136 uint32_t sched_idx,
137 TidInfoMap& state_map,
138 int64_t trace_end_ts,
139 tables::ThreadStateTable* table) {
140 int64_t ts = sched.GetTypedColumnByName<int64_t>("ts")[sched_idx];
141 UniqueTid utid = sched.GetTypedColumnByName<uint32_t>("utid")[sched_idx];
142 ThreadSchedInfo* info = &state_map[utid];
143
144 // Due to races in the kernel, it is possible for the same thread to be
145 // scheduled on different CPUs at the same time. This will manifest itself
146 // here by having |info->desched_ts| in the future of this scheduling slice
147 // (i.e. there was a scheduling slice in the past which ended after the start
148 // of the current scheduling slice).
149 //
150 // We work around this problem by truncating the previous slice to the start
151 // of this slice and not adding the descheduled slice (i.e. we don't call
152 // |FlushPendingEventsForThread| which adds this slice).
153 //
154 // See b/186509316 for details and an example on when this happens.
155 if (info->desched_ts && info->desched_ts.value() > ts) {
156 uint32_t prev_sched_row = info->scheduled_row.value();
157 int64_t prev_sched_start = table->ts()[prev_sched_row];
158
159 // Just a double check that descheduling slice would have started at the
160 // same time the scheduling slice would have ended.
161 PERFETTO_DCHECK(prev_sched_start + table->dur()[prev_sched_row] ==
162 info->desched_ts.value());
163
164 // Truncate the duration of the old slice to end at the start of this
165 // scheduling slice.
166 table->mutable_dur()->Set(prev_sched_row, ts - prev_sched_start);
167 } else {
168 FlushPendingEventsForThread(utid, *info, table, ts);
169 }
170
171 // Reset so we don't have any leftover data on the next round.
172 *info = {};
173
174 // Undo the expansion of the final sched slice for each CPU to the end of the
175 // trace by setting the duration back to -1. This counteracts the code in
176 // SchedEventTracker::FlushPendingEvents
177 // TODO(lalitm): remove this hack when we stop expanding the last slice to the
178 // end of the trace.
179 int64_t dur = sched.GetTypedColumnByName<int64_t>("dur")[sched_idx];
180 if (ts + dur == trace_end_ts) {
181 dur = -1;
182 }
183
184 // Now add the sched slice itself as "Running" with the other fields
185 // unchanged.
186 tables::ThreadStateTable::Row sched_row;
187 sched_row.ts = ts;
188 sched_row.dur = dur;
189 sched_row.cpu = sched.GetTypedColumnByName<uint32_t>("cpu")[sched_idx];
190 sched_row.state = running_string_id_;
191 sched_row.utid = utid;
192
193 auto id_and_row = table->Insert(sched_row);
194
195 // If the sched row had a negative duration, don't add any descheduled slice
196 // because it would be meaningless.
197 if (sched_row.dur == -1) {
198 return;
199 }
200
201 // This will be flushed to the table on the next sched slice (or the very end
202 // of the big loop).
203 info->desched_ts = ts + dur;
204 info->desched_end_state =
205 sched.GetTypedColumnByName<StringId>("end_state")[sched_idx];
206 info->scheduled_row = id_and_row.row;
207 }
208
AddWakingEvent(const Table & waking,uint32_t waking_idx,TidInfoMap & state_map)209 void ThreadStateGenerator::AddWakingEvent(const Table& waking,
210 uint32_t waking_idx,
211 TidInfoMap& state_map) {
212 int64_t ts = waking.GetTypedColumnByName<int64_t>("ts")[waking_idx];
213 UniqueTid utid = static_cast<UniqueTid>(
214 waking.GetTypedColumnByName<int64_t>("ref")[waking_idx]);
215 ThreadSchedInfo* info = &state_map[utid];
216
217 // Occasionally, it is possible to get a waking event for a thread
218 // which is already in a runnable state. When this happens, we just
219 // ignore the waking event.
220 // See b/186509316 for details and an example on when this happens.
221 if (info->desched_end_state &&
222 *info->desched_end_state == runnable_string_id_) {
223 return;
224 }
225
226 // As counter-intuitive as it seems, occasionally we can get a waking
227 // event for a thread which is currently running.
228 //
229 // There are two cases when this can happen:
230 // 1. The kernel legitimately send a waking event for a "running" thread
231 // because the thread was woken up before the kernel switched away
232 // from it. In this case, the waking timestamp will be in the past
233 // because we added the descheduled slice when we processed the sched
234 // event.
235 // 2. We're close to the end of the trace or had data-loss and we missed
236 // the switch out event for a thread but we see a waking after.
237
238 // Case 1 described above. In this situation, we should drop the waking
239 // entirely.
240 if (info->desched_ts && *info->desched_ts > ts) {
241 return;
242 }
243
244 // For case 2 and otherwise, we should just note the fact that the thread
245 // became runnable at this time. Note that we cannot check if runnable is
246 // already not set because we could have data-loss which leads to us getting
247 // back to back waking for a single thread.
248 info->runnable_ts = ts;
249 }
250
CreateSchema()251 Table::Schema ThreadStateGenerator::CreateSchema() {
252 auto schema = tables::ThreadStateTable::Schema();
253
254 // Because we expect our users to generally want ordered by ts, we set the
255 // ordering for the schema to match our forced sort pass in ComputeTable.
256 auto ts_it = std::find_if(
257 schema.columns.begin(), schema.columns.end(),
258 [](const Table::Schema::Column& col) { return col.name == "ts"; });
259 ts_it->is_sorted = true;
260 auto id_it = std::find_if(
261 schema.columns.begin(), schema.columns.end(),
262 [](const Table::Schema::Column& col) { return col.name == "id"; });
263 id_it->is_sorted = false;
264
265 return schema;
266 }
267
FlushPendingEventsForThread(UniqueTid utid,const ThreadSchedInfo & info,tables::ThreadStateTable * table,base::Optional<int64_t> end_ts)268 void ThreadStateGenerator::FlushPendingEventsForThread(
269 UniqueTid utid,
270 const ThreadSchedInfo& info,
271 tables::ThreadStateTable* table,
272 base::Optional<int64_t> end_ts) {
273 // First, let's flush the descheduled period (if any) to the table.
274 if (info.desched_ts) {
275 PERFETTO_DCHECK(info.desched_end_state);
276
277 int64_t dur;
278 if (end_ts) {
279 int64_t desched_end_ts = info.runnable_ts ? *info.runnable_ts : *end_ts;
280 dur = desched_end_ts - *info.desched_ts;
281 } else {
282 dur = -1;
283 }
284
285 tables::ThreadStateTable::Row row;
286 row.ts = *info.desched_ts;
287 row.dur = dur;
288 row.state = *info.desched_end_state;
289 row.utid = utid;
290 row.io_wait = info.io_wait;
291 row.blocked_function = info.blocked_function;
292 table->Insert(row);
293 }
294
295 // Next, flush the runnable period (if any) to the table.
296 if (info.runnable_ts) {
297 tables::ThreadStateTable::Row row;
298 row.ts = *info.runnable_ts;
299 row.dur = end_ts ? *end_ts - row.ts : -1;
300 row.state = runnable_string_id_;
301 row.utid = utid;
302 table->Insert(row);
303 }
304 }
305
AddBlockedReasonEvent(const Table & blocked_reason,uint32_t blocked_idx,TidInfoMap & state_map)306 void ThreadStateGenerator::AddBlockedReasonEvent(const Table& blocked_reason,
307 uint32_t blocked_idx,
308 TidInfoMap& state_map) {
309 const auto& utid_col = blocked_reason.GetTypedColumnByName<int64_t>("ref");
310 const auto& arg_set_id_col =
311 blocked_reason.GetTypedColumnByName<uint32_t>("arg_set_id");
312
313 UniqueTid utid = static_cast<UniqueTid>(utid_col[blocked_idx]);
314 uint32_t arg_set_id = arg_set_id_col[blocked_idx];
315 ThreadSchedInfo& info = state_map[utid];
316
317 base::Optional<Variadic> opt_value;
318 base::Status status =
319 context_->storage->ExtractArg(arg_set_id, "io_wait", &opt_value);
320
321 // We can't do anything better than ignoring any errors here.
322 // TODO(lalitm): see if there's a better way to handle this.
323 if (status.ok() && opt_value) {
324 PERFETTO_CHECK(opt_value->type == Variadic::Type::kBool);
325 info.io_wait = opt_value->bool_value;
326 }
327
328 status = context_->storage->ExtractArg(arg_set_id, "function", &opt_value);
329 if (status.ok() && opt_value) {
330 PERFETTO_CHECK(opt_value->type == Variadic::Type::kString);
331 info.blocked_function = opt_value->string_value;
332 }
333 }
334
TableName()335 std::string ThreadStateGenerator::TableName() {
336 return "thread_state";
337 }
338
EstimateRowCount()339 uint32_t ThreadStateGenerator::EstimateRowCount() {
340 return context_->storage->sched_slice_table().row_count();
341 }
342
343 } // namespace trace_processor
344 } // namespace perfetto
345