1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18
19 #include <inttypes.h>
20 #include <string.h>
21
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ipc/client.h"
24 #include "perfetto/tracing/core/consumer.h"
25 #include "perfetto/tracing/core/observable_events.h"
26 #include "perfetto/tracing/core/trace_config.h"
27 #include "perfetto/tracing/core/trace_stats.h"
28
29 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
30 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
31 // Consumer* during the callbacks.
32
33 namespace perfetto {
34
35 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)36 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
37 const char* service_sock_name,
38 Consumer* consumer,
39 base::TaskRunner* task_runner) {
40 return std::unique_ptr<TracingService::ConsumerEndpoint>(
41 new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
42 }
43
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)44 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
45 Consumer* consumer,
46 base::TaskRunner* task_runner)
47 : consumer_(consumer),
48 ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
49 consumer_port_(this /* event_listener */),
50 weak_ptr_factory_(this) {
51 ipc_channel_->BindService(consumer_port_.GetWeakPtr());
52 }
53
54 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
55
56 // Called by the IPC layer if the BindService() succeeds.
OnConnect()57 void ConsumerIPCClientImpl::OnConnect() {
58 connected_ = true;
59 consumer_->OnConnect();
60 }
61
OnDisconnect()62 void ConsumerIPCClientImpl::OnDisconnect() {
63 PERFETTO_DLOG("Tracing service connection failure");
64 connected_ = false;
65 consumer_->OnDisconnect();
66 }
67
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)68 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
69 base::ScopedFile fd) {
70 if (!connected_) {
71 PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
72 return;
73 }
74
75 protos::EnableTracingRequest req;
76 trace_config.ToProto(req.mutable_trace_config());
77 ipc::Deferred<protos::EnableTracingResponse> async_response;
78 auto weak_this = weak_ptr_factory_.GetWeakPtr();
79 async_response.Bind(
80 [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> response) {
81 if (weak_this)
82 weak_this->OnEnableTracingResponse(std::move(response));
83 });
84
85 // |fd| will be closed when this function returns, but it's fine because the
86 // IPC layer dup()'s it when sending the IPC.
87 consumer_port_.EnableTracing(req, std::move(async_response), *fd);
88 }
89
ChangeTraceConfig(const TraceConfig &)90 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig&) {
91 if (!connected_) {
92 PERFETTO_DLOG(
93 "Cannot ChangeTraceConfig(), not connected to tracing service");
94 return;
95 }
96
97 ipc::Deferred<protos::ChangeTraceConfigResponse> async_response;
98 async_response.Bind(
99 [](ipc::AsyncResult<protos::ChangeTraceConfigResponse> response) {
100 if (!response)
101 PERFETTO_DLOG("ChangeTraceConfig() failed");
102 });
103 protos::ChangeTraceConfigRequest req;
104 consumer_port_.ChangeTraceConfig(req, std::move(async_response));
105 }
106
StartTracing()107 void ConsumerIPCClientImpl::StartTracing() {
108 if (!connected_) {
109 PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
110 return;
111 }
112
113 ipc::Deferred<protos::StartTracingResponse> async_response;
114 async_response.Bind(
115 [](ipc::AsyncResult<protos::StartTracingResponse> response) {
116 if (!response)
117 PERFETTO_DLOG("StartTracing() failed");
118 });
119 protos::StartTracingRequest req;
120 consumer_port_.StartTracing(req, std::move(async_response));
121 }
122
DisableTracing()123 void ConsumerIPCClientImpl::DisableTracing() {
124 if (!connected_) {
125 PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
126 return;
127 }
128
129 ipc::Deferred<protos::DisableTracingResponse> async_response;
130 async_response.Bind(
131 [](ipc::AsyncResult<protos::DisableTracingResponse> response) {
132 if (!response)
133 PERFETTO_DLOG("DisableTracing() failed");
134 });
135 consumer_port_.DisableTracing(protos::DisableTracingRequest(),
136 std::move(async_response));
137 }
138
ReadBuffers()139 void ConsumerIPCClientImpl::ReadBuffers() {
140 if (!connected_) {
141 PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
142 return;
143 }
144
145 ipc::Deferred<protos::ReadBuffersResponse> async_response;
146
147 // The IPC layer guarantees that callbacks are destroyed after this object
148 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
149 // contract of this class expects the caller to not destroy the Consumer class
150 // before having destroyed this class. Hence binding |this| here is safe.
151 async_response.Bind(
152 [this](ipc::AsyncResult<protos::ReadBuffersResponse> response) {
153 OnReadBuffersResponse(std::move(response));
154 });
155 consumer_port_.ReadBuffers(protos::ReadBuffersRequest(),
156 std::move(async_response));
157 }
158
OnReadBuffersResponse(ipc::AsyncResult<protos::ReadBuffersResponse> response)159 void ConsumerIPCClientImpl::OnReadBuffersResponse(
160 ipc::AsyncResult<protos::ReadBuffersResponse> response) {
161 if (!response) {
162 PERFETTO_DLOG("ReadBuffers() failed");
163 return;
164 }
165 std::vector<TracePacket> trace_packets;
166 for (auto& resp_slice : *response->mutable_slices()) {
167 partial_packet_.AddSlice(
168 Slice(std::unique_ptr<std::string>(resp_slice.release_data())));
169 if (resp_slice.last_slice_for_packet())
170 trace_packets.emplace_back(std::move(partial_packet_));
171 }
172 if (!trace_packets.empty() || !response.has_more())
173 consumer_->OnTraceData(std::move(trace_packets), response.has_more());
174 }
175
OnEnableTracingResponse(ipc::AsyncResult<protos::EnableTracingResponse> response)176 void ConsumerIPCClientImpl::OnEnableTracingResponse(
177 ipc::AsyncResult<protos::EnableTracingResponse> response) {
178 if (!response || response->disabled())
179 consumer_->OnTracingDisabled();
180 }
181
FreeBuffers()182 void ConsumerIPCClientImpl::FreeBuffers() {
183 if (!connected_) {
184 PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
185 return;
186 }
187
188 protos::FreeBuffersRequest req;
189 ipc::Deferred<protos::FreeBuffersResponse> async_response;
190 async_response.Bind(
191 [](ipc::AsyncResult<protos::FreeBuffersResponse> response) {
192 if (!response)
193 PERFETTO_DLOG("FreeBuffers() failed");
194 });
195 consumer_port_.FreeBuffers(req, std::move(async_response));
196 }
197
Flush(uint32_t timeout_ms,FlushCallback callback)198 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
199 if (!connected_) {
200 PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
201 return callback(/*success=*/false);
202 }
203
204 protos::FlushRequest req;
205 req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
206 ipc::Deferred<protos::FlushResponse> async_response;
207 async_response.Bind(
208 [callback](ipc::AsyncResult<protos::FlushResponse> response) {
209 callback(!!response);
210 });
211 consumer_port_.Flush(req, std::move(async_response));
212 }
213
Detach(const std::string & key)214 void ConsumerIPCClientImpl::Detach(const std::string& key) {
215 if (!connected_) {
216 PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
217 return;
218 }
219
220 protos::DetachRequest req;
221 req.set_key(key);
222 ipc::Deferred<protos::DetachResponse> async_response;
223 auto weak_this = weak_ptr_factory_.GetWeakPtr();
224
225 async_response.Bind(
226 [weak_this](ipc::AsyncResult<protos::DetachResponse> response) {
227 if (weak_this)
228 weak_this->consumer_->OnDetach(!!response);
229 });
230 consumer_port_.Detach(req, std::move(async_response));
231 }
232
Attach(const std::string & key)233 void ConsumerIPCClientImpl::Attach(const std::string& key) {
234 if (!connected_) {
235 PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
236 return;
237 }
238
239 {
240 protos::AttachRequest req;
241 req.set_key(key);
242 ipc::Deferred<protos::AttachResponse> async_response;
243 auto weak_this = weak_ptr_factory_.GetWeakPtr();
244
245 async_response.Bind([weak_this](
246 ipc::AsyncResult<protos::AttachResponse> response) {
247 if (!weak_this)
248 return;
249 TraceConfig trace_config;
250 if (!response) {
251 weak_this->consumer_->OnAttach(/*success=*/false, trace_config);
252 return;
253 }
254 trace_config.FromProto(response->trace_config());
255
256 // If attached succesfully, also attach to the end-of-trace
257 // notificaton callback, via EnableTracing(attach_notification_only).
258 protos::EnableTracingRequest enable_req;
259 enable_req.set_attach_notification_only(true);
260 ipc::Deferred<protos::EnableTracingResponse> enable_resp;
261 enable_resp.Bind(
262 [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> resp) {
263 if (weak_this)
264 weak_this->OnEnableTracingResponse(std::move(resp));
265 });
266 weak_this->consumer_port_.EnableTracing(enable_req,
267 std::move(enable_resp));
268
269 weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
270 });
271 consumer_port_.Attach(req, std::move(async_response));
272 }
273 }
274
GetTraceStats()275 void ConsumerIPCClientImpl::GetTraceStats() {
276 if (!connected_) {
277 PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
278 return;
279 }
280
281 protos::GetTraceStatsRequest req;
282 ipc::Deferred<protos::GetTraceStatsResponse> async_response;
283
284 // The IPC layer guarantees that callbacks are destroyed after this object
285 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
286 // contract of this class expects the caller to not destroy the Consumer class
287 // before having destroyed this class. Hence binding |this| here is safe.
288 async_response.Bind(
289 [this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) {
290 TraceStats trace_stats;
291 if (!response) {
292 consumer_->OnTraceStats(/*success=*/false, trace_stats);
293 return;
294 }
295 trace_stats.FromProto(response->trace_stats());
296 consumer_->OnTraceStats(/*success=*/true, trace_stats);
297 });
298 consumer_port_.GetTraceStats(req, std::move(async_response));
299 }
300
ObserveEvents(uint32_t enabled_event_types)301 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
302 if (!connected_) {
303 PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
304 return;
305 }
306
307 protos::ObserveEventsRequest req;
308 if (enabled_event_types & ObservableEventType::kDataSourceInstances) {
309 req.add_events_to_observe(
310 protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
311 }
312 ipc::Deferred<protos::ObserveEventsResponse> async_response;
313 // The IPC layer guarantees that callbacks are destroyed after this object
314 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
315 // contract of this class expects the caller to not destroy the Consumer class
316 // before having destroyed this class. Hence binding |this| here is safe.
317 async_response.Bind(
318 [this](ipc::AsyncResult<protos::ObserveEventsResponse> response) {
319 // Skip empty response, which the service sends to close the stream.
320 if (!response->events().instance_state_changes().size()) {
321 PERFETTO_DCHECK(!response.has_more());
322 return;
323 }
324 ObservableEvents events;
325 events.FromProto(response->events());
326 consumer_->OnObservableEvents(events);
327 });
328 consumer_port_.ObserveEvents(req, std::move(async_response));
329 }
330
331 } // namespace perfetto
332