1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <cinttypes>
18
19 #include "perfetto/ext/base/file_utils.h"
20 #include "perfetto/ext/base/string_utils.h"
21 #include "perfetto/ext/base/temp_file.h"
22 #include "perfetto/ext/tracing/core/consumer.h"
23 #include "perfetto/ext/tracing/core/producer.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_stats.h"
26 #include "perfetto/ext/tracing/core/trace_writer.h"
27 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
28 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
29 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
30 #include "perfetto/tracing/core/data_source_config.h"
31 #include "perfetto/tracing/core/data_source_descriptor.h"
32 #include "perfetto/tracing/core/trace_config.h"
33 #include "src/base/test/test_task_runner.h"
34 #include "src/ipc/test/test_socket.h"
35 #include "src/tracing/core/tracing_service_impl.h"
36 #include "test/gtest_and_gmock.h"
37
38 #include "protos/perfetto/config/trace_config.gen.h"
39 #include "protos/perfetto/trace/clock_snapshot.gen.h"
40 #include "protos/perfetto/trace/test_event.gen.h"
41 #include "protos/perfetto/trace/test_event.pbzero.h"
42 #include "protos/perfetto/trace/trace.gen.h"
43 #include "protos/perfetto/trace/trace_packet.gen.h"
44 #include "protos/perfetto/trace/trace_packet.pbzero.h"
45
46 namespace perfetto {
47 namespace {
48
49 using testing::_;
50 using testing::Invoke;
51 using testing::InvokeWithoutArgs;
52
53 ipc::TestSocket kProducerSock{"tracing_test-producer"};
54 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
55
56 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
57 class MockProducer : public Producer {
58 public:
~MockProducer()59 ~MockProducer() override {}
60
61 // Producer implementation.
62 MOCK_METHOD(void, OnConnect, (), (override));
63 MOCK_METHOD(void, OnDisconnect, (), (override));
64 MOCK_METHOD(void,
65 SetupDataSource,
66 (DataSourceInstanceID, const DataSourceConfig&),
67 (override));
68 MOCK_METHOD(void,
69 StartDataSource,
70 (DataSourceInstanceID, const DataSourceConfig&),
71 (override));
72 MOCK_METHOD(void, StopDataSource, (DataSourceInstanceID), (override));
73 MOCK_METHOD(void, OnTracingSetup, (), (override));
74 MOCK_METHOD(void,
75 Flush,
76 (FlushRequestID, const DataSourceInstanceID*, size_t),
77 (override));
78 MOCK_METHOD(void,
79 ClearIncrementalState,
80 (const DataSourceInstanceID*, size_t),
81 (override));
82 };
83
84 class MockConsumer : public Consumer {
85 public:
~MockConsumer()86 ~MockConsumer() override {}
87
88 // Producer implementation.
89 MOCK_METHOD(void, OnConnect, (), (override));
90 MOCK_METHOD(void, OnDisconnect, (), (override));
91 MOCK_METHOD(void,
92 OnTracingDisabled,
93 (const std::string& /*error*/),
94 (override));
95 MOCK_METHOD(void, OnTracePackets, (std::vector<TracePacket>*, bool));
96 MOCK_METHOD(void, OnDetach, (bool), (override));
97 MOCK_METHOD(void, OnAttach, (bool, const TraceConfig&), (override));
98 MOCK_METHOD(void, OnTraceStats, (bool, const TraceStats&), (override));
99 MOCK_METHOD(void, OnObservableEvents, (const ObservableEvents&), (override));
100 MOCK_METHOD(void, OnSessionCloned, (const OnSessionClonedArgs&), (override));
101
102 // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)103 void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
104 OnTracePackets(&packets, has_more);
105 }
106 };
107
CheckTraceStats(const protos::gen::TracePacket & packet)108 void CheckTraceStats(const protos::gen::TracePacket& packet) {
109 EXPECT_TRUE(packet.has_trace_stats());
110 EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
111 EXPECT_EQ(1u, packet.trace_stats().producers_connected());
112 EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
113 EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
114 EXPECT_EQ(1u, packet.trace_stats().total_buffers());
115 EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
116
117 const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
118 EXPECT_GT(buf_stats.bytes_written(), 0u);
119 EXPECT_GT(buf_stats.chunks_written(), 0u);
120 EXPECT_EQ(0u, buf_stats.chunks_overwritten());
121 EXPECT_EQ(0u, buf_stats.chunks_rewritten());
122 EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
123 EXPECT_EQ(0u, buf_stats.write_wrap_count());
124 EXPECT_EQ(0u, buf_stats.patches_failed());
125 EXPECT_EQ(0u, buf_stats.readaheads_failed());
126 EXPECT_EQ(0u, buf_stats.abi_violations());
127 }
128
129 static_assert(TracingServiceImpl::kMaxTracePacketSliceSize <=
130 ipc::kIPCBufferSize - 512,
131 "Tracing service max packet slice should be smaller than IPC "
132 "buffer size (with some headroom)");
133
134 } // namespace
135
136 class TracingIntegrationTest : public ::testing::Test {
137 public:
SetUp()138 void SetUp() override {
139 kProducerSock.Destroy();
140 kConsumerSock.Destroy();
141 task_runner_.reset(new base::TestTaskRunner());
142
143 // Create the service host.
144 svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
145 svc_->Start(kProducerSock.name(), kConsumerSock.name());
146
147 // Create and connect a Producer.
148 producer_endpoint_ = ProducerIPCClient::Connect(
149 kProducerSock.name(), &producer_, "perfetto.mock_producer",
150 task_runner_.get(), GetProducerSMBScrapingMode());
151 auto on_producer_connect =
152 task_runner_->CreateCheckpoint("on_producer_connect");
153 EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
154 task_runner_->RunUntilCheckpoint("on_producer_connect");
155
156 // Register a data source.
157 DataSourceDescriptor ds_desc;
158 ds_desc.set_name("perfetto.test");
159 producer_endpoint_->RegisterDataSource(ds_desc);
160
161 // Create and connect a Consumer.
162 consumer_endpoint_ = ConsumerIPCClient::Connect(
163 kConsumerSock.name(), &consumer_, task_runner_.get());
164 auto on_consumer_connect =
165 task_runner_->CreateCheckpoint("on_consumer_connect");
166 EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
167 task_runner_->RunUntilCheckpoint("on_consumer_connect");
168
169 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
170 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
171 }
172
TearDown()173 void TearDown() override {
174 // Destroy the service and check that both Producer and Consumer see an
175 // OnDisconnect() call.
176
177 auto on_producer_disconnect =
178 task_runner_->CreateCheckpoint("on_producer_disconnect");
179 EXPECT_CALL(producer_, OnDisconnect())
180 .WillOnce(Invoke(on_producer_disconnect));
181
182 auto on_consumer_disconnect =
183 task_runner_->CreateCheckpoint("on_consumer_disconnect");
184 EXPECT_CALL(consumer_, OnDisconnect())
185 .WillOnce(Invoke(on_consumer_disconnect));
186
187 svc_.reset();
188 task_runner_->RunUntilCheckpoint("on_producer_disconnect");
189 task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
190
191 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
192 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
193
194 task_runner_.reset();
195 kProducerSock.Destroy();
196 kConsumerSock.Destroy();
197 }
198
GetProducerSMBScrapingMode()199 virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
200 return TracingService::ProducerSMBScrapingMode::kDefault;
201 }
202
WaitForTraceWritersChanged(ProducerID producer_id)203 void WaitForTraceWritersChanged(ProducerID producer_id) {
204 static int i = 0;
205 auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
206 "_" + std::to_string(i++);
207 auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
208 auto writers = GetWriters(producer_id);
209 std::function<void()> task;
210 task = [&task, writers, writers_changed, producer_id, this]() {
211 if (writers != GetWriters(producer_id)) {
212 writers_changed();
213 return;
214 }
215 task_runner_->PostDelayedTask(task, 1);
216 };
217 task_runner_->PostDelayedTask(task, 1);
218 task_runner_->RunUntilCheckpoint(checkpoint_name);
219 }
220
GetWriters(ProducerID producer_id)221 const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
222 return reinterpret_cast<TracingServiceImpl*>(svc_->service())
223 ->GetProducer(producer_id)
224 ->writers_;
225 }
226
last_producer_id()227 ProducerID* last_producer_id() {
228 return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
229 ->last_producer_id_;
230 }
231
232 std::unique_ptr<base::TestTaskRunner> task_runner_;
233 std::unique_ptr<ServiceIPCHost> svc_;
234 std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
235 MockProducer producer_;
236 std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
237 MockConsumer consumer_;
238 };
239
TEST_F(TracingIntegrationTest,WithIPCTransport)240 TEST_F(TracingIntegrationTest, WithIPCTransport) {
241 // Start tracing.
242 TraceConfig trace_config;
243 trace_config.add_buffers()->set_size_kb(4096 * 10);
244 auto* ds_config = trace_config.add_data_sources()->mutable_config();
245 ds_config->set_name("perfetto.test");
246 ds_config->set_target_buffer(0);
247 consumer_endpoint_->EnableTracing(trace_config);
248
249 // At this point, the Producer should be asked to turn its data source on.
250 DataSourceInstanceID ds_iid = 0;
251
252 BufferID global_buf_id = 0;
253 auto on_create_ds_instance =
254 task_runner_->CreateCheckpoint("on_create_ds_instance");
255 EXPECT_CALL(producer_, OnTracingSetup());
256
257 // Store the arguments passed to SetupDataSource() and later check that they
258 // match the ones passed to StartDataSource().
259 DataSourceInstanceID setup_id;
260 DataSourceConfig setup_cfg_proto;
261 EXPECT_CALL(producer_, SetupDataSource(_, _))
262 .WillOnce(
263 Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
264 const DataSourceConfig& cfg) {
265 setup_id = id;
266 setup_cfg_proto = cfg;
267 }));
268 EXPECT_CALL(producer_, StartDataSource(_, _))
269 .WillOnce(
270 Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
271 &setup_cfg_proto](DataSourceInstanceID id,
272 const DataSourceConfig& cfg) {
273 // id and config should match the ones passed to SetupDataSource.
274 ASSERT_EQ(id, setup_id);
275 ASSERT_EQ(setup_cfg_proto, cfg);
276 ASSERT_NE(0u, id);
277 ds_iid = id;
278 ASSERT_EQ("perfetto.test", cfg.name());
279 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
280 ASSERT_NE(0u, global_buf_id);
281 ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
282 on_create_ds_instance();
283 }));
284 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
285
286 // Now let the data source fill some pages within the same task.
287 // Doing so should accumulate a bunch of chunks that will be notified by the
288 // a future task in one batch.
289 std::unique_ptr<TraceWriter> writer =
290 producer_endpoint_->CreateTraceWriter(global_buf_id);
291 ASSERT_TRUE(writer);
292
293 const size_t kNumPackets = 10;
294 for (size_t i = 0; i < kNumPackets; i++) {
295 char buf[16];
296 base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
297 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
298 }
299
300 // Allow the service to see the CommitData() before reading back.
301 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
302 writer->Flush(on_data_committed);
303 task_runner_->RunUntilCheckpoint("on_data_committed");
304
305 // Read the log buffer.
306 consumer_endpoint_->ReadBuffers();
307 size_t num_pack_rx = 0;
308 bool saw_clock_snapshot = false;
309 bool saw_trace_config = false;
310 bool saw_trace_stats = false;
311 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
312 EXPECT_CALL(consumer_, OnTracePackets(_, _))
313 .WillRepeatedly(
314 Invoke([&num_pack_rx, all_packets_rx, &trace_config,
315 &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
316 std::vector<TracePacket>* packets, bool has_more) {
317 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
318 const int kExpectedMinNumberOfClocks = 1;
319 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
320 const int kExpectedMinNumberOfClocks = 2;
321 #else
322 const int kExpectedMinNumberOfClocks = 6;
323 #endif
324
325 for (auto& encoded_packet : *packets) {
326 protos::gen::TracePacket packet;
327 ASSERT_TRUE(packet.ParseFromString(
328 encoded_packet.GetRawBytesForTesting()));
329 if (packet.has_for_testing()) {
330 char buf[8];
331 base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", num_pack_rx++);
332 EXPECT_EQ(std::string(buf), packet.for_testing().str());
333 } else if (packet.has_clock_snapshot()) {
334 EXPECT_GE(packet.clock_snapshot().clocks_size(),
335 kExpectedMinNumberOfClocks);
336 saw_clock_snapshot = true;
337 } else if (packet.has_trace_config()) {
338 EXPECT_EQ(packet.trace_config(), trace_config);
339 saw_trace_config = true;
340 } else if (packet.has_trace_stats()) {
341 saw_trace_stats = true;
342 CheckTraceStats(packet);
343 }
344 }
345 if (!has_more)
346 all_packets_rx();
347 }));
348 task_runner_->RunUntilCheckpoint("all_packets_rx");
349 ASSERT_EQ(kNumPackets, num_pack_rx);
350 EXPECT_TRUE(saw_clock_snapshot);
351 EXPECT_TRUE(saw_trace_config);
352 EXPECT_TRUE(saw_trace_stats);
353
354 // Disable tracing.
355 consumer_endpoint_->DisableTracing();
356
357 auto on_tracing_disabled =
358 task_runner_->CreateCheckpoint("on_tracing_disabled");
359 EXPECT_CALL(producer_, StopDataSource(_));
360 EXPECT_CALL(consumer_, OnTracingDisabled(_))
361 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
362 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
363 }
364
365 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)366 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
367 // Start tracing.
368 TraceConfig trace_config;
369 trace_config.add_buffers()->set_size_kb(4096 * 10);
370 auto* ds_config = trace_config.add_data_sources()->mutable_config();
371 ds_config->set_name("perfetto.test");
372 consumer_endpoint_->EnableTracing(trace_config);
373
374 auto on_create_ds_instance =
375 task_runner_->CreateCheckpoint("on_create_ds_instance");
376 EXPECT_CALL(producer_, OnTracingSetup());
377
378 // Store the arguments passed to SetupDataSource() and later check that they
379 // match the ones passed to StartDataSource().
380 EXPECT_CALL(producer_, SetupDataSource(_, _));
381 EXPECT_CALL(producer_, StartDataSource(_, _))
382 .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
383 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
384
385 EXPECT_CALL(consumer_, OnTracingDisabled(_))
386 .WillOnce(Invoke([](const std::string& err) {
387 EXPECT_THAT(err,
388 testing::HasSubstr("EnableTracing IPC request rejected"));
389 }));
390
391 // TearDown() will destroy the service via svc_.reset(). That will drop the
392 // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
393 }
394
395 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)396 TEST_F(TracingIntegrationTest, WriteIntoFile) {
397 // Start tracing.
398 TraceConfig trace_config;
399 trace_config.add_buffers()->set_size_kb(4096 * 10);
400 auto* ds_config = trace_config.add_data_sources()->mutable_config();
401 ds_config->set_name("perfetto.test");
402 ds_config->set_target_buffer(0);
403 trace_config.set_write_into_file(true);
404 base::TempFile tmp_file = base::TempFile::CreateUnlinked();
405 consumer_endpoint_->EnableTracing(trace_config,
406 base::ScopedFile(dup(tmp_file.fd())));
407
408 // At this point, the producer_ should be asked to turn its data source on.
409 BufferID global_buf_id = 0;
410 auto on_create_ds_instance =
411 task_runner_->CreateCheckpoint("on_create_ds_instance");
412 EXPECT_CALL(producer_, OnTracingSetup());
413 EXPECT_CALL(producer_, SetupDataSource(_, _));
414 EXPECT_CALL(producer_, StartDataSource(_, _))
415 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
416 DataSourceInstanceID, const DataSourceConfig& cfg) {
417 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
418 on_create_ds_instance();
419 }));
420 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
421
422 std::unique_ptr<TraceWriter> writer =
423 producer_endpoint_->CreateTraceWriter(global_buf_id);
424 ASSERT_TRUE(writer);
425
426 const size_t kNumPackets = 10;
427 for (size_t i = 0; i < kNumPackets; i++) {
428 char buf[16];
429 base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
430 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
431 }
432 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
433 writer->Flush(on_data_committed);
434 task_runner_->RunUntilCheckpoint("on_data_committed");
435
436 // Will disable tracing and will force the buffers to be written into the
437 // file before destroying them.
438 consumer_endpoint_->FreeBuffers();
439
440 auto on_tracing_disabled =
441 task_runner_->CreateCheckpoint("on_tracing_disabled");
442 EXPECT_CALL(producer_, StopDataSource(_));
443 EXPECT_CALL(consumer_, OnTracingDisabled(_))
444 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
445 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
446
447 // Check that |tmp_file| contains a valid trace.proto message.
448 ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
449 std::string trace_contents;
450 ASSERT_TRUE(base::ReadFileDescriptor(tmp_file.fd(), &trace_contents));
451 protos::gen::Trace tmp_trace;
452 ASSERT_TRUE(tmp_trace.ParseFromString(trace_contents));
453 size_t num_test_packet = 0;
454 size_t num_clock_snapshot_packet = 0;
455 size_t num_system_info_packet = 0;
456 bool saw_trace_stats = false;
457 for (int i = 0; i < tmp_trace.packet_size(); i++) {
458 const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
459 if (packet.has_for_testing()) {
460 ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
461 packet.for_testing().str());
462 } else if (packet.has_trace_stats()) {
463 saw_trace_stats = true;
464 CheckTraceStats(packet);
465 } else if (packet.has_clock_snapshot()) {
466 num_clock_snapshot_packet++;
467 } else if (packet.has_system_info()) {
468 num_system_info_packet++;
469 }
470 }
471 ASSERT_TRUE(saw_trace_stats);
472 ASSERT_GT(num_clock_snapshot_packet, 0u);
473 ASSERT_GT(num_system_info_packet, 0u);
474 }
475 #endif
476
477 class TracingIntegrationTestWithSMBScrapingProducer
478 : public TracingIntegrationTest {
479 public:
GetProducerSMBScrapingMode()480 TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
481 override {
482 return TracingService::ProducerSMBScrapingMode::kEnabled;
483 }
484 };
485
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)486 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
487 // Start tracing.
488 TraceConfig trace_config;
489 trace_config.add_buffers()->set_size_kb(4096 * 10);
490 auto* ds_config = trace_config.add_data_sources()->mutable_config();
491 ds_config->set_name("perfetto.test");
492 ds_config->set_target_buffer(0);
493 consumer_endpoint_->EnableTracing(trace_config);
494
495 // At this point, the Producer should be asked to turn its data source on.
496
497 BufferID global_buf_id = 0;
498 auto on_create_ds_instance =
499 task_runner_->CreateCheckpoint("on_create_ds_instance");
500 EXPECT_CALL(producer_, OnTracingSetup());
501
502 EXPECT_CALL(producer_, SetupDataSource(_, _));
503 EXPECT_CALL(producer_, StartDataSource(_, _))
504 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
505 DataSourceInstanceID, const DataSourceConfig& cfg) {
506 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
507 on_create_ds_instance();
508 }));
509 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
510
511 // Create writer, which will post a task to register the writer with the
512 // service.
513 std::unique_ptr<TraceWriter> writer =
514 producer_endpoint_->CreateTraceWriter(global_buf_id);
515 ASSERT_TRUE(writer);
516
517 // Wait for the writer to be registered.
518 WaitForTraceWritersChanged(*last_producer_id());
519
520 // Write a few trace packets.
521 writer->NewTracePacket()->set_for_testing()->set_str("payload1");
522 writer->NewTracePacket()->set_for_testing()->set_str("payload2");
523 writer->NewTracePacket()->set_for_testing()->set_str("payload3");
524
525 // Ask the service to flush, but don't flush our trace writer. This should
526 // cause our uncommitted SMB chunk to be scraped.
527 auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
528 consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
529 EXPECT_TRUE(success);
530 on_flush_complete();
531 });
532 EXPECT_CALL(producer_, Flush(_, _, _))
533 .WillOnce(Invoke([this](FlushRequestID flush_req_id,
534 const DataSourceInstanceID*, size_t) {
535 producer_endpoint_->NotifyFlushComplete(flush_req_id);
536 }));
537 task_runner_->RunUntilCheckpoint("on_flush_complete");
538
539 // Read the log buffer. We should only see the first two written trace
540 // packets, because the service can't be sure the last one was written
541 // completely by the trace writer.
542 consumer_endpoint_->ReadBuffers();
543
544 size_t num_test_pack_rx = 0;
545 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
546 EXPECT_CALL(consumer_, OnTracePackets(_, _))
547 .WillRepeatedly(
548 Invoke([&num_test_pack_rx, all_packets_rx](
549 std::vector<TracePacket>* packets, bool has_more) {
550 for (auto& encoded_packet : *packets) {
551 protos::gen::TracePacket packet;
552 ASSERT_TRUE(packet.ParseFromString(
553 encoded_packet.GetRawBytesForTesting()));
554 if (packet.has_for_testing()) {
555 num_test_pack_rx++;
556 }
557 }
558 if (!has_more)
559 all_packets_rx();
560 }));
561 task_runner_->RunUntilCheckpoint("all_packets_rx");
562 ASSERT_EQ(2u, num_test_pack_rx);
563
564 // Disable tracing.
565 consumer_endpoint_->DisableTracing();
566
567 auto on_tracing_disabled =
568 task_runner_->CreateCheckpoint("on_tracing_disabled");
569 auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
570 EXPECT_CALL(producer_, StopDataSource(_))
571 .WillOnce(InvokeWithoutArgs(on_stop_ds));
572 EXPECT_CALL(consumer_, OnTracingDisabled(_))
573 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
574 task_runner_->RunUntilCheckpoint("on_stop_ds");
575 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
576 }
577
578 // TODO(primiano): add tests to cover:
579 // - unknown fields preserved end-to-end.
580 // - >1 data source.
581 // - >1 data consumer sharing the same data source, with different TraceBuffers.
582 // - >1 consumer with > 1 buffer each.
583 // - Consumer disconnecting in the middle of a ReadBuffers() call.
584 // - Multiple calls to DisableTracing.
585 // - Out of order Enable/Disable/FreeBuffers calls.
586 // - DisableTracing does actually freeze the buffers.
587
588 } // namespace perfetto
589