1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/dynamic/thread_state_generator.h"
18
19 #include <memory>
20 #include <set>
21
22 #include "src/trace_processor/types/trace_processor_context.h"
23
24 namespace perfetto {
25 namespace trace_processor {
26
ThreadStateGenerator(TraceProcessorContext * context)27 ThreadStateGenerator::ThreadStateGenerator(TraceProcessorContext* context)
28 : running_string_id_(context->storage->InternString("Running")),
29 runnable_string_id_(context->storage->InternString("R")),
30 context_(context) {}
31
32 ThreadStateGenerator::~ThreadStateGenerator() = default;
33
ValidateConstraints(const QueryConstraints &)34 util::Status ThreadStateGenerator::ValidateConstraints(
35 const QueryConstraints&) {
36 return util::OkStatus();
37 }
38
ComputeTable(const std::vector<Constraint> &,const std::vector<Order> &)39 std::unique_ptr<Table> ThreadStateGenerator::ComputeTable(
40 const std::vector<Constraint>&,
41 const std::vector<Order>&) {
42 if (!unsorted_thread_state_table_) {
43 int64_t trace_end_ts =
44 context_->storage->GetTraceTimestampBoundsNs().second;
45
46 unsorted_thread_state_table_ = ComputeThreadStateTable(trace_end_ts);
47
48 // We explicitly sort by ts here as ComputeThreadStateTable does not insert
49 // rows in sorted order but we expect our clients to always want to sort
50 // on ts. Writing ComputeThreadStateTable to insert in sorted order is
51 // more trouble than its worth.
52 sorted_thread_state_table_ = unsorted_thread_state_table_->Sort(
53 {unsorted_thread_state_table_->ts().ascending()});
54 }
55 PERFETTO_CHECK(sorted_thread_state_table_);
56 return std::unique_ptr<Table>(new Table(sorted_thread_state_table_->Copy()));
57 }
58
59 std::unique_ptr<tables::ThreadStateTable>
ComputeThreadStateTable(int64_t trace_end_ts)60 ThreadStateGenerator::ComputeThreadStateTable(int64_t trace_end_ts) {
61 std::unique_ptr<tables::ThreadStateTable> table(new tables::ThreadStateTable(
62 context_->storage->mutable_string_pool(), nullptr));
63
64 const auto& raw_sched = context_->storage->sched_slice_table();
65 const auto& instants = context_->storage->instant_table();
66
67 // In both tables, exclude utid == 0 which represents the idle thread.
68 Table sched = raw_sched.Filter({raw_sched.utid().ne(0)});
69 Table waking = instants.Filter(
70 {instants.name().eq("sched_waking"), instants.ref().ne(0)});
71
72 // We prefer to use waking if at all possible and fall back to wakeup if not
73 // available.
74 if (waking.row_count() == 0) {
75 waking = instants.Filter(
76 {instants.name().eq("sched_wakeup"), instants.ref().ne(0)});
77 }
78
79 Table sched_blocked_reason = instants.Filter(
80 {instants.name().eq("sched_blocked_reason"), instants.ref().ne(0)});
81
82 const auto& sched_ts_col = sched.GetTypedColumnByName<int64_t>("ts");
83 const auto& waking_ts_col = waking.GetTypedColumnByName<int64_t>("ts");
84 const auto& blocked_ts_col =
85 sched_blocked_reason.GetTypedColumnByName<int64_t>("ts");
86
87 uint32_t sched_idx = 0;
88 uint32_t waking_idx = 0;
89 uint32_t blocked_idx = 0;
90 std::unordered_map<UniqueTid, ThreadSchedInfo> state_map;
91 while (sched_idx < sched.row_count() || waking_idx < waking.row_count() ||
92 blocked_idx < sched_blocked_reason.row_count()) {
93 int64_t sched_ts = sched_idx < sched.row_count()
94 ? sched_ts_col[sched_idx]
95 : std::numeric_limits<int64_t>::max();
96 int64_t waking_ts = waking_idx < waking.row_count()
97 ? waking_ts_col[waking_idx]
98 : std::numeric_limits<int64_t>::max();
99 int64_t blocked_ts = blocked_idx < sched_blocked_reason.row_count()
100 ? blocked_ts_col[blocked_idx]
101 : std::numeric_limits<int64_t>::max();
102
103 // We go through all tables, picking the earliest timestamp from any
104 // to process that event.
105 int64_t min_ts = std::min({sched_ts, waking_ts, blocked_ts});
106 if (min_ts == sched_ts) {
107 AddSchedEvent(sched, sched_idx++, state_map, trace_end_ts, table.get());
108 } else if (min_ts == waking_ts) {
109 AddWakingEvent(waking, waking_idx++, state_map);
110 } else /* (min_ts == blocked_ts) */ {
111 AddBlockedReasonEvent(sched_blocked_reason, blocked_idx++, state_map);
112 }
113 }
114
115 // At the end, go through and flush any remaining pending events.
116 for (const auto& utid_to_pending_info : state_map) {
117 UniqueTid utid = utid_to_pending_info.first;
118 const ThreadSchedInfo& pending_info = utid_to_pending_info.second;
119 FlushPendingEventsForThread(utid, pending_info, table.get(), base::nullopt);
120 }
121
122 return table;
123 }
124
AddSchedEvent(const Table & sched,uint32_t sched_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map,int64_t trace_end_ts,tables::ThreadStateTable * table)125 void ThreadStateGenerator::AddSchedEvent(
126 const Table& sched,
127 uint32_t sched_idx,
128 std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map,
129 int64_t trace_end_ts,
130 tables::ThreadStateTable* table) {
131 int64_t ts = sched.GetTypedColumnByName<int64_t>("ts")[sched_idx];
132 UniqueTid utid = sched.GetTypedColumnByName<uint32_t>("utid")[sched_idx];
133 ThreadSchedInfo* info = &state_map[utid];
134
135 // Due to races in the kernel, it is possible for the same thread to be
136 // scheduled on different CPUs at the same time. This will manifest itself
137 // here by having |info->desched_ts| in the future of this scheduling slice
138 // (i.e. there was a scheduling slice in the past which ended after the start
139 // of the current scheduling slice).
140 //
141 // We work around this problem by truncating the previous slice to the start
142 // of this slice and not adding the descheduled slice (i.e. we don't call
143 // |FlushPendingEventsForThread| which adds this slice).
144 //
145 // See b/186509316 for details and an example on when this happens.
146 if (info->desched_ts && info->desched_ts.value() > ts) {
147 uint32_t prev_sched_row = info->scheduled_row.value();
148 int64_t prev_sched_start = table->ts()[prev_sched_row];
149
150 // Just a double check that descheduling slice would have started at the
151 // same time the scheduling slice would have ended.
152 PERFETTO_DCHECK(prev_sched_start + table->dur()[prev_sched_row] ==
153 info->desched_ts.value());
154
155 // Truncate the duration of the old slice to end at the start of this
156 // scheduling slice.
157 table->mutable_dur()->Set(prev_sched_row, ts - prev_sched_start);
158 } else {
159 FlushPendingEventsForThread(utid, *info, table, ts);
160 }
161
162 // Reset so we don't have any leftover data on the next round.
163 *info = {};
164
165 // Undo the expansion of the final sched slice for each CPU to the end of the
166 // trace by setting the duration back to -1. This counteracts the code in
167 // SchedEventTracker::FlushPendingEvents
168 // TODO(lalitm): remove this hack when we stop expanding the last slice to the
169 // end of the trace.
170 int64_t dur = sched.GetTypedColumnByName<int64_t>("dur")[sched_idx];
171 if (ts + dur == trace_end_ts) {
172 dur = -1;
173 }
174
175 // Now add the sched slice itself as "Running" with the other fields
176 // unchanged.
177 tables::ThreadStateTable::Row sched_row;
178 sched_row.ts = ts;
179 sched_row.dur = dur;
180 sched_row.cpu = sched.GetTypedColumnByName<uint32_t>("cpu")[sched_idx];
181 sched_row.state = running_string_id_;
182 sched_row.utid = utid;
183
184 auto id_and_row = table->Insert(sched_row);
185
186 // If the sched row had a negative duration, don't add any descheduled slice
187 // because it would be meaningless.
188 if (sched_row.dur == -1) {
189 return;
190 }
191
192 // This will be flushed to the table on the next sched slice (or the very end
193 // of the big loop).
194 info->desched_ts = ts + dur;
195 info->desched_end_state =
196 sched.GetTypedColumnByName<StringId>("end_state")[sched_idx];
197 info->scheduled_row = id_and_row.row;
198 }
199
AddWakingEvent(const Table & waking,uint32_t waking_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map)200 void ThreadStateGenerator::AddWakingEvent(
201 const Table& waking,
202 uint32_t waking_idx,
203 std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map) {
204 int64_t ts = waking.GetTypedColumnByName<int64_t>("ts")[waking_idx];
205 UniqueTid utid = static_cast<UniqueTid>(
206 waking.GetTypedColumnByName<int64_t>("ref")[waking_idx]);
207 ThreadSchedInfo* info = &state_map[utid];
208
209 // Occasionally, it is possible to get a waking event for a thread
210 // which is already in a runnable state. When this happens, we just
211 // ignore the waking event.
212 // See b/186509316 for details and an example on when this happens.
213 if (info->desched_end_state &&
214 *info->desched_end_state == runnable_string_id_) {
215 return;
216 }
217
218 // As counter-intuitive as it seems, occasionally we can get a waking
219 // event for a thread which is currently running.
220 //
221 // There are two cases when this can happen:
222 // 1. The kernel legitimately send a waking event for a "running" thread
223 // because the thread was woken up before the kernel switched away
224 // from it. In this case, the waking timestamp will be in the past
225 // because we added the descheduled slice when we processed the sched
226 // event.
227 // 2. We're close to the end of the trace or had data-loss and we missed
228 // the switch out event for a thread but we see a waking after.
229
230 // Case 1 described above. In this situation, we should drop the waking
231 // entirely.
232 if (info->desched_ts && *info->desched_ts > ts) {
233 return;
234 }
235
236 // For case 2 and otherwise, we should just note the fact that the thread
237 // became runnable at this time. Note that we cannot check if runnable is
238 // already not set because we could have data-loss which leads to us getting
239 // back to back waking for a single thread.
240 info->runnable_ts = ts;
241 }
242
CreateSchema()243 Table::Schema ThreadStateGenerator::CreateSchema() {
244 auto schema = tables::ThreadStateTable::Schema();
245
246 // Because we expect our users to generally want ordered by ts, we set the
247 // ordering for the schema to match our forced sort pass in ComputeTable.
248 auto ts_it = std::find_if(
249 schema.columns.begin(), schema.columns.end(),
250 [](const Table::Schema::Column& col) { return col.name == "ts"; });
251 ts_it->is_sorted = true;
252 auto id_it = std::find_if(
253 schema.columns.begin(), schema.columns.end(),
254 [](const Table::Schema::Column& col) { return col.name == "id"; });
255 id_it->is_sorted = false;
256
257 return schema;
258 }
259
FlushPendingEventsForThread(UniqueTid utid,const ThreadSchedInfo & info,tables::ThreadStateTable * table,base::Optional<int64_t> end_ts)260 void ThreadStateGenerator::FlushPendingEventsForThread(
261 UniqueTid utid,
262 const ThreadSchedInfo& info,
263 tables::ThreadStateTable* table,
264 base::Optional<int64_t> end_ts) {
265 // First, let's flush the descheduled period (if any) to the table.
266 if (info.desched_ts) {
267 PERFETTO_DCHECK(info.desched_end_state);
268
269 int64_t dur;
270 if (end_ts) {
271 int64_t desched_end_ts = info.runnable_ts ? *info.runnable_ts : *end_ts;
272 dur = desched_end_ts - *info.desched_ts;
273 } else {
274 dur = -1;
275 }
276
277 tables::ThreadStateTable::Row row;
278 row.ts = *info.desched_ts;
279 row.dur = dur;
280 row.state = *info.desched_end_state;
281 row.utid = utid;
282 row.io_wait = info.io_wait;
283 row.blocked_function = info.blocked_function;
284 table->Insert(row);
285 }
286
287 // Next, flush the runnable period (if any) to the table.
288 if (info.runnable_ts) {
289 tables::ThreadStateTable::Row row;
290 row.ts = *info.runnable_ts;
291 row.dur = end_ts ? *end_ts - row.ts : -1;
292 row.state = runnable_string_id_;
293 row.utid = utid;
294 table->Insert(row);
295 }
296 }
297
AddBlockedReasonEvent(const Table & blocked_reason,uint32_t blocked_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map)298 void ThreadStateGenerator::AddBlockedReasonEvent(
299 const Table& blocked_reason,
300 uint32_t blocked_idx,
301 std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map) {
302 const auto& utid_col = blocked_reason.GetTypedColumnByName<int64_t>("ref");
303 const auto& arg_set_id_col =
304 blocked_reason.GetTypedColumnByName<uint32_t>("arg_set_id");
305
306 UniqueTid utid = static_cast<UniqueTid>(utid_col[blocked_idx]);
307 uint32_t arg_set_id = arg_set_id_col[blocked_idx];
308 ThreadSchedInfo& info = state_map[utid];
309
310 base::Optional<Variadic> opt_value;
311 util::Status status =
312 context_->storage->ExtractArg(arg_set_id, "io_wait", &opt_value);
313
314 // We can't do anything better than ignoring any errors here.
315 // TODO(lalitm): see if there's a better way to handle this.
316 if (status.ok() && opt_value) {
317 PERFETTO_CHECK(opt_value->type == Variadic::Type::kBool);
318 info.io_wait = opt_value->bool_value;
319 }
320
321 status = context_->storage->ExtractArg(arg_set_id, "function", &opt_value);
322 if (status.ok() && opt_value) {
323 PERFETTO_CHECK(opt_value->type == Variadic::Type::kString);
324 info.blocked_function = opt_value->string_value;
325 }
326 }
327
TableName()328 std::string ThreadStateGenerator::TableName() {
329 return "thread_state";
330 }
331
EstimateRowCount()332 uint32_t ThreadStateGenerator::EstimateRowCount() {
333 return context_->storage->sched_slice_table().row_count();
334 }
335
336 } // namespace trace_processor
337 } // namespace perfetto
338