• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "test/fake_producer.h"
18 
19 #include <mutex>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/time.h"
23 #include "perfetto/ext/base/utils.h"
24 #include "perfetto/ext/traced/traced.h"
25 #include "perfetto/ext/tracing/core/commit_data_request.h"
26 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
27 #include "perfetto/ext/tracing/core/trace_packet.h"
28 #include "perfetto/ext/tracing/core/trace_writer.h"
29 #include "perfetto/tracing/core/data_source_config.h"
30 
31 #include "protos/perfetto/config/test_config.gen.h"
32 #include "protos/perfetto/trace/test_event.pbzero.h"
33 #include "protos/perfetto/trace/trace_packet.pbzero.h"
34 
35 namespace perfetto {
36 
37 namespace {
38 const MaybeUnboundBufferID kStartupTargetBufferReservationId = 1;
39 }  // namespace
40 
FakeProducer(const std::string & name,base::TaskRunner * task_runner)41 FakeProducer::FakeProducer(const std::string& name,
42                            base::TaskRunner* task_runner)
43     : name_(name), task_runner_(task_runner) {}
44 FakeProducer::~FakeProducer() = default;
45 
Connect(const char * socket_name,std::function<void ()> on_connect,std::function<void ()> on_setup_data_source_instance,std::function<void ()> on_create_data_source_instance,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)46 void FakeProducer::Connect(const char* socket_name,
47                            std::function<void()> on_connect,
48                            std::function<void()> on_setup_data_source_instance,
49                            std::function<void()> on_create_data_source_instance,
50                            std::unique_ptr<SharedMemory> shm,
51                            std::unique_ptr<SharedMemoryArbiter> shm_arbiter) {
52   PERFETTO_DCHECK_THREAD(thread_checker_);
53   endpoint_ = ProducerIPCClient::Connect(
54       socket_name, this, "android.perfetto.FakeProducer", task_runner_,
55       TracingService::ProducerSMBScrapingMode::kDefault,
56       /*shared_memory_size_hint_bytes=*/0,
57       /*shared_memory_page_size_hint_bytes=*/base::kPageSize, std::move(shm),
58       std::move(shm_arbiter));
59   on_connect_ = std::move(on_connect);
60   on_setup_data_source_instance_ = std::move(on_setup_data_source_instance);
61   on_create_data_source_instance_ = std::move(on_create_data_source_instance);
62 }
63 
OnConnect()64 void FakeProducer::OnConnect() {
65   PERFETTO_DCHECK_THREAD(thread_checker_);
66   DataSourceDescriptor descriptor;
67   descriptor.set_name(name_);
68   endpoint_->RegisterDataSource(descriptor);
69   auto on_connect_callback = std::move(on_connect_);
70   auto task_runner = task_runner_;
71   endpoint_->Sync([task_runner, on_connect_callback] {
72     task_runner->PostTask(on_connect_callback);
73   });
74 }
75 
OnDisconnect()76 void FakeProducer::OnDisconnect() {
77   PERFETTO_DCHECK_THREAD(thread_checker_);
78   PERFETTO_FATAL("Producer unexpectedly disconnected from the service");
79 }
80 
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)81 void FakeProducer::SetupDataSource(DataSourceInstanceID,
82                                    const DataSourceConfig&) {
83   task_runner_->PostTask(on_setup_data_source_instance_);
84 }
85 
StartDataSource(DataSourceInstanceID,const DataSourceConfig & source_config)86 void FakeProducer::StartDataSource(DataSourceInstanceID,
87                                    const DataSourceConfig& source_config) {
88   PERFETTO_DCHECK_THREAD(thread_checker_);
89   if (trace_writer_) {
90     // Startup tracing was already active, just bind the target buffer.
91     endpoint_->MaybeSharedMemoryArbiter()->BindStartupTargetBuffer(
92         kStartupTargetBufferReservationId,
93         static_cast<BufferID>(source_config.target_buffer()));
94   } else {
95     // Common case: Start tracing now.
96     trace_writer_ = endpoint_->CreateTraceWriter(
97         static_cast<BufferID>(source_config.target_buffer()));
98     SetupFromConfig(source_config.for_testing());
99   }
100   if (source_config.for_testing().send_batch_on_register()) {
101     ProduceEventBatch(on_create_data_source_instance_);
102   } else {
103     task_runner_->PostTask(on_create_data_source_instance_);
104   }
105 }
106 
StopDataSource(DataSourceInstanceID)107 void FakeProducer::StopDataSource(DataSourceInstanceID) {
108   PERFETTO_DCHECK_THREAD(thread_checker_);
109   trace_writer_.reset();
110 }
111 
112 // Note: this can be called on a different thread.
ProduceStartupEventBatch(const protos::gen::TestConfig & config,SharedMemoryArbiter * arbiter,std::function<void ()> callback)113 void FakeProducer::ProduceStartupEventBatch(
114     const protos::gen::TestConfig& config,
115     SharedMemoryArbiter* arbiter,
116     std::function<void()> callback) {
117   task_runner_->PostTask([this, config, arbiter, callback] {
118     SetupFromConfig(config);
119 
120     PERFETTO_CHECK(!trace_writer_);
121     trace_writer_ =
122         arbiter->CreateStartupTraceWriter(kStartupTargetBufferReservationId);
123 
124     EmitEventBatchOnTaskRunner({});
125 
126     // Issue callback right after writing - cannot wait for flush yet because
127     // we're not connected yet.
128     callback();
129   });
130 }
131 
132 // Note: this can be called on a different thread.
ProduceEventBatch(std::function<void ()> callback)133 void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
134   task_runner_->PostTask(
135       [this, callback] { EmitEventBatchOnTaskRunner(callback); });
136 }
137 
RegisterDataSource(const DataSourceDescriptor & desc)138 void FakeProducer::RegisterDataSource(const DataSourceDescriptor& desc) {
139   task_runner_->PostTask([this, desc] { endpoint_->RegisterDataSource(desc); });
140 }
141 
CommitData(const CommitDataRequest & req,std::function<void ()> callback)142 void FakeProducer::CommitData(const CommitDataRequest& req,
143                               std::function<void()> callback) {
144   task_runner_->PostTask(
145       [this, req, callback] { endpoint_->CommitData(req, callback); });
146 }
147 
Sync(std::function<void ()> callback)148 void FakeProducer::Sync(std::function<void()> callback) {
149   task_runner_->PostTask([this, callback] { endpoint_->Sync(callback); });
150 }
151 
OnTracingSetup()152 void FakeProducer::OnTracingSetup() {}
153 
Flush(FlushRequestID flush_request_id,const DataSourceInstanceID *,size_t num_data_sources)154 void FakeProducer::Flush(FlushRequestID flush_request_id,
155                          const DataSourceInstanceID*,
156                          size_t num_data_sources) {
157   PERFETTO_DCHECK(num_data_sources > 0);
158   if (trace_writer_)
159     trace_writer_->Flush();
160   endpoint_->NotifyFlushComplete(flush_request_id);
161 }
162 
SetupFromConfig(const protos::gen::TestConfig & config)163 void FakeProducer::SetupFromConfig(const protos::gen::TestConfig& config) {
164   rnd_engine_ = std::minstd_rand0(config.seed());
165   message_count_ = config.message_count();
166   message_size_ = config.message_size();
167   max_messages_per_second_ = config.max_messages_per_second();
168 }
169 
EmitEventBatchOnTaskRunner(std::function<void ()> callback)170 void FakeProducer::EmitEventBatchOnTaskRunner(std::function<void()> callback) {
171   PERFETTO_CHECK(trace_writer_);
172   PERFETTO_CHECK(message_size_ > 1);
173   std::unique_ptr<char, base::FreeDeleter> payload(
174       static_cast<char*>(malloc(message_size_)));
175   memset(payload.get(), '.', message_size_);
176   payload.get()[message_size_ - 1] = 0;
177 
178   base::TimeMillis start = base::GetWallTimeMs();
179   int64_t iterations = 0;
180   uint32_t messages_to_emit = message_count_;
181   while (messages_to_emit > 0) {
182     uint32_t messages_in_minibatch =
183         max_messages_per_second_ == 0
184             ? messages_to_emit
185             : std::min(max_messages_per_second_, messages_to_emit);
186     PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
187 
188     for (uint32_t i = 0; i < messages_in_minibatch; i++) {
189       auto handle = trace_writer_->NewTracePacket();
190       handle->set_for_testing()->set_seq_value(
191           static_cast<uint32_t>(rnd_engine_()));
192       handle->set_for_testing()->set_str(payload.get(), message_size_);
193     }
194     messages_to_emit -= messages_in_minibatch;
195     iterations++;
196 
197     // Pause until the second boundary to make sure that we are adhering to
198     // the speed limitation.
199     if (max_messages_per_second_ > 0) {
200       int64_t expected_time_taken = iterations * 1000;
201       base::TimeMillis time_taken = base::GetWallTimeMs() - start;
202       while (time_taken.count() < expected_time_taken) {
203         usleep(static_cast<useconds_t>(
204             (expected_time_taken - time_taken.count()) * 1000));
205         time_taken = base::GetWallTimeMs() - start;
206       }
207     }
208     trace_writer_->Flush(messages_to_emit > 0 ? [] {} : callback);
209   }
210 }
211 
212 }  // namespace perfetto
213