1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/trace_processor/prelude/functions/layout_functions.h"
16 
17 #include <queue>
18 #include <vector>
19 #include "perfetto/ext/base/status_or.h"
20 #include "perfetto/trace_processor/basic_types.h"
21 #include "src/trace_processor/sqlite/sqlite_utils.h"
22 #include "src/trace_processor/util/status_macros.h"
23 
24 namespace perfetto::trace_processor {
25 
26 namespace {
27 
28 constexpr char kFunctionName[] = "INTERNAL_LAYOUT";
29 
30 // A helper class for tracking which depths are available at a given time
31 // and which slices are occupying each depths.
32 class SlicePacker {
33  public:
34   SlicePacker() = default;
35 
36   // |dur| can be 0 for instant events and -1 for slices which do not end.
AddSlice(int64_t ts,int64_t dur)37   base::Status AddSlice(int64_t ts, int64_t dur) {
38     if (last_call_ == LastCall::kAddSlice) {
39       return base::ErrStatus(R"(
40 Incorrect window clause (observed two consecutive calls to "step" function).
41 The window clause should be "rows between unbounded preceding and current row".
42 )");
43     }
44     last_call_ = LastCall::kAddSlice;
45     if (ts < last_seen_ts_) {
46       return base::ErrStatus(R"(
47 Passed slices are in incorrect order: %s requires timestamps to be sorted.
48 Please specify "ORDER BY ts" in the window clause.
49 )",
50                              kFunctionName);
51     }
52     last_seen_ts_ = ts;
53     ProcessPrecedingEvents(ts);
54     // If the event is instant, do not mark this depth as occupied as it
55     // becomes immediately available again.
56     bool is_busy = dur != 0;
57     size_t depth = SelectAvailableDepth(is_busy);
58     // If the slice has an end and is not an instant, schedule this depth
59     // to be marked available again when it ends.
60     if (dur > 0) {
61       slice_ends_.push({ts + dur, depth});
62     }
63     last_depth_ = depth;
64     return base::OkStatus();
65   }
66 
GetLastDepth()67   size_t GetLastDepth() {
68     last_call_ = LastCall::kQuery;
69     return last_depth_;
70   }
71 
72  private:
73   struct SliceEnd {
74     int64_t ts;
75     size_t depth;
76   };
77 
78   struct SliceEndGreater {
operator ()perfetto::trace_processor::__anon1aeabe940111::SlicePacker::SliceEndGreater79     bool operator()(const SliceEnd& lhs, const SliceEnd& rhs) {
80       return lhs.ts > rhs.ts;
81     }
82   };
83 
ProcessPrecedingEvents(int64_t ts)84   void ProcessPrecedingEvents(int64_t ts) {
85     while (!slice_ends_.empty() && slice_ends_.top().ts <= ts) {
86       is_depth_busy_[slice_ends_.top().depth] = false;
87       slice_ends_.pop();
88     }
89   }
90 
SelectAvailableDepth(bool new_state)91   size_t SelectAvailableDepth(bool new_state) {
92     for (size_t i = 0; i < is_depth_busy_.size(); ++i) {
93       if (!is_depth_busy_[i]) {
94         is_depth_busy_[i] = new_state;
95         return i;
96       }
97     }
98     size_t depth = is_depth_busy_.size();
99     is_depth_busy_.push_back(new_state);
100     return depth;
101   }
102 
103   enum class LastCall {
104     kAddSlice,
105     kQuery,
106   };
107   // The first call will be "add slice" and the calls are expected to
108   // interleave, so set initial value to "query".
109   LastCall last_call_ = LastCall::kQuery;
110 
111   int64_t last_seen_ts_ = 0;
112   std::vector<bool> is_depth_busy_;
113   // A list of currently open slices, ordered by end timestamp (ascending).
114   std::priority_queue<SliceEnd, std::vector<SliceEnd>, SliceEndGreater>
115       slice_ends_;
116   size_t last_depth_ = 0;
117 };
118 
GetOrCreateAggregationContext(sqlite3_context * ctx)119 base::StatusOr<SlicePacker*> GetOrCreateAggregationContext(
120     sqlite3_context* ctx) {
121   SlicePacker** packer = static_cast<SlicePacker**>(
122       sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
123   if (!packer) {
124     return base::ErrStatus("Failed to allocate aggregate context");
125   }
126 
127   if (!*packer) {
128     *packer = new SlicePacker();
129   }
130   return *packer;
131 }
132 
Step(sqlite3_context * ctx,size_t argc,sqlite3_value ** argv)133 base::Status Step(sqlite3_context* ctx, size_t argc, sqlite3_value** argv) {
134   base::StatusOr<SlicePacker*> slice_packer =
135       GetOrCreateAggregationContext(ctx);
136   RETURN_IF_ERROR(slice_packer.status());
137 
138   base::StatusOr<SqlValue> ts =
139       sqlite_utils::ExtractArgument(argc, argv, "ts", 0, SqlValue::kLong);
140   RETURN_IF_ERROR(ts.status());
141 
142   base::StatusOr<SqlValue> dur =
143       sqlite_utils::ExtractArgument(argc, argv, "dur", 1, SqlValue::kLong);
144   RETURN_IF_ERROR(dur.status());
145 
146   return slice_packer.value()->AddSlice(ts->AsLong(), dur.value().AsLong());
147 }
148 
StepWrapper(sqlite3_context * ctx,int argc,sqlite3_value ** argv)149 void StepWrapper(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
150   PERFETTO_CHECK(argc >= 0);
151 
152   base::Status status = Step(ctx, static_cast<size_t>(argc), argv);
153   if (!status.ok()) {
154     sqlite_utils::SetSqliteError(ctx, kFunctionName, status);
155     return;
156   }
157 }
158 
FinalWrapper(sqlite3_context * ctx)159 void FinalWrapper(sqlite3_context* ctx) {
160   SlicePacker** slice_packer = static_cast<SlicePacker**>(
161       sqlite3_aggregate_context(ctx, sizeof(SlicePacker*)));
162   if (!slice_packer || !*slice_packer) {
163     return;
164   }
165   sqlite3_result_int64(ctx,
166                        static_cast<int64_t>((*slice_packer)->GetLastDepth()));
167   delete *slice_packer;
168 }
169 
ValueWrapper(sqlite3_context * ctx)170 void ValueWrapper(sqlite3_context* ctx) {
171   base::StatusOr<SlicePacker*> slice_packer =
172       GetOrCreateAggregationContext(ctx);
173   if (!slice_packer.ok()) {
174     sqlite_utils::SetSqliteError(ctx, kFunctionName, slice_packer.status());
175     return;
176   }
177   sqlite3_result_int64(
178       ctx, static_cast<int64_t>(slice_packer.value()->GetLastDepth()));
179 }
180 
InverseWrapper(sqlite3_context * ctx,int,sqlite3_value **)181 void InverseWrapper(sqlite3_context* ctx, int, sqlite3_value**) {
182   sqlite_utils::SetSqliteError(ctx, kFunctionName, base::ErrStatus(R"(
183 The inverse step is not supported: the window clause should be
184 "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".
185 )"));
186 }
187 
188 }  // namespace
189 
Register(sqlite3 * db,TraceProcessorContext * context)190 base::Status LayoutFunctions::Register(sqlite3* db,
191                                        TraceProcessorContext* context) {
192   int flags = SQLITE_UTF8 | SQLITE_DETERMINISTIC;
193   int ret = sqlite3_create_window_function(
194       db, kFunctionName, 2, flags, context, StepWrapper, FinalWrapper,
195       ValueWrapper, InverseWrapper, nullptr);
196   if (ret != SQLITE_OK) {
197     return base::ErrStatus("Unable to register function with name %s",
198                            kFunctionName);
199   }
200   return base::OkStatus();
201 }
202 
203 }  // namespace perfetto::trace_processor
204