• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cinttypes>
18 
19 #include "perfetto/ext/base/file_utils.h"
20 #include "perfetto/ext/base/string_utils.h"
21 #include "perfetto/ext/base/temp_file.h"
22 #include "perfetto/ext/tracing/core/consumer.h"
23 #include "perfetto/ext/tracing/core/producer.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_stats.h"
26 #include "perfetto/ext/tracing/core/trace_writer.h"
27 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
28 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
29 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
30 #include "perfetto/tracing/core/data_source_config.h"
31 #include "perfetto/tracing/core/data_source_descriptor.h"
32 #include "perfetto/tracing/core/trace_config.h"
33 #include "src/base/test/test_task_runner.h"
34 #include "src/ipc/test/test_socket.h"
35 #include "src/tracing/core/tracing_service_impl.h"
36 #include "test/gtest_and_gmock.h"
37 
38 #include "protos/perfetto/config/trace_config.gen.h"
39 #include "protos/perfetto/trace/clock_snapshot.gen.h"
40 #include "protos/perfetto/trace/test_event.gen.h"
41 #include "protos/perfetto/trace/test_event.pbzero.h"
42 #include "protos/perfetto/trace/trace.gen.h"
43 #include "protos/perfetto/trace/trace_packet.gen.h"
44 #include "protos/perfetto/trace/trace_packet.pbzero.h"
45 
46 namespace perfetto {
47 namespace {
48 
49 using testing::_;
50 using testing::Invoke;
51 using testing::InvokeWithoutArgs;
52 
53 ipc::TestSocket kProducerSock{"tracing_test-producer"};
54 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
55 
56 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
57 class MockProducer : public Producer {
58  public:
~MockProducer()59   ~MockProducer() override {}
60 
61   // Producer implementation.
62   MOCK_METHOD(void, OnConnect, (), (override));
63   MOCK_METHOD(void, OnDisconnect, (), (override));
64   MOCK_METHOD(void,
65               SetupDataSource,
66               (DataSourceInstanceID, const DataSourceConfig&),
67               (override));
68   MOCK_METHOD(void,
69               StartDataSource,
70               (DataSourceInstanceID, const DataSourceConfig&),
71               (override));
72   MOCK_METHOD(void, StopDataSource, (DataSourceInstanceID), (override));
73   MOCK_METHOD(void, OnTracingSetup, (), (override));
74   MOCK_METHOD(void,
75               Flush,
76               (FlushRequestID, const DataSourceInstanceID*, size_t),
77               (override));
78   MOCK_METHOD(void,
79               ClearIncrementalState,
80               (const DataSourceInstanceID*, size_t),
81               (override));
82 };
83 
84 class MockConsumer : public Consumer {
85  public:
~MockConsumer()86   ~MockConsumer() override {}
87 
88   // Producer implementation.
89   MOCK_METHOD(void, OnConnect, (), (override));
90   MOCK_METHOD(void, OnDisconnect, (), (override));
91   MOCK_METHOD(void,
92               OnTracingDisabled,
93               (const std::string& /*error*/),
94               (override));
95   MOCK_METHOD(void, OnTracePackets, (std::vector<TracePacket>*, bool));
96   MOCK_METHOD(void, OnDetach, (bool), (override));
97   MOCK_METHOD(void, OnAttach, (bool, const TraceConfig&), (override));
98   MOCK_METHOD(void, OnTraceStats, (bool, const TraceStats&), (override));
99   MOCK_METHOD(void, OnObservableEvents, (const ObservableEvents&), (override));
100   MOCK_METHOD(void, OnSessionCloned, (const OnSessionClonedArgs&), (override));
101 
102   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)103   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
104     OnTracePackets(&packets, has_more);
105   }
106 };
107 
CheckTraceStats(const protos::gen::TracePacket & packet)108 void CheckTraceStats(const protos::gen::TracePacket& packet) {
109   EXPECT_TRUE(packet.has_trace_stats());
110   EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
111   EXPECT_EQ(1u, packet.trace_stats().producers_connected());
112   EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
113   EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
114   EXPECT_EQ(1u, packet.trace_stats().total_buffers());
115   EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
116 
117   const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
118   EXPECT_GT(buf_stats.bytes_written(), 0u);
119   EXPECT_GT(buf_stats.chunks_written(), 0u);
120   EXPECT_EQ(0u, buf_stats.chunks_overwritten());
121   EXPECT_EQ(0u, buf_stats.chunks_rewritten());
122   EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
123   EXPECT_EQ(0u, buf_stats.write_wrap_count());
124   EXPECT_EQ(0u, buf_stats.patches_failed());
125   EXPECT_EQ(0u, buf_stats.readaheads_failed());
126   EXPECT_EQ(0u, buf_stats.abi_violations());
127 }
128 
129 static_assert(TracingServiceImpl::kMaxTracePacketSliceSize <=
130                   ipc::kIPCBufferSize - 512,
131               "Tracing service max packet slice should be smaller than IPC "
132               "buffer size (with some headroom)");
133 
134 }  // namespace
135 
136 class TracingIntegrationTest : public ::testing::Test {
137  public:
SetUp()138   void SetUp() override {
139     kProducerSock.Destroy();
140     kConsumerSock.Destroy();
141     task_runner_.reset(new base::TestTaskRunner());
142 
143     // Create the service host.
144     svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
145     svc_->Start(kProducerSock.name(), kConsumerSock.name());
146 
147     // Create and connect a Producer.
148     producer_endpoint_ = ProducerIPCClient::Connect(
149         kProducerSock.name(), &producer_, "perfetto.mock_producer",
150         task_runner_.get(), GetProducerSMBScrapingMode());
151     auto on_producer_connect =
152         task_runner_->CreateCheckpoint("on_producer_connect");
153     EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
154     task_runner_->RunUntilCheckpoint("on_producer_connect");
155 
156     // Register a data source.
157     DataSourceDescriptor ds_desc;
158     ds_desc.set_name("perfetto.test");
159     producer_endpoint_->RegisterDataSource(ds_desc);
160 
161     // Create and connect a Consumer.
162     consumer_endpoint_ = ConsumerIPCClient::Connect(
163         kConsumerSock.name(), &consumer_, task_runner_.get());
164     auto on_consumer_connect =
165         task_runner_->CreateCheckpoint("on_consumer_connect");
166     EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
167     task_runner_->RunUntilCheckpoint("on_consumer_connect");
168 
169     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
170     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
171   }
172 
TearDown()173   void TearDown() override {
174     // Destroy the service and check that both Producer and Consumer see an
175     // OnDisconnect() call.
176 
177     auto on_producer_disconnect =
178         task_runner_->CreateCheckpoint("on_producer_disconnect");
179     EXPECT_CALL(producer_, OnDisconnect())
180         .WillOnce(Invoke(on_producer_disconnect));
181 
182     auto on_consumer_disconnect =
183         task_runner_->CreateCheckpoint("on_consumer_disconnect");
184     EXPECT_CALL(consumer_, OnDisconnect())
185         .WillOnce(Invoke(on_consumer_disconnect));
186 
187     svc_.reset();
188     task_runner_->RunUntilCheckpoint("on_producer_disconnect");
189     task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
190 
191     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
192     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
193 
194     task_runner_.reset();
195     kProducerSock.Destroy();
196     kConsumerSock.Destroy();
197   }
198 
GetProducerSMBScrapingMode()199   virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
200     return TracingService::ProducerSMBScrapingMode::kDefault;
201   }
202 
WaitForTraceWritersChanged(ProducerID producer_id)203   void WaitForTraceWritersChanged(ProducerID producer_id) {
204     static int i = 0;
205     auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
206                            "_" + std::to_string(i++);
207     auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
208     auto writers = GetWriters(producer_id);
209     std::function<void()> task;
210     task = [&task, writers, writers_changed, producer_id, this]() {
211       if (writers != GetWriters(producer_id)) {
212         writers_changed();
213         return;
214       }
215       task_runner_->PostDelayedTask(task, 1);
216     };
217     task_runner_->PostDelayedTask(task, 1);
218     task_runner_->RunUntilCheckpoint(checkpoint_name);
219   }
220 
GetWriters(ProducerID producer_id)221   const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
222     return reinterpret_cast<TracingServiceImpl*>(svc_->service())
223         ->GetProducer(producer_id)
224         ->writers_;
225   }
226 
last_producer_id()227   ProducerID* last_producer_id() {
228     return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
229                 ->last_producer_id_;
230   }
231 
232   std::unique_ptr<base::TestTaskRunner> task_runner_;
233   std::unique_ptr<ServiceIPCHost> svc_;
234   std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
235   MockProducer producer_;
236   std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
237   MockConsumer consumer_;
238 };
239 
TEST_F(TracingIntegrationTest,WithIPCTransport)240 TEST_F(TracingIntegrationTest, WithIPCTransport) {
241   // Start tracing.
242   TraceConfig trace_config;
243   trace_config.add_buffers()->set_size_kb(4096 * 10);
244   auto* ds_config = trace_config.add_data_sources()->mutable_config();
245   ds_config->set_name("perfetto.test");
246   ds_config->set_target_buffer(0);
247   consumer_endpoint_->EnableTracing(trace_config);
248 
249   // At this point, the Producer should be asked to turn its data source on.
250   DataSourceInstanceID ds_iid = 0;
251 
252   BufferID global_buf_id = 0;
253   auto on_create_ds_instance =
254       task_runner_->CreateCheckpoint("on_create_ds_instance");
255   EXPECT_CALL(producer_, OnTracingSetup());
256 
257   // Store the arguments passed to SetupDataSource() and later check that they
258   // match the ones passed to StartDataSource().
259   DataSourceInstanceID setup_id;
260   DataSourceConfig setup_cfg_proto;
261   EXPECT_CALL(producer_, SetupDataSource(_, _))
262       .WillOnce(
263           Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
264                                                const DataSourceConfig& cfg) {
265             setup_id = id;
266             setup_cfg_proto = cfg;
267           }));
268   EXPECT_CALL(producer_, StartDataSource(_, _))
269       .WillOnce(
270           Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
271                   &setup_cfg_proto](DataSourceInstanceID id,
272                                     const DataSourceConfig& cfg) {
273             // id and config should match the ones passed to SetupDataSource.
274             ASSERT_EQ(id, setup_id);
275             ASSERT_EQ(setup_cfg_proto, cfg);
276             ASSERT_NE(0u, id);
277             ds_iid = id;
278             ASSERT_EQ("perfetto.test", cfg.name());
279             global_buf_id = static_cast<BufferID>(cfg.target_buffer());
280             ASSERT_NE(0u, global_buf_id);
281             ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
282             on_create_ds_instance();
283           }));
284   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
285 
286   // Now let the data source fill some pages within the same task.
287   // Doing so should accumulate a bunch of chunks that will be notified by the
288   // a future task in one batch.
289   std::unique_ptr<TraceWriter> writer =
290       producer_endpoint_->CreateTraceWriter(global_buf_id);
291   ASSERT_TRUE(writer);
292 
293   const size_t kNumPackets = 10;
294   for (size_t i = 0; i < kNumPackets; i++) {
295     char buf[16];
296     base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
297     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
298   }
299 
300   // Allow the service to see the CommitData() before reading back.
301   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
302   writer->Flush(on_data_committed);
303   task_runner_->RunUntilCheckpoint("on_data_committed");
304 
305   // Read the log buffer.
306   consumer_endpoint_->ReadBuffers();
307   size_t num_pack_rx = 0;
308   bool saw_clock_snapshot = false;
309   bool saw_trace_config = false;
310   bool saw_trace_stats = false;
311   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
312   EXPECT_CALL(consumer_, OnTracePackets(_, _))
313       .WillRepeatedly(
314           Invoke([&num_pack_rx, all_packets_rx, &trace_config,
315                   &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
316                      std::vector<TracePacket>* packets, bool has_more) {
317 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
318             const int kExpectedMinNumberOfClocks = 1;
319 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
320             const int kExpectedMinNumberOfClocks = 2;
321 #else
322             const int kExpectedMinNumberOfClocks = 6;
323 #endif
324 
325             for (auto& encoded_packet : *packets) {
326               protos::gen::TracePacket packet;
327               ASSERT_TRUE(packet.ParseFromString(
328                   encoded_packet.GetRawBytesForTesting()));
329               if (packet.has_for_testing()) {
330                 char buf[8];
331                 base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", num_pack_rx++);
332                 EXPECT_EQ(std::string(buf), packet.for_testing().str());
333               } else if (packet.has_clock_snapshot()) {
334                 EXPECT_GE(packet.clock_snapshot().clocks_size(),
335                           kExpectedMinNumberOfClocks);
336                 saw_clock_snapshot = true;
337               } else if (packet.has_trace_config()) {
338                 EXPECT_EQ(packet.trace_config(), trace_config);
339                 saw_trace_config = true;
340               } else if (packet.has_trace_stats()) {
341                 saw_trace_stats = true;
342                 CheckTraceStats(packet);
343               }
344             }
345             if (!has_more)
346               all_packets_rx();
347           }));
348   task_runner_->RunUntilCheckpoint("all_packets_rx");
349   ASSERT_EQ(kNumPackets, num_pack_rx);
350   EXPECT_TRUE(saw_clock_snapshot);
351   EXPECT_TRUE(saw_trace_config);
352   EXPECT_TRUE(saw_trace_stats);
353 
354   // Disable tracing.
355   consumer_endpoint_->DisableTracing();
356 
357   auto on_tracing_disabled =
358       task_runner_->CreateCheckpoint("on_tracing_disabled");
359   EXPECT_CALL(producer_, StopDataSource(_));
360   EXPECT_CALL(consumer_, OnTracingDisabled(_))
361       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
362   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
363 }
364 
365 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)366 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
367   // Start tracing.
368   TraceConfig trace_config;
369   trace_config.add_buffers()->set_size_kb(4096 * 10);
370   auto* ds_config = trace_config.add_data_sources()->mutable_config();
371   ds_config->set_name("perfetto.test");
372   consumer_endpoint_->EnableTracing(trace_config);
373 
374   auto on_create_ds_instance =
375       task_runner_->CreateCheckpoint("on_create_ds_instance");
376   EXPECT_CALL(producer_, OnTracingSetup());
377 
378   // Store the arguments passed to SetupDataSource() and later check that they
379   // match the ones passed to StartDataSource().
380   EXPECT_CALL(producer_, SetupDataSource(_, _));
381   EXPECT_CALL(producer_, StartDataSource(_, _))
382       .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
383   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
384 
385   EXPECT_CALL(consumer_, OnTracingDisabled(_))
386       .WillOnce(Invoke([](const std::string& err) {
387         EXPECT_THAT(err,
388                     testing::HasSubstr("EnableTracing IPC request rejected"));
389       }));
390 
391   // TearDown() will destroy the service via svc_.reset(). That will drop the
392   // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
393 }
394 
395 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)396 TEST_F(TracingIntegrationTest, WriteIntoFile) {
397   // Start tracing.
398   TraceConfig trace_config;
399   trace_config.add_buffers()->set_size_kb(4096 * 10);
400   auto* ds_config = trace_config.add_data_sources()->mutable_config();
401   ds_config->set_name("perfetto.test");
402   ds_config->set_target_buffer(0);
403   trace_config.set_write_into_file(true);
404   base::TempFile tmp_file = base::TempFile::CreateUnlinked();
405   consumer_endpoint_->EnableTracing(trace_config,
406                                     base::ScopedFile(dup(tmp_file.fd())));
407 
408   // At this point, the producer_ should be asked to turn its data source on.
409   BufferID global_buf_id = 0;
410   auto on_create_ds_instance =
411       task_runner_->CreateCheckpoint("on_create_ds_instance");
412   EXPECT_CALL(producer_, OnTracingSetup());
413   EXPECT_CALL(producer_, SetupDataSource(_, _));
414   EXPECT_CALL(producer_, StartDataSource(_, _))
415       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
416                            DataSourceInstanceID, const DataSourceConfig& cfg) {
417         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
418         on_create_ds_instance();
419       }));
420   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
421 
422   std::unique_ptr<TraceWriter> writer =
423       producer_endpoint_->CreateTraceWriter(global_buf_id);
424   ASSERT_TRUE(writer);
425 
426   const size_t kNumPackets = 10;
427   for (size_t i = 0; i < kNumPackets; i++) {
428     char buf[16];
429     base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
430     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
431   }
432   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
433   writer->Flush(on_data_committed);
434   task_runner_->RunUntilCheckpoint("on_data_committed");
435 
436   // Will disable tracing and will force the buffers to be written into the
437   // file before destroying them.
438   consumer_endpoint_->FreeBuffers();
439 
440   auto on_tracing_disabled =
441       task_runner_->CreateCheckpoint("on_tracing_disabled");
442   EXPECT_CALL(producer_, StopDataSource(_));
443   EXPECT_CALL(consumer_, OnTracingDisabled(_))
444       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
445   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
446 
447   // Check that |tmp_file| contains a valid trace.proto message.
448   ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
449   std::string trace_contents;
450   ASSERT_TRUE(base::ReadFileDescriptor(tmp_file.fd(), &trace_contents));
451   protos::gen::Trace tmp_trace;
452   ASSERT_TRUE(tmp_trace.ParseFromString(trace_contents));
453   size_t num_test_packet = 0;
454   size_t num_clock_snapshot_packet = 0;
455   size_t num_system_info_packet = 0;
456   bool saw_trace_stats = false;
457   for (int i = 0; i < tmp_trace.packet_size(); i++) {
458     const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
459     if (packet.has_for_testing()) {
460       ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
461                 packet.for_testing().str());
462     } else if (packet.has_trace_stats()) {
463       saw_trace_stats = true;
464       CheckTraceStats(packet);
465     } else if (packet.has_clock_snapshot()) {
466       num_clock_snapshot_packet++;
467     } else if (packet.has_system_info()) {
468       num_system_info_packet++;
469     }
470   }
471   ASSERT_TRUE(saw_trace_stats);
472   ASSERT_GT(num_clock_snapshot_packet, 0u);
473   ASSERT_GT(num_system_info_packet, 0u);
474 }
475 #endif
476 
477 class TracingIntegrationTestWithSMBScrapingProducer
478     : public TracingIntegrationTest {
479  public:
GetProducerSMBScrapingMode()480   TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
481       override {
482     return TracingService::ProducerSMBScrapingMode::kEnabled;
483   }
484 };
485 
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)486 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
487   // Start tracing.
488   TraceConfig trace_config;
489   trace_config.add_buffers()->set_size_kb(4096 * 10);
490   auto* ds_config = trace_config.add_data_sources()->mutable_config();
491   ds_config->set_name("perfetto.test");
492   ds_config->set_target_buffer(0);
493   consumer_endpoint_->EnableTracing(trace_config);
494 
495   // At this point, the Producer should be asked to turn its data source on.
496 
497   BufferID global_buf_id = 0;
498   auto on_create_ds_instance =
499       task_runner_->CreateCheckpoint("on_create_ds_instance");
500   EXPECT_CALL(producer_, OnTracingSetup());
501 
502   EXPECT_CALL(producer_, SetupDataSource(_, _));
503   EXPECT_CALL(producer_, StartDataSource(_, _))
504       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
505                            DataSourceInstanceID, const DataSourceConfig& cfg) {
506         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
507         on_create_ds_instance();
508       }));
509   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
510 
511   // Create writer, which will post a task to register the writer with the
512   // service.
513   std::unique_ptr<TraceWriter> writer =
514       producer_endpoint_->CreateTraceWriter(global_buf_id);
515   ASSERT_TRUE(writer);
516 
517   // Wait for the writer to be registered.
518   WaitForTraceWritersChanged(*last_producer_id());
519 
520   // Write a few trace packets.
521   writer->NewTracePacket()->set_for_testing()->set_str("payload1");
522   writer->NewTracePacket()->set_for_testing()->set_str("payload2");
523   writer->NewTracePacket()->set_for_testing()->set_str("payload3");
524 
525   // Ask the service to flush, but don't flush our trace writer. This should
526   // cause our uncommitted SMB chunk to be scraped.
527   auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
528   consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
529     EXPECT_TRUE(success);
530     on_flush_complete();
531   });
532   EXPECT_CALL(producer_, Flush(_, _, _))
533       .WillOnce(Invoke([this](FlushRequestID flush_req_id,
534                               const DataSourceInstanceID*, size_t) {
535         producer_endpoint_->NotifyFlushComplete(flush_req_id);
536       }));
537   task_runner_->RunUntilCheckpoint("on_flush_complete");
538 
539   // Read the log buffer. We should only see the first two written trace
540   // packets, because the service can't be sure the last one was written
541   // completely by the trace writer.
542   consumer_endpoint_->ReadBuffers();
543 
544   size_t num_test_pack_rx = 0;
545   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
546   EXPECT_CALL(consumer_, OnTracePackets(_, _))
547       .WillRepeatedly(
548           Invoke([&num_test_pack_rx, all_packets_rx](
549                      std::vector<TracePacket>* packets, bool has_more) {
550             for (auto& encoded_packet : *packets) {
551               protos::gen::TracePacket packet;
552               ASSERT_TRUE(packet.ParseFromString(
553                   encoded_packet.GetRawBytesForTesting()));
554               if (packet.has_for_testing()) {
555                 num_test_pack_rx++;
556               }
557             }
558             if (!has_more)
559               all_packets_rx();
560           }));
561   task_runner_->RunUntilCheckpoint("all_packets_rx");
562   ASSERT_EQ(2u, num_test_pack_rx);
563 
564   // Disable tracing.
565   consumer_endpoint_->DisableTracing();
566 
567   auto on_tracing_disabled =
568       task_runner_->CreateCheckpoint("on_tracing_disabled");
569   auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
570   EXPECT_CALL(producer_, StopDataSource(_))
571       .WillOnce(InvokeWithoutArgs(on_stop_ds));
572   EXPECT_CALL(consumer_, OnTracingDisabled(_))
573       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
574   task_runner_->RunUntilCheckpoint("on_stop_ds");
575   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
576 }
577 
578 // TODO(primiano): add tests to cover:
579 // - unknown fields preserved end-to-end.
580 // - >1 data source.
581 // - >1 data consumer sharing the same data source, with different TraceBuffers.
582 // - >1 consumer with > 1 buffer each.
583 // - Consumer disconnecting in the middle of a ReadBuffers() call.
584 // - Multiple calls to DisableTracing.
585 // - Out of order Enable/Disable/FreeBuffers calls.
586 // - DisableTracing does actually freeze the buffers.
587 
588 }  // namespace perfetto
589