1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18
19 #include <inttypes.h>
20
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/scoped_file.h"
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ipc/basic_types.h"
25 #include "perfetto/ipc/host.h"
26 #include "perfetto/tracing/core/shared_memory_abi.h"
27 #include "perfetto/tracing/core/slice.h"
28 #include "perfetto/tracing/core/trace_config.h"
29 #include "perfetto/tracing/core/trace_packet.h"
30 #include "perfetto/tracing/core/trace_stats.h"
31 #include "perfetto/tracing/core/tracing_service.h"
32
33 namespace perfetto {
34
ConsumerIPCService(TracingService * core_service)35 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
36 : core_service_(core_service), weak_ptr_factory_(this) {}
37
38 ConsumerIPCService::~ConsumerIPCService() = default;
39
40 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()41 ConsumerIPCService::GetConsumerForCurrentRequest() {
42 const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
43 const uid_t uid = ipc::Service::client_info().uid();
44 PERFETTO_CHECK(ipc_client_id);
45 auto it = consumers_.find(ipc_client_id);
46 if (it == consumers_.end()) {
47 auto* remote_consumer = new RemoteConsumer();
48 consumers_[ipc_client_id].reset(remote_consumer);
49 remote_consumer->service_endpoint =
50 core_service_->ConnectConsumer(remote_consumer, uid);
51 return remote_consumer;
52 }
53 return it->second.get();
54 }
55
56 // Called by the IPC layer.
OnClientDisconnected()57 void ConsumerIPCService::OnClientDisconnected() {
58 ipc::ClientID client_id = ipc::Service::client_info().client_id();
59 consumers_.erase(client_id);
60 }
61
62 // Called by the IPC layer.
EnableTracing(const protos::EnableTracingRequest & req,DeferredEnableTracingResponse resp)63 void ConsumerIPCService::EnableTracing(const protos::EnableTracingRequest& req,
64 DeferredEnableTracingResponse resp) {
65 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
66 if (req.attach_notification_only()) {
67 remote_consumer->enable_tracing_response = std::move(resp);
68 return;
69 }
70 TraceConfig trace_config;
71 trace_config.FromProto(req.trace_config());
72 base::ScopedFile fd;
73 if (trace_config.write_into_file())
74 fd = ipc::Service::TakeReceivedFD();
75 remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
76 remote_consumer->enable_tracing_response = std::move(resp);
77 }
78
79 // Called by the IPC layer.
StartTracing(const protos::StartTracingRequest &,DeferredStartTracingResponse resp)80 void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&,
81 DeferredStartTracingResponse resp) {
82 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
83 remote_consumer->service_endpoint->StartTracing();
84 resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create());
85 }
86
87 // Called by the IPC layer.
ChangeTraceConfig(const protos::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)88 void ConsumerIPCService::ChangeTraceConfig(
89 const protos::ChangeTraceConfigRequest& req,
90 DeferredChangeTraceConfigResponse resp) {
91 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
92 TraceConfig trace_config;
93 trace_config.FromProto(req.trace_config());
94 remote_consumer->service_endpoint->ChangeTraceConfig(trace_config);
95 resp.Resolve(ipc::AsyncResult<protos::ChangeTraceConfigResponse>::Create());
96 }
97
98 // Called by the IPC layer.
DisableTracing(const protos::DisableTracingRequest &,DeferredDisableTracingResponse resp)99 void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
100 DeferredDisableTracingResponse resp) {
101 GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
102 resp.Resolve(ipc::AsyncResult<protos::DisableTracingResponse>::Create());
103 }
104
105 // Called by the IPC layer.
ReadBuffers(const protos::ReadBuffersRequest &,DeferredReadBuffersResponse resp)106 void ConsumerIPCService::ReadBuffers(const protos::ReadBuffersRequest&,
107 DeferredReadBuffersResponse resp) {
108 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
109 remote_consumer->read_buffers_response = std::move(resp);
110 remote_consumer->service_endpoint->ReadBuffers();
111 }
112
113 // Called by the IPC layer.
FreeBuffers(const protos::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)114 void ConsumerIPCService::FreeBuffers(const protos::FreeBuffersRequest&,
115 DeferredFreeBuffersResponse resp) {
116 GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
117 resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create());
118 }
119
120 // Called by the IPC layer.
Flush(const protos::FlushRequest & req,DeferredFlushResponse resp)121 void ConsumerIPCService::Flush(const protos::FlushRequest& req,
122 DeferredFlushResponse resp) {
123 auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
124 std::move(resp));
125 auto weak_this = weak_ptr_factory_.GetWeakPtr();
126 auto callback = [weak_this, it](bool success) {
127 if (weak_this)
128 weak_this->OnFlushCallback(success, std::move(it));
129 };
130 GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
131 std::move(callback));
132 }
133
134 // Called by the IPC layer.
Detach(const protos::DetachRequest & req,DeferredDetachResponse resp)135 void ConsumerIPCService::Detach(const protos::DetachRequest& req,
136 DeferredDetachResponse resp) {
137 // OnDetach() will resolve the |detach_response|.
138 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
139 remote_consumer->detach_response = std::move(resp);
140 remote_consumer->service_endpoint->Detach(req.key());
141 }
142
143 // Called by the IPC layer.
Attach(const protos::AttachRequest & req,DeferredAttachResponse resp)144 void ConsumerIPCService::Attach(const protos::AttachRequest& req,
145 DeferredAttachResponse resp) {
146 // OnAttach() will resolve the |attach_response|.
147 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
148 remote_consumer->attach_response = std::move(resp);
149 remote_consumer->service_endpoint->Attach(req.key());
150 }
151
152 // Called by the IPC layer.
GetTraceStats(const protos::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)153 void ConsumerIPCService::GetTraceStats(const protos::GetTraceStatsRequest&,
154 DeferredGetTraceStatsResponse resp) {
155 // OnTraceStats() will resolve the |get_trace_stats_response|.
156 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
157 remote_consumer->get_trace_stats_response = std::move(resp);
158 remote_consumer->service_endpoint->GetTraceStats();
159 }
160
161 // Called by the IPC layer.
ObserveEvents(const protos::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)162 void ConsumerIPCService::ObserveEvents(const protos::ObserveEventsRequest& req,
163 DeferredObserveEventsResponse resp) {
164 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
165
166 // If there's a prior stream, close it so that client can clean it up.
167 remote_consumer->CloseObserveEventsResponseStream();
168
169 remote_consumer->observe_events_response = std::move(resp);
170
171 bool observe_instances = false;
172 for (const auto& type : req.events_to_observe()) {
173 switch (type) {
174 case protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES:
175 observe_instances = true;
176 break;
177 default:
178 PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type);
179 break;
180 }
181 }
182 remote_consumer->service_endpoint->ObserveEvents(observe_instances);
183
184 // If no events are to be observed, close the stream immediately so that the
185 // client can clean up.
186 if (req.events_to_observe().size() == 0)
187 remote_consumer->CloseObserveEventsResponseStream();
188 }
189
190 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)191 void ConsumerIPCService::OnFlushCallback(
192 bool success,
193 PendingFlushResponses::iterator pending_response_it) {
194 DeferredFlushResponse response(std::move(*pending_response_it));
195 pending_flush_responses_.erase(pending_response_it);
196 if (success) {
197 response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create());
198 } else {
199 response.Reject();
200 }
201 }
202
203 ////////////////////////////////////////////////////////////////////////////////
204 // RemoteConsumer methods
205 ////////////////////////////////////////////////////////////////////////////////
206
207 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
208 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
209
210 // Invoked by the |core_service_| business logic after the ConnectConsumer()
211 // call. There is nothing to do here, we really expected the ConnectConsumer()
212 // to just work in the local case.
OnConnect()213 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
214
215 // Invoked by the |core_service_| business logic after we destroy the
216 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()217 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
218
OnTracingDisabled()219 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled() {
220 if (enable_tracing_response.IsBound()) {
221 auto result = ipc::AsyncResult<protos::EnableTracingResponse>::Create();
222 result->set_disabled(true);
223 enable_tracing_response.Resolve(std::move(result));
224 }
225 }
226
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)227 void ConsumerIPCService::RemoteConsumer::OnTraceData(
228 std::vector<TracePacket> trace_packets,
229 bool has_more) {
230 if (!read_buffers_response.IsBound())
231 return;
232
233 auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
234
235 // A TracePacket might be too big to fit into a single IPC message (max
236 // kIPCBufferSize). However a TracePacket is made of slices and each slice
237 // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
238 // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
239 // if its slices don't fit within one IPC, chunk them over several contiguous
240 // IPCs using the |last_slice_for_packet| for glueing on the other side.
241 static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
242 "kIPCBufferSize too small given the max possible slice size");
243
244 auto send_ipc_reply = [this, &result](bool more) {
245 result.set_has_more(more);
246 read_buffers_response.Resolve(std::move(result));
247 result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
248 };
249
250 size_t approx_reply_size = 0;
251 for (const TracePacket& trace_packet : trace_packets) {
252 size_t num_slices_left_for_packet = trace_packet.slices().size();
253 for (const Slice& slice : trace_packet.slices()) {
254 // Check if this slice would cause the IPC to overflow its max size and,
255 // if that is the case, split the IPCs. The "16" and "64" below are
256 // over-estimations of, respectively:
257 // 16: the preamble that prefixes each slice (there are 2 x size fields
258 // in the proto + the |last_slice_for_packet| bool).
259 // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
260 // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
261 // will hit a DCHECK anyways.
262 const size_t approx_slice_size = slice.size + 16;
263 if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
264 // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
265 PERFETTO_CHECK(result->slices_size() > 0);
266 send_ipc_reply(/*has_more=*/true);
267 approx_reply_size = 0;
268 }
269 approx_reply_size += approx_slice_size;
270
271 auto* res_slice = result->add_slices();
272 res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
273 res_slice->set_data(slice.start, slice.size);
274 }
275 }
276 send_ipc_reply(has_more);
277 }
278
OnDetach(bool success)279 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
280 if (!success) {
281 std::move(detach_response).Reject();
282 return;
283 }
284 auto resp = ipc::AsyncResult<protos::DetachResponse>::Create();
285 std::move(detach_response).Resolve(std::move(resp));
286 }
287
OnAttach(bool success,const TraceConfig & trace_config)288 void ConsumerIPCService::RemoteConsumer::OnAttach(
289 bool success,
290 const TraceConfig& trace_config) {
291 if (!success) {
292 std::move(attach_response).Reject();
293 return;
294 }
295 auto response = ipc::AsyncResult<protos::AttachResponse>::Create();
296 trace_config.ToProto(response->mutable_trace_config());
297 std::move(attach_response).Resolve(std::move(response));
298 }
299
OnTraceStats(bool success,const TraceStats & stats)300 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
301 const TraceStats& stats) {
302 if (!success) {
303 std::move(get_trace_stats_response).Reject();
304 return;
305 }
306 auto response = ipc::AsyncResult<protos::GetTraceStatsResponse>::Create();
307 stats.ToProto(response->mutable_trace_stats());
308 std::move(get_trace_stats_response).Resolve(std::move(response));
309 }
310
OnObservableEvents(const ObservableEvents & events)311 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
312 const ObservableEvents& events) {
313 if (!observe_events_response.IsBound())
314 return;
315
316 auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
317 result.set_has_more(true);
318 events.ToProto(result->mutable_events());
319 observe_events_response.Resolve(std::move(result));
320 }
321
CloseObserveEventsResponseStream()322 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
323 if (!observe_events_response.IsBound())
324 return;
325
326 auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
327 result.set_has_more(false);
328 observe_events_response.Resolve(std::move(result));
329 }
330
331 } // namespace perfetto
332