• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/ipc/client.h"
24 #include "perfetto/ext/tracing/core/consumer.h"
25 #include "perfetto/ext/tracing/core/observable_events.h"
26 #include "perfetto/ext/tracing/core/trace_stats.h"
27 #include "perfetto/tracing/core/trace_config.h"
28 #include "perfetto/tracing/core/tracing_service_state.h"
29 
30 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
31 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
32 // Consumer* during the callbacks.
33 
34 namespace perfetto {
35 
36 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)37 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
38     const char* service_sock_name,
39     Consumer* consumer,
40     base::TaskRunner* task_runner) {
41   return std::unique_ptr<TracingService::ConsumerEndpoint>(
42       new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
43 }
44 
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)45 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
46                                              Consumer* consumer,
47                                              base::TaskRunner* task_runner)
48     : consumer_(consumer),
49       ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
50       consumer_port_(this /* event_listener */),
51       weak_ptr_factory_(this) {
52   ipc_channel_->BindService(consumer_port_.GetWeakPtr());
53 }
54 
55 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
56 
57 // Called by the IPC layer if the BindService() succeeds.
OnConnect()58 void ConsumerIPCClientImpl::OnConnect() {
59   connected_ = true;
60   consumer_->OnConnect();
61 }
62 
OnDisconnect()63 void ConsumerIPCClientImpl::OnDisconnect() {
64   PERFETTO_DLOG("Tracing service connection failure");
65   connected_ = false;
66   consumer_->OnDisconnect();
67 }
68 
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)69 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
70                                           base::ScopedFile fd) {
71   if (!connected_) {
72     PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
73     return;
74   }
75 
76   protos::gen::EnableTracingRequest req;
77   *req.mutable_trace_config() = trace_config;
78   ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
79   auto weak_this = weak_ptr_factory_.GetWeakPtr();
80   async_response.Bind(
81       [weak_this](
82           ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
83         if (weak_this)
84           weak_this->OnEnableTracingResponse(std::move(response));
85       });
86 
87   // |fd| will be closed when this function returns, but it's fine because the
88   // IPC layer dup()'s it when sending the IPC.
89   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
90 }
91 
ChangeTraceConfig(const TraceConfig &)92 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig&) {
93   if (!connected_) {
94     PERFETTO_DLOG(
95         "Cannot ChangeTraceConfig(), not connected to tracing service");
96     return;
97   }
98 
99   ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
100   async_response.Bind(
101       [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
102         if (!response)
103           PERFETTO_DLOG("ChangeTraceConfig() failed");
104       });
105   protos::gen::ChangeTraceConfigRequest req;
106   consumer_port_.ChangeTraceConfig(req, std::move(async_response));
107 }
108 
StartTracing()109 void ConsumerIPCClientImpl::StartTracing() {
110   if (!connected_) {
111     PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
112     return;
113   }
114 
115   ipc::Deferred<protos::gen::StartTracingResponse> async_response;
116   async_response.Bind(
117       [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
118         if (!response)
119           PERFETTO_DLOG("StartTracing() failed");
120       });
121   protos::gen::StartTracingRequest req;
122   consumer_port_.StartTracing(req, std::move(async_response));
123 }
124 
DisableTracing()125 void ConsumerIPCClientImpl::DisableTracing() {
126   if (!connected_) {
127     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
128     return;
129   }
130 
131   ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
132   async_response.Bind(
133       [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
134         if (!response)
135           PERFETTO_DLOG("DisableTracing() failed");
136       });
137   consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
138                                 std::move(async_response));
139 }
140 
ReadBuffers()141 void ConsumerIPCClientImpl::ReadBuffers() {
142   if (!connected_) {
143     PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
144     return;
145   }
146 
147   ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
148 
149   // The IPC layer guarantees that callbacks are destroyed after this object
150   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
151   // contract of this class expects the caller to not destroy the Consumer class
152   // before having destroyed this class. Hence binding |this| here is safe.
153   async_response.Bind(
154       [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
155         OnReadBuffersResponse(std::move(response));
156       });
157   consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
158                              std::move(async_response));
159 }
160 
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)161 void ConsumerIPCClientImpl::OnReadBuffersResponse(
162     ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
163   if (!response) {
164     PERFETTO_DLOG("ReadBuffers() failed");
165     return;
166   }
167   std::vector<TracePacket> trace_packets;
168   for (auto& resp_slice : response->slices()) {
169     const std::string& slice_data = resp_slice.data();
170     Slice slice = Slice::Allocate(slice_data.size());
171     memcpy(slice.own_data(), slice_data.data(), slice.size);
172     partial_packet_.AddSlice(std::move(slice));
173     if (resp_slice.last_slice_for_packet())
174       trace_packets.emplace_back(std::move(partial_packet_));
175   }
176   if (!trace_packets.empty() || !response.has_more())
177     consumer_->OnTraceData(std::move(trace_packets), response.has_more());
178 }
179 
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)180 void ConsumerIPCClientImpl::OnEnableTracingResponse(
181     ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
182   if (!response || response->disabled())
183     consumer_->OnTracingDisabled();
184 }
185 
FreeBuffers()186 void ConsumerIPCClientImpl::FreeBuffers() {
187   if (!connected_) {
188     PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
189     return;
190   }
191 
192   protos::gen::FreeBuffersRequest req;
193   ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
194   async_response.Bind(
195       [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
196         if (!response)
197           PERFETTO_DLOG("FreeBuffers() failed");
198       });
199   consumer_port_.FreeBuffers(req, std::move(async_response));
200 }
201 
Flush(uint32_t timeout_ms,FlushCallback callback)202 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
203   if (!connected_) {
204     PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
205     return callback(/*success=*/false);
206   }
207 
208   protos::gen::FlushRequest req;
209   req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
210   ipc::Deferred<protos::gen::FlushResponse> async_response;
211   async_response.Bind(
212       [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
213         callback(!!response);
214       });
215   consumer_port_.Flush(req, std::move(async_response));
216 }
217 
Detach(const std::string & key)218 void ConsumerIPCClientImpl::Detach(const std::string& key) {
219   if (!connected_) {
220     PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
221     return;
222   }
223 
224   protos::gen::DetachRequest req;
225   req.set_key(key);
226   ipc::Deferred<protos::gen::DetachResponse> async_response;
227   auto weak_this = weak_ptr_factory_.GetWeakPtr();
228 
229   async_response.Bind(
230       [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
231         if (weak_this)
232           weak_this->consumer_->OnDetach(!!response);
233       });
234   consumer_port_.Detach(req, std::move(async_response));
235 }
236 
Attach(const std::string & key)237 void ConsumerIPCClientImpl::Attach(const std::string& key) {
238   if (!connected_) {
239     PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
240     return;
241   }
242 
243   {
244     protos::gen::AttachRequest req;
245     req.set_key(key);
246     ipc::Deferred<protos::gen::AttachResponse> async_response;
247     auto weak_this = weak_ptr_factory_.GetWeakPtr();
248 
249     async_response.Bind(
250         [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
251           if (!weak_this)
252             return;
253           if (!response) {
254             weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
255             return;
256           }
257           const TraceConfig& trace_config = response->trace_config();
258 
259           // If attached succesfully, also attach to the end-of-trace
260           // notificaton callback, via EnableTracing(attach_notification_only).
261           protos::gen::EnableTracingRequest enable_req;
262           enable_req.set_attach_notification_only(true);
263           ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
264           enable_resp.Bind(
265               [weak_this](
266                   ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
267                 if (weak_this)
268                   weak_this->OnEnableTracingResponse(std::move(resp));
269               });
270           weak_this->consumer_port_.EnableTracing(enable_req,
271                                                   std::move(enable_resp));
272 
273           weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
274         });
275     consumer_port_.Attach(req, std::move(async_response));
276   }
277 }
278 
GetTraceStats()279 void ConsumerIPCClientImpl::GetTraceStats() {
280   if (!connected_) {
281     PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
282     return;
283   }
284 
285   protos::gen::GetTraceStatsRequest req;
286   ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
287 
288   // The IPC layer guarantees that callbacks are destroyed after this object
289   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
290   // contract of this class expects the caller to not destroy the Consumer class
291   // before having destroyed this class. Hence binding |this| here is safe.
292   async_response.Bind(
293       [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
294         if (!response) {
295           consumer_->OnTraceStats(/*success=*/false, TraceStats());
296           return;
297         }
298         consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
299       });
300   consumer_port_.GetTraceStats(req, std::move(async_response));
301 }
302 
ObserveEvents(uint32_t enabled_event_types)303 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
304   if (!connected_) {
305     PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
306     return;
307   }
308 
309   protos::gen::ObserveEventsRequest req;
310   for (uint32_t i = 0; i < 32; i++) {
311     const uint32_t event_id = 1u << i;
312     if (enabled_event_types & event_id)
313       req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
314   }
315 
316   ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
317   // The IPC layer guarantees that callbacks are destroyed after this object
318   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
319   // contract of this class expects the caller to not destroy the Consumer class
320   // before having destroyed this class. Hence binding |this| here is safe.
321   async_response.Bind(
322       [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
323         // Skip empty response, which the service sends to close the stream.
324         if (!response.has_more()) {
325           PERFETTO_DCHECK(!response->events().instance_state_changes().size());
326           return;
327         }
328         consumer_->OnObservableEvents(response->events());
329       });
330   consumer_port_.ObserveEvents(req, std::move(async_response));
331 }
332 
QueryServiceState(QueryServiceStateCallback callback)333 void ConsumerIPCClientImpl::QueryServiceState(
334     QueryServiceStateCallback callback) {
335   if (!connected_) {
336     PERFETTO_DLOG(
337         "Cannot QueryServiceState(), not connected to tracing service");
338     return;
339   }
340 
341   auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
342                                            {std::move(callback), {}});
343   protos::gen::QueryServiceStateRequest req;
344   ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
345   auto weak_this = weak_ptr_factory_.GetWeakPtr();
346   async_response.Bind(
347       [weak_this,
348        it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
349         if (weak_this)
350           weak_this->OnQueryServiceStateResponse(std::move(response), it);
351       });
352   consumer_port_.QueryServiceState(req, std::move(async_response));
353 }
354 
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)355 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
356     ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
357     PendingQueryServiceRequests::iterator req_it) {
358   PERFETTO_DCHECK(req_it->callback);
359 
360   if (!response) {
361     auto callback = std::move(req_it->callback);
362     pending_query_svc_reqs_.erase(req_it);
363     callback(false, TracingServiceState());
364     return;
365   }
366 
367   // The QueryServiceState response can be split in several chunks if the
368   // service has several data sources. The client is supposed to merge all the
369   // replies. The easiest way to achieve this is to re-serialize the partial
370   // response and then re-decode the merged result in one shot.
371   std::vector<uint8_t>& merged_resp = req_it->merged_resp;
372   std::vector<uint8_t> part = response->service_state().SerializeAsArray();
373   merged_resp.insert(merged_resp.end(), part.begin(), part.end());
374 
375   if (response.has_more())
376     return;
377 
378   // All replies have been received. Decode the merged result and reply to the
379   // callback.
380   protos::gen::TracingServiceState svc_state;
381   bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
382   if (!ok)
383     PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
384   auto callback = std::move(req_it->callback);
385   pending_query_svc_reqs_.erase(req_it);
386   callback(ok, std::move(svc_state));
387 }
388 
QueryCapabilities(QueryCapabilitiesCallback callback)389 void ConsumerIPCClientImpl::QueryCapabilities(
390     QueryCapabilitiesCallback callback) {
391   if (!connected_) {
392     PERFETTO_DLOG(
393         "Cannot QueryCapabilities(), not connected to tracing service");
394     return;
395   }
396 
397   protos::gen::QueryCapabilitiesRequest req;
398   ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
399   async_response.Bind(
400       [callback](
401           ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
402         if (!response) {
403           // If the IPC fails, we are talking to an older version of the service
404           // that didn't support QueryCapabilities at all. In this case return
405           // an empty capabilities message.
406           callback(TracingServiceCapabilities());
407         } else {
408           callback(response->capabilities());
409         }
410       });
411   consumer_port_.QueryCapabilities(req, std::move(async_response));
412 }
413 
414 }  // namespace perfetto
415