1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/trace_processor/perfetto_sql/intrinsics/functions/layout_functions.h"
16
17 #include <cstddef>
18 #include <cstdint>
19 #include <limits>
20 #include <queue>
21 #include <vector>
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/status.h"
24 #include "perfetto/ext/base/status_or.h"
25 #include "perfetto/trace_processor/basic_types.h"
26 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
27 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
28 #include "src/trace_processor/sqlite/bindings/sqlite_window_function.h"
29 #include "src/trace_processor/sqlite/sqlite_utils.h"
30 #include "src/trace_processor/util/status_macros.h"
31
32 namespace perfetto::trace_processor {
33
34 namespace {
35
36 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
37
38 // A helper class for tracking which depths are available at a given time
39 // and which slices are occupying each depths.
40 class SlicePacker {
41 public:
42 SlicePacker() = default;
43
44 // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)45 base::Status AddSlice(int64_t ts, int64_t dur) {
46 if (last_call_ == LastCall::kAddSlice) {
47 return base::ErrStatus(R"(
48 Incorrect window clause (observed two consecutive calls to "step" function).
49 The window clause should be "rows between unbounded preceding and current row".
50 )");
51 }
52 last_call_ = LastCall::kAddSlice;
53 if (ts < last_seen_ts_) {
54 return base::ErrStatus(R"(
55 Passed slices are in incorrect order: %s requires timestamps to be sorted.
56 Please specify "ORDER BY ts" in the window clause.
57 )",
58 kFunctionName);
59 }
60 last_seen_ts_ = ts;
61 ProcessPrecedingEvents(ts);
62 // If the event is instant, do not mark this depth as occupied as it
63 // becomes immediately available again.
64 bool is_busy = dur != 0;
65 size_t depth = SelectAvailableDepth(is_busy);
66 // If the slice has an end and is not an instant, schedule this depth
67 // to be marked available again when it ends.
68 if (dur != 0) {
69 int64_t ts_end =
70 dur == -1 ? std::numeric_limits<int64_t>::max() : ts + dur;
71 slice_ends_.push({ts_end, depth});
72 }
73 last_depth_ = depth;
74 return base::OkStatus();
75 }
76
GetLastDepth()77 size_t GetLastDepth() {
78 last_call_ = LastCall::kQuery;
79 return last_depth_;
80 }
81
82 private:
83 struct SliceEnd {
84 int64_t ts;
85 size_t depth;
86 };
87
88 struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon930ff87f0111::SlicePacker::SliceEndGreater89 bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
90 return lhs.ts > rhs.ts;
91 }
92 };
93
ProcessPrecedingEvents(int64_t ts)94 void ProcessPrecedingEvents(int64_t ts) {
95 while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
96 is_depth_busy_[slice_ends_.top().depth] = false;
97 slice_ends_.pop();
98 }
99 }
100
SelectAvailableDepth(bool new_state)101 size_t SelectAvailableDepth(bool new_state) {
102 for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
103 if (!is_depth_busy_[i]) {
104 is_depth_busy_[i] = new_state;
105 return i;
106 }
107 }
108 size_t depth = is_depth_busy_.size();
109 is_depth_busy_.push_back(new_state);
110 return depth;
111 }
112
113 enum class LastCall {
114 kAddSlice,
115 kQuery,
116 };
117 // The first call will be "add slice" and the calls are expected to
118 // interleave, so set initial value to "query".
119 LastCall last_call_ = LastCall::kQuery;
120
121 int64_t last_seen_ts_ = 0;
122 std::vector<bool> is_depth_busy_;
123 // A list of currently open slices, ordered by end timestamp (ascending).
124 std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
125 slice_ends_;
126 size_t last_depth_ = 0;
127 };
128
GetOrCreateAggregationContext(sqlite3_context * ctx)129 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
130 sqlite3_context* ctx) {
131 auto** packer = static_cast<SlicePacker**>(
132 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
133 if (!packer) {
134 return base::ErrStatus("Failed to allocate aggregate context");
135 }
136
137 if (!*packer) {
138 *packer = new SlicePacker();
139 }
140 return *packer;
141 }
142
StepStatus(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)143 base::Status StepStatus(sqlite3_context* ctx,
144 size_t argc,
145 sqlite3_value** argv) {
146 base::StatusOr<SlicePacker*> slice_packer =
147 GetOrCreateAggregationContext(ctx);
148 RETURN_IF_ERROR(slice_packer.status());
149
150 base::StatusOr<SqlValue> ts =
151 sqlite::utils::ExtractArgument(argc, argv, "ts", 0, SqlValue::kLong);
152 RETURN_IF_ERROR(ts.status());
153
154 base::StatusOr<SqlValue> dur =
155 sqlite::utils::ExtractArgument(argc, argv, "dur", 1, SqlValue::kLong);
156 RETURN_IF_ERROR(dur.status());
157
158 return slice_packer.value()->AddSlice(ts->AsLong(), dur.value().AsLong());
159 }
160
161 struct InternalLayout : public SqliteWindowFunction {
Stepperfetto::trace_processor::__anon930ff87f0111::InternalLayout162 static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
163 PERFETTO_CHECK(argc >= 0);
164
165 base::Status status = StepStatus(ctx, static_cast<size_t>(argc), argv);
166 if (!status.ok()) {
167 return sqlite::utils::SetError(ctx, kFunctionName, status);
168 }
169 }
170
Inverseperfetto::trace_processor::__anon930ff87f0111::InternalLayout171 static void Inverse(sqlite3_context* ctx, int, sqlite3_value**) {
172 sqlite::utils::SetError(ctx, kFunctionName, base::ErrStatus(R"(
173 The inverse step is not supported: the window clause should be
174 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
175 )"));
176 }
177
Valueperfetto::trace_processor::__anon930ff87f0111::InternalLayout178 static void Value(sqlite3_context* ctx) {
179 base::StatusOr<SlicePacker*> slice_packer =
180 GetOrCreateAggregationContext(ctx);
181 if (!slice_packer.ok()) {
182 return sqlite::utils::SetError(ctx, kFunctionName, slice_packer.status());
183 }
184 return sqlite::result::Long(
185 ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
186 }
187
Finalperfetto::trace_processor::__anon930ff87f0111::InternalLayout188 static void Final(sqlite3_context* ctx) {
189 auto** slice_packer = static_cast<SlicePacker**>(
190 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
191 if (!slice_packer || !*slice_packer) {
192 return;
193 }
194 sqlite::result::Long(ctx,
195 static_cast<int64_t>((*slice_packer)->GetLastDepth()));
196 delete *slice_packer;
197 }
198 };
199
200 } // namespace
201
RegisterLayoutFunctions(PerfettoSqlEngine & engine)202 base::Status RegisterLayoutFunctions(PerfettoSqlEngine& engine) {
203 return engine.RegisterSqliteWindowFunction<InternalLayout>(kFunctionName, 2,
204 nullptr);
205 }
206
207 } // namespace perfetto::trace_processor
208