• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cinttypes>
18 
19 #include "perfetto/ext/base/temp_file.h"
20 #include "perfetto/ext/tracing/core/consumer.h"
21 #include "perfetto/ext/tracing/core/producer.h"
22 #include "perfetto/ext/tracing/core/trace_packet.h"
23 #include "perfetto/ext/tracing/core/trace_stats.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
26 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
27 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 #include "perfetto/tracing/core/trace_config.h"
31 #include "src/base/test/test_task_runner.h"
32 #include "src/ipc/test/test_socket.h"
33 #include "src/tracing/core/tracing_service_impl.h"
34 #include "test/gtest_and_gmock.h"
35 
36 #include "protos/perfetto/config/trace_config.gen.h"
37 #include "protos/perfetto/trace/clock_snapshot.gen.h"
38 #include "protos/perfetto/trace/test_event.gen.h"
39 #include "protos/perfetto/trace/test_event.pbzero.h"
40 #include "protos/perfetto/trace/trace.gen.h"
41 #include "protos/perfetto/trace/trace_packet.gen.h"
42 #include "protos/perfetto/trace/trace_packet.pbzero.h"
43 
44 namespace perfetto {
45 namespace {
46 
47 using testing::_;
48 using testing::Invoke;
49 using testing::InvokeWithoutArgs;
50 
51 ipc::TestSocket kProducerSock{"tracing_test-producer"};
52 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
53 
54 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
55 class MockProducer : public Producer {
56  public:
~MockProducer()57   ~MockProducer() override {}
58 
59   // Producer implementation.
60   MOCK_METHOD0(OnConnect, void());
61   MOCK_METHOD0(OnDisconnect, void());
62   MOCK_METHOD2(SetupDataSource,
63                void(DataSourceInstanceID, const DataSourceConfig&));
64   MOCK_METHOD2(StartDataSource,
65                void(DataSourceInstanceID, const DataSourceConfig&));
66   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
67   MOCK_METHOD0(uid, uid_t());
68   MOCK_METHOD0(OnTracingSetup, void());
69   MOCK_METHOD3(Flush,
70                void(FlushRequestID, const DataSourceInstanceID*, size_t));
71   MOCK_METHOD2(ClearIncrementalState,
72                void(const DataSourceInstanceID*, size_t));
73 };
74 
75 class MockConsumer : public Consumer {
76  public:
~MockConsumer()77   ~MockConsumer() override {}
78 
79   // Producer implementation.
80   MOCK_METHOD0(OnConnect, void());
81   MOCK_METHOD0(OnDisconnect, void());
82   MOCK_METHOD1(OnTracingDisabled, void(const std::string& /*error*/));
83   MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
84   MOCK_METHOD1(OnDetach, void(bool));
85   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
86   MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
87   MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
88 
89   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)90   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
91     OnTracePackets(&packets, has_more);
92   }
93 };
94 
CheckTraceStats(const protos::gen::TracePacket & packet)95 void CheckTraceStats(const protos::gen::TracePacket& packet) {
96   EXPECT_TRUE(packet.has_trace_stats());
97   EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
98   EXPECT_EQ(1u, packet.trace_stats().producers_connected());
99   EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
100   EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
101   EXPECT_EQ(1u, packet.trace_stats().total_buffers());
102   EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
103 
104   const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
105   EXPECT_GT(buf_stats.bytes_written(), 0u);
106   EXPECT_GT(buf_stats.chunks_written(), 0u);
107   EXPECT_EQ(0u, buf_stats.chunks_overwritten());
108   EXPECT_EQ(0u, buf_stats.chunks_rewritten());
109   EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
110   EXPECT_EQ(0u, buf_stats.write_wrap_count());
111   EXPECT_EQ(0u, buf_stats.patches_failed());
112   EXPECT_EQ(0u, buf_stats.readaheads_failed());
113   EXPECT_EQ(0u, buf_stats.abi_violations());
114 }
115 
116 static_assert(TracingServiceImpl::kMaxTracePacketSliceSize <=
117                   ipc::kIPCBufferSize - 512,
118               "Tracing service max packet slice should be smaller than IPC "
119               "buffer size (with some headroom)");
120 
121 }  // namespace
122 
123 class TracingIntegrationTest : public ::testing::Test {
124  public:
SetUp()125   void SetUp() override {
126     kProducerSock.Destroy();
127     kConsumerSock.Destroy();
128     task_runner_.reset(new base::TestTaskRunner());
129 
130     // Create the service host.
131     svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
132     svc_->Start(kProducerSock.name(), kConsumerSock.name());
133 
134     // Create and connect a Producer.
135     producer_endpoint_ = ProducerIPCClient::Connect(
136         kProducerSock.name(), &producer_, "perfetto.mock_producer",
137         task_runner_.get(), GetProducerSMBScrapingMode());
138     auto on_producer_connect =
139         task_runner_->CreateCheckpoint("on_producer_connect");
140     EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
141     task_runner_->RunUntilCheckpoint("on_producer_connect");
142 
143     // Register a data source.
144     DataSourceDescriptor ds_desc;
145     ds_desc.set_name("perfetto.test");
146     producer_endpoint_->RegisterDataSource(ds_desc);
147 
148     // Create and connect a Consumer.
149     consumer_endpoint_ = ConsumerIPCClient::Connect(
150         kConsumerSock.name(), &consumer_, task_runner_.get());
151     auto on_consumer_connect =
152         task_runner_->CreateCheckpoint("on_consumer_connect");
153     EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
154     task_runner_->RunUntilCheckpoint("on_consumer_connect");
155 
156     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
157     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
158   }
159 
TearDown()160   void TearDown() override {
161     // Destroy the service and check that both Producer and Consumer see an
162     // OnDisconnect() call.
163 
164     auto on_producer_disconnect =
165         task_runner_->CreateCheckpoint("on_producer_disconnect");
166     EXPECT_CALL(producer_, OnDisconnect())
167         .WillOnce(Invoke(on_producer_disconnect));
168 
169     auto on_consumer_disconnect =
170         task_runner_->CreateCheckpoint("on_consumer_disconnect");
171     EXPECT_CALL(consumer_, OnDisconnect())
172         .WillOnce(Invoke(on_consumer_disconnect));
173 
174     svc_.reset();
175     task_runner_->RunUntilCheckpoint("on_producer_disconnect");
176     task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
177 
178     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
179     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
180 
181     task_runner_.reset();
182     kProducerSock.Destroy();
183     kConsumerSock.Destroy();
184   }
185 
GetProducerSMBScrapingMode()186   virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
187     return TracingService::ProducerSMBScrapingMode::kDefault;
188   }
189 
WaitForTraceWritersChanged(ProducerID producer_id)190   void WaitForTraceWritersChanged(ProducerID producer_id) {
191     static int i = 0;
192     auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
193                            "_" + std::to_string(i++);
194     auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
195     auto writers = GetWriters(producer_id);
196     std::function<void()> task;
197     task = [&task, writers, writers_changed, producer_id, this]() {
198       if (writers != GetWriters(producer_id)) {
199         writers_changed();
200         return;
201       }
202       task_runner_->PostDelayedTask(task, 1);
203     };
204     task_runner_->PostDelayedTask(task, 1);
205     task_runner_->RunUntilCheckpoint(checkpoint_name);
206   }
207 
GetWriters(ProducerID producer_id)208   const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
209     return reinterpret_cast<TracingServiceImpl*>(svc_->service())
210         ->GetProducer(producer_id)
211         ->writers_;
212   }
213 
last_producer_id()214   ProducerID* last_producer_id() {
215     return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
216                 ->last_producer_id_;
217   }
218 
219   std::unique_ptr<base::TestTaskRunner> task_runner_;
220   std::unique_ptr<ServiceIPCHost> svc_;
221   std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
222   MockProducer producer_;
223   std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
224   MockConsumer consumer_;
225 };
226 
TEST_F(TracingIntegrationTest,WithIPCTransport)227 TEST_F(TracingIntegrationTest, WithIPCTransport) {
228   // Start tracing.
229   TraceConfig trace_config;
230   trace_config.add_buffers()->set_size_kb(4096 * 10);
231   auto* ds_config = trace_config.add_data_sources()->mutable_config();
232   ds_config->set_name("perfetto.test");
233   ds_config->set_target_buffer(0);
234   consumer_endpoint_->EnableTracing(trace_config);
235 
236   // At this point, the Producer should be asked to turn its data source on.
237   DataSourceInstanceID ds_iid = 0;
238 
239   BufferID global_buf_id = 0;
240   auto on_create_ds_instance =
241       task_runner_->CreateCheckpoint("on_create_ds_instance");
242   EXPECT_CALL(producer_, OnTracingSetup());
243 
244   // Store the arguments passed to SetupDataSource() and later check that they
245   // match the ones passed to StartDataSource().
246   DataSourceInstanceID setup_id;
247   DataSourceConfig setup_cfg_proto;
248   EXPECT_CALL(producer_, SetupDataSource(_, _))
249       .WillOnce(
250           Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
251                                                const DataSourceConfig& cfg) {
252             setup_id = id;
253             setup_cfg_proto = cfg;
254           }));
255   EXPECT_CALL(producer_, StartDataSource(_, _))
256       .WillOnce(
257           Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
258                   &setup_cfg_proto](DataSourceInstanceID id,
259                                     const DataSourceConfig& cfg) {
260             // id and config should match the ones passed to SetupDataSource.
261             ASSERT_EQ(id, setup_id);
262             ASSERT_EQ(setup_cfg_proto, cfg);
263             ASSERT_NE(0u, id);
264             ds_iid = id;
265             ASSERT_EQ("perfetto.test", cfg.name());
266             global_buf_id = static_cast<BufferID>(cfg.target_buffer());
267             ASSERT_NE(0u, global_buf_id);
268             ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
269             on_create_ds_instance();
270           }));
271   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
272 
273   // Now let the data source fill some pages within the same task.
274   // Doing so should accumulate a bunch of chunks that will be notified by the
275   // a future task in one batch.
276   std::unique_ptr<TraceWriter> writer =
277       producer_endpoint_->CreateTraceWriter(global_buf_id);
278   ASSERT_TRUE(writer);
279 
280   const size_t kNumPackets = 10;
281   for (size_t i = 0; i < kNumPackets; i++) {
282     char buf[16];
283     sprintf(buf, "evt_%zu", i);
284     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
285   }
286 
287   // Allow the service to see the CommitData() before reading back.
288   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
289   writer->Flush(on_data_committed);
290   task_runner_->RunUntilCheckpoint("on_data_committed");
291 
292   // Read the log buffer.
293   consumer_endpoint_->ReadBuffers();
294   size_t num_pack_rx = 0;
295   bool saw_clock_snapshot = false;
296   bool saw_trace_config = false;
297   bool saw_trace_stats = false;
298   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
299   EXPECT_CALL(consumer_, OnTracePackets(_, _))
300       .WillRepeatedly(
301           Invoke([&num_pack_rx, all_packets_rx, &trace_config,
302                   &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
303                      std::vector<TracePacket>* packets, bool has_more) {
304 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
305             const int kExpectedMinNumberOfClocks = 1;
306 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
307             const int kExpectedMinNumberOfClocks = 2;
308 #else
309             const int kExpectedMinNumberOfClocks = 6;
310 #endif
311 
312             for (auto& encoded_packet : *packets) {
313               protos::gen::TracePacket packet;
314               ASSERT_TRUE(packet.ParseFromString(
315                   encoded_packet.GetRawBytesForTesting()));
316               if (packet.has_for_testing()) {
317                 char buf[8];
318                 sprintf(buf, "evt_%zu", num_pack_rx++);
319                 EXPECT_EQ(std::string(buf), packet.for_testing().str());
320               } else if (packet.has_clock_snapshot()) {
321                 EXPECT_GE(packet.clock_snapshot().clocks_size(),
322                           kExpectedMinNumberOfClocks);
323                 saw_clock_snapshot = true;
324               } else if (packet.has_trace_config()) {
325                 EXPECT_EQ(packet.trace_config(), trace_config);
326                 saw_trace_config = true;
327               } else if (packet.has_trace_stats()) {
328                 saw_trace_stats = true;
329                 CheckTraceStats(packet);
330               }
331             }
332             if (!has_more)
333               all_packets_rx();
334           }));
335   task_runner_->RunUntilCheckpoint("all_packets_rx");
336   ASSERT_EQ(kNumPackets, num_pack_rx);
337   EXPECT_TRUE(saw_clock_snapshot);
338   EXPECT_TRUE(saw_trace_config);
339   EXPECT_TRUE(saw_trace_stats);
340 
341   // Disable tracing.
342   consumer_endpoint_->DisableTracing();
343 
344   auto on_tracing_disabled =
345       task_runner_->CreateCheckpoint("on_tracing_disabled");
346   EXPECT_CALL(producer_, StopDataSource(_));
347   EXPECT_CALL(consumer_, OnTracingDisabled(_))
348       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
349   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
350 }
351 
352 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)353 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
354   // Start tracing.
355   TraceConfig trace_config;
356   trace_config.add_buffers()->set_size_kb(4096 * 10);
357   auto* ds_config = trace_config.add_data_sources()->mutable_config();
358   ds_config->set_name("perfetto.test");
359   consumer_endpoint_->EnableTracing(trace_config);
360 
361   auto on_create_ds_instance =
362       task_runner_->CreateCheckpoint("on_create_ds_instance");
363   EXPECT_CALL(producer_, OnTracingSetup());
364 
365   // Store the arguments passed to SetupDataSource() and later check that they
366   // match the ones passed to StartDataSource().
367   EXPECT_CALL(producer_, SetupDataSource(_, _));
368   EXPECT_CALL(producer_, StartDataSource(_, _))
369       .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
370   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
371 
372   EXPECT_CALL(consumer_, OnTracingDisabled(_))
373       .WillOnce(Invoke([](const std::string& err) {
374         EXPECT_THAT(err,
375                     testing::HasSubstr("EnableTracing IPC request rejected"));
376       }));
377 
378   // TearDown() will destroy the service via svc_.reset(). That will drop the
379   // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
380 }
381 
382 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)383 TEST_F(TracingIntegrationTest, WriteIntoFile) {
384   // Start tracing.
385   TraceConfig trace_config;
386   trace_config.add_buffers()->set_size_kb(4096 * 10);
387   auto* ds_config = trace_config.add_data_sources()->mutable_config();
388   ds_config->set_name("perfetto.test");
389   ds_config->set_target_buffer(0);
390   trace_config.set_write_into_file(true);
391   base::TempFile tmp_file = base::TempFile::CreateUnlinked();
392   consumer_endpoint_->EnableTracing(trace_config,
393                                     base::ScopedFile(dup(tmp_file.fd())));
394 
395   // At this point, the producer_ should be asked to turn its data source on.
396   BufferID global_buf_id = 0;
397   auto on_create_ds_instance =
398       task_runner_->CreateCheckpoint("on_create_ds_instance");
399   EXPECT_CALL(producer_, OnTracingSetup());
400   EXPECT_CALL(producer_, SetupDataSource(_, _));
401   EXPECT_CALL(producer_, StartDataSource(_, _))
402       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
403                            DataSourceInstanceID, const DataSourceConfig& cfg) {
404         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
405         on_create_ds_instance();
406       }));
407   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
408 
409   std::unique_ptr<TraceWriter> writer =
410       producer_endpoint_->CreateTraceWriter(global_buf_id);
411   ASSERT_TRUE(writer);
412 
413   const size_t kNumPackets = 10;
414   for (size_t i = 0; i < kNumPackets; i++) {
415     char buf[16];
416     sprintf(buf, "evt_%zu", i);
417     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
418   }
419   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
420   writer->Flush(on_data_committed);
421   task_runner_->RunUntilCheckpoint("on_data_committed");
422 
423   // Will disable tracing and will force the buffers to be written into the
424   // file before destroying them.
425   consumer_endpoint_->FreeBuffers();
426 
427   auto on_tracing_disabled =
428       task_runner_->CreateCheckpoint("on_tracing_disabled");
429   EXPECT_CALL(producer_, StopDataSource(_));
430   EXPECT_CALL(consumer_, OnTracingDisabled(_))
431       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
432   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
433 
434   // Check that |tmp_file| contains a valid trace.proto message.
435   ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
436   char tmp_buf[1024];
437   ssize_t rsize = read(tmp_file.fd(), tmp_buf, sizeof(tmp_buf));
438   ASSERT_GT(rsize, 0);
439   protos::gen::Trace tmp_trace;
440   ASSERT_TRUE(tmp_trace.ParseFromArray(tmp_buf, static_cast<size_t>(rsize)));
441   size_t num_test_packet = 0;
442   size_t num_clock_snapshot_packet = 0;
443   size_t num_system_info_packet = 0;
444   bool saw_trace_stats = false;
445   for (int i = 0; i < tmp_trace.packet_size(); i++) {
446     const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
447     if (packet.has_for_testing()) {
448       ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
449                 packet.for_testing().str());
450     } else if (packet.has_trace_stats()) {
451       saw_trace_stats = true;
452       CheckTraceStats(packet);
453     } else if (packet.has_clock_snapshot()) {
454       num_clock_snapshot_packet++;
455     } else if (packet.has_system_info()) {
456       num_system_info_packet++;
457     }
458   }
459   ASSERT_TRUE(saw_trace_stats);
460   ASSERT_GT(num_clock_snapshot_packet, 0u);
461   ASSERT_GT(num_system_info_packet, 0u);
462 }
463 #endif
464 
465 class TracingIntegrationTestWithSMBScrapingProducer
466     : public TracingIntegrationTest {
467  public:
GetProducerSMBScrapingMode()468   TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
469       override {
470     return TracingService::ProducerSMBScrapingMode::kEnabled;
471   }
472 };
473 
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)474 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
475   // Start tracing.
476   TraceConfig trace_config;
477   trace_config.add_buffers()->set_size_kb(4096 * 10);
478   auto* ds_config = trace_config.add_data_sources()->mutable_config();
479   ds_config->set_name("perfetto.test");
480   ds_config->set_target_buffer(0);
481   consumer_endpoint_->EnableTracing(trace_config);
482 
483   // At this point, the Producer should be asked to turn its data source on.
484 
485   BufferID global_buf_id = 0;
486   auto on_create_ds_instance =
487       task_runner_->CreateCheckpoint("on_create_ds_instance");
488   EXPECT_CALL(producer_, OnTracingSetup());
489 
490   EXPECT_CALL(producer_, SetupDataSource(_, _));
491   EXPECT_CALL(producer_, StartDataSource(_, _))
492       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
493                            DataSourceInstanceID, const DataSourceConfig& cfg) {
494         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
495         on_create_ds_instance();
496       }));
497   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
498 
499   // Create writer, which will post a task to register the writer with the
500   // service.
501   std::unique_ptr<TraceWriter> writer =
502       producer_endpoint_->CreateTraceWriter(global_buf_id);
503   ASSERT_TRUE(writer);
504 
505   // Wait for the writer to be registered.
506   WaitForTraceWritersChanged(*last_producer_id());
507 
508   // Write a few trace packets.
509   writer->NewTracePacket()->set_for_testing()->set_str("payload1");
510   writer->NewTracePacket()->set_for_testing()->set_str("payload2");
511   writer->NewTracePacket()->set_for_testing()->set_str("payload3");
512 
513   // Ask the service to flush, but don't flush our trace writer. This should
514   // cause our uncommitted SMB chunk to be scraped.
515   auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
516   consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
517     EXPECT_TRUE(success);
518     on_flush_complete();
519   });
520   EXPECT_CALL(producer_, Flush(_, _, _))
521       .WillOnce(Invoke([this](FlushRequestID flush_req_id,
522                               const DataSourceInstanceID*, size_t) {
523         producer_endpoint_->NotifyFlushComplete(flush_req_id);
524       }));
525   task_runner_->RunUntilCheckpoint("on_flush_complete");
526 
527   // Read the log buffer. We should only see the first two written trace
528   // packets, because the service can't be sure the last one was written
529   // completely by the trace writer.
530   consumer_endpoint_->ReadBuffers();
531 
532   size_t num_test_pack_rx = 0;
533   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
534   EXPECT_CALL(consumer_, OnTracePackets(_, _))
535       .WillRepeatedly(
536           Invoke([&num_test_pack_rx, all_packets_rx](
537                      std::vector<TracePacket>* packets, bool has_more) {
538             for (auto& encoded_packet : *packets) {
539               protos::gen::TracePacket packet;
540               ASSERT_TRUE(packet.ParseFromString(
541                   encoded_packet.GetRawBytesForTesting()));
542               if (packet.has_for_testing()) {
543                 num_test_pack_rx++;
544               }
545             }
546             if (!has_more)
547               all_packets_rx();
548           }));
549   task_runner_->RunUntilCheckpoint("all_packets_rx");
550   ASSERT_EQ(2u, num_test_pack_rx);
551 
552   // Disable tracing.
553   consumer_endpoint_->DisableTracing();
554 
555   auto on_tracing_disabled =
556       task_runner_->CreateCheckpoint("on_tracing_disabled");
557   auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
558   EXPECT_CALL(producer_, StopDataSource(_))
559       .WillOnce(InvokeWithoutArgs(on_stop_ds));
560   EXPECT_CALL(consumer_, OnTracingDisabled(_))
561       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
562   task_runner_->RunUntilCheckpoint("on_stop_ds");
563   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
564 }
565 
566 // TODO(primiano): add tests to cover:
567 // - unknown fields preserved end-to-end.
568 // - >1 data source.
569 // - >1 data consumer sharing the same data source, with different TraceBuffers.
570 // - >1 consumer with > 1 buffer each.
571 // - Consumer disconnecting in the middle of a ReadBuffers() call.
572 // - Multiple calls to DisableTracing.
573 // - Out of order Enable/Disable/FreeBuffers calls.
574 // - DisableTracing does actually freeze the buffers.
575 
576 }  // namespace perfetto
577