• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/dynamic/thread_state_generator.h"
18 
19 #include <memory>
20 #include <set>
21 
22 #include "src/trace_processor/types/trace_processor_context.h"
23 
24 namespace perfetto {
25 namespace trace_processor {
26 
ThreadStateGenerator(TraceProcessorContext * context)27 ThreadStateGenerator::ThreadStateGenerator(TraceProcessorContext* context)
28     : running_string_id_(context->storage->InternString("Running")),
29       runnable_string_id_(context->storage->InternString("R")),
30       context_(context) {}
31 
32 ThreadStateGenerator::~ThreadStateGenerator() = default;
33 
ValidateConstraints(const QueryConstraints &)34 base::Status ThreadStateGenerator::ValidateConstraints(
35     const QueryConstraints&) {
36   return base::OkStatus();
37 }
38 
ComputeTable(const std::vector<Constraint> &,const std::vector<Order> &,const BitVector &,std::unique_ptr<Table> & table_return)39 base::Status ThreadStateGenerator::ComputeTable(
40     const std::vector<Constraint>&,
41     const std::vector<Order>&,
42     const BitVector&,
43     std::unique_ptr<Table>& table_return) {
44   if (!unsorted_thread_state_table_) {
45     int64_t trace_end_ts =
46         context_->storage->GetTraceTimestampBoundsNs().second;
47 
48     unsorted_thread_state_table_ = ComputeThreadStateTable(trace_end_ts);
49 
50     // We explicitly sort by ts here as ComputeThreadStateTable does not insert
51     // rows in sorted order but we expect our clients to always want to sort
52     // on ts. Writing ComputeThreadStateTable to insert in sorted order is
53     // more trouble than its worth.
54     sorted_thread_state_table_ = unsorted_thread_state_table_->Sort(
55         {unsorted_thread_state_table_->ts().ascending()});
56   }
57   // TODO(rsavitski): return base::ErrStatus instead?
58   PERFETTO_CHECK(sorted_thread_state_table_);
59   table_return =
60       std::unique_ptr<Table>(new Table(sorted_thread_state_table_->Copy()));
61   return base::OkStatus();
62 }
63 
64 std::unique_ptr<tables::ThreadStateTable>
ComputeThreadStateTable(int64_t trace_end_ts)65 ThreadStateGenerator::ComputeThreadStateTable(int64_t trace_end_ts) {
66   std::unique_ptr<tables::ThreadStateTable> table(new tables::ThreadStateTable(
67       context_->storage->mutable_string_pool(), nullptr));
68 
69   const auto& raw_sched = context_->storage->sched_slice_table();
70   const auto& instants = context_->storage->instant_table();
71 
72   // In both tables, exclude utid == 0 which represents the idle thread.
73   Table sched = raw_sched.Filter({raw_sched.utid().ne(0)},
74                                  RowMap::OptimizeFor::kLookupSpeed);
75   Table waking = instants.Filter(
76       {instants.name().eq("sched_waking"), instants.ref().ne(0)},
77       RowMap::OptimizeFor::kLookupSpeed);
78 
79   // We prefer to use waking if at all possible and fall back to wakeup if not
80   // available.
81   if (waking.row_count() == 0) {
82     waking = instants.Filter(
83         {instants.name().eq("sched_wakeup"), instants.ref().ne(0)},
84         RowMap::OptimizeFor::kLookupSpeed);
85   }
86 
87   Table sched_blocked_reason = instants.Filter(
88       {instants.name().eq("sched_blocked_reason"), instants.ref().ne(0)},
89       RowMap::OptimizeFor::kLookupSpeed);
90 
91   const auto& sched_ts_col = sched.GetTypedColumnByName<int64_t>("ts");
92   const auto& waking_ts_col = waking.GetTypedColumnByName<int64_t>("ts");
93   const auto& blocked_ts_col =
94       sched_blocked_reason.GetTypedColumnByName<int64_t>("ts");
95 
96   uint32_t sched_idx = 0;
97   uint32_t waking_idx = 0;
98   uint32_t blocked_idx = 0;
99   TidInfoMap state_map(/*initial_capacity=*/1024);
100   while (sched_idx < sched.row_count() || waking_idx < waking.row_count() ||
101          blocked_idx < sched_blocked_reason.row_count()) {
102     int64_t sched_ts = sched_idx < sched.row_count()
103                            ? sched_ts_col[sched_idx]
104                            : std::numeric_limits<int64_t>::max();
105     int64_t waking_ts = waking_idx < waking.row_count()
106                             ? waking_ts_col[waking_idx]
107                             : std::numeric_limits<int64_t>::max();
108     int64_t blocked_ts = blocked_idx < sched_blocked_reason.row_count()
109                              ? blocked_ts_col[blocked_idx]
110                              : std::numeric_limits<int64_t>::max();
111 
112     // We go through all tables, picking the earliest timestamp from any
113     // to process that event.
114     int64_t min_ts = std::min({sched_ts, waking_ts, blocked_ts});
115     if (min_ts == sched_ts) {
116       AddSchedEvent(sched, sched_idx++, state_map, trace_end_ts, table.get());
117     } else if (min_ts == waking_ts) {
118       AddWakingEvent(waking, waking_idx++, state_map);
119     } else /* (min_ts == blocked_ts) */ {
120       AddBlockedReasonEvent(sched_blocked_reason, blocked_idx++, state_map);
121     }
122   }
123 
124   // At the end, go through and flush any remaining pending events.
125   for (auto it = state_map.GetIterator(); it; ++it) {
126     // for (const auto& utid_to_pending_info : state_map) {
127     UniqueTid utid = it.key();
128     const ThreadSchedInfo& pending_info = it.value();
129     FlushPendingEventsForThread(utid, pending_info, table.get(), base::nullopt);
130   }
131 
132   return table;
133 }
134 
AddSchedEvent(const Table & sched,uint32_t sched_idx,TidInfoMap & state_map,int64_t trace_end_ts,tables::ThreadStateTable * table)135 void ThreadStateGenerator::AddSchedEvent(const Table& sched,
136                                          uint32_t sched_idx,
137                                          TidInfoMap& state_map,
138                                          int64_t trace_end_ts,
139                                          tables::ThreadStateTable* table) {
140   int64_t ts = sched.GetTypedColumnByName<int64_t>("ts")[sched_idx];
141   UniqueTid utid = sched.GetTypedColumnByName<uint32_t>("utid")[sched_idx];
142   ThreadSchedInfo* info = &state_map[utid];
143 
144   // Due to races in the kernel, it is possible for the same thread to be
145   // scheduled on different CPUs at the same time. This will manifest itself
146   // here by having |info->desched_ts| in the future of this scheduling slice
147   // (i.e. there was a scheduling slice in the past which ended after the start
148   // of the current scheduling slice).
149   //
150   // We work around this problem by truncating the previous slice to the start
151   // of this slice and not adding the descheduled slice (i.e. we don't call
152   // |FlushPendingEventsForThread| which adds this slice).
153   //
154   // See b/186509316 for details and an example on when this happens.
155   if (info->desched_ts && info->desched_ts.value() > ts) {
156     uint32_t prev_sched_row = info->scheduled_row.value();
157     int64_t prev_sched_start = table->ts()[prev_sched_row];
158 
159     // Just a double check that descheduling slice would have started at the
160     // same time the scheduling slice would have ended.
161     PERFETTO_DCHECK(prev_sched_start + table->dur()[prev_sched_row] ==
162                     info->desched_ts.value());
163 
164     // Truncate the duration of the old slice to end at the start of this
165     // scheduling slice.
166     table->mutable_dur()->Set(prev_sched_row, ts - prev_sched_start);
167   } else {
168     FlushPendingEventsForThread(utid, *info, table, ts);
169   }
170 
171   // Reset so we don't have any leftover data on the next round.
172   *info = {};
173 
174   // Undo the expansion of the final sched slice for each CPU to the end of the
175   // trace by setting the duration back to -1. This counteracts the code in
176   // SchedEventTracker::FlushPendingEvents
177   // TODO(lalitm): remove this hack when we stop expanding the last slice to the
178   // end of the trace.
179   int64_t dur = sched.GetTypedColumnByName<int64_t>("dur")[sched_idx];
180   if (ts + dur == trace_end_ts) {
181     dur = -1;
182   }
183 
184   // Now add the sched slice itself as "Running" with the other fields
185   // unchanged.
186   tables::ThreadStateTable::Row sched_row;
187   sched_row.ts = ts;
188   sched_row.dur = dur;
189   sched_row.cpu = sched.GetTypedColumnByName<uint32_t>("cpu")[sched_idx];
190   sched_row.state = running_string_id_;
191   sched_row.utid = utid;
192 
193   auto id_and_row = table->Insert(sched_row);
194 
195   // If the sched row had a negative duration, don't add any descheduled slice
196   // because it would be meaningless.
197   if (sched_row.dur == -1) {
198     return;
199   }
200 
201   // This will be flushed to the table on the next sched slice (or the very end
202   // of the big loop).
203   info->desched_ts = ts + dur;
204   info->desched_end_state =
205       sched.GetTypedColumnByName<StringId>("end_state")[sched_idx];
206   info->scheduled_row = id_and_row.row;
207 }
208 
AddWakingEvent(const Table & waking,uint32_t waking_idx,TidInfoMap & state_map)209 void ThreadStateGenerator::AddWakingEvent(const Table& waking,
210                                           uint32_t waking_idx,
211                                           TidInfoMap& state_map) {
212   int64_t ts = waking.GetTypedColumnByName<int64_t>("ts")[waking_idx];
213   UniqueTid utid = static_cast<UniqueTid>(
214       waking.GetTypedColumnByName<int64_t>("ref")[waking_idx]);
215   ThreadSchedInfo* info = &state_map[utid];
216 
217   // Occasionally, it is possible to get a waking event for a thread
218   // which is already in a runnable state. When this happens, we just
219   // ignore the waking event.
220   // See b/186509316 for details and an example on when this happens.
221   if (info->desched_end_state &&
222       *info->desched_end_state == runnable_string_id_) {
223     return;
224   }
225 
226   // As counter-intuitive as it seems, occasionally we can get a waking
227   // event for a thread which is currently running.
228   //
229   // There are two cases when this can happen:
230   // 1. The kernel legitimately send a waking event for a "running" thread
231   //    because the thread was woken up before the kernel switched away
232   //    from it. In this case, the waking timestamp will be in the past
233   //    because we added the descheduled slice when we processed the sched
234   //    event.
235   // 2. We're close to the end of the trace or had data-loss and we missed
236   //    the switch out event for a thread but we see a waking after.
237 
238   // Case 1 described above. In this situation, we should drop the waking
239   // entirely.
240   if (info->desched_ts && *info->desched_ts > ts) {
241     return;
242   }
243 
244   // For case 2 and otherwise, we should just note the fact that the thread
245   // became runnable at this time. Note that we cannot check if runnable is
246   // already not set because we could have data-loss which leads to us getting
247   // back to back waking for a single thread.
248   info->runnable_ts = ts;
249 }
250 
CreateSchema()251 Table::Schema ThreadStateGenerator::CreateSchema() {
252   auto schema = tables::ThreadStateTable::Schema();
253 
254   // Because we expect our users to generally want ordered by ts, we set the
255   // ordering for the schema to match our forced sort pass in ComputeTable.
256   auto ts_it = std::find_if(
257       schema.columns.begin(), schema.columns.end(),
258       [](const Table::Schema::Column& col) { return col.name == "ts"; });
259   ts_it->is_sorted = true;
260   auto id_it = std::find_if(
261       schema.columns.begin(), schema.columns.end(),
262       [](const Table::Schema::Column& col) { return col.name == "id"; });
263   id_it->is_sorted = false;
264 
265   return schema;
266 }
267 
FlushPendingEventsForThread(UniqueTid utid,const ThreadSchedInfo & info,tables::ThreadStateTable * table,base::Optional<int64_t> end_ts)268 void ThreadStateGenerator::FlushPendingEventsForThread(
269     UniqueTid utid,
270     const ThreadSchedInfo& info,
271     tables::ThreadStateTable* table,
272     base::Optional<int64_t> end_ts) {
273   // First, let's flush the descheduled period (if any) to the table.
274   if (info.desched_ts) {
275     PERFETTO_DCHECK(info.desched_end_state);
276 
277     int64_t dur;
278     if (end_ts) {
279       int64_t desched_end_ts = info.runnable_ts ? *info.runnable_ts : *end_ts;
280       dur = desched_end_ts - *info.desched_ts;
281     } else {
282       dur = -1;
283     }
284 
285     tables::ThreadStateTable::Row row;
286     row.ts = *info.desched_ts;
287     row.dur = dur;
288     row.state = *info.desched_end_state;
289     row.utid = utid;
290     row.io_wait = info.io_wait;
291     row.blocked_function = info.blocked_function;
292     table->Insert(row);
293   }
294 
295   // Next, flush the runnable period (if any) to the table.
296   if (info.runnable_ts) {
297     tables::ThreadStateTable::Row row;
298     row.ts = *info.runnable_ts;
299     row.dur = end_ts ? *end_ts - row.ts : -1;
300     row.state = runnable_string_id_;
301     row.utid = utid;
302     table->Insert(row);
303   }
304 }
305 
AddBlockedReasonEvent(const Table & blocked_reason,uint32_t blocked_idx,TidInfoMap & state_map)306 void ThreadStateGenerator::AddBlockedReasonEvent(const Table& blocked_reason,
307                                                  uint32_t blocked_idx,
308                                                  TidInfoMap& state_map) {
309   const auto& utid_col = blocked_reason.GetTypedColumnByName<int64_t>("ref");
310   const auto& arg_set_id_col =
311       blocked_reason.GetTypedColumnByName<uint32_t>("arg_set_id");
312 
313   UniqueTid utid = static_cast<UniqueTid>(utid_col[blocked_idx]);
314   uint32_t arg_set_id = arg_set_id_col[blocked_idx];
315   ThreadSchedInfo& info = state_map[utid];
316 
317   base::Optional<Variadic> opt_value;
318   base::Status status =
319       context_->storage->ExtractArg(arg_set_id, "io_wait", &opt_value);
320 
321   // We can't do anything better than ignoring any errors here.
322   // TODO(lalitm): see if there's a better way to handle this.
323   if (status.ok() && opt_value) {
324     PERFETTO_CHECK(opt_value->type == Variadic::Type::kBool);
325     info.io_wait = opt_value->bool_value;
326   }
327 
328   status = context_->storage->ExtractArg(arg_set_id, "function", &opt_value);
329   if (status.ok() && opt_value) {
330     PERFETTO_CHECK(opt_value->type == Variadic::Type::kString);
331     info.blocked_function = opt_value->string_value;
332   }
333 }
334 
TableName()335 std::string ThreadStateGenerator::TableName() {
336   return "thread_state";
337 }
338 
EstimateRowCount()339 uint32_t ThreadStateGenerator::EstimateRowCount() {
340   return context_->storage->sched_slice_table().row_count();
341 }
342 
343 }  // namespace trace_processor
344 }  // namespace perfetto
345