1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18
19 #include <cinttypes>
20
21 #include <string.h>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/ext/base/unix_socket.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/ext/ipc/client.h"
28 #include "perfetto/ext/tracing/core/commit_data_request.h"
29 #include "perfetto/ext/tracing/core/producer.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/ext/tracing/core/trace_writer.h"
33 #include "perfetto/tracing/core/data_source_config.h"
34 #include "perfetto/tracing/core/data_source_descriptor.h"
35 #include "perfetto/tracing/core/trace_config.h"
36 #include "src/tracing/core/in_process_shared_memory.h"
37
38 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
39 #include "src/tracing/ipc/shared_memory_windows.h"
40 #else
41 #include "src/tracing/ipc/posix_shared_memory.h"
42 #endif
43
44 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
45 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
46 // the callbacks.
47
48 namespace perfetto {
49
50 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,ConnectionFlags conn_flags)51 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
52 const char* service_sock_name,
53 Producer* producer,
54 const std::string& producer_name,
55 base::TaskRunner* task_runner,
56 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
57 size_t shared_memory_size_hint_bytes,
58 size_t shared_memory_page_size_hint_bytes,
59 std::unique_ptr<SharedMemory> shm,
60 std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
61 ConnectionFlags conn_flags) {
62 return std::unique_ptr<TracingService::ProducerEndpoint>(
63 new ProducerIPCClientImpl(
64 {service_sock_name,
65 conn_flags ==
66 ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable},
67 producer, producer_name, task_runner, smb_scraping_mode,
68 shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes,
69 std::move(shm), std::move(shm_arbiter), nullptr));
70 }
71
72 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,CreateSocketAsync create_socket_async)73 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
74 ipc::Client::ConnArgs conn_args,
75 Producer* producer,
76 const std::string& producer_name,
77 base::TaskRunner* task_runner,
78 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
79 size_t shared_memory_size_hint_bytes,
80 size_t shared_memory_page_size_hint_bytes,
81 std::unique_ptr<SharedMemory> shm,
82 std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
83 CreateSocketAsync create_socket_async) {
84 return std::unique_ptr<TracingService::ProducerEndpoint>(
85 new ProducerIPCClientImpl(
86 std::move(conn_args), producer, producer_name, task_runner,
87 smb_scraping_mode, shared_memory_size_hint_bytes,
88 shared_memory_page_size_hint_bytes, std::move(shm),
89 std::move(shm_arbiter), create_socket_async));
90 }
91
ProducerIPCClientImpl(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,CreateSocketAsync create_socket_async)92 ProducerIPCClientImpl::ProducerIPCClientImpl(
93 ipc::Client::ConnArgs conn_args,
94 Producer* producer,
95 const std::string& producer_name,
96 base::TaskRunner* task_runner,
97 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
98 size_t shared_memory_size_hint_bytes,
99 size_t shared_memory_page_size_hint_bytes,
100 std::unique_ptr<SharedMemory> shm,
101 std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
102 CreateSocketAsync create_socket_async)
103 : producer_(producer),
104 task_runner_(task_runner),
105 receive_shmem_fd_cb_fuchsia_(
106 std::move(conn_args.receive_shmem_fd_cb_fuchsia)),
107 producer_port_(
108 new protos::gen::ProducerPortProxy(this /* event_listener */)),
109 shared_memory_(std::move(shm)),
110 shared_memory_arbiter_(std::move(shm_arbiter)),
111 name_(producer_name),
112 shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes),
113 shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes),
114 smb_scraping_mode_(smb_scraping_mode) {
115 // Check for producer-provided SMB (used by Chrome for startup tracing).
116 if (shared_memory_) {
117 // We also expect a valid (unbound) arbiter. Bind it to this endpoint now.
118 PERFETTO_CHECK(shared_memory_arbiter_);
119 shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_);
120
121 // If the service accepts our SMB, then it must match our requested page
122 // layout. The protocol doesn't allow the service to change the size and
123 // layout when the SMB is provided by the producer.
124 shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024;
125 }
126
127 if (create_socket_async) {
128 PERFETTO_DCHECK(conn_args.socket_name);
129 auto weak_this = weak_factory_.GetWeakPtr();
130 create_socket_async(
131 [weak_this, task_runner = task_runner_](base::SocketHandle fd) {
132 task_runner->PostTask([weak_this, fd] {
133 base::ScopedSocketHandle handle(fd);
134 if (!weak_this) {
135 return;
136 }
137 ipc::Client::ConnArgs args(std::move(handle));
138 weak_this->ipc_channel_ = ipc::Client::CreateInstance(
139 std::move(args), weak_this->task_runner_);
140 weak_this->ipc_channel_->BindService(
141 weak_this->producer_port_->GetWeakPtr());
142 });
143 });
144 } else {
145 ipc_channel_ =
146 ipc::Client::CreateInstance(std::move(conn_args), task_runner);
147 ipc_channel_->BindService(producer_port_->GetWeakPtr());
148 }
149 PERFETTO_DCHECK_THREAD(thread_checker_);
150 }
151
~ProducerIPCClientImpl()152 ProducerIPCClientImpl::~ProducerIPCClientImpl() {
153 PERFETTO_DCHECK_THREAD(thread_checker_);
154 }
155
Disconnect()156 void ProducerIPCClientImpl::Disconnect() {
157 PERFETTO_DCHECK_THREAD(thread_checker_);
158 if (!producer_port_)
159 return;
160 // Reset the producer port so that no further IPCs are received and IPC
161 // callbacks are no longer executed. Also reset the IPC channel so that the
162 // service is notified of the disconnection.
163 producer_port_.reset();
164 ipc_channel_.reset();
165 // Perform disconnect synchronously.
166 OnDisconnect();
167 }
168
169 // Called by the IPC layer if the BindService() succeeds.
OnConnect()170 void ProducerIPCClientImpl::OnConnect() {
171 PERFETTO_DCHECK_THREAD(thread_checker_);
172 connected_ = true;
173
174 // The IPC layer guarantees that any outstanding callback will be dropped on
175 // the floor if producer_port_ is destroyed between the request and the reply.
176 // Binding |this| is hence safe.
177 ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
178 on_init.Bind(
179 [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
180 OnConnectionInitialized(
181 resp.success(),
182 resp.success() ? resp->using_shmem_provided_by_producer() : false,
183 resp.success() ? resp->direct_smb_patching_supported() : false,
184 resp.success() ? resp->use_shmem_emulation() : false);
185 });
186 protos::gen::InitializeConnectionRequest req;
187 req.set_producer_name(name_);
188 req.set_shared_memory_size_hint_bytes(
189 static_cast<uint32_t>(shared_memory_size_hint_bytes_));
190 req.set_shared_memory_page_size_hint_bytes(
191 static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
192 switch (smb_scraping_mode_) {
193 case TracingService::ProducerSMBScrapingMode::kDefault:
194 // No need to set the mode, it defaults to use the service default if
195 // unspecified.
196 break;
197 case TracingService::ProducerSMBScrapingMode::kEnabled:
198 req.set_smb_scraping_mode(
199 protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
200 break;
201 case TracingService::ProducerSMBScrapingMode::kDisabled:
202 req.set_smb_scraping_mode(
203 protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
204 break;
205 }
206
207 int shm_fd = -1;
208 if (shared_memory_) {
209 req.set_producer_provided_shmem(true);
210 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
211 auto key = static_cast<SharedMemoryWindows*>(shared_memory_.get())->key();
212 req.set_shm_key_windows(key);
213 #else
214 shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd();
215 #endif
216 }
217
218 req.set_sdk_version(base::GetVersionString());
219 producer_port_->InitializeConnection(req, std::move(on_init), shm_fd);
220
221 // Create the back channel to receive commands from the Service.
222 ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
223 on_cmd.Bind(
224 [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
225 if (!resp)
226 return; // The IPC channel was closed and |resp| was auto-rejected.
227 OnServiceRequest(*resp);
228 });
229 producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
230 std::move(on_cmd));
231
232 // If there are pending Sync() requests, send them now.
233 for (auto& pending_sync : pending_sync_reqs_)
234 Sync(std::move(pending_sync));
235 pending_sync_reqs_.clear();
236 }
237
OnDisconnect()238 void ProducerIPCClientImpl::OnDisconnect() {
239 PERFETTO_DCHECK_THREAD(thread_checker_);
240 PERFETTO_DLOG("Tracing service connection failure");
241 connected_ = false;
242 data_sources_setup_.clear();
243 producer_->OnDisconnect(); // Note: may delete |this|.
244 }
245
ScheduleDisconnect()246 void ProducerIPCClientImpl::ScheduleDisconnect() {
247 // |ipc_channel| doesn't allow disconnection in the middle of handling
248 // an IPC call, so the connection drop must take place over two phases.
249
250 // First, synchronously drop the |producer_port_| so that no more IPC
251 // messages are handled.
252 producer_port_.reset();
253
254 // Then schedule an async task for performing the remainder of the
255 // disconnection operations outside the context of the IPC method handler.
256 auto weak_this = weak_factory_.GetWeakPtr();
257 task_runner_->PostTask([weak_this]() {
258 if (weak_this) {
259 weak_this->Disconnect();
260 }
261 });
262 }
263
OnConnectionInitialized(bool connection_succeeded,bool using_shmem_provided_by_producer,bool direct_smb_patching_supported,bool use_shmem_emulation)264 void ProducerIPCClientImpl::OnConnectionInitialized(
265 bool connection_succeeded,
266 bool using_shmem_provided_by_producer,
267 bool direct_smb_patching_supported,
268 bool use_shmem_emulation) {
269 PERFETTO_DCHECK_THREAD(thread_checker_);
270 // If connection_succeeded == false, the OnDisconnect() call will follow next
271 // and there we'll notify the |producer_|. TODO: add a test for this.
272 if (!connection_succeeded)
273 return;
274 is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
275 direct_smb_patching_supported_ = direct_smb_patching_supported;
276 // The tracing service may reject using shared memory and tell the client to
277 // commit data over the socket. This can happen when the client connects to
278 // the service via a relay service:
279 // client <-Unix socket-> relay service <- vsock -> tracing service.
280 use_shmem_emulation_ = use_shmem_emulation;
281 producer_->OnConnect();
282
283 // Bail out if the service failed to adopt our producer-allocated SMB.
284 // TODO(eseckler): Handle adoption failure more gracefully.
285 if (shared_memory_ && !is_shmem_provided_by_producer_) {
286 PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
287 Disconnect();
288 return;
289 }
290 }
291
OnServiceRequest(const protos::gen::GetAsyncCommandResponse & cmd)292 void ProducerIPCClientImpl::OnServiceRequest(
293 const protos::gen::GetAsyncCommandResponse& cmd) {
294 PERFETTO_DCHECK_THREAD(thread_checker_);
295
296 // This message is sent only when connecting to a service running Android Q+.
297 // See comment below in kStartDataSource.
298 if (cmd.has_setup_data_source()) {
299 const auto& req = cmd.setup_data_source();
300 const DataSourceInstanceID dsid = req.new_instance_id();
301 data_sources_setup_.insert(dsid);
302 producer_->SetupDataSource(dsid, req.config());
303 return;
304 }
305
306 if (cmd.has_start_data_source()) {
307 const auto& req = cmd.start_data_source();
308 const DataSourceInstanceID dsid = req.new_instance_id();
309 const DataSourceConfig& cfg = req.config();
310 if (!data_sources_setup_.count(dsid)) {
311 // When connecting with an older (Android P) service, the service will not
312 // send a SetupDataSource message. We synthesize it here in that case.
313 producer_->SetupDataSource(dsid, cfg);
314 }
315 producer_->StartDataSource(dsid, cfg);
316 return;
317 }
318
319 if (cmd.has_stop_data_source()) {
320 const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
321 producer_->StopDataSource(dsid);
322 data_sources_setup_.erase(dsid);
323 return;
324 }
325
326 if (cmd.has_setup_tracing()) {
327 std::unique_ptr<SharedMemory> ipc_shared_memory;
328 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
329 const std::string& shm_key = cmd.setup_tracing().shm_key_windows();
330 if (!shm_key.empty())
331 ipc_shared_memory = SharedMemoryWindows::Attach(shm_key);
332 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_FUCHSIA)
333 // On Fuchsia, the embedder is responsible for routing the shared memory
334 // FD, which is provided to this code via a blocking callback.
335 PERFETTO_CHECK(receive_shmem_fd_cb_fuchsia_);
336
337 base::ScopedFile shmem_fd(receive_shmem_fd_cb_fuchsia_());
338 if (!shmem_fd) {
339 // Failure to get a shared memory buffer is a protocol violation and
340 // therefore we should drop the Protocol connection.
341 PERFETTO_ELOG("Could not get shared memory FD from embedder.");
342 ScheduleDisconnect();
343 return;
344 }
345
346 ipc_shared_memory =
347 PosixSharedMemory::AttachToFd(std::move(shmem_fd),
348 /*require_seals_if_supported=*/false);
349 #else
350 base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
351 if (shmem_fd) {
352 // TODO(primiano): handle mmap failure in case of OOM.
353 ipc_shared_memory =
354 PosixSharedMemory::AttachToFd(std::move(shmem_fd),
355 /*require_seals_if_supported=*/false);
356 }
357 #endif
358 if (use_shmem_emulation_) {
359 PERFETTO_CHECK(!ipc_shared_memory);
360 // Need to create an emulated shmem buffer when the transport deosn't
361 // support it.
362 ipc_shared_memory = InProcessSharedMemory::Create(
363 /*size=*/InProcessSharedMemory::kShmemEmulationSize);
364 }
365 if (ipc_shared_memory) {
366 auto shmem_mode = use_shmem_emulation_
367 ? SharedMemoryABI::ShmemMode::kShmemEmulation
368 : SharedMemoryABI::ShmemMode::kDefault;
369 // This is the nominal case used in most configurations, where the service
370 // provides the SMB.
371 PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_);
372 shared_memory_ = std::move(ipc_shared_memory);
373 shared_buffer_page_size_kb_ =
374 cmd.setup_tracing().shared_buffer_page_size_kb();
375 shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
376 shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, shmem_mode,
377 this, task_runner_);
378 if (direct_smb_patching_supported_)
379 shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService();
380 } else {
381 // Producer-provided SMB (used by Chrome for startup tracing).
382 PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ &&
383 shared_memory_arbiter_);
384 }
385 producer_->OnTracingSetup();
386 return;
387 }
388
389 if (cmd.has_flush()) {
390 // This cast boilerplate is required only because protobuf uses its own
391 // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
392 // type (long vs long long) even though they have the same size.
393 const auto* data_source_ids = cmd.flush().data_source_ids().data();
394 static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
395 "data_source_ids should be 64-bit");
396
397 FlushFlags flags(cmd.flush().flags());
398 producer_->Flush(
399 cmd.flush().request_id(),
400 reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
401 static_cast<size_t>(cmd.flush().data_source_ids().size()), flags);
402 return;
403 }
404
405 if (cmd.has_clear_incremental_state()) {
406 const auto* data_source_ids =
407 cmd.clear_incremental_state().data_source_ids().data();
408 static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
409 "data_source_ids should be 64-bit");
410 producer_->ClearIncrementalState(
411 reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
412 static_cast<size_t>(
413 cmd.clear_incremental_state().data_source_ids().size()));
414 return;
415 }
416
417 PERFETTO_DFATAL("Unknown async request received from tracing service");
418 }
419
RegisterDataSource(const DataSourceDescriptor & descriptor)420 void ProducerIPCClientImpl::RegisterDataSource(
421 const DataSourceDescriptor& descriptor) {
422 PERFETTO_DCHECK_THREAD(thread_checker_);
423 if (!connected_) {
424 PERFETTO_DLOG(
425 "Cannot RegisterDataSource(), not connected to tracing service");
426 }
427 protos::gen::RegisterDataSourceRequest req;
428 *req.mutable_data_source_descriptor() = descriptor;
429 ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
430 async_response.Bind(
431 [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
432 if (!response)
433 PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
434 });
435 producer_port_->RegisterDataSource(req, std::move(async_response));
436 }
437
UpdateDataSource(const DataSourceDescriptor & descriptor)438 void ProducerIPCClientImpl::UpdateDataSource(
439 const DataSourceDescriptor& descriptor) {
440 PERFETTO_DCHECK_THREAD(thread_checker_);
441 if (!connected_) {
442 PERFETTO_DLOG(
443 "Cannot UpdateDataSource(), not connected to tracing service");
444 }
445 protos::gen::UpdateDataSourceRequest req;
446 *req.mutable_data_source_descriptor() = descriptor;
447 ipc::Deferred<protos::gen::UpdateDataSourceResponse> async_response;
448 async_response.Bind(
449 [](ipc::AsyncResult<protos::gen::UpdateDataSourceResponse> response) {
450 if (!response)
451 PERFETTO_DLOG("UpdateDataSource() failed: connection reset");
452 });
453 producer_port_->UpdateDataSource(req, std::move(async_response));
454 }
455
UnregisterDataSource(const std::string & name)456 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
457 PERFETTO_DCHECK_THREAD(thread_checker_);
458 if (!connected_) {
459 PERFETTO_DLOG(
460 "Cannot UnregisterDataSource(), not connected to tracing service");
461 return;
462 }
463 protos::gen::UnregisterDataSourceRequest req;
464 req.set_data_source_name(name);
465 producer_port_->UnregisterDataSource(
466 req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>());
467 }
468
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)469 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
470 uint32_t target_buffer) {
471 PERFETTO_DCHECK_THREAD(thread_checker_);
472 if (!connected_) {
473 PERFETTO_DLOG(
474 "Cannot RegisterTraceWriter(), not connected to tracing service");
475 return;
476 }
477 protos::gen::RegisterTraceWriterRequest req;
478 req.set_trace_writer_id(writer_id);
479 req.set_target_buffer(target_buffer);
480 producer_port_->RegisterTraceWriter(
481 req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>());
482 }
483
UnregisterTraceWriter(uint32_t writer_id)484 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
485 PERFETTO_DCHECK_THREAD(thread_checker_);
486 if (!connected_) {
487 PERFETTO_DLOG(
488 "Cannot UnregisterTraceWriter(), not connected to tracing service");
489 return;
490 }
491 protos::gen::UnregisterTraceWriterRequest req;
492 req.set_trace_writer_id(writer_id);
493 producer_port_->UnregisterTraceWriter(
494 req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>());
495 }
496
CommitData(const CommitDataRequest & req,CommitDataCallback callback)497 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
498 CommitDataCallback callback) {
499 PERFETTO_DCHECK_THREAD(thread_checker_);
500 if (!connected_) {
501 PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
502 return;
503 }
504 ipc::Deferred<protos::gen::CommitDataResponse> async_response;
505 // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
506 // this call and checks that the callback is dropped.
507 if (callback) {
508 async_response.Bind(
509 [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) {
510 if (!response) {
511 PERFETTO_DLOG("CommitData() failed: connection reset");
512 return;
513 }
514 callback();
515 });
516 }
517 producer_port_->CommitData(req, std::move(async_response));
518 }
519
NotifyDataSourceStarted(DataSourceInstanceID id)520 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
521 PERFETTO_DCHECK_THREAD(thread_checker_);
522 if (!connected_) {
523 PERFETTO_DLOG(
524 "Cannot NotifyDataSourceStarted(), not connected to tracing service");
525 return;
526 }
527 protos::gen::NotifyDataSourceStartedRequest req;
528 req.set_data_source_id(id);
529 producer_port_->NotifyDataSourceStarted(
530 req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>());
531 }
532
NotifyDataSourceStopped(DataSourceInstanceID id)533 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
534 PERFETTO_DCHECK_THREAD(thread_checker_);
535 if (!connected_) {
536 PERFETTO_DLOG(
537 "Cannot NotifyDataSourceStopped(), not connected to tracing service");
538 return;
539 }
540 protos::gen::NotifyDataSourceStoppedRequest req;
541 req.set_data_source_id(id);
542 producer_port_->NotifyDataSourceStopped(
543 req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>());
544 }
545
ActivateTriggers(const std::vector<std::string> & triggers)546 void ProducerIPCClientImpl::ActivateTriggers(
547 const std::vector<std::string>& triggers) {
548 PERFETTO_DCHECK_THREAD(thread_checker_);
549 if (!connected_) {
550 PERFETTO_DLOG(
551 "Cannot ActivateTriggers(), not connected to tracing service");
552 return;
553 }
554 protos::gen::ActivateTriggersRequest proto_req;
555 for (const auto& name : triggers) {
556 *proto_req.add_trigger_names() = name;
557 }
558 producer_port_->ActivateTriggers(
559 proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>());
560 }
561
Sync(std::function<void ()> callback)562 void ProducerIPCClientImpl::Sync(std::function<void()> callback) {
563 PERFETTO_DCHECK_THREAD(thread_checker_);
564 if (!connected_) {
565 pending_sync_reqs_.emplace_back(std::move(callback));
566 return;
567 }
568 ipc::Deferred<protos::gen::SyncResponse> resp;
569 resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) {
570 // Here we ACK the callback even if the service replies with a failure
571 // (i.e. the service is too old and doesn't understand Sync()). In that
572 // case the service has still seen the request, the IPC roundtrip is
573 // still a (weaker) linearization fence.
574 callback();
575 });
576 producer_port_->Sync(protos::gen::SyncRequest(), std::move(resp));
577 }
578
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)579 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
580 BufferID target_buffer,
581 BufferExhaustedPolicy buffer_exhausted_policy) {
582 // This method can be called by different threads. |shared_memory_arbiter_| is
583 // thread-safe but be aware of accessing any other state in this function.
584 return shared_memory_arbiter_->CreateTraceWriter(target_buffer,
585 buffer_exhausted_policy);
586 }
587
MaybeSharedMemoryArbiter()588 SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() {
589 return shared_memory_arbiter_.get();
590 }
591
IsShmemProvidedByProducer() const592 bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const {
593 return is_shmem_provided_by_producer_;
594 }
595
NotifyFlushComplete(FlushRequestID req_id)596 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
597 return shared_memory_arbiter_->NotifyFlushComplete(req_id);
598 }
599
shared_memory() const600 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
601 return shared_memory_.get();
602 }
603
shared_buffer_page_size_kb() const604 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
605 return shared_buffer_page_size_kb_;
606 }
607
608 } // namespace perfetto
609