• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ipc/client.h"
24 #include "perfetto/tracing/core/commit_data_request.h"
25 #include "perfetto/tracing/core/data_source_config.h"
26 #include "perfetto/tracing/core/data_source_descriptor.h"
27 #include "perfetto/tracing/core/producer.h"
28 #include "perfetto/tracing/core/shared_memory_arbiter.h"
29 #include "perfetto/tracing/core/trace_config.h"
30 #include "perfetto/tracing/core/trace_writer.h"
31 #include "src/tracing/ipc/posix_shared_memory.h"
32 
33 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
34 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
35 // the callbacks.
36 
37 namespace perfetto {
38 
39 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner)40 std::unique_ptr<Service::ProducerEndpoint> ProducerIPCClient::Connect(
41     const char* service_sock_name,
42     Producer* producer,
43     const std::string& producer_name,
44     base::TaskRunner* task_runner) {
45   return std::unique_ptr<Service::ProducerEndpoint>(new ProducerIPCClientImpl(
46       service_sock_name, producer, producer_name, task_runner));
47 }
48 
ProducerIPCClientImpl(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner)49 ProducerIPCClientImpl::ProducerIPCClientImpl(const char* service_sock_name,
50                                              Producer* producer,
51                                              const std::string& producer_name,
52                                              base::TaskRunner* task_runner)
53     : producer_(producer),
54       task_runner_(task_runner),
55       ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
56       producer_port_(this /* event_listener */),
57       name_(producer_name) {
58   ipc_channel_->BindService(producer_port_.GetWeakPtr());
59   PERFETTO_DCHECK_THREAD(thread_checker_);
60 }
61 
62 ProducerIPCClientImpl::~ProducerIPCClientImpl() = default;
63 
64 // Called by the IPC layer if the BindService() succeeds.
OnConnect()65 void ProducerIPCClientImpl::OnConnect() {
66   PERFETTO_DCHECK_THREAD(thread_checker_);
67   connected_ = true;
68 
69   // The IPC layer guarantees that any outstanding callback will be dropped on
70   // the floor if producer_port_ is destroyed between the request and the reply.
71   // Binding |this| is hence safe.
72   ipc::Deferred<protos::InitializeConnectionResponse> on_init;
73   on_init.Bind(
74       [this](ipc::AsyncResult<protos::InitializeConnectionResponse> resp) {
75         OnConnectionInitialized(resp.success());
76       });
77   protos::InitializeConnectionRequest req;
78   req.set_producer_name(name_);
79   producer_port_.InitializeConnection(req, std::move(on_init));
80 
81   // Create the back channel to receive commands from the Service.
82   ipc::Deferred<protos::GetAsyncCommandResponse> on_cmd;
83   on_cmd.Bind([this](ipc::AsyncResult<protos::GetAsyncCommandResponse> resp) {
84     if (!resp)
85       return;  // The IPC channel was closed and |resp| was auto-rejected.
86     OnServiceRequest(*resp);
87   });
88   producer_port_.GetAsyncCommand(protos::GetAsyncCommandRequest(),
89                                  std::move(on_cmd));
90 }
91 
OnDisconnect()92 void ProducerIPCClientImpl::OnDisconnect() {
93   PERFETTO_DCHECK_THREAD(thread_checker_);
94   PERFETTO_DLOG("Tracing service connection failure");
95   connected_ = false;
96   producer_->OnDisconnect();
97 }
98 
OnConnectionInitialized(bool connection_succeeded)99 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
100   PERFETTO_DCHECK_THREAD(thread_checker_);
101   // If connection_succeeded == false, the OnDisconnect() call will follow next
102   // and there we'll notify the |producer_|. TODO: add a test for this.
103   if (!connection_succeeded)
104     return;
105   producer_->OnConnect();
106 }
107 
OnServiceRequest(const protos::GetAsyncCommandResponse & cmd)108 void ProducerIPCClientImpl::OnServiceRequest(
109     const protos::GetAsyncCommandResponse& cmd) {
110   PERFETTO_DCHECK_THREAD(thread_checker_);
111   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
112     const auto& req = cmd.start_data_source();
113     const DataSourceInstanceID dsid = req.new_instance_id();
114     DataSourceConfig cfg;
115     cfg.FromProto(req.config());
116     producer_->CreateDataSourceInstance(dsid, cfg);
117     return;
118   }
119 
120   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
121     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
122     producer_->TearDownDataSourceInstance(dsid);
123     return;
124   }
125 
126   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupTracing) {
127     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
128     PERFETTO_CHECK(shmem_fd);
129 
130     // TODO(primiano): handle mmap failure in case of OOM.
131     shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
132     shared_buffer_page_size_kb_ =
133         cmd.setup_tracing().shared_buffer_page_size_kb();
134     shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
135         shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
136         task_runner_);
137     producer_->OnTracingSetup();
138     return;
139   }
140 
141   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
142     // This cast boilerplate is required only because protobuf uses its own
143     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
144     // type (long vs long long) even though they have the same size.
145     const auto* data_source_ids = cmd.flush().data_source_ids().data();
146     static_assert(sizeof(data_source_ids[0]) == sizeof(FlushRequestID),
147                   "data_source_ids should be 64-bit");
148     producer_->Flush(cmd.flush().request_id(),
149                      reinterpret_cast<const FlushRequestID*>(data_source_ids),
150                      static_cast<size_t>(cmd.flush().data_source_ids().size()));
151     return;
152   }
153 
154   PERFETTO_DLOG("Unknown async request %d received from tracing service",
155                 cmd.cmd_case());
156   PERFETTO_DCHECK(false);
157 }
158 
RegisterDataSource(const DataSourceDescriptor & descriptor)159 void ProducerIPCClientImpl::RegisterDataSource(
160     const DataSourceDescriptor& descriptor) {
161   PERFETTO_DCHECK_THREAD(thread_checker_);
162   if (!connected_) {
163     PERFETTO_DLOG(
164         "Cannot RegisterDataSource(), not connected to tracing service");
165   }
166   protos::RegisterDataSourceRequest req;
167   descriptor.ToProto(req.mutable_data_source_descriptor());
168   ipc::Deferred<protos::RegisterDataSourceResponse> async_response;
169   async_response.Bind(
170       [](ipc::AsyncResult<protos::RegisterDataSourceResponse> response) {
171         if (!response)
172           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
173       });
174   producer_port_.RegisterDataSource(req, std::move(async_response));
175 }
176 
UnregisterDataSource(const std::string & name)177 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
178   PERFETTO_DCHECK_THREAD(thread_checker_);
179   if (!connected_) {
180     PERFETTO_DLOG(
181         "Cannot UnregisterDataSource(), not connected to tracing service");
182     return;
183   }
184   protos::UnregisterDataSourceRequest req;
185   req.set_data_source_name(name);
186   producer_port_.UnregisterDataSource(
187       req, ipc::Deferred<protos::UnregisterDataSourceResponse>());
188 }
189 
CommitData(const CommitDataRequest & req,CommitDataCallback callback)190 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
191                                        CommitDataCallback callback) {
192   PERFETTO_DCHECK_THREAD(thread_checker_);
193   if (!connected_) {
194     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
195     return;
196   }
197   protos::CommitDataRequest proto_req;
198   req.ToProto(&proto_req);
199   ipc::Deferred<protos::CommitDataResponse> async_response;
200   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
201   // this call and checks that the callback is dropped.
202   if (callback) {
203     async_response.Bind(
204         [callback](ipc::AsyncResult<protos::CommitDataResponse> response) {
205           if (!response) {
206             PERFETTO_DLOG("CommitData() failed: connection reset");
207             return;
208           }
209           callback();
210         });
211   }
212   producer_port_.CommitData(proto_req, std::move(async_response));
213 }
214 
CreateTraceWriter(BufferID target_buffer)215 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
216     BufferID target_buffer) {
217   // This method can be called by different threads. |shared_memory_arbiter_| is
218   // thread-safe but be aware of accessing any other state in this function.
219   return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
220 }
221 
NotifyFlushComplete(FlushRequestID req_id)222 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
223   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
224 }
225 
shared_memory() const226 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
227   return shared_memory_.get();
228 }
229 
shared_buffer_page_size_kb() const230 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
231   return shared_buffer_page_size_kb_;
232 }
233 
234 }  // namespace perfetto
235