• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18 
19 #include <cinttypes>
20 
21 #include <string.h>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/ext/base/version.h"
26 #include "perfetto/ext/ipc/client.h"
27 #include "perfetto/ext/tracing/core/commit_data_request.h"
28 #include "perfetto/ext/tracing/core/producer.h"
29 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
30 #include "perfetto/ext/tracing/core/trace_writer.h"
31 #include "perfetto/tracing/core/data_source_config.h"
32 #include "perfetto/tracing/core/data_source_descriptor.h"
33 #include "perfetto/tracing/core/trace_config.h"
34 
35 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
36 #include "src/tracing/ipc/shared_memory_windows.h"
37 #else
38 #include "src/tracing/ipc/posix_shared_memory.h"
39 #endif
40 
41 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
42 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
43 // the callbacks.
44 
45 namespace perfetto {
46 
47 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,ConnectionFlags conn_flags)48 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
49     const char* service_sock_name,
50     Producer* producer,
51     const std::string& producer_name,
52     base::TaskRunner* task_runner,
53     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
54     size_t shared_memory_size_hint_bytes,
55     size_t shared_memory_page_size_hint_bytes,
56     std::unique_ptr<SharedMemory> shm,
57     std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
58     ConnectionFlags conn_flags) {
59   return std::unique_ptr<TracingService::ProducerEndpoint>(
60       new ProducerIPCClientImpl(
61           {service_sock_name,
62            conn_flags ==
63                ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable},
64           producer, producer_name, task_runner, smb_scraping_mode,
65           shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes,
66           std::move(shm), std::move(shm_arbiter)));
67 }
68 
69 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)70 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
71     ipc::Client::ConnArgs conn_args,
72     Producer* producer,
73     const std::string& producer_name,
74     base::TaskRunner* task_runner,
75     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
76     size_t shared_memory_size_hint_bytes,
77     size_t shared_memory_page_size_hint_bytes,
78     std::unique_ptr<SharedMemory> shm,
79     std::unique_ptr<SharedMemoryArbiter> shm_arbiter) {
80   return std::unique_ptr<TracingService::ProducerEndpoint>(
81       new ProducerIPCClientImpl(std::move(conn_args), producer, producer_name,
82                                 task_runner, smb_scraping_mode,
83                                 shared_memory_size_hint_bytes,
84                                 shared_memory_page_size_hint_bytes,
85                                 std::move(shm), std::move(shm_arbiter)));
86 }
87 
ProducerIPCClientImpl(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)88 ProducerIPCClientImpl::ProducerIPCClientImpl(
89     ipc::Client::ConnArgs conn_args,
90     Producer* producer,
91     const std::string& producer_name,
92     base::TaskRunner* task_runner,
93     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
94     size_t shared_memory_size_hint_bytes,
95     size_t shared_memory_page_size_hint_bytes,
96     std::unique_ptr<SharedMemory> shm,
97     std::unique_ptr<SharedMemoryArbiter> shm_arbiter)
98     : producer_(producer),
99       task_runner_(task_runner),
100       receive_shmem_fd_cb_fuchsia_(
101           std::move(conn_args.receive_shmem_fd_cb_fuchsia)),
102       ipc_channel_(
103           ipc::Client::CreateInstance(std::move(conn_args), task_runner)),
104       producer_port_(
105           new protos::gen::ProducerPortProxy(this /* event_listener */)),
106       shared_memory_(std::move(shm)),
107       shared_memory_arbiter_(std::move(shm_arbiter)),
108       name_(producer_name),
109       shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes),
110       shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes),
111       smb_scraping_mode_(smb_scraping_mode) {
112   // Check for producer-provided SMB (used by Chrome for startup tracing).
113   if (shared_memory_) {
114     // We also expect a valid (unbound) arbiter. Bind it to this endpoint now.
115     PERFETTO_CHECK(shared_memory_arbiter_);
116     shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_);
117 
118     // If the service accepts our SMB, then it must match our requested page
119     // layout. The protocol doesn't allow the service to change the size and
120     // layout when the SMB is provided by the producer.
121     shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024;
122   }
123 
124   ipc_channel_->BindService(producer_port_->GetWeakPtr());
125   PERFETTO_DCHECK_THREAD(thread_checker_);
126 }
127 
~ProducerIPCClientImpl()128 ProducerIPCClientImpl::~ProducerIPCClientImpl() {
129   PERFETTO_DCHECK_THREAD(thread_checker_);
130 }
131 
Disconnect()132 void ProducerIPCClientImpl::Disconnect() {
133   PERFETTO_DCHECK_THREAD(thread_checker_);
134   if (!producer_port_)
135     return;
136   // Reset the producer port so that no further IPCs are received and IPC
137   // callbacks are no longer executed. Also reset the IPC channel so that the
138   // service is notified of the disconnection.
139   producer_port_.reset();
140   ipc_channel_.reset();
141   // Perform disconnect synchronously.
142   OnDisconnect();
143 }
144 
145 // Called by the IPC layer if the BindService() succeeds.
OnConnect()146 void ProducerIPCClientImpl::OnConnect() {
147   PERFETTO_DCHECK_THREAD(thread_checker_);
148   connected_ = true;
149 
150   // The IPC layer guarantees that any outstanding callback will be dropped on
151   // the floor if producer_port_ is destroyed between the request and the reply.
152   // Binding |this| is hence safe.
153   ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
154   on_init.Bind(
155       [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
156         OnConnectionInitialized(
157             resp.success(),
158             resp.success() ? resp->using_shmem_provided_by_producer() : false,
159             resp.success() ? resp->direct_smb_patching_supported() : false);
160       });
161   protos::gen::InitializeConnectionRequest req;
162   req.set_producer_name(name_);
163   req.set_shared_memory_size_hint_bytes(
164       static_cast<uint32_t>(shared_memory_size_hint_bytes_));
165   req.set_shared_memory_page_size_hint_bytes(
166       static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
167   switch (smb_scraping_mode_) {
168     case TracingService::ProducerSMBScrapingMode::kDefault:
169       // No need to set the mode, it defaults to use the service default if
170       // unspecified.
171       break;
172     case TracingService::ProducerSMBScrapingMode::kEnabled:
173       req.set_smb_scraping_mode(
174           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
175       break;
176     case TracingService::ProducerSMBScrapingMode::kDisabled:
177       req.set_smb_scraping_mode(
178           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
179       break;
180   }
181 
182   int shm_fd = -1;
183   if (shared_memory_) {
184     req.set_producer_provided_shmem(true);
185 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
186     auto key = static_cast<SharedMemoryWindows*>(shared_memory_.get())->key();
187     req.set_shm_key_windows(key);
188 #else
189     shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd();
190 #endif
191   }
192 
193   req.set_sdk_version(base::GetVersionString());
194   producer_port_->InitializeConnection(req, std::move(on_init), shm_fd);
195 
196   // Create the back channel to receive commands from the Service.
197   ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
198   on_cmd.Bind(
199       [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
200         if (!resp)
201           return;  // The IPC channel was closed and |resp| was auto-rejected.
202         OnServiceRequest(*resp);
203       });
204   producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
205                                   std::move(on_cmd));
206 
207   // If there are pending Sync() requests, send them now.
208   for (const auto& pending_sync : pending_sync_reqs_)
209     Sync(std::move(pending_sync));
210   pending_sync_reqs_.clear();
211 }
212 
OnDisconnect()213 void ProducerIPCClientImpl::OnDisconnect() {
214   PERFETTO_DCHECK_THREAD(thread_checker_);
215   PERFETTO_DLOG("Tracing service connection failure");
216   connected_ = false;
217   data_sources_setup_.clear();
218   producer_->OnDisconnect();  // Note: may delete |this|.
219 }
220 
ScheduleDisconnect()221 void ProducerIPCClientImpl::ScheduleDisconnect() {
222   // |ipc_channel| doesn't allow disconnection in the middle of handling
223   // an IPC call, so the connection drop must take place over two phases.
224 
225   // First, synchronously drop the |producer_port_| so that no more IPC
226   // messages are handled.
227   producer_port_.reset();
228 
229   // Then schedule an async task for performing the remainder of the
230   // disconnection operations outside the context of the IPC method handler.
231   auto weak_this = weak_factory_.GetWeakPtr();
232   task_runner_->PostTask([weak_this]() {
233     if (weak_this) {
234       weak_this->Disconnect();
235     }
236   });
237 }
238 
OnConnectionInitialized(bool connection_succeeded,bool using_shmem_provided_by_producer,bool direct_smb_patching_supported)239 void ProducerIPCClientImpl::OnConnectionInitialized(
240     bool connection_succeeded,
241     bool using_shmem_provided_by_producer,
242     bool direct_smb_patching_supported) {
243   PERFETTO_DCHECK_THREAD(thread_checker_);
244   // If connection_succeeded == false, the OnDisconnect() call will follow next
245   // and there we'll notify the |producer_|. TODO: add a test for this.
246   if (!connection_succeeded)
247     return;
248   is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
249   direct_smb_patching_supported_ = direct_smb_patching_supported;
250   producer_->OnConnect();
251 
252   // Bail out if the service failed to adopt our producer-allocated SMB.
253   // TODO(eseckler): Handle adoption failure more gracefully.
254   if (shared_memory_ && !is_shmem_provided_by_producer_) {
255     PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
256     Disconnect();
257     return;
258   }
259 }
260 
OnServiceRequest(const protos::gen::GetAsyncCommandResponse & cmd)261 void ProducerIPCClientImpl::OnServiceRequest(
262     const protos::gen::GetAsyncCommandResponse& cmd) {
263   PERFETTO_DCHECK_THREAD(thread_checker_);
264 
265   // This message is sent only when connecting to a service running Android Q+.
266   // See comment below in kStartDataSource.
267   if (cmd.has_setup_data_source()) {
268     const auto& req = cmd.setup_data_source();
269     const DataSourceInstanceID dsid = req.new_instance_id();
270     data_sources_setup_.insert(dsid);
271     producer_->SetupDataSource(dsid, req.config());
272     return;
273   }
274 
275   if (cmd.has_start_data_source()) {
276     const auto& req = cmd.start_data_source();
277     const DataSourceInstanceID dsid = req.new_instance_id();
278     const DataSourceConfig& cfg = req.config();
279     if (!data_sources_setup_.count(dsid)) {
280       // When connecting with an older (Android P) service, the service will not
281       // send a SetupDataSource message. We synthesize it here in that case.
282       producer_->SetupDataSource(dsid, cfg);
283     }
284     producer_->StartDataSource(dsid, cfg);
285     return;
286   }
287 
288   if (cmd.has_stop_data_source()) {
289     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
290     producer_->StopDataSource(dsid);
291     data_sources_setup_.erase(dsid);
292     return;
293   }
294 
295   if (cmd.has_setup_tracing()) {
296     std::unique_ptr<SharedMemory> ipc_shared_memory;
297 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
298     const std::string& shm_key = cmd.setup_tracing().shm_key_windows();
299     if (!shm_key.empty())
300       ipc_shared_memory = SharedMemoryWindows::Attach(shm_key);
301 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_FUCHSIA)
302     // On Fuchsia, the embedder is responsible for routing the shared memory
303     // FD, which is provided to this code via a blocking callback.
304     PERFETTO_CHECK(receive_shmem_fd_cb_fuchsia_);
305 
306     base::ScopedFile shmem_fd(receive_shmem_fd_cb_fuchsia_());
307     if (!shmem_fd) {
308       // Failure to get a shared memory buffer is a protocol violation and
309       // therefore we should drop the Protocol connection.
310       PERFETTO_ELOG("Could not get shared memory FD from embedder.");
311       ScheduleDisconnect();
312       return;
313     }
314 
315     ipc_shared_memory =
316         PosixSharedMemory::AttachToFd(std::move(shmem_fd),
317                                       /*require_seals_if_supported=*/false);
318 #else
319     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
320     if (shmem_fd) {
321       // TODO(primiano): handle mmap failure in case of OOM.
322       ipc_shared_memory =
323           PosixSharedMemory::AttachToFd(std::move(shmem_fd),
324                                         /*require_seals_if_supported=*/false);
325     }
326 #endif
327     if (ipc_shared_memory) {
328       // This is the nominal case used in most configurations, where the service
329       // provides the SMB.
330       PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_);
331       shared_memory_ = std::move(ipc_shared_memory);
332       shared_buffer_page_size_kb_ =
333           cmd.setup_tracing().shared_buffer_page_size_kb();
334       shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
335           shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
336           task_runner_);
337       if (direct_smb_patching_supported_)
338         shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService();
339     } else {
340       // Producer-provided SMB (used by Chrome for startup tracing).
341       PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ &&
342                      shared_memory_arbiter_);
343     }
344     producer_->OnTracingSetup();
345     return;
346   }
347 
348   if (cmd.has_flush()) {
349     // This cast boilerplate is required only because protobuf uses its own
350     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
351     // type (long vs long long) even though they have the same size.
352     const auto* data_source_ids = cmd.flush().data_source_ids().data();
353     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
354                   "data_source_ids should be 64-bit");
355     producer_->Flush(
356         cmd.flush().request_id(),
357         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
358         static_cast<size_t>(cmd.flush().data_source_ids().size()));
359     return;
360   }
361 
362   if (cmd.has_clear_incremental_state()) {
363     const auto* data_source_ids =
364         cmd.clear_incremental_state().data_source_ids().data();
365     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
366                   "data_source_ids should be 64-bit");
367     producer_->ClearIncrementalState(
368         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
369         static_cast<size_t>(
370             cmd.clear_incremental_state().data_source_ids().size()));
371     return;
372   }
373 
374   PERFETTO_DFATAL("Unknown async request received from tracing service");
375 }
376 
RegisterDataSource(const DataSourceDescriptor & descriptor)377 void ProducerIPCClientImpl::RegisterDataSource(
378     const DataSourceDescriptor& descriptor) {
379   PERFETTO_DCHECK_THREAD(thread_checker_);
380   if (!connected_) {
381     PERFETTO_DLOG(
382         "Cannot RegisterDataSource(), not connected to tracing service");
383   }
384   protos::gen::RegisterDataSourceRequest req;
385   *req.mutable_data_source_descriptor() = descriptor;
386   ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
387   async_response.Bind(
388       [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
389         if (!response)
390           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
391       });
392   producer_port_->RegisterDataSource(req, std::move(async_response));
393 }
394 
UpdateDataSource(const DataSourceDescriptor & descriptor)395 void ProducerIPCClientImpl::UpdateDataSource(
396     const DataSourceDescriptor& descriptor) {
397   PERFETTO_DCHECK_THREAD(thread_checker_);
398   if (!connected_) {
399     PERFETTO_DLOG(
400         "Cannot UpdateDataSource(), not connected to tracing service");
401   }
402   protos::gen::UpdateDataSourceRequest req;
403   *req.mutable_data_source_descriptor() = descriptor;
404   ipc::Deferred<protos::gen::UpdateDataSourceResponse> async_response;
405   async_response.Bind(
406       [](ipc::AsyncResult<protos::gen::UpdateDataSourceResponse> response) {
407         if (!response)
408           PERFETTO_DLOG("UpdateDataSource() failed: connection reset");
409       });
410   producer_port_->UpdateDataSource(req, std::move(async_response));
411 }
412 
UnregisterDataSource(const std::string & name)413 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
414   PERFETTO_DCHECK_THREAD(thread_checker_);
415   if (!connected_) {
416     PERFETTO_DLOG(
417         "Cannot UnregisterDataSource(), not connected to tracing service");
418     return;
419   }
420   protos::gen::UnregisterDataSourceRequest req;
421   req.set_data_source_name(name);
422   producer_port_->UnregisterDataSource(
423       req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>());
424 }
425 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)426 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
427                                                 uint32_t target_buffer) {
428   PERFETTO_DCHECK_THREAD(thread_checker_);
429   if (!connected_) {
430     PERFETTO_DLOG(
431         "Cannot RegisterTraceWriter(), not connected to tracing service");
432     return;
433   }
434   protos::gen::RegisterTraceWriterRequest req;
435   req.set_trace_writer_id(writer_id);
436   req.set_target_buffer(target_buffer);
437   producer_port_->RegisterTraceWriter(
438       req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>());
439 }
440 
UnregisterTraceWriter(uint32_t writer_id)441 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
442   PERFETTO_DCHECK_THREAD(thread_checker_);
443   if (!connected_) {
444     PERFETTO_DLOG(
445         "Cannot UnregisterTraceWriter(), not connected to tracing service");
446     return;
447   }
448   protos::gen::UnregisterTraceWriterRequest req;
449   req.set_trace_writer_id(writer_id);
450   producer_port_->UnregisterTraceWriter(
451       req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>());
452 }
453 
CommitData(const CommitDataRequest & req,CommitDataCallback callback)454 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
455                                        CommitDataCallback callback) {
456   PERFETTO_DCHECK_THREAD(thread_checker_);
457   if (!connected_) {
458     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
459     return;
460   }
461   ipc::Deferred<protos::gen::CommitDataResponse> async_response;
462   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
463   // this call and checks that the callback is dropped.
464   if (callback) {
465     async_response.Bind(
466         [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) {
467           if (!response) {
468             PERFETTO_DLOG("CommitData() failed: connection reset");
469             return;
470           }
471           callback();
472         });
473   }
474   producer_port_->CommitData(req, std::move(async_response));
475 }
476 
NotifyDataSourceStarted(DataSourceInstanceID id)477 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
478   PERFETTO_DCHECK_THREAD(thread_checker_);
479   if (!connected_) {
480     PERFETTO_DLOG(
481         "Cannot NotifyDataSourceStarted(), not connected to tracing service");
482     return;
483   }
484   protos::gen::NotifyDataSourceStartedRequest req;
485   req.set_data_source_id(id);
486   producer_port_->NotifyDataSourceStarted(
487       req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>());
488 }
489 
NotifyDataSourceStopped(DataSourceInstanceID id)490 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
491   PERFETTO_DCHECK_THREAD(thread_checker_);
492   if (!connected_) {
493     PERFETTO_DLOG(
494         "Cannot NotifyDataSourceStopped(), not connected to tracing service");
495     return;
496   }
497   protos::gen::NotifyDataSourceStoppedRequest req;
498   req.set_data_source_id(id);
499   producer_port_->NotifyDataSourceStopped(
500       req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>());
501 }
502 
ActivateTriggers(const std::vector<std::string> & triggers)503 void ProducerIPCClientImpl::ActivateTriggers(
504     const std::vector<std::string>& triggers) {
505   PERFETTO_DCHECK_THREAD(thread_checker_);
506   if (!connected_) {
507     PERFETTO_DLOG(
508         "Cannot ActivateTriggers(), not connected to tracing service");
509     return;
510   }
511   protos::gen::ActivateTriggersRequest proto_req;
512   for (const auto& name : triggers) {
513     *proto_req.add_trigger_names() = name;
514   }
515   producer_port_->ActivateTriggers(
516       proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>());
517 }
518 
Sync(std::function<void ()> callback)519 void ProducerIPCClientImpl::Sync(std::function<void()> callback) {
520   PERFETTO_DCHECK_THREAD(thread_checker_);
521   if (!connected_) {
522     pending_sync_reqs_.emplace_back(std::move(callback));
523     return;
524   }
525   ipc::Deferred<protos::gen::SyncResponse> resp;
526   resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) {
527     // Here we ACK the callback even if the service replies with a failure
528     // (i.e. the service is too old and doesn't understand Sync()). In that
529     // case the service has still seen the request, the IPC roundtrip is
530     // still a (weaker) linearization fence.
531     callback();
532   });
533   producer_port_->Sync(protos::gen::SyncRequest(), std::move(resp));
534 }
535 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)536 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
537     BufferID target_buffer,
538     BufferExhaustedPolicy buffer_exhausted_policy) {
539   // This method can be called by different threads. |shared_memory_arbiter_| is
540   // thread-safe but be aware of accessing any other state in this function.
541   return shared_memory_arbiter_->CreateTraceWriter(target_buffer,
542                                                    buffer_exhausted_policy);
543 }
544 
MaybeSharedMemoryArbiter()545 SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() {
546   return shared_memory_arbiter_.get();
547 }
548 
IsShmemProvidedByProducer() const549 bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const {
550   return is_shmem_provided_by_producer_;
551 }
552 
NotifyFlushComplete(FlushRequestID req_id)553 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
554   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
555 }
556 
shared_memory() const557 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
558   return shared_memory_.get();
559 }
560 
shared_buffer_page_size_kb() const561 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
562   return shared_buffer_page_size_kb_;
563 }
564 
565 }  // namespace perfetto
566