1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18
19 #include <inttypes.h>
20 #include <string.h>
21
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ipc/client.h"
24 #include "perfetto/tracing/core/commit_data_request.h"
25 #include "perfetto/tracing/core/data_source_config.h"
26 #include "perfetto/tracing/core/data_source_descriptor.h"
27 #include "perfetto/tracing/core/producer.h"
28 #include "perfetto/tracing/core/shared_memory_arbiter.h"
29 #include "perfetto/tracing/core/trace_config.h"
30 #include "perfetto/tracing/core/trace_writer.h"
31 #include "src/tracing/ipc/posix_shared_memory.h"
32
33 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
34 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
35 // the callbacks.
36
37 namespace perfetto {
38
39 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner)40 std::unique_ptr<Service::ProducerEndpoint> ProducerIPCClient::Connect(
41 const char* service_sock_name,
42 Producer* producer,
43 const std::string& producer_name,
44 base::TaskRunner* task_runner) {
45 return std::unique_ptr<Service::ProducerEndpoint>(new ProducerIPCClientImpl(
46 service_sock_name, producer, producer_name, task_runner));
47 }
48
ProducerIPCClientImpl(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner)49 ProducerIPCClientImpl::ProducerIPCClientImpl(const char* service_sock_name,
50 Producer* producer,
51 const std::string& producer_name,
52 base::TaskRunner* task_runner)
53 : producer_(producer),
54 task_runner_(task_runner),
55 ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
56 producer_port_(this /* event_listener */),
57 name_(producer_name) {
58 ipc_channel_->BindService(producer_port_.GetWeakPtr());
59 PERFETTO_DCHECK_THREAD(thread_checker_);
60 }
61
62 ProducerIPCClientImpl::~ProducerIPCClientImpl() = default;
63
64 // Called by the IPC layer if the BindService() succeeds.
OnConnect()65 void ProducerIPCClientImpl::OnConnect() {
66 PERFETTO_DCHECK_THREAD(thread_checker_);
67 connected_ = true;
68
69 // The IPC layer guarantees that any outstanding callback will be dropped on
70 // the floor if producer_port_ is destroyed between the request and the reply.
71 // Binding |this| is hence safe.
72 ipc::Deferred<protos::InitializeConnectionResponse> on_init;
73 on_init.Bind(
74 [this](ipc::AsyncResult<protos::InitializeConnectionResponse> resp) {
75 OnConnectionInitialized(resp.success());
76 });
77 protos::InitializeConnectionRequest req;
78 req.set_producer_name(name_);
79 producer_port_.InitializeConnection(req, std::move(on_init));
80
81 // Create the back channel to receive commands from the Service.
82 ipc::Deferred<protos::GetAsyncCommandResponse> on_cmd;
83 on_cmd.Bind([this](ipc::AsyncResult<protos::GetAsyncCommandResponse> resp) {
84 if (!resp)
85 return; // The IPC channel was closed and |resp| was auto-rejected.
86 OnServiceRequest(*resp);
87 });
88 producer_port_.GetAsyncCommand(protos::GetAsyncCommandRequest(),
89 std::move(on_cmd));
90 }
91
OnDisconnect()92 void ProducerIPCClientImpl::OnDisconnect() {
93 PERFETTO_DCHECK_THREAD(thread_checker_);
94 PERFETTO_DLOG("Tracing service connection failure");
95 connected_ = false;
96 producer_->OnDisconnect();
97 }
98
OnConnectionInitialized(bool connection_succeeded)99 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
100 PERFETTO_DCHECK_THREAD(thread_checker_);
101 // If connection_succeeded == false, the OnDisconnect() call will follow next
102 // and there we'll notify the |producer_|. TODO: add a test for this.
103 if (!connection_succeeded)
104 return;
105 producer_->OnConnect();
106 }
107
OnServiceRequest(const protos::GetAsyncCommandResponse & cmd)108 void ProducerIPCClientImpl::OnServiceRequest(
109 const protos::GetAsyncCommandResponse& cmd) {
110 PERFETTO_DCHECK_THREAD(thread_checker_);
111 if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
112 const auto& req = cmd.start_data_source();
113 const DataSourceInstanceID dsid = req.new_instance_id();
114 DataSourceConfig cfg;
115 cfg.FromProto(req.config());
116 producer_->CreateDataSourceInstance(dsid, cfg);
117 return;
118 }
119
120 if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
121 const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
122 producer_->TearDownDataSourceInstance(dsid);
123 return;
124 }
125
126 if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupTracing) {
127 base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
128 PERFETTO_CHECK(shmem_fd);
129
130 // TODO(primiano): handle mmap failure in case of OOM.
131 shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
132 shared_buffer_page_size_kb_ =
133 cmd.setup_tracing().shared_buffer_page_size_kb();
134 shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
135 shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
136 task_runner_);
137 producer_->OnTracingSetup();
138 return;
139 }
140
141 if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
142 // This cast boilerplate is required only because protobuf uses its own
143 // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
144 // type (long vs long long) even though they have the same size.
145 const auto* data_source_ids = cmd.flush().data_source_ids().data();
146 static_assert(sizeof(data_source_ids[0]) == sizeof(FlushRequestID),
147 "data_source_ids should be 64-bit");
148 producer_->Flush(cmd.flush().request_id(),
149 reinterpret_cast<const FlushRequestID*>(data_source_ids),
150 static_cast<size_t>(cmd.flush().data_source_ids().size()));
151 return;
152 }
153
154 PERFETTO_DLOG("Unknown async request %d received from tracing service",
155 cmd.cmd_case());
156 PERFETTO_DCHECK(false);
157 }
158
RegisterDataSource(const DataSourceDescriptor & descriptor)159 void ProducerIPCClientImpl::RegisterDataSource(
160 const DataSourceDescriptor& descriptor) {
161 PERFETTO_DCHECK_THREAD(thread_checker_);
162 if (!connected_) {
163 PERFETTO_DLOG(
164 "Cannot RegisterDataSource(), not connected to tracing service");
165 }
166 protos::RegisterDataSourceRequest req;
167 descriptor.ToProto(req.mutable_data_source_descriptor());
168 ipc::Deferred<protos::RegisterDataSourceResponse> async_response;
169 async_response.Bind(
170 [](ipc::AsyncResult<protos::RegisterDataSourceResponse> response) {
171 if (!response)
172 PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
173 });
174 producer_port_.RegisterDataSource(req, std::move(async_response));
175 }
176
UnregisterDataSource(const std::string & name)177 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
178 PERFETTO_DCHECK_THREAD(thread_checker_);
179 if (!connected_) {
180 PERFETTO_DLOG(
181 "Cannot UnregisterDataSource(), not connected to tracing service");
182 return;
183 }
184 protos::UnregisterDataSourceRequest req;
185 req.set_data_source_name(name);
186 producer_port_.UnregisterDataSource(
187 req, ipc::Deferred<protos::UnregisterDataSourceResponse>());
188 }
189
CommitData(const CommitDataRequest & req,CommitDataCallback callback)190 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
191 CommitDataCallback callback) {
192 PERFETTO_DCHECK_THREAD(thread_checker_);
193 if (!connected_) {
194 PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
195 return;
196 }
197 protos::CommitDataRequest proto_req;
198 req.ToProto(&proto_req);
199 ipc::Deferred<protos::CommitDataResponse> async_response;
200 // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
201 // this call and checks that the callback is dropped.
202 if (callback) {
203 async_response.Bind(
204 [callback](ipc::AsyncResult<protos::CommitDataResponse> response) {
205 if (!response) {
206 PERFETTO_DLOG("CommitData() failed: connection reset");
207 return;
208 }
209 callback();
210 });
211 }
212 producer_port_.CommitData(proto_req, std::move(async_response));
213 }
214
CreateTraceWriter(BufferID target_buffer)215 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
216 BufferID target_buffer) {
217 // This method can be called by different threads. |shared_memory_arbiter_| is
218 // thread-safe but be aware of accessing any other state in this function.
219 return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
220 }
221
NotifyFlushComplete(FlushRequestID req_id)222 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
223 return shared_memory_arbiter_->NotifyFlushComplete(req_id);
224 }
225
shared_memory() const226 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
227 return shared_memory_.get();
228 }
229
shared_buffer_page_size_kb() const230 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
231 return shared_buffer_page_size_kb_;
232 }
233
234 } // namespace perfetto
235