• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ipc/client.h"
24 #include "perfetto/tracing/core/commit_data_request.h"
25 #include "perfetto/tracing/core/data_source_config.h"
26 #include "perfetto/tracing/core/data_source_descriptor.h"
27 #include "perfetto/tracing/core/producer.h"
28 #include "perfetto/tracing/core/shared_memory_arbiter.h"
29 #include "perfetto/tracing/core/trace_config.h"
30 #include "perfetto/tracing/core/trace_writer.h"
31 #include "src/tracing/ipc/posix_shared_memory.h"
32 
33 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
34 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
35 // the callbacks.
36 
37 namespace perfetto {
38 
39 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode)40 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
41     const char* service_sock_name,
42     Producer* producer,
43     const std::string& producer_name,
44     base::TaskRunner* task_runner,
45     TracingService::ProducerSMBScrapingMode smb_scraping_mode) {
46   return std::unique_ptr<TracingService::ProducerEndpoint>(
47       new ProducerIPCClientImpl(service_sock_name, producer, producer_name,
48                                 task_runner, smb_scraping_mode));
49 }
50 
ProducerIPCClientImpl(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode)51 ProducerIPCClientImpl::ProducerIPCClientImpl(
52     const char* service_sock_name,
53     Producer* producer,
54     const std::string& producer_name,
55     base::TaskRunner* task_runner,
56     TracingService::ProducerSMBScrapingMode smb_scraping_mode)
57     : producer_(producer),
58       task_runner_(task_runner),
59       ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
60       producer_port_(this /* event_listener */),
61       name_(producer_name),
62       smb_scraping_mode_(smb_scraping_mode) {
63   ipc_channel_->BindService(producer_port_.GetWeakPtr());
64   PERFETTO_DCHECK_THREAD(thread_checker_);
65 }
66 
67 ProducerIPCClientImpl::~ProducerIPCClientImpl() = default;
68 
69 // Called by the IPC layer if the BindService() succeeds.
OnConnect()70 void ProducerIPCClientImpl::OnConnect() {
71   PERFETTO_DCHECK_THREAD(thread_checker_);
72   connected_ = true;
73 
74   // The IPC layer guarantees that any outstanding callback will be dropped on
75   // the floor if producer_port_ is destroyed between the request and the reply.
76   // Binding |this| is hence safe.
77   ipc::Deferred<protos::InitializeConnectionResponse> on_init;
78   on_init.Bind(
79       [this](ipc::AsyncResult<protos::InitializeConnectionResponse> resp) {
80         OnConnectionInitialized(resp.success());
81       });
82   protos::InitializeConnectionRequest req;
83   req.set_producer_name(name_);
84   switch (smb_scraping_mode_) {
85     case TracingService::ProducerSMBScrapingMode::kDefault:
86       // No need to set the mode, it defaults to use the service default if
87       // unspecified.
88       break;
89     case TracingService::ProducerSMBScrapingMode::kEnabled:
90       req.set_smb_scraping_mode(
91           protos::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
92       break;
93     case TracingService::ProducerSMBScrapingMode::kDisabled:
94       req.set_smb_scraping_mode(
95           protos::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
96       break;
97   }
98   producer_port_.InitializeConnection(req, std::move(on_init));
99 
100   // Create the back channel to receive commands from the Service.
101   ipc::Deferred<protos::GetAsyncCommandResponse> on_cmd;
102   on_cmd.Bind([this](ipc::AsyncResult<protos::GetAsyncCommandResponse> resp) {
103     if (!resp)
104       return;  // The IPC channel was closed and |resp| was auto-rejected.
105     OnServiceRequest(*resp);
106   });
107   producer_port_.GetAsyncCommand(protos::GetAsyncCommandRequest(),
108                                  std::move(on_cmd));
109 }
110 
OnDisconnect()111 void ProducerIPCClientImpl::OnDisconnect() {
112   PERFETTO_DCHECK_THREAD(thread_checker_);
113   PERFETTO_DLOG("Tracing service connection failure");
114   connected_ = false;
115   producer_->OnDisconnect();
116   data_sources_setup_.clear();
117 }
118 
OnConnectionInitialized(bool connection_succeeded)119 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
120   PERFETTO_DCHECK_THREAD(thread_checker_);
121   // If connection_succeeded == false, the OnDisconnect() call will follow next
122   // and there we'll notify the |producer_|. TODO: add a test for this.
123   if (!connection_succeeded)
124     return;
125   producer_->OnConnect();
126 }
127 
OnServiceRequest(const protos::GetAsyncCommandResponse & cmd)128 void ProducerIPCClientImpl::OnServiceRequest(
129     const protos::GetAsyncCommandResponse& cmd) {
130   PERFETTO_DCHECK_THREAD(thread_checker_);
131 
132   // This message is sent only when connecting to a service running Android Q+.
133   // See comment below in kStartDataSource.
134   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupDataSource) {
135     const auto& req = cmd.setup_data_source();
136     const DataSourceInstanceID dsid = req.new_instance_id();
137     DataSourceConfig cfg;
138     cfg.FromProto(req.config());
139     data_sources_setup_.insert(dsid);
140     producer_->SetupDataSource(dsid, cfg);
141     return;
142   }
143 
144   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
145     const auto& req = cmd.start_data_source();
146     const DataSourceInstanceID dsid = req.new_instance_id();
147     DataSourceConfig cfg;
148     cfg.FromProto(req.config());
149     if (!data_sources_setup_.count(dsid)) {
150       // When connecting with an older (Android P) service, the service will not
151       // send a SetupDataSource message. We synthesize it here in that case.
152       producer_->SetupDataSource(dsid, cfg);
153     }
154     producer_->StartDataSource(dsid, cfg);
155     return;
156   }
157 
158   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
159     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
160     producer_->StopDataSource(dsid);
161     data_sources_setup_.erase(dsid);
162     return;
163   }
164 
165   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupTracing) {
166     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
167     PERFETTO_CHECK(shmem_fd);
168 
169     // TODO(primiano): handle mmap failure in case of OOM.
170     shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
171     shared_buffer_page_size_kb_ =
172         cmd.setup_tracing().shared_buffer_page_size_kb();
173     shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
174         shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
175         task_runner_);
176     producer_->OnTracingSetup();
177     return;
178   }
179 
180   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
181     // This cast boilerplate is required only because protobuf uses its own
182     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
183     // type (long vs long long) even though they have the same size.
184     const auto* data_source_ids = cmd.flush().data_source_ids().data();
185     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
186                   "data_source_ids should be 64-bit");
187     producer_->Flush(
188         cmd.flush().request_id(),
189         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
190         static_cast<size_t>(cmd.flush().data_source_ids().size()));
191     return;
192   }
193 
194   if (cmd.cmd_case() ==
195       protos::GetAsyncCommandResponse::kClearIncrementalState) {
196     const auto* data_source_ids =
197         cmd.clear_incremental_state().data_source_ids().data();
198     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
199                   "data_source_ids should be 64-bit");
200     producer_->ClearIncrementalState(
201         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
202         static_cast<size_t>(
203             cmd.clear_incremental_state().data_source_ids().size()));
204     return;
205   }
206 
207   PERFETTO_DFATAL("Unknown async request %d received from tracing service",
208                   cmd.cmd_case());
209 }
210 
RegisterDataSource(const DataSourceDescriptor & descriptor)211 void ProducerIPCClientImpl::RegisterDataSource(
212     const DataSourceDescriptor& descriptor) {
213   PERFETTO_DCHECK_THREAD(thread_checker_);
214   if (!connected_) {
215     PERFETTO_DLOG(
216         "Cannot RegisterDataSource(), not connected to tracing service");
217   }
218   protos::RegisterDataSourceRequest req;
219   descriptor.ToProto(req.mutable_data_source_descriptor());
220   ipc::Deferred<protos::RegisterDataSourceResponse> async_response;
221   async_response.Bind(
222       [](ipc::AsyncResult<protos::RegisterDataSourceResponse> response) {
223         if (!response)
224           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
225       });
226   producer_port_.RegisterDataSource(req, std::move(async_response));
227 }
228 
UnregisterDataSource(const std::string & name)229 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
230   PERFETTO_DCHECK_THREAD(thread_checker_);
231   if (!connected_) {
232     PERFETTO_DLOG(
233         "Cannot UnregisterDataSource(), not connected to tracing service");
234     return;
235   }
236   protos::UnregisterDataSourceRequest req;
237   req.set_data_source_name(name);
238   producer_port_.UnregisterDataSource(
239       req, ipc::Deferred<protos::UnregisterDataSourceResponse>());
240 }
241 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)242 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
243                                                 uint32_t target_buffer) {
244   PERFETTO_DCHECK_THREAD(thread_checker_);
245   if (!connected_) {
246     PERFETTO_DLOG(
247         "Cannot RegisterTraceWriter(), not connected to tracing service");
248     return;
249   }
250   protos::RegisterTraceWriterRequest req;
251   req.set_trace_writer_id(writer_id);
252   req.set_target_buffer(target_buffer);
253   producer_port_.RegisterTraceWriter(
254       req, ipc::Deferred<protos::RegisterTraceWriterResponse>());
255 }
256 
UnregisterTraceWriter(uint32_t writer_id)257 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
258   PERFETTO_DCHECK_THREAD(thread_checker_);
259   if (!connected_) {
260     PERFETTO_DLOG(
261         "Cannot UnregisterTraceWriter(), not connected to tracing service");
262     return;
263   }
264   protos::UnregisterTraceWriterRequest req;
265   req.set_trace_writer_id(writer_id);
266   producer_port_.UnregisterTraceWriter(
267       req, ipc::Deferred<protos::UnregisterTraceWriterResponse>());
268 }
269 
CommitData(const CommitDataRequest & req,CommitDataCallback callback)270 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
271                                        CommitDataCallback callback) {
272   PERFETTO_DCHECK_THREAD(thread_checker_);
273   if (!connected_) {
274     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
275     return;
276   }
277   protos::CommitDataRequest proto_req;
278   req.ToProto(&proto_req);
279   ipc::Deferred<protos::CommitDataResponse> async_response;
280   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
281   // this call and checks that the callback is dropped.
282   if (callback) {
283     async_response.Bind(
284         [callback](ipc::AsyncResult<protos::CommitDataResponse> response) {
285           if (!response) {
286             PERFETTO_DLOG("CommitData() failed: connection reset");
287             return;
288           }
289           callback();
290         });
291   }
292   producer_port_.CommitData(proto_req, std::move(async_response));
293 }
294 
NotifyDataSourceStarted(DataSourceInstanceID id)295 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
296   PERFETTO_DCHECK_THREAD(thread_checker_);
297   if (!connected_) {
298     PERFETTO_DLOG(
299         "Cannot NotifyDataSourceStarted(), not connected to tracing service");
300     return;
301   }
302   protos::NotifyDataSourceStartedRequest req;
303   req.set_data_source_id(id);
304   producer_port_.NotifyDataSourceStarted(
305       req, ipc::Deferred<protos::NotifyDataSourceStartedResponse>());
306 }
307 
NotifyDataSourceStopped(DataSourceInstanceID id)308 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
309   PERFETTO_DCHECK_THREAD(thread_checker_);
310   if (!connected_) {
311     PERFETTO_DLOG(
312         "Cannot NotifyDataSourceStopped(), not connected to tracing service");
313     return;
314   }
315   protos::NotifyDataSourceStoppedRequest req;
316   req.set_data_source_id(id);
317   producer_port_.NotifyDataSourceStopped(
318       req, ipc::Deferred<protos::NotifyDataSourceStoppedResponse>());
319 }
320 
ActivateTriggers(const std::vector<std::string> & triggers)321 void ProducerIPCClientImpl::ActivateTriggers(
322     const std::vector<std::string>& triggers) {
323   PERFETTO_DCHECK_THREAD(thread_checker_);
324   if (!connected_) {
325     PERFETTO_DLOG(
326         "Cannot ActivateTriggers(), not connected to tracing service");
327     return;
328   }
329   protos::ActivateTriggersRequest proto_req;
330   for (const auto& name : triggers) {
331     *proto_req.add_trigger_names() = name;
332   }
333   producer_port_.ActivateTriggers(
334       proto_req, ipc::Deferred<protos::ActivateTriggersResponse>());
335 }
336 
CreateTraceWriter(BufferID target_buffer)337 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
338     BufferID target_buffer) {
339   // This method can be called by different threads. |shared_memory_arbiter_| is
340   // thread-safe but be aware of accessing any other state in this function.
341   return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
342 }
343 
GetInProcessShmemArbiter()344 SharedMemoryArbiter* ProducerIPCClientImpl::GetInProcessShmemArbiter() {
345   PERFETTO_DLOG("Cannot GetInProcessShmemArbiter() via the IPC layer.");
346   return nullptr;
347 }
348 
NotifyFlushComplete(FlushRequestID req_id)349 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
350   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
351 }
352 
shared_memory() const353 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
354   return shared_memory_.get();
355 }
356 
shared_buffer_page_size_kb() const357 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
358   return shared_buffer_page_size_kb_;
359 }
360 
361 }  // namespace perfetto
362