1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/cloud_trace_processor/trace_processor_wrapper.h"
18
19 #include <atomic>
20 #include <memory>
21 #include <mutex>
22 #include <optional>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "perfetto/base/status.h"
28 #include "perfetto/ext/base/file_utils.h"
29 #include "perfetto/ext/base/status_or.h"
30 #include "perfetto/ext/base/threading/future.h"
31 #include "perfetto/ext/base/threading/poll.h"
32 #include "perfetto/ext/base/threading/stream.h"
33 #include "perfetto/ext/base/threading/thread_pool.h"
34 #include "perfetto/ext/base/threading/util.h"
35 #include "perfetto/protozero/proto_utils.h"
36 #include "perfetto/protozero/scattered_heap_buffer.h"
37 #include "perfetto/trace_processor/trace_blob.h"
38 #include "perfetto/trace_processor/trace_blob_view.h"
39 #include "perfetto/trace_processor/trace_processor.h"
40 #include "protos/perfetto/cloud_trace_processor/worker.pb.h"
41 #include "src/protozero/proto_ring_buffer.h"
42 #include "src/trace_processor/rpc/query_result_serializer.h"
43 #include "src/trace_processor/util/status_macros.h"
44
45 namespace perfetto {
46 namespace cloud_trace_processor {
47 namespace {
48
49 using trace_processor::QueryResultSerializer;
50 using trace_processor::TraceBlob;
51 using trace_processor::TraceBlobView;
52 using trace_processor::TraceProcessor;
53 using Statefulness = TraceProcessorWrapper::Statefulness;
54
55 struct QueryRunner {
QueryRunnerperfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner56 QueryRunner(std::shared_ptr<TraceProcessor> _tp,
57 std::string _query,
58 std::string _trace_path,
59 Statefulness _statefulness)
60 : tp(std::move(_tp)),
61 query(std::move(_query)),
62 trace_path(std::move(_trace_path)),
63 statefulness(_statefulness) {}
64
operator ()perfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner65 std::optional<protos::TracePoolShardQueryResponse> operator()() {
66 if (!has_more) {
67 if (statefulness == Statefulness::kStateless) {
68 tp->RestoreInitialTables();
69 }
70 return std::nullopt;
71 }
72 // If the serializer does not exist yet, that means we have not yet run
73 // the query so make sure to do that first.
74 EnsureSerializerExists();
75 has_more = serializer->Serialize(&result);
76
77 protos::TracePoolShardQueryResponse resp;
78 *resp.mutable_trace() = trace_path;
79 resp.mutable_result()->ParseFromArray(result.data(),
80 static_cast<int>(result.size()));
81 result.clear();
82 return std::make_optional(std::move(resp));
83 }
84
EnsureSerializerExistsperfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner85 void EnsureSerializerExists() {
86 if (serializer) {
87 return;
88 }
89 auto it = tp->ExecuteQuery(query);
90 serializer.reset(new QueryResultSerializer(std::move(it)));
91 }
92
93 std::shared_ptr<TraceProcessor> tp;
94 std::string query;
95 std::string trace_path;
96 TraceProcessorWrapper::Statefulness statefulness;
97
98 // shared_ptr to allow copying when this type is coerced to std::function.
99 std::shared_ptr<QueryResultSerializer> serializer;
100 std::vector<uint8_t> result;
101 bool has_more = true;
102 };
103
104 } // namespace
105
TraceProcessorWrapper(std::string trace_path,base::ThreadPool * thread_pool,Statefulness statefulness)106 TraceProcessorWrapper::TraceProcessorWrapper(std::string trace_path,
107 base::ThreadPool* thread_pool,
108 Statefulness statefulness)
109 : trace_path_(std::move(trace_path)),
110 thread_pool_(thread_pool),
111 statefulness_(statefulness) {
112 trace_processor::Config config;
113 config.ingest_ftrace_in_raw_table = false;
114 trace_processor_ = TraceProcessor::CreateInstance(config);
115 }
116
LoadTrace(base::StatusOrStream<std::vector<uint8_t>> file_stream)117 base::StatusFuture TraceProcessorWrapper::LoadTrace(
118 base::StatusOrStream<std::vector<uint8_t>> file_stream) {
119 if (trace_processor_.use_count() != 1) {
120 return base::ErrStatus("Request is already in flight");
121 }
122 return std::move(file_stream)
123 .MapFuture(
124 [this](base::StatusOr<std::vector<uint8_t>> d) -> base::StatusFuture {
125 RETURN_IF_ERROR(d.status());
126 return base::RunOnceOnThreadPool<base::Status>(
127 thread_pool_, [res = std::move(*d), tp = trace_processor_] {
128 return tp->Parse(TraceBlobView(
129 TraceBlob::CopyFrom(res.data(), res.size())));
130 });
131 })
132 .Collect(base::AllOkCollector())
133 .ContinueWith([this](base::Status status) -> base::StatusFuture {
134 RETURN_IF_ERROR(status);
135 return base::RunOnceOnThreadPool<base::Status>(
136 thread_pool_, [tp = trace_processor_] {
137 tp->NotifyEndOfFile();
138 return base::OkStatus();
139 });
140 });
141 }
142
143 base::StatusOrStream<protos::TracePoolShardQueryResponse>
Query(const std::string & query)144 TraceProcessorWrapper::Query(const std::string& query) {
145 using StatusOrResponse = base::StatusOr<protos::TracePoolShardQueryResponse>;
146 if (trace_processor_.use_count() != 1) {
147 return base::StreamOf<StatusOrResponse>(
148 base::ErrStatus("Request is already in flight"));
149 }
150 return base::RunOnThreadPool<StatusOrResponse>(
151 thread_pool_,
152 QueryRunner(trace_processor_, query, trace_path_, statefulness_));
153 }
154
155 } // namespace cloud_trace_processor
156 } // namespace perfetto
157