• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ipc/client.h"
24 #include "perfetto/tracing/core/consumer.h"
25 #include "perfetto/tracing/core/observable_events.h"
26 #include "perfetto/tracing/core/trace_config.h"
27 #include "perfetto/tracing/core/trace_stats.h"
28 
29 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
30 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
31 // Consumer* during the callbacks.
32 
33 namespace perfetto {
34 
35 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)36 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
37     const char* service_sock_name,
38     Consumer* consumer,
39     base::TaskRunner* task_runner) {
40   return std::unique_ptr<TracingService::ConsumerEndpoint>(
41       new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
42 }
43 
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)44 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
45                                              Consumer* consumer,
46                                              base::TaskRunner* task_runner)
47     : consumer_(consumer),
48       ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
49       consumer_port_(this /* event_listener */),
50       weak_ptr_factory_(this) {
51   ipc_channel_->BindService(consumer_port_.GetWeakPtr());
52 }
53 
54 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
55 
56 // Called by the IPC layer if the BindService() succeeds.
OnConnect()57 void ConsumerIPCClientImpl::OnConnect() {
58   connected_ = true;
59   consumer_->OnConnect();
60 }
61 
OnDisconnect()62 void ConsumerIPCClientImpl::OnDisconnect() {
63   PERFETTO_DLOG("Tracing service connection failure");
64   connected_ = false;
65   consumer_->OnDisconnect();
66 }
67 
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)68 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
69                                           base::ScopedFile fd) {
70   if (!connected_) {
71     PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
72     return;
73   }
74 
75   protos::EnableTracingRequest req;
76   trace_config.ToProto(req.mutable_trace_config());
77   ipc::Deferred<protos::EnableTracingResponse> async_response;
78   auto weak_this = weak_ptr_factory_.GetWeakPtr();
79   async_response.Bind(
80       [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> response) {
81         if (weak_this)
82           weak_this->OnEnableTracingResponse(std::move(response));
83       });
84 
85   // |fd| will be closed when this function returns, but it's fine because the
86   // IPC layer dup()'s it when sending the IPC.
87   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
88 }
89 
ChangeTraceConfig(const TraceConfig &)90 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig&) {
91   if (!connected_) {
92     PERFETTO_DLOG(
93         "Cannot ChangeTraceConfig(), not connected to tracing service");
94     return;
95   }
96 
97   ipc::Deferred<protos::ChangeTraceConfigResponse> async_response;
98   async_response.Bind(
99       [](ipc::AsyncResult<protos::ChangeTraceConfigResponse> response) {
100         if (!response)
101           PERFETTO_DLOG("ChangeTraceConfig() failed");
102       });
103   protos::ChangeTraceConfigRequest req;
104   consumer_port_.ChangeTraceConfig(req, std::move(async_response));
105 }
106 
StartTracing()107 void ConsumerIPCClientImpl::StartTracing() {
108   if (!connected_) {
109     PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
110     return;
111   }
112 
113   ipc::Deferred<protos::StartTracingResponse> async_response;
114   async_response.Bind(
115       [](ipc::AsyncResult<protos::StartTracingResponse> response) {
116         if (!response)
117           PERFETTO_DLOG("StartTracing() failed");
118       });
119   protos::StartTracingRequest req;
120   consumer_port_.StartTracing(req, std::move(async_response));
121 }
122 
DisableTracing()123 void ConsumerIPCClientImpl::DisableTracing() {
124   if (!connected_) {
125     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
126     return;
127   }
128 
129   ipc::Deferred<protos::DisableTracingResponse> async_response;
130   async_response.Bind(
131       [](ipc::AsyncResult<protos::DisableTracingResponse> response) {
132         if (!response)
133           PERFETTO_DLOG("DisableTracing() failed");
134       });
135   consumer_port_.DisableTracing(protos::DisableTracingRequest(),
136                                 std::move(async_response));
137 }
138 
ReadBuffers()139 void ConsumerIPCClientImpl::ReadBuffers() {
140   if (!connected_) {
141     PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
142     return;
143   }
144 
145   ipc::Deferred<protos::ReadBuffersResponse> async_response;
146 
147   // The IPC layer guarantees that callbacks are destroyed after this object
148   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
149   // contract of this class expects the caller to not destroy the Consumer class
150   // before having destroyed this class. Hence binding |this| here is safe.
151   async_response.Bind(
152       [this](ipc::AsyncResult<protos::ReadBuffersResponse> response) {
153         OnReadBuffersResponse(std::move(response));
154       });
155   consumer_port_.ReadBuffers(protos::ReadBuffersRequest(),
156                              std::move(async_response));
157 }
158 
OnReadBuffersResponse(ipc::AsyncResult<protos::ReadBuffersResponse> response)159 void ConsumerIPCClientImpl::OnReadBuffersResponse(
160     ipc::AsyncResult<protos::ReadBuffersResponse> response) {
161   if (!response) {
162     PERFETTO_DLOG("ReadBuffers() failed");
163     return;
164   }
165   std::vector<TracePacket> trace_packets;
166   for (auto& resp_slice : *response->mutable_slices()) {
167     partial_packet_.AddSlice(
168         Slice(std::unique_ptr<std::string>(resp_slice.release_data())));
169     if (resp_slice.last_slice_for_packet())
170       trace_packets.emplace_back(std::move(partial_packet_));
171   }
172   if (!trace_packets.empty() || !response.has_more())
173     consumer_->OnTraceData(std::move(trace_packets), response.has_more());
174 }
175 
OnEnableTracingResponse(ipc::AsyncResult<protos::EnableTracingResponse> response)176 void ConsumerIPCClientImpl::OnEnableTracingResponse(
177     ipc::AsyncResult<protos::EnableTracingResponse> response) {
178   if (!response || response->disabled())
179     consumer_->OnTracingDisabled();
180 }
181 
FreeBuffers()182 void ConsumerIPCClientImpl::FreeBuffers() {
183   if (!connected_) {
184     PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
185     return;
186   }
187 
188   protos::FreeBuffersRequest req;
189   ipc::Deferred<protos::FreeBuffersResponse> async_response;
190   async_response.Bind(
191       [](ipc::AsyncResult<protos::FreeBuffersResponse> response) {
192         if (!response)
193           PERFETTO_DLOG("FreeBuffers() failed");
194       });
195   consumer_port_.FreeBuffers(req, std::move(async_response));
196 }
197 
Flush(uint32_t timeout_ms,FlushCallback callback)198 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
199   if (!connected_) {
200     PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
201     return callback(/*success=*/false);
202   }
203 
204   protos::FlushRequest req;
205   req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
206   ipc::Deferred<protos::FlushResponse> async_response;
207   async_response.Bind(
208       [callback](ipc::AsyncResult<protos::FlushResponse> response) {
209         callback(!!response);
210       });
211   consumer_port_.Flush(req, std::move(async_response));
212 }
213 
Detach(const std::string & key)214 void ConsumerIPCClientImpl::Detach(const std::string& key) {
215   if (!connected_) {
216     PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
217     return;
218   }
219 
220   protos::DetachRequest req;
221   req.set_key(key);
222   ipc::Deferred<protos::DetachResponse> async_response;
223   auto weak_this = weak_ptr_factory_.GetWeakPtr();
224 
225   async_response.Bind(
226       [weak_this](ipc::AsyncResult<protos::DetachResponse> response) {
227         if (weak_this)
228           weak_this->consumer_->OnDetach(!!response);
229       });
230   consumer_port_.Detach(req, std::move(async_response));
231 }
232 
Attach(const std::string & key)233 void ConsumerIPCClientImpl::Attach(const std::string& key) {
234   if (!connected_) {
235     PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
236     return;
237   }
238 
239   {
240     protos::AttachRequest req;
241     req.set_key(key);
242     ipc::Deferred<protos::AttachResponse> async_response;
243     auto weak_this = weak_ptr_factory_.GetWeakPtr();
244 
245     async_response.Bind([weak_this](
246                             ipc::AsyncResult<protos::AttachResponse> response) {
247       if (!weak_this)
248         return;
249       TraceConfig trace_config;
250       if (!response) {
251         weak_this->consumer_->OnAttach(/*success=*/false, trace_config);
252         return;
253       }
254       trace_config.FromProto(response->trace_config());
255 
256       // If attached succesfully, also attach to the end-of-trace
257       // notificaton callback, via EnableTracing(attach_notification_only).
258       protos::EnableTracingRequest enable_req;
259       enable_req.set_attach_notification_only(true);
260       ipc::Deferred<protos::EnableTracingResponse> enable_resp;
261       enable_resp.Bind(
262           [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> resp) {
263             if (weak_this)
264               weak_this->OnEnableTracingResponse(std::move(resp));
265           });
266       weak_this->consumer_port_.EnableTracing(enable_req,
267                                               std::move(enable_resp));
268 
269       weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
270     });
271     consumer_port_.Attach(req, std::move(async_response));
272   }
273 }
274 
GetTraceStats()275 void ConsumerIPCClientImpl::GetTraceStats() {
276   if (!connected_) {
277     PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
278     return;
279   }
280 
281   protos::GetTraceStatsRequest req;
282   ipc::Deferred<protos::GetTraceStatsResponse> async_response;
283 
284   // The IPC layer guarantees that callbacks are destroyed after this object
285   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
286   // contract of this class expects the caller to not destroy the Consumer class
287   // before having destroyed this class. Hence binding |this| here is safe.
288   async_response.Bind(
289       [this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) {
290         TraceStats trace_stats;
291         if (!response) {
292           consumer_->OnTraceStats(/*success=*/false, trace_stats);
293           return;
294         }
295         trace_stats.FromProto(response->trace_stats());
296         consumer_->OnTraceStats(/*success=*/true, trace_stats);
297       });
298   consumer_port_.GetTraceStats(req, std::move(async_response));
299 }
300 
ObserveEvents(uint32_t enabled_event_types)301 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
302   if (!connected_) {
303     PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
304     return;
305   }
306 
307   protos::ObserveEventsRequest req;
308   if (enabled_event_types & ObservableEventType::kDataSourceInstances) {
309     req.add_events_to_observe(
310         protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
311   }
312   ipc::Deferred<protos::ObserveEventsResponse> async_response;
313   // The IPC layer guarantees that callbacks are destroyed after this object
314   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
315   // contract of this class expects the caller to not destroy the Consumer class
316   // before having destroyed this class. Hence binding |this| here is safe.
317   async_response.Bind(
318       [this](ipc::AsyncResult<protos::ObserveEventsResponse> response) {
319         // Skip empty response, which the service sends to close the stream.
320         if (!response->events().instance_state_changes().size()) {
321           PERFETTO_DCHECK(!response.has_more());
322           return;
323         }
324         ObservableEvents events;
325         events.FromProto(response->events());
326         consumer_->OnObservableEvents(events);
327       });
328   consumer_port_.ObserveEvents(req, std::move(async_response));
329 }
330 
331 }  // namespace perfetto
332