1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <cinttypes>
18
19 #include "perfetto/ext/base/temp_file.h"
20 #include "perfetto/ext/tracing/core/consumer.h"
21 #include "perfetto/ext/tracing/core/producer.h"
22 #include "perfetto/ext/tracing/core/trace_packet.h"
23 #include "perfetto/ext/tracing/core/trace_stats.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
26 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
27 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 #include "perfetto/tracing/core/trace_config.h"
31 #include "src/base/test/test_task_runner.h"
32 #include "src/ipc/test/test_socket.h"
33 #include "src/tracing/core/tracing_service_impl.h"
34 #include "test/gtest_and_gmock.h"
35
36 #include "protos/perfetto/config/trace_config.gen.h"
37 #include "protos/perfetto/trace/clock_snapshot.gen.h"
38 #include "protos/perfetto/trace/test_event.gen.h"
39 #include "protos/perfetto/trace/test_event.pbzero.h"
40 #include "protos/perfetto/trace/trace.gen.h"
41 #include "protos/perfetto/trace/trace_packet.gen.h"
42 #include "protos/perfetto/trace/trace_packet.pbzero.h"
43
44 namespace perfetto {
45 namespace {
46
47 using testing::_;
48 using testing::Invoke;
49 using testing::InvokeWithoutArgs;
50
51 ipc::TestSocket kProducerSock{"tracing_test-producer"};
52 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
53
54 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
55 class MockProducer : public Producer {
56 public:
~MockProducer()57 ~MockProducer() override {}
58
59 // Producer implementation.
60 MOCK_METHOD0(OnConnect, void());
61 MOCK_METHOD0(OnDisconnect, void());
62 MOCK_METHOD2(SetupDataSource,
63 void(DataSourceInstanceID, const DataSourceConfig&));
64 MOCK_METHOD2(StartDataSource,
65 void(DataSourceInstanceID, const DataSourceConfig&));
66 MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
67 MOCK_METHOD0(uid, uid_t());
68 MOCK_METHOD0(OnTracingSetup, void());
69 MOCK_METHOD3(Flush,
70 void(FlushRequestID, const DataSourceInstanceID*, size_t));
71 MOCK_METHOD2(ClearIncrementalState,
72 void(const DataSourceInstanceID*, size_t));
73 };
74
75 class MockConsumer : public Consumer {
76 public:
~MockConsumer()77 ~MockConsumer() override {}
78
79 // Producer implementation.
80 MOCK_METHOD0(OnConnect, void());
81 MOCK_METHOD0(OnDisconnect, void());
82 MOCK_METHOD1(OnTracingDisabled, void(const std::string& /*error*/));
83 MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
84 MOCK_METHOD1(OnDetach, void(bool));
85 MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
86 MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
87 MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
88
89 // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)90 void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
91 OnTracePackets(&packets, has_more);
92 }
93 };
94
CheckTraceStats(const protos::gen::TracePacket & packet)95 void CheckTraceStats(const protos::gen::TracePacket& packet) {
96 EXPECT_TRUE(packet.has_trace_stats());
97 EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
98 EXPECT_EQ(1u, packet.trace_stats().producers_connected());
99 EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
100 EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
101 EXPECT_EQ(1u, packet.trace_stats().total_buffers());
102 EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
103
104 const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
105 EXPECT_GT(buf_stats.bytes_written(), 0u);
106 EXPECT_GT(buf_stats.chunks_written(), 0u);
107 EXPECT_EQ(0u, buf_stats.chunks_overwritten());
108 EXPECT_EQ(0u, buf_stats.chunks_rewritten());
109 EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
110 EXPECT_EQ(0u, buf_stats.write_wrap_count());
111 EXPECT_EQ(0u, buf_stats.patches_failed());
112 EXPECT_EQ(0u, buf_stats.readaheads_failed());
113 EXPECT_EQ(0u, buf_stats.abi_violations());
114 }
115
116 static_assert(TracingServiceImpl::kMaxTracePacketSliceSize <=
117 ipc::kIPCBufferSize - 512,
118 "Tracing service max packet slice should be smaller than IPC "
119 "buffer size (with some headroom)");
120
121 } // namespace
122
123 class TracingIntegrationTest : public ::testing::Test {
124 public:
SetUp()125 void SetUp() override {
126 kProducerSock.Destroy();
127 kConsumerSock.Destroy();
128 task_runner_.reset(new base::TestTaskRunner());
129
130 // Create the service host.
131 svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
132 svc_->Start(kProducerSock.name(), kConsumerSock.name());
133
134 // Create and connect a Producer.
135 producer_endpoint_ = ProducerIPCClient::Connect(
136 kProducerSock.name(), &producer_, "perfetto.mock_producer",
137 task_runner_.get(), GetProducerSMBScrapingMode());
138 auto on_producer_connect =
139 task_runner_->CreateCheckpoint("on_producer_connect");
140 EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
141 task_runner_->RunUntilCheckpoint("on_producer_connect");
142
143 // Register a data source.
144 DataSourceDescriptor ds_desc;
145 ds_desc.set_name("perfetto.test");
146 producer_endpoint_->RegisterDataSource(ds_desc);
147
148 // Create and connect a Consumer.
149 consumer_endpoint_ = ConsumerIPCClient::Connect(
150 kConsumerSock.name(), &consumer_, task_runner_.get());
151 auto on_consumer_connect =
152 task_runner_->CreateCheckpoint("on_consumer_connect");
153 EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
154 task_runner_->RunUntilCheckpoint("on_consumer_connect");
155
156 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
157 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
158 }
159
TearDown()160 void TearDown() override {
161 // Destroy the service and check that both Producer and Consumer see an
162 // OnDisconnect() call.
163
164 auto on_producer_disconnect =
165 task_runner_->CreateCheckpoint("on_producer_disconnect");
166 EXPECT_CALL(producer_, OnDisconnect())
167 .WillOnce(Invoke(on_producer_disconnect));
168
169 auto on_consumer_disconnect =
170 task_runner_->CreateCheckpoint("on_consumer_disconnect");
171 EXPECT_CALL(consumer_, OnDisconnect())
172 .WillOnce(Invoke(on_consumer_disconnect));
173
174 svc_.reset();
175 task_runner_->RunUntilCheckpoint("on_producer_disconnect");
176 task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
177
178 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
179 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
180
181 task_runner_.reset();
182 kProducerSock.Destroy();
183 kConsumerSock.Destroy();
184 }
185
GetProducerSMBScrapingMode()186 virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
187 return TracingService::ProducerSMBScrapingMode::kDefault;
188 }
189
WaitForTraceWritersChanged(ProducerID producer_id)190 void WaitForTraceWritersChanged(ProducerID producer_id) {
191 static int i = 0;
192 auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
193 "_" + std::to_string(i++);
194 auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
195 auto writers = GetWriters(producer_id);
196 std::function<void()> task;
197 task = [&task, writers, writers_changed, producer_id, this]() {
198 if (writers != GetWriters(producer_id)) {
199 writers_changed();
200 return;
201 }
202 task_runner_->PostDelayedTask(task, 1);
203 };
204 task_runner_->PostDelayedTask(task, 1);
205 task_runner_->RunUntilCheckpoint(checkpoint_name);
206 }
207
GetWriters(ProducerID producer_id)208 const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
209 return reinterpret_cast<TracingServiceImpl*>(svc_->service())
210 ->GetProducer(producer_id)
211 ->writers_;
212 }
213
last_producer_id()214 ProducerID* last_producer_id() {
215 return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
216 ->last_producer_id_;
217 }
218
219 std::unique_ptr<base::TestTaskRunner> task_runner_;
220 std::unique_ptr<ServiceIPCHost> svc_;
221 std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
222 MockProducer producer_;
223 std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
224 MockConsumer consumer_;
225 };
226
TEST_F(TracingIntegrationTest,WithIPCTransport)227 TEST_F(TracingIntegrationTest, WithIPCTransport) {
228 // Start tracing.
229 TraceConfig trace_config;
230 trace_config.add_buffers()->set_size_kb(4096 * 10);
231 auto* ds_config = trace_config.add_data_sources()->mutable_config();
232 ds_config->set_name("perfetto.test");
233 ds_config->set_target_buffer(0);
234 consumer_endpoint_->EnableTracing(trace_config);
235
236 // At this point, the Producer should be asked to turn its data source on.
237 DataSourceInstanceID ds_iid = 0;
238
239 BufferID global_buf_id = 0;
240 auto on_create_ds_instance =
241 task_runner_->CreateCheckpoint("on_create_ds_instance");
242 EXPECT_CALL(producer_, OnTracingSetup());
243
244 // Store the arguments passed to SetupDataSource() and later check that they
245 // match the ones passed to StartDataSource().
246 DataSourceInstanceID setup_id;
247 DataSourceConfig setup_cfg_proto;
248 EXPECT_CALL(producer_, SetupDataSource(_, _))
249 .WillOnce(
250 Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
251 const DataSourceConfig& cfg) {
252 setup_id = id;
253 setup_cfg_proto = cfg;
254 }));
255 EXPECT_CALL(producer_, StartDataSource(_, _))
256 .WillOnce(
257 Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
258 &setup_cfg_proto](DataSourceInstanceID id,
259 const DataSourceConfig& cfg) {
260 // id and config should match the ones passed to SetupDataSource.
261 ASSERT_EQ(id, setup_id);
262 ASSERT_EQ(setup_cfg_proto, cfg);
263 ASSERT_NE(0u, id);
264 ds_iid = id;
265 ASSERT_EQ("perfetto.test", cfg.name());
266 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
267 ASSERT_NE(0u, global_buf_id);
268 ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
269 on_create_ds_instance();
270 }));
271 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
272
273 // Now let the data source fill some pages within the same task.
274 // Doing so should accumulate a bunch of chunks that will be notified by the
275 // a future task in one batch.
276 std::unique_ptr<TraceWriter> writer =
277 producer_endpoint_->CreateTraceWriter(global_buf_id);
278 ASSERT_TRUE(writer);
279
280 const size_t kNumPackets = 10;
281 for (size_t i = 0; i < kNumPackets; i++) {
282 char buf[16];
283 sprintf(buf, "evt_%zu", i);
284 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
285 }
286
287 // Allow the service to see the CommitData() before reading back.
288 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
289 writer->Flush(on_data_committed);
290 task_runner_->RunUntilCheckpoint("on_data_committed");
291
292 // Read the log buffer.
293 consumer_endpoint_->ReadBuffers();
294 size_t num_pack_rx = 0;
295 bool saw_clock_snapshot = false;
296 bool saw_trace_config = false;
297 bool saw_trace_stats = false;
298 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
299 EXPECT_CALL(consumer_, OnTracePackets(_, _))
300 .WillRepeatedly(
301 Invoke([&num_pack_rx, all_packets_rx, &trace_config,
302 &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
303 std::vector<TracePacket>* packets, bool has_more) {
304 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
305 const int kExpectedMinNumberOfClocks = 1;
306 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
307 const int kExpectedMinNumberOfClocks = 2;
308 #else
309 const int kExpectedMinNumberOfClocks = 6;
310 #endif
311
312 for (auto& encoded_packet : *packets) {
313 protos::gen::TracePacket packet;
314 ASSERT_TRUE(packet.ParseFromString(
315 encoded_packet.GetRawBytesForTesting()));
316 if (packet.has_for_testing()) {
317 char buf[8];
318 sprintf(buf, "evt_%zu", num_pack_rx++);
319 EXPECT_EQ(std::string(buf), packet.for_testing().str());
320 } else if (packet.has_clock_snapshot()) {
321 EXPECT_GE(packet.clock_snapshot().clocks_size(),
322 kExpectedMinNumberOfClocks);
323 saw_clock_snapshot = true;
324 } else if (packet.has_trace_config()) {
325 EXPECT_EQ(packet.trace_config(), trace_config);
326 saw_trace_config = true;
327 } else if (packet.has_trace_stats()) {
328 saw_trace_stats = true;
329 CheckTraceStats(packet);
330 }
331 }
332 if (!has_more)
333 all_packets_rx();
334 }));
335 task_runner_->RunUntilCheckpoint("all_packets_rx");
336 ASSERT_EQ(kNumPackets, num_pack_rx);
337 EXPECT_TRUE(saw_clock_snapshot);
338 EXPECT_TRUE(saw_trace_config);
339 EXPECT_TRUE(saw_trace_stats);
340
341 // Disable tracing.
342 consumer_endpoint_->DisableTracing();
343
344 auto on_tracing_disabled =
345 task_runner_->CreateCheckpoint("on_tracing_disabled");
346 EXPECT_CALL(producer_, StopDataSource(_));
347 EXPECT_CALL(consumer_, OnTracingDisabled(_))
348 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
349 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
350 }
351
352 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)353 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
354 // Start tracing.
355 TraceConfig trace_config;
356 trace_config.add_buffers()->set_size_kb(4096 * 10);
357 auto* ds_config = trace_config.add_data_sources()->mutable_config();
358 ds_config->set_name("perfetto.test");
359 consumer_endpoint_->EnableTracing(trace_config);
360
361 auto on_create_ds_instance =
362 task_runner_->CreateCheckpoint("on_create_ds_instance");
363 EXPECT_CALL(producer_, OnTracingSetup());
364
365 // Store the arguments passed to SetupDataSource() and later check that they
366 // match the ones passed to StartDataSource().
367 EXPECT_CALL(producer_, SetupDataSource(_, _));
368 EXPECT_CALL(producer_, StartDataSource(_, _))
369 .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
370 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
371
372 EXPECT_CALL(consumer_, OnTracingDisabled(_))
373 .WillOnce(Invoke([](const std::string& err) {
374 EXPECT_THAT(err,
375 testing::HasSubstr("EnableTracing IPC request rejected"));
376 }));
377
378 // TearDown() will destroy the service via svc_.reset(). That will drop the
379 // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
380 }
381
382 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)383 TEST_F(TracingIntegrationTest, WriteIntoFile) {
384 // Start tracing.
385 TraceConfig trace_config;
386 trace_config.add_buffers()->set_size_kb(4096 * 10);
387 auto* ds_config = trace_config.add_data_sources()->mutable_config();
388 ds_config->set_name("perfetto.test");
389 ds_config->set_target_buffer(0);
390 trace_config.set_write_into_file(true);
391 base::TempFile tmp_file = base::TempFile::CreateUnlinked();
392 consumer_endpoint_->EnableTracing(trace_config,
393 base::ScopedFile(dup(tmp_file.fd())));
394
395 // At this point, the producer_ should be asked to turn its data source on.
396 BufferID global_buf_id = 0;
397 auto on_create_ds_instance =
398 task_runner_->CreateCheckpoint("on_create_ds_instance");
399 EXPECT_CALL(producer_, OnTracingSetup());
400 EXPECT_CALL(producer_, SetupDataSource(_, _));
401 EXPECT_CALL(producer_, StartDataSource(_, _))
402 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
403 DataSourceInstanceID, const DataSourceConfig& cfg) {
404 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
405 on_create_ds_instance();
406 }));
407 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
408
409 std::unique_ptr<TraceWriter> writer =
410 producer_endpoint_->CreateTraceWriter(global_buf_id);
411 ASSERT_TRUE(writer);
412
413 const size_t kNumPackets = 10;
414 for (size_t i = 0; i < kNumPackets; i++) {
415 char buf[16];
416 sprintf(buf, "evt_%zu", i);
417 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
418 }
419 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
420 writer->Flush(on_data_committed);
421 task_runner_->RunUntilCheckpoint("on_data_committed");
422
423 // Will disable tracing and will force the buffers to be written into the
424 // file before destroying them.
425 consumer_endpoint_->FreeBuffers();
426
427 auto on_tracing_disabled =
428 task_runner_->CreateCheckpoint("on_tracing_disabled");
429 EXPECT_CALL(producer_, StopDataSource(_));
430 EXPECT_CALL(consumer_, OnTracingDisabled(_))
431 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
432 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
433
434 // Check that |tmp_file| contains a valid trace.proto message.
435 ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
436 char tmp_buf[1024];
437 ssize_t rsize = read(tmp_file.fd(), tmp_buf, sizeof(tmp_buf));
438 ASSERT_GT(rsize, 0);
439 protos::gen::Trace tmp_trace;
440 ASSERT_TRUE(tmp_trace.ParseFromArray(tmp_buf, static_cast<size_t>(rsize)));
441 size_t num_test_packet = 0;
442 size_t num_clock_snapshot_packet = 0;
443 size_t num_system_info_packet = 0;
444 bool saw_trace_stats = false;
445 for (int i = 0; i < tmp_trace.packet_size(); i++) {
446 const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
447 if (packet.has_for_testing()) {
448 ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
449 packet.for_testing().str());
450 } else if (packet.has_trace_stats()) {
451 saw_trace_stats = true;
452 CheckTraceStats(packet);
453 } else if (packet.has_clock_snapshot()) {
454 num_clock_snapshot_packet++;
455 } else if (packet.has_system_info()) {
456 num_system_info_packet++;
457 }
458 }
459 ASSERT_TRUE(saw_trace_stats);
460 ASSERT_GT(num_clock_snapshot_packet, 0u);
461 ASSERT_GT(num_system_info_packet, 0u);
462 }
463 #endif
464
465 class TracingIntegrationTestWithSMBScrapingProducer
466 : public TracingIntegrationTest {
467 public:
GetProducerSMBScrapingMode()468 TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
469 override {
470 return TracingService::ProducerSMBScrapingMode::kEnabled;
471 }
472 };
473
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)474 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
475 // Start tracing.
476 TraceConfig trace_config;
477 trace_config.add_buffers()->set_size_kb(4096 * 10);
478 auto* ds_config = trace_config.add_data_sources()->mutable_config();
479 ds_config->set_name("perfetto.test");
480 ds_config->set_target_buffer(0);
481 consumer_endpoint_->EnableTracing(trace_config);
482
483 // At this point, the Producer should be asked to turn its data source on.
484
485 BufferID global_buf_id = 0;
486 auto on_create_ds_instance =
487 task_runner_->CreateCheckpoint("on_create_ds_instance");
488 EXPECT_CALL(producer_, OnTracingSetup());
489
490 EXPECT_CALL(producer_, SetupDataSource(_, _));
491 EXPECT_CALL(producer_, StartDataSource(_, _))
492 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
493 DataSourceInstanceID, const DataSourceConfig& cfg) {
494 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
495 on_create_ds_instance();
496 }));
497 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
498
499 // Create writer, which will post a task to register the writer with the
500 // service.
501 std::unique_ptr<TraceWriter> writer =
502 producer_endpoint_->CreateTraceWriter(global_buf_id);
503 ASSERT_TRUE(writer);
504
505 // Wait for the writer to be registered.
506 WaitForTraceWritersChanged(*last_producer_id());
507
508 // Write a few trace packets.
509 writer->NewTracePacket()->set_for_testing()->set_str("payload1");
510 writer->NewTracePacket()->set_for_testing()->set_str("payload2");
511 writer->NewTracePacket()->set_for_testing()->set_str("payload3");
512
513 // Ask the service to flush, but don't flush our trace writer. This should
514 // cause our uncommitted SMB chunk to be scraped.
515 auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
516 consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
517 EXPECT_TRUE(success);
518 on_flush_complete();
519 });
520 EXPECT_CALL(producer_, Flush(_, _, _))
521 .WillOnce(Invoke([this](FlushRequestID flush_req_id,
522 const DataSourceInstanceID*, size_t) {
523 producer_endpoint_->NotifyFlushComplete(flush_req_id);
524 }));
525 task_runner_->RunUntilCheckpoint("on_flush_complete");
526
527 // Read the log buffer. We should only see the first two written trace
528 // packets, because the service can't be sure the last one was written
529 // completely by the trace writer.
530 consumer_endpoint_->ReadBuffers();
531
532 size_t num_test_pack_rx = 0;
533 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
534 EXPECT_CALL(consumer_, OnTracePackets(_, _))
535 .WillRepeatedly(
536 Invoke([&num_test_pack_rx, all_packets_rx](
537 std::vector<TracePacket>* packets, bool has_more) {
538 for (auto& encoded_packet : *packets) {
539 protos::gen::TracePacket packet;
540 ASSERT_TRUE(packet.ParseFromString(
541 encoded_packet.GetRawBytesForTesting()));
542 if (packet.has_for_testing()) {
543 num_test_pack_rx++;
544 }
545 }
546 if (!has_more)
547 all_packets_rx();
548 }));
549 task_runner_->RunUntilCheckpoint("all_packets_rx");
550 ASSERT_EQ(2u, num_test_pack_rx);
551
552 // Disable tracing.
553 consumer_endpoint_->DisableTracing();
554
555 auto on_tracing_disabled =
556 task_runner_->CreateCheckpoint("on_tracing_disabled");
557 auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
558 EXPECT_CALL(producer_, StopDataSource(_))
559 .WillOnce(InvokeWithoutArgs(on_stop_ds));
560 EXPECT_CALL(consumer_, OnTracingDisabled(_))
561 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
562 task_runner_->RunUntilCheckpoint("on_stop_ds");
563 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
564 }
565
566 // TODO(primiano): add tests to cover:
567 // - unknown fields preserved end-to-end.
568 // - >1 data source.
569 // - >1 data consumer sharing the same data source, with different TraceBuffers.
570 // - >1 consumer with > 1 buffer each.
571 // - Consumer disconnecting in the middle of a ReadBuffers() call.
572 // - Multiple calls to DisableTracing.
573 // - Out of order Enable/Disable/FreeBuffers calls.
574 // - DisableTracing does actually freeze the buffers.
575
576 } // namespace perfetto
577