1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "test/fake_producer.h"
18
19 #include <mutex>
20
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/time.h"
23 #include "perfetto/ext/base/utils.h"
24 #include "perfetto/ext/traced/traced.h"
25 #include "perfetto/ext/tracing/core/commit_data_request.h"
26 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
27 #include "perfetto/ext/tracing/core/trace_packet.h"
28 #include "perfetto/ext/tracing/core/trace_writer.h"
29 #include "perfetto/tracing/core/data_source_config.h"
30
31 #include "protos/perfetto/config/test_config.gen.h"
32 #include "protos/perfetto/trace/test_event.pbzero.h"
33 #include "protos/perfetto/trace/trace_packet.pbzero.h"
34
35 namespace perfetto {
36
37 namespace {
38 const MaybeUnboundBufferID kStartupTargetBufferReservationId = 1;
39 } // namespace
40
FakeProducer(const std::string & name,base::TaskRunner * task_runner)41 FakeProducer::FakeProducer(const std::string& name,
42 base::TaskRunner* task_runner)
43 : name_(name), task_runner_(task_runner) {}
44 FakeProducer::~FakeProducer() = default;
45
Connect(const char * socket_name,std::function<void ()> on_connect,std::function<void ()> on_setup_data_source_instance,std::function<void ()> on_create_data_source_instance,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)46 void FakeProducer::Connect(const char* socket_name,
47 std::function<void()> on_connect,
48 std::function<void()> on_setup_data_source_instance,
49 std::function<void()> on_create_data_source_instance,
50 std::unique_ptr<SharedMemory> shm,
51 std::unique_ptr<SharedMemoryArbiter> shm_arbiter) {
52 PERFETTO_DCHECK_THREAD(thread_checker_);
53 endpoint_ = ProducerIPCClient::Connect(
54 socket_name, this, "android.perfetto.FakeProducer", task_runner_,
55 TracingService::ProducerSMBScrapingMode::kDefault,
56 /*shared_memory_size_hint_bytes=*/0,
57 /*shared_memory_page_size_hint_bytes=*/base::kPageSize, std::move(shm),
58 std::move(shm_arbiter));
59 on_connect_ = std::move(on_connect);
60 on_setup_data_source_instance_ = std::move(on_setup_data_source_instance);
61 on_create_data_source_instance_ = std::move(on_create_data_source_instance);
62 }
63
OnConnect()64 void FakeProducer::OnConnect() {
65 PERFETTO_DCHECK_THREAD(thread_checker_);
66 DataSourceDescriptor descriptor;
67 descriptor.set_name(name_);
68 endpoint_->RegisterDataSource(descriptor);
69 auto on_connect_callback = std::move(on_connect_);
70 auto task_runner = task_runner_;
71 endpoint_->Sync([task_runner, on_connect_callback] {
72 task_runner->PostTask(on_connect_callback);
73 });
74 }
75
OnDisconnect()76 void FakeProducer::OnDisconnect() {
77 PERFETTO_DCHECK_THREAD(thread_checker_);
78 PERFETTO_FATAL("Producer unexpectedly disconnected from the service");
79 }
80
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)81 void FakeProducer::SetupDataSource(DataSourceInstanceID,
82 const DataSourceConfig&) {
83 task_runner_->PostTask(on_setup_data_source_instance_);
84 }
85
StartDataSource(DataSourceInstanceID,const DataSourceConfig & source_config)86 void FakeProducer::StartDataSource(DataSourceInstanceID,
87 const DataSourceConfig& source_config) {
88 PERFETTO_DCHECK_THREAD(thread_checker_);
89 if (trace_writer_) {
90 // Startup tracing was already active, just bind the target buffer.
91 endpoint_->MaybeSharedMemoryArbiter()->BindStartupTargetBuffer(
92 kStartupTargetBufferReservationId,
93 static_cast<BufferID>(source_config.target_buffer()));
94 } else {
95 // Common case: Start tracing now.
96 trace_writer_ = endpoint_->CreateTraceWriter(
97 static_cast<BufferID>(source_config.target_buffer()));
98 SetupFromConfig(source_config.for_testing());
99 }
100 if (source_config.for_testing().send_batch_on_register()) {
101 ProduceEventBatch(on_create_data_source_instance_);
102 } else {
103 task_runner_->PostTask(on_create_data_source_instance_);
104 }
105 }
106
StopDataSource(DataSourceInstanceID)107 void FakeProducer::StopDataSource(DataSourceInstanceID) {
108 PERFETTO_DCHECK_THREAD(thread_checker_);
109 trace_writer_.reset();
110 }
111
112 // Note: this can be called on a different thread.
ProduceStartupEventBatch(const protos::gen::TestConfig & config,SharedMemoryArbiter * arbiter,std::function<void ()> callback)113 void FakeProducer::ProduceStartupEventBatch(
114 const protos::gen::TestConfig& config,
115 SharedMemoryArbiter* arbiter,
116 std::function<void()> callback) {
117 task_runner_->PostTask([this, config, arbiter, callback] {
118 SetupFromConfig(config);
119
120 PERFETTO_CHECK(!trace_writer_);
121 trace_writer_ =
122 arbiter->CreateStartupTraceWriter(kStartupTargetBufferReservationId);
123
124 EmitEventBatchOnTaskRunner({});
125
126 // Issue callback right after writing - cannot wait for flush yet because
127 // we're not connected yet.
128 callback();
129 });
130 }
131
132 // Note: this can be called on a different thread.
ProduceEventBatch(std::function<void ()> callback)133 void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
134 task_runner_->PostTask(
135 [this, callback] { EmitEventBatchOnTaskRunner(callback); });
136 }
137
RegisterDataSource(const DataSourceDescriptor & desc)138 void FakeProducer::RegisterDataSource(const DataSourceDescriptor& desc) {
139 task_runner_->PostTask([this, desc] { endpoint_->RegisterDataSource(desc); });
140 }
141
CommitData(const CommitDataRequest & req,std::function<void ()> callback)142 void FakeProducer::CommitData(const CommitDataRequest& req,
143 std::function<void()> callback) {
144 task_runner_->PostTask(
145 [this, req, callback] { endpoint_->CommitData(req, callback); });
146 }
147
Sync(std::function<void ()> callback)148 void FakeProducer::Sync(std::function<void()> callback) {
149 task_runner_->PostTask([this, callback] { endpoint_->Sync(callback); });
150 }
151
OnTracingSetup()152 void FakeProducer::OnTracingSetup() {}
153
Flush(FlushRequestID flush_request_id,const DataSourceInstanceID *,size_t num_data_sources)154 void FakeProducer::Flush(FlushRequestID flush_request_id,
155 const DataSourceInstanceID*,
156 size_t num_data_sources) {
157 PERFETTO_DCHECK(num_data_sources > 0);
158 if (trace_writer_)
159 trace_writer_->Flush();
160 endpoint_->NotifyFlushComplete(flush_request_id);
161 }
162
SetupFromConfig(const protos::gen::TestConfig & config)163 void FakeProducer::SetupFromConfig(const protos::gen::TestConfig& config) {
164 rnd_engine_ = std::minstd_rand0(config.seed());
165 message_count_ = config.message_count();
166 message_size_ = config.message_size();
167 max_messages_per_second_ = config.max_messages_per_second();
168 }
169
EmitEventBatchOnTaskRunner(std::function<void ()> callback)170 void FakeProducer::EmitEventBatchOnTaskRunner(std::function<void()> callback) {
171 PERFETTO_CHECK(trace_writer_);
172 PERFETTO_CHECK(message_size_ > 1);
173 std::unique_ptr<char, base::FreeDeleter> payload(
174 static_cast<char*>(malloc(message_size_)));
175 memset(payload.get(), '.', message_size_);
176 payload.get()[message_size_ - 1] = 0;
177
178 base::TimeMillis start = base::GetWallTimeMs();
179 int64_t iterations = 0;
180 uint32_t messages_to_emit = message_count_;
181 while (messages_to_emit > 0) {
182 uint32_t messages_in_minibatch =
183 max_messages_per_second_ == 0
184 ? messages_to_emit
185 : std::min(max_messages_per_second_, messages_to_emit);
186 PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
187
188 for (uint32_t i = 0; i < messages_in_minibatch; i++) {
189 auto handle = trace_writer_->NewTracePacket();
190 handle->set_for_testing()->set_seq_value(
191 static_cast<uint32_t>(rnd_engine_()));
192 handle->set_for_testing()->set_str(payload.get(), message_size_);
193 }
194 messages_to_emit -= messages_in_minibatch;
195 iterations++;
196
197 // Pause until the second boundary to make sure that we are adhering to
198 // the speed limitation.
199 if (max_messages_per_second_ > 0) {
200 int64_t expected_time_taken = iterations * 1000;
201 base::TimeMillis time_taken = base::GetWallTimeMs() - start;
202 while (time_taken.count() < expected_time_taken) {
203 usleep(static_cast<useconds_t>(
204 (expected_time_taken - time_taken.count()) * 1000));
205 time_taken = base::GetWallTimeMs() - start;
206 }
207 }
208 trace_writer_->Flush(messages_to_emit > 0 ? [] {} : callback);
209 }
210 }
211
212 } // namespace perfetto
213