1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/trace_processor/prelude/functions/layout_functions.h"
16
17 #include <queue>
18 #include <vector>
19 #include "perfetto/ext/base/status_or.h"
20 #include "perfetto/trace_processor/basic_types.h"
21 #include "src/trace_processor/sqlite/sqlite_utils.h"
22 #include "src/trace_processor/util/status_macros.h"
23
24 namespace perfetto::trace_processor {
25
26 namespace {
27
28 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
29
30 // A helper class for tracking which depths are available at a given time
31 // and which slices are occupying each depths.
32 class SlicePacker {
33 public:
34 SlicePacker() = default;
35
36 // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)37 base::Status AddSlice(int64_t ts, int64_t dur) {
38 if (last_call_ == LastCall::kAddSlice) {
39 return base::ErrStatus(R"(
40 Incorrect window clause (observed two consecutive calls to "step" function).
41 The window clause should be "rows between unbounded preceding and current row".
42 )");
43 }
44 last_call_ = LastCall::kAddSlice;
45 if (ts < last_seen_ts_) {
46 return base::ErrStatus(R"(
47 Passed slices are in incorrect order: %s requires timestamps to be sorted.
48 Please specify "ORDER BY ts" in the window clause.
49 )",
50 kFunctionName);
51 }
52 last_seen_ts_ = ts;
53 ProcessPrecedingEvents(ts);
54 // If the event is instant, do not mark this depth as occupied as it
55 // becomes immediately available again.
56 bool is_busy = dur != 0;
57 size_t depth = SelectAvailableDepth(is_busy);
58 // If the slice has an end and is not an instant, schedule this depth
59 // to be marked available again when it ends.
60 if (dur > 0) {
61 slice_ends_.push({ts + dur, depth});
62 }
63 last_depth_ = depth;
64 return base::OkStatus();
65 }
66
GetLastDepth()67 size_t GetLastDepth() {
68 last_call_ = LastCall::kQuery;
69 return last_depth_;
70 }
71
72 private:
73 struct SliceEnd {
74 int64_t ts;
75 size_t depth;
76 };
77
78 struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon1aeabe940111::SlicePacker::SliceEndGreater79 bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
80 return lhs.ts > rhs.ts;
81 }
82 };
83
ProcessPrecedingEvents(int64_t ts)84 void ProcessPrecedingEvents(int64_t ts) {
85 while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
86 is_depth_busy_[slice_ends_.top().depth] = false;
87 slice_ends_.pop();
88 }
89 }
90
SelectAvailableDepth(bool new_state)91 size_t SelectAvailableDepth(bool new_state) {
92 for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
93 if (!is_depth_busy_[i]) {
94 is_depth_busy_[i] = new_state;
95 return i;
96 }
97 }
98 size_t depth = is_depth_busy_.size();
99 is_depth_busy_.push_back(new_state);
100 return depth;
101 }
102
103 enum class LastCall {
104 kAddSlice,
105 kQuery,
106 };
107 // The first call will be "add slice" and the calls are expected to
108 // interleave, so set initial value to "query".
109 LastCall last_call_ = LastCall::kQuery;
110
111 int64_t last_seen_ts_ = 0;
112 std::vector<bool> is_depth_busy_;
113 // A list of currently open slices, ordered by end timestamp (ascending).
114 std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
115 slice_ends_;
116 size_t last_depth_ = 0;
117 };
118
GetOrCreateAggregationContext(sqlite3_context * ctx)119 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
120 sqlite3_context* ctx) {
121 SlicePacker** packer = static_cast<SlicePacker**>(
122 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
123 if (!packer) {
124 return base::ErrStatus("Failed to allocate aggregate context");
125 }
126
127 if (!*packer) {
128 *packer = new SlicePacker();
129 }
130 return *packer;
131 }
132
Step(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)133 base::Status Step(sqlite3_context* ctx, size_t argc, sqlite3_value** argv) {
134 base::StatusOr<SlicePacker*> slice_packer =
135 GetOrCreateAggregationContext(ctx);
136 RETURN_IF_ERROR(slice_packer.status());
137
138 base::StatusOr<SqlValue> ts =
139 sqlite_utils::ExtractArgument(argc, argv, "ts", 0, SqlValue::kLong);
140 RETURN_IF_ERROR(ts.status());
141
142 base::StatusOr<SqlValue> dur =
143 sqlite_utils::ExtractArgument(argc, argv, "dur", 1, SqlValue::kLong);
144 RETURN_IF_ERROR(dur.status());
145
146 return slice_packer.value()->AddSlice(ts->AsLong(), dur.value().AsLong());
147 }
148
StepWrapper(sqlite3_context * ctx,int argc,sqlite3_value ** argv)149 void StepWrapper(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
150 PERFETTO_CHECK(argc >= 0);
151
152 base::Status status = Step(ctx, static_cast<size_t>(argc), argv);
153 if (!status.ok()) {
154 sqlite_utils::SetSqliteError(ctx, kFunctionName, status);
155 return;
156 }
157 }
158
FinalWrapper(sqlite3_context * ctx)159 void FinalWrapper(sqlite3_context* ctx) {
160 SlicePacker** slice_packer = static_cast<SlicePacker**>(
161 sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
162 if (!slice_packer || !*slice_packer) {
163 return;
164 }
165 sqlite3_result_int64(ctx,
166 static_cast<int64_t>((*slice_packer)->GetLastDepth()));
167 delete *slice_packer;
168 }
169
ValueWrapper(sqlite3_context * ctx)170 void ValueWrapper(sqlite3_context* ctx) {
171 base::StatusOr<SlicePacker*> slice_packer =
172 GetOrCreateAggregationContext(ctx);
173 if (!slice_packer.ok()) {
174 sqlite_utils::SetSqliteError(ctx, kFunctionName, slice_packer.status());
175 return;
176 }
177 sqlite3_result_int64(
178 ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
179 }
180
InverseWrapper(sqlite3_context * ctx,int,sqlite3_value **)181 void InverseWrapper(sqlite3_context* ctx, int, sqlite3_value**) {
182 sqlite_utils::SetSqliteError(ctx, kFunctionName, base::ErrStatus(R"(
183 The inverse step is not supported: the window clause should be
184 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
185 )"));
186 }
187
188 } // namespace
189
Register(sqlite3 * db,TraceProcessorContext * context)190 base::Status LayoutFunctions::Register(sqlite3* db,
191 TraceProcessorContext* context) {
192 int flags = SQLITE_UTF8 | SQLITE_DETERMINISTIC;
193 int ret = sqlite3_create_window_function(
194 db, kFunctionName, 2, flags, context, StepWrapper, FinalWrapper,
195 ValueWrapper, InverseWrapper, nullptr);
196 if (ret != SQLITE_OK) {
197 return base::ErrStatus("Unable to register function with name %s",
198 kFunctionName);
199 }
200 return base::OkStatus();
201 }
202
203 } // namespace perfetto::trace_processor
204