1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18
19 #include <inttypes.h>
20 #include <string.h>
21
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/ipc/client.h"
24 #include "perfetto/ext/tracing/core/consumer.h"
25 #include "perfetto/ext/tracing/core/observable_events.h"
26 #include "perfetto/ext/tracing/core/trace_stats.h"
27 #include "perfetto/tracing/core/trace_config.h"
28 #include "perfetto/tracing/core/tracing_service_state.h"
29
30 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
31 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
32 // Consumer* during the callbacks.
33
34 namespace perfetto {
35
36 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)37 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
38 const char* service_sock_name,
39 Consumer* consumer,
40 base::TaskRunner* task_runner) {
41 return std::unique_ptr<TracingService::ConsumerEndpoint>(
42 new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
43 }
44
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)45 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
46 Consumer* consumer,
47 base::TaskRunner* task_runner)
48 : consumer_(consumer),
49 ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
50 consumer_port_(this /* event_listener */),
51 weak_ptr_factory_(this) {
52 ipc_channel_->BindService(consumer_port_.GetWeakPtr());
53 }
54
55 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
56
57 // Called by the IPC layer if the BindService() succeeds.
OnConnect()58 void ConsumerIPCClientImpl::OnConnect() {
59 connected_ = true;
60 consumer_->OnConnect();
61 }
62
OnDisconnect()63 void ConsumerIPCClientImpl::OnDisconnect() {
64 PERFETTO_DLOG("Tracing service connection failure");
65 connected_ = false;
66 consumer_->OnDisconnect();
67 }
68
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)69 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
70 base::ScopedFile fd) {
71 if (!connected_) {
72 PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
73 return;
74 }
75
76 protos::gen::EnableTracingRequest req;
77 *req.mutable_trace_config() = trace_config;
78 ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
79 auto weak_this = weak_ptr_factory_.GetWeakPtr();
80 async_response.Bind(
81 [weak_this](
82 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
83 if (weak_this)
84 weak_this->OnEnableTracingResponse(std::move(response));
85 });
86
87 // |fd| will be closed when this function returns, but it's fine because the
88 // IPC layer dup()'s it when sending the IPC.
89 consumer_port_.EnableTracing(req, std::move(async_response), *fd);
90 }
91
ChangeTraceConfig(const TraceConfig &)92 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig&) {
93 if (!connected_) {
94 PERFETTO_DLOG(
95 "Cannot ChangeTraceConfig(), not connected to tracing service");
96 return;
97 }
98
99 ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
100 async_response.Bind(
101 [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
102 if (!response)
103 PERFETTO_DLOG("ChangeTraceConfig() failed");
104 });
105 protos::gen::ChangeTraceConfigRequest req;
106 consumer_port_.ChangeTraceConfig(req, std::move(async_response));
107 }
108
StartTracing()109 void ConsumerIPCClientImpl::StartTracing() {
110 if (!connected_) {
111 PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
112 return;
113 }
114
115 ipc::Deferred<protos::gen::StartTracingResponse> async_response;
116 async_response.Bind(
117 [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
118 if (!response)
119 PERFETTO_DLOG("StartTracing() failed");
120 });
121 protos::gen::StartTracingRequest req;
122 consumer_port_.StartTracing(req, std::move(async_response));
123 }
124
DisableTracing()125 void ConsumerIPCClientImpl::DisableTracing() {
126 if (!connected_) {
127 PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
128 return;
129 }
130
131 ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
132 async_response.Bind(
133 [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
134 if (!response)
135 PERFETTO_DLOG("DisableTracing() failed");
136 });
137 consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
138 std::move(async_response));
139 }
140
ReadBuffers()141 void ConsumerIPCClientImpl::ReadBuffers() {
142 if (!connected_) {
143 PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
144 return;
145 }
146
147 ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
148
149 // The IPC layer guarantees that callbacks are destroyed after this object
150 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
151 // contract of this class expects the caller to not destroy the Consumer class
152 // before having destroyed this class. Hence binding |this| here is safe.
153 async_response.Bind(
154 [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
155 OnReadBuffersResponse(std::move(response));
156 });
157 consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
158 std::move(async_response));
159 }
160
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)161 void ConsumerIPCClientImpl::OnReadBuffersResponse(
162 ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
163 if (!response) {
164 PERFETTO_DLOG("ReadBuffers() failed");
165 return;
166 }
167 std::vector<TracePacket> trace_packets;
168 for (auto& resp_slice : response->slices()) {
169 const std::string& slice_data = resp_slice.data();
170 Slice slice = Slice::Allocate(slice_data.size());
171 memcpy(slice.own_data(), slice_data.data(), slice.size);
172 partial_packet_.AddSlice(std::move(slice));
173 if (resp_slice.last_slice_for_packet())
174 trace_packets.emplace_back(std::move(partial_packet_));
175 }
176 if (!trace_packets.empty() || !response.has_more())
177 consumer_->OnTraceData(std::move(trace_packets), response.has_more());
178 }
179
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)180 void ConsumerIPCClientImpl::OnEnableTracingResponse(
181 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
182 if (!response || response->disabled())
183 consumer_->OnTracingDisabled();
184 }
185
FreeBuffers()186 void ConsumerIPCClientImpl::FreeBuffers() {
187 if (!connected_) {
188 PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
189 return;
190 }
191
192 protos::gen::FreeBuffersRequest req;
193 ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
194 async_response.Bind(
195 [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
196 if (!response)
197 PERFETTO_DLOG("FreeBuffers() failed");
198 });
199 consumer_port_.FreeBuffers(req, std::move(async_response));
200 }
201
Flush(uint32_t timeout_ms,FlushCallback callback)202 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
203 if (!connected_) {
204 PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
205 return callback(/*success=*/false);
206 }
207
208 protos::gen::FlushRequest req;
209 req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
210 ipc::Deferred<protos::gen::FlushResponse> async_response;
211 async_response.Bind(
212 [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
213 callback(!!response);
214 });
215 consumer_port_.Flush(req, std::move(async_response));
216 }
217
Detach(const std::string & key)218 void ConsumerIPCClientImpl::Detach(const std::string& key) {
219 if (!connected_) {
220 PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
221 return;
222 }
223
224 protos::gen::DetachRequest req;
225 req.set_key(key);
226 ipc::Deferred<protos::gen::DetachResponse> async_response;
227 auto weak_this = weak_ptr_factory_.GetWeakPtr();
228
229 async_response.Bind(
230 [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
231 if (weak_this)
232 weak_this->consumer_->OnDetach(!!response);
233 });
234 consumer_port_.Detach(req, std::move(async_response));
235 }
236
Attach(const std::string & key)237 void ConsumerIPCClientImpl::Attach(const std::string& key) {
238 if (!connected_) {
239 PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
240 return;
241 }
242
243 {
244 protos::gen::AttachRequest req;
245 req.set_key(key);
246 ipc::Deferred<protos::gen::AttachResponse> async_response;
247 auto weak_this = weak_ptr_factory_.GetWeakPtr();
248
249 async_response.Bind(
250 [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
251 if (!weak_this)
252 return;
253 if (!response) {
254 weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
255 return;
256 }
257 const TraceConfig& trace_config = response->trace_config();
258
259 // If attached succesfully, also attach to the end-of-trace
260 // notificaton callback, via EnableTracing(attach_notification_only).
261 protos::gen::EnableTracingRequest enable_req;
262 enable_req.set_attach_notification_only(true);
263 ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
264 enable_resp.Bind(
265 [weak_this](
266 ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
267 if (weak_this)
268 weak_this->OnEnableTracingResponse(std::move(resp));
269 });
270 weak_this->consumer_port_.EnableTracing(enable_req,
271 std::move(enable_resp));
272
273 weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
274 });
275 consumer_port_.Attach(req, std::move(async_response));
276 }
277 }
278
GetTraceStats()279 void ConsumerIPCClientImpl::GetTraceStats() {
280 if (!connected_) {
281 PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
282 return;
283 }
284
285 protos::gen::GetTraceStatsRequest req;
286 ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
287
288 // The IPC layer guarantees that callbacks are destroyed after this object
289 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
290 // contract of this class expects the caller to not destroy the Consumer class
291 // before having destroyed this class. Hence binding |this| here is safe.
292 async_response.Bind(
293 [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
294 if (!response) {
295 consumer_->OnTraceStats(/*success=*/false, TraceStats());
296 return;
297 }
298 consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
299 });
300 consumer_port_.GetTraceStats(req, std::move(async_response));
301 }
302
ObserveEvents(uint32_t enabled_event_types)303 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
304 if (!connected_) {
305 PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
306 return;
307 }
308
309 protos::gen::ObserveEventsRequest req;
310 for (uint32_t i = 0; i < 32; i++) {
311 const uint32_t event_id = 1u << i;
312 if (enabled_event_types & event_id)
313 req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
314 }
315
316 ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
317 // The IPC layer guarantees that callbacks are destroyed after this object
318 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
319 // contract of this class expects the caller to not destroy the Consumer class
320 // before having destroyed this class. Hence binding |this| here is safe.
321 async_response.Bind(
322 [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
323 // Skip empty response, which the service sends to close the stream.
324 if (!response.has_more()) {
325 PERFETTO_DCHECK(!response->events().instance_state_changes().size());
326 return;
327 }
328 consumer_->OnObservableEvents(response->events());
329 });
330 consumer_port_.ObserveEvents(req, std::move(async_response));
331 }
332
QueryServiceState(QueryServiceStateCallback callback)333 void ConsumerIPCClientImpl::QueryServiceState(
334 QueryServiceStateCallback callback) {
335 if (!connected_) {
336 PERFETTO_DLOG(
337 "Cannot QueryServiceState(), not connected to tracing service");
338 return;
339 }
340
341 auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
342 {std::move(callback), {}});
343 protos::gen::QueryServiceStateRequest req;
344 ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
345 auto weak_this = weak_ptr_factory_.GetWeakPtr();
346 async_response.Bind(
347 [weak_this,
348 it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
349 if (weak_this)
350 weak_this->OnQueryServiceStateResponse(std::move(response), it);
351 });
352 consumer_port_.QueryServiceState(req, std::move(async_response));
353 }
354
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)355 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
356 ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
357 PendingQueryServiceRequests::iterator req_it) {
358 PERFETTO_DCHECK(req_it->callback);
359
360 if (!response) {
361 auto callback = std::move(req_it->callback);
362 pending_query_svc_reqs_.erase(req_it);
363 callback(false, TracingServiceState());
364 return;
365 }
366
367 // The QueryServiceState response can be split in several chunks if the
368 // service has several data sources. The client is supposed to merge all the
369 // replies. The easiest way to achieve this is to re-serialize the partial
370 // response and then re-decode the merged result in one shot.
371 std::vector<uint8_t>& merged_resp = req_it->merged_resp;
372 std::vector<uint8_t> part = response->service_state().SerializeAsArray();
373 merged_resp.insert(merged_resp.end(), part.begin(), part.end());
374
375 if (response.has_more())
376 return;
377
378 // All replies have been received. Decode the merged result and reply to the
379 // callback.
380 protos::gen::TracingServiceState svc_state;
381 bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
382 if (!ok)
383 PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
384 auto callback = std::move(req_it->callback);
385 pending_query_svc_reqs_.erase(req_it);
386 callback(ok, std::move(svc_state));
387 }
388
QueryCapabilities(QueryCapabilitiesCallback callback)389 void ConsumerIPCClientImpl::QueryCapabilities(
390 QueryCapabilitiesCallback callback) {
391 if (!connected_) {
392 PERFETTO_DLOG(
393 "Cannot QueryCapabilities(), not connected to tracing service");
394 return;
395 }
396
397 protos::gen::QueryCapabilitiesRequest req;
398 ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
399 async_response.Bind(
400 [callback](
401 ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
402 if (!response) {
403 // If the IPC fails, we are talking to an older version of the service
404 // that didn't support QueryCapabilities at all. In this case return
405 // an empty capabilities message.
406 callback(TracingServiceCapabilities());
407 } else {
408 callback(response->capabilities());
409 }
410 });
411 consumer_port_.QueryCapabilities(req, std::move(async_response));
412 }
413
414 } // namespace perfetto
415