• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/cloud_trace_processor/trace_processor_wrapper.h"
18 
19 #include <atomic>
20 #include <memory>
21 #include <mutex>
22 #include <optional>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "perfetto/base/status.h"
28 #include "perfetto/ext/base/file_utils.h"
29 #include "perfetto/ext/base/status_or.h"
30 #include "perfetto/ext/base/threading/future.h"
31 #include "perfetto/ext/base/threading/poll.h"
32 #include "perfetto/ext/base/threading/stream.h"
33 #include "perfetto/ext/base/threading/thread_pool.h"
34 #include "perfetto/ext/base/threading/util.h"
35 #include "perfetto/protozero/proto_utils.h"
36 #include "perfetto/protozero/scattered_heap_buffer.h"
37 #include "perfetto/trace_processor/trace_blob.h"
38 #include "perfetto/trace_processor/trace_blob_view.h"
39 #include "perfetto/trace_processor/trace_processor.h"
40 #include "protos/perfetto/cloud_trace_processor/worker.pb.h"
41 #include "src/protozero/proto_ring_buffer.h"
42 #include "src/trace_processor/rpc/query_result_serializer.h"
43 #include "src/trace_processor/util/status_macros.h"
44 
45 namespace perfetto {
46 namespace cloud_trace_processor {
47 namespace {
48 
49 using trace_processor::QueryResultSerializer;
50 using trace_processor::TraceBlob;
51 using trace_processor::TraceBlobView;
52 using trace_processor::TraceProcessor;
53 using Statefulness = TraceProcessorWrapper::Statefulness;
54 
55 struct QueryRunner {
QueryRunnerperfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner56   QueryRunner(std::shared_ptr<TraceProcessor> _tp,
57               std::string _query,
58               std::string _trace_path,
59               Statefulness _statefulness)
60       : tp(std::move(_tp)),
61         query(std::move(_query)),
62         trace_path(std::move(_trace_path)),
63         statefulness(_statefulness) {}
64 
operator ()perfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner65   std::optional<protos::TracePoolShardQueryResponse> operator()() {
66     if (!has_more) {
67       if (statefulness == Statefulness::kStateless) {
68         tp->RestoreInitialTables();
69       }
70       return std::nullopt;
71     }
72     // If the serializer does not exist yet, that means we have not yet run
73     // the query so make sure to do that first.
74     EnsureSerializerExists();
75     has_more = serializer->Serialize(&result);
76 
77     protos::TracePoolShardQueryResponse resp;
78     *resp.mutable_trace() = trace_path;
79     resp.mutable_result()->ParseFromArray(result.data(),
80                                           static_cast<int>(result.size()));
81     result.clear();
82     return std::make_optional(std::move(resp));
83   }
84 
EnsureSerializerExistsperfetto::cloud_trace_processor::__anonfeed12ba0111::QueryRunner85   void EnsureSerializerExists() {
86     if (serializer) {
87       return;
88     }
89     auto it = tp->ExecuteQuery(query);
90     serializer.reset(new QueryResultSerializer(std::move(it)));
91   }
92 
93   std::shared_ptr<TraceProcessor> tp;
94   std::string query;
95   std::string trace_path;
96   TraceProcessorWrapper::Statefulness statefulness;
97 
98   // shared_ptr to allow copying when this type is coerced to std::function.
99   std::shared_ptr<QueryResultSerializer> serializer;
100   std::vector<uint8_t> result;
101   bool has_more = true;
102 };
103 
104 }  // namespace
105 
TraceProcessorWrapper(std::string trace_path,base::ThreadPool * thread_pool,Statefulness statefulness)106 TraceProcessorWrapper::TraceProcessorWrapper(std::string trace_path,
107                                              base::ThreadPool* thread_pool,
108                                              Statefulness statefulness)
109     : trace_path_(std::move(trace_path)),
110       thread_pool_(thread_pool),
111       statefulness_(statefulness) {
112   trace_processor::Config config;
113   config.ingest_ftrace_in_raw_table = false;
114   trace_processor_ = TraceProcessor::CreateInstance(config);
115 }
116 
LoadTrace(base::StatusOrStream<std::vector<uint8_t>> file_stream)117 base::StatusFuture TraceProcessorWrapper::LoadTrace(
118     base::StatusOrStream<std::vector<uint8_t>> file_stream) {
119   if (trace_processor_.use_count() != 1) {
120     return base::ErrStatus("Request is already in flight");
121   }
122   return std::move(file_stream)
123       .MapFuture(
124           [this](base::StatusOr<std::vector<uint8_t>> d) -> base::StatusFuture {
125             RETURN_IF_ERROR(d.status());
126             return base::RunOnceOnThreadPool<base::Status>(
127                 thread_pool_, [res = std::move(*d), tp = trace_processor_] {
128                   return tp->Parse(TraceBlobView(
129                       TraceBlob::CopyFrom(res.data(), res.size())));
130                 });
131           })
132       .Collect(base::AllOkCollector())
133       .ContinueWith([this](base::Status status) -> base::StatusFuture {
134         RETURN_IF_ERROR(status);
135         return base::RunOnceOnThreadPool<base::Status>(
136             thread_pool_, [tp = trace_processor_] {
137               tp->NotifyEndOfFile();
138               return base::OkStatus();
139             });
140       });
141 }
142 
143 base::StatusOrStream<protos::TracePoolShardQueryResponse>
Query(const std::string & query)144 TraceProcessorWrapper::Query(const std::string& query) {
145   using StatusOrResponse = base::StatusOr<protos::TracePoolShardQueryResponse>;
146   if (trace_processor_.use_count() != 1) {
147     return base::StreamOf<StatusOrResponse>(
148         base::ErrStatus("Request is already in flight"));
149   }
150   return base::RunOnThreadPool<StatusOrResponse>(
151       thread_pool_,
152       QueryRunner(trace_processor_, query, trace_path_, statefulness_));
153 }
154 
155 }  // namespace cloud_trace_processor
156 }  // namespace perfetto
157