• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.h"
16 
17 #include <cstddef>
18 #include <cstdint>
19 #include <limits>
20 #include <queue>
21 #include <vector>
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/trace_processor/basic_types.h"
26 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
27 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
28 #include "src/trace_processor/sqlite/bindings/sqlite_window_function.h"
29 #include "src/trace_processor/sqlite/sqlite_utils.h"
30 #include "src/trace_processor/util/status_macros.h"
31 
32 namespace perfetto::trace_processor {
33 
34 namespace {
35 
36 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
37 
38 // A helper class for tracking which depths are available at a given time
39 // and which slices are occupying each depths.
40 class SlicePacker {
41  public:
42   SlicePacker() = default;
43 
44   // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)45   base::Status AddSlice(int64_t ts, int64_t dur) {
46     if (last_call_ == LastCall::kAddSlice) {
47       return base::ErrStatus(R"(
48 Incorrect window clause (observed two consecutive calls to "step" function).
49 The window clause should be "rows between unbounded preceding and current row".
50 )");
51     }
52     last_call_ = LastCall::kAddSlice;
53     if (ts < last_seen_ts_) {
54       return base::ErrStatus(R"(
55 Passed slices are in incorrect order: %s requires timestamps to be sorted.
56 Please specify "ORDER BY ts" in the window clause.
57 )",
58                              kFunctionName);
59     }
60     last_seen_ts_ = ts;
61     ProcessPrecedingEvents(ts);
62     // If the event is instant, do not mark this depth as occupied as it
63     // becomes immediately available again.
64     bool is_busy = dur != 0;
65     size_t depth = SelectAvailableDepth(is_busy);
66     // If the slice has an end and is not an instant, schedule this depth
67     // to be marked available again when it ends.
68     if (dur != 0) {
69       int64_t ts_end =
70           dur == -1 ? std::numeric_limits<int64_t>::max() : ts + dur;
71       slice_ends_.push({ts_end, depth});
72     }
73     last_depth_ = depth;
74     return base::OkStatus();
75   }
76 
GetLastDepth()77   size_t GetLastDepth() {
78     last_call_ = LastCall::kQuery;
79     return last_depth_;
80   }
81 
82  private:
83   struct SliceEnd {
84     int64_t ts;
85     size_t depth;
86   };
87 
88   struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon930ff87f0111::SlicePacker::SliceEndGreater89     bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
90       return lhs.ts > rhs.ts;
91     }
92   };
93 
ProcessPrecedingEvents(int64_t ts)94   void ProcessPrecedingEvents(int64_t ts) {
95     while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
96       is_depth_busy_[slice_ends_.top().depth] = false;
97       slice_ends_.pop();
98     }
99   }
100 
SelectAvailableDepth(bool new_state)101   size_t SelectAvailableDepth(bool new_state) {
102     for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
103       if (!is_depth_busy_[i]) {
104         is_depth_busy_[i] = new_state;
105         return i;
106       }
107     }
108     size_t depth = is_depth_busy_.size();
109     is_depth_busy_.push_back(new_state);
110     return depth;
111   }
112 
113   enum class LastCall {
114     kAddSlice,
115     kQuery,
116   };
117   // The first call will be "add slice" and the calls are expected to
118   // interleave, so set initial value to "query".
119   LastCall last_call_ = LastCall::kQuery;
120 
121   int64_t last_seen_ts_ = 0;
122   std::vector<bool> is_depth_busy_;
123   // A list of currently open slices, ordered by end timestamp (ascending).
124   std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
125       slice_ends_;
126   size_t last_depth_ = 0;
127 };
128 
GetOrCreateAggregationContext(sqlite3_context * ctx)129 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
130     sqlite3_context* ctx) {
131   auto** packer = static_cast<SlicePacker**>(
132       sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
133   if (!packer) {
134     return base::ErrStatus("Failed to allocate aggregate context");
135   }
136 
137   if (!*packer) {
138     *packer = new SlicePacker();
139   }
140   return *packer;
141 }
142 
StepStatus(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)143 base::Status StepStatus(sqlite3_context* ctx,
144                         size_t argc,
145                         sqlite3_value** argv) {
146   base::StatusOr<SlicePacker*> slice_packer =
147       GetOrCreateAggregationContext(ctx);
148   RETURN_IF_ERROR(slice_packer.status());
149 
150   base::StatusOr<SqlValue> ts =
151       sqlite::utils::ExtractArgument(argc, argv, "ts", 0, SqlValue::kLong);
152   RETURN_IF_ERROR(ts.status());
153 
154   base::StatusOr<SqlValue> dur =
155       sqlite::utils::ExtractArgument(argc, argv, "dur", 1, SqlValue::kLong);
156   RETURN_IF_ERROR(dur.status());
157 
158   return slice_packer.value()->AddSlice(ts->AsLong(), dur.value().AsLong());
159 }
160 
161 struct InternalLayout : public SqliteWindowFunction {
Stepperfetto::trace_processor::__anon930ff87f0111::InternalLayout162   static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
163     PERFETTO_CHECK(argc >= 0);
164 
165     base::Status status = StepStatus(ctx, static_cast<size_t>(argc), argv);
166     if (!status.ok()) {
167       return sqlite::utils::SetError(ctx, kFunctionName, status);
168     }
169   }
170 
Inverseperfetto::trace_processor::__anon930ff87f0111::InternalLayout171   static void Inverse(sqlite3_context* ctx, int, sqlite3_value**) {
172     sqlite::utils::SetError(ctx, kFunctionName, base::ErrStatus(R"(
173 The inverse step is not supported: the window clause should be
174 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
175 )"));
176   }
177 
Valueperfetto::trace_processor::__anon930ff87f0111::InternalLayout178   static void Value(sqlite3_context* ctx) {
179     base::StatusOr<SlicePacker*> slice_packer =
180         GetOrCreateAggregationContext(ctx);
181     if (!slice_packer.ok()) {
182       return sqlite::utils::SetError(ctx, kFunctionName, slice_packer.status());
183     }
184     return sqlite::result::Long(
185         ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
186   }
187 
Finalperfetto::trace_processor::__anon930ff87f0111::InternalLayout188   static void Final(sqlite3_context* ctx) {
189     auto** slice_packer = static_cast<SlicePacker**>(
190         sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
191     if (!slice_packer || !*slice_packer) {
192       return;
193     }
194     sqlite::result::Long(ctx,
195                          static_cast<int64_t>((*slice_packer)->GetLastDepth()));
196     delete *slice_packer;
197   }
198 };
199 
200 }  // namespace
201 
RegisterLayoutFunctions(PerfettoSqlEngine & engine)202 base::Status RegisterLayoutFunctions(PerfettoSqlEngine& engine) {
203   return engine.RegisterSqliteWindowFunction<InternalLayout>(kFunctionName, 2,
204                                                              nullptr);
205 }
206 
207 }  // namespace perfetto::trace_processor
208