• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18 
19 #include <inttypes.h>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/scoped_file.h"
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ipc/basic_types.h"
25 #include "perfetto/ipc/host.h"
26 #include "perfetto/tracing/core/service.h"
27 #include "perfetto/tracing/core/shared_memory_abi.h"
28 #include "perfetto/tracing/core/slice.h"
29 #include "perfetto/tracing/core/trace_config.h"
30 #include "perfetto/tracing/core/trace_packet.h"
31 
32 namespace perfetto {
33 
ConsumerIPCService(Service * core_service)34 ConsumerIPCService::ConsumerIPCService(Service* core_service)
35     : core_service_(core_service), weak_ptr_factory_(this) {}
36 
37 ConsumerIPCService::~ConsumerIPCService() = default;
38 
39 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()40 ConsumerIPCService::GetConsumerForCurrentRequest() {
41   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
42   PERFETTO_CHECK(ipc_client_id);
43   auto it = consumers_.find(ipc_client_id);
44   if (it == consumers_.end()) {
45     auto* remote_consumer = new RemoteConsumer();
46     consumers_[ipc_client_id].reset(remote_consumer);
47     remote_consumer->service_endpoint =
48         core_service_->ConnectConsumer(remote_consumer);
49     return remote_consumer;
50   }
51   return it->second.get();
52 }
53 
54 // Called by the IPC layer.
OnClientDisconnected()55 void ConsumerIPCService::OnClientDisconnected() {
56   ipc::ClientID client_id = ipc::Service::client_info().client_id();
57   consumers_.erase(client_id);
58 }
59 
60 // Called by the IPC layer.
EnableTracing(const protos::EnableTracingRequest & req,DeferredEnableTracingResponse resp)61 void ConsumerIPCService::EnableTracing(const protos::EnableTracingRequest& req,
62                                        DeferredEnableTracingResponse resp) {
63   TraceConfig trace_config;
64   trace_config.FromProto(req.trace_config());
65   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
66   base::ScopedFile fd;
67   if (trace_config.write_into_file())
68     fd = ipc::Service::TakeReceivedFD();
69   remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
70   remote_consumer->enable_tracing_response = std::move(resp);
71 }
72 
73 // Called by the IPC layer.
DisableTracing(const protos::DisableTracingRequest &,DeferredDisableTracingResponse resp)74 void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
75                                         DeferredDisableTracingResponse resp) {
76   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
77   resp.Resolve(ipc::AsyncResult<protos::DisableTracingResponse>::Create());
78 }
79 
80 // Called by the IPC layer.
ReadBuffers(const protos::ReadBuffersRequest &,DeferredReadBuffersResponse resp)81 void ConsumerIPCService::ReadBuffers(const protos::ReadBuffersRequest&,
82                                      DeferredReadBuffersResponse resp) {
83   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
84   remote_consumer->read_buffers_response = std::move(resp);
85   remote_consumer->service_endpoint->ReadBuffers();
86 }
87 
88 // Called by the IPC layer.
FreeBuffers(const protos::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)89 void ConsumerIPCService::FreeBuffers(const protos::FreeBuffersRequest&,
90                                      DeferredFreeBuffersResponse resp) {
91   GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
92   resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create());
93 }
94 
95 // Called by the IPC layer.
Flush(const protos::FlushRequest & req,DeferredFlushResponse resp)96 void ConsumerIPCService::Flush(const protos::FlushRequest& req,
97                                DeferredFlushResponse resp) {
98   auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
99                                             std::move(resp));
100   auto weak_this = weak_ptr_factory_.GetWeakPtr();
101   auto callback = [weak_this, it](bool success) {
102     if (weak_this)
103       weak_this->OnFlushCallback(success, std::move(it));
104   };
105   GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
106                                                           std::move(callback));
107 }
108 
109 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)110 void ConsumerIPCService::OnFlushCallback(
111     bool success,
112     PendingFlushResponses::iterator pending_response_it) {
113   DeferredFlushResponse response(std::move(*pending_response_it));
114   pending_flush_responses_.erase(pending_response_it);
115   if (success) {
116     response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create());
117   } else {
118     response.Reject();
119   }
120 }
121 
122 ////////////////////////////////////////////////////////////////////////////////
123 // RemoteConsumer methods
124 ////////////////////////////////////////////////////////////////////////////////
125 
126 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
127 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
128 
129 // Invoked by the |core_service_| business logic after the ConnectConsumer()
130 // call. There is nothing to do here, we really expected the ConnectConsumer()
131 // to just work in the local case.
OnConnect()132 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
133 
134 // Invoked by the |core_service_| business logic after we destroy the
135 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()136 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
137 
OnTracingDisabled()138 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled() {
139   auto result = ipc::AsyncResult<protos::EnableTracingResponse>::Create();
140   result->set_disabled(true);
141   enable_tracing_response.Resolve(std::move(result));
142 }
143 
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)144 void ConsumerIPCService::RemoteConsumer::OnTraceData(
145     std::vector<TracePacket> trace_packets,
146     bool has_more) {
147   if (!read_buffers_response.IsBound())
148     return;
149 
150   auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
151 
152   // A TracePacket might be too big to fit into a single IPC message (max
153   // kIPCBufferSize). However a TracePacket is made of slices and each slice
154   // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
155   // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
156   // if its slices don't fit within one IPC, chunk them over several contiguous
157   // IPCs using the |last_slice_for_packet| for glueing on the other side.
158   static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
159                 "kIPCBufferSize too small given the max possible slice size");
160 
161   auto send_ipc_reply = [this, &result](bool more) {
162     result.set_has_more(more);
163     read_buffers_response.Resolve(std::move(result));
164     result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
165   };
166 
167   size_t approx_reply_size = 0;
168   for (const TracePacket& trace_packet : trace_packets) {
169     size_t num_slices_left_for_packet = trace_packet.slices().size();
170     for (const Slice& slice : trace_packet.slices()) {
171       // Check if this slice would cause the IPC to overflow its max size and,
172       // if that is the case, split the IPCs. The "16" and "64" below are
173       // over-estimations of, respectively:
174       // 16: the preamble that prefixes each slice (there are 2 x size fields
175       //     in the proto + the |last_slice_for_packet| bool).
176       // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
177       // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
178       // will hit a DCHECK anyways.
179       const size_t approx_slice_size = slice.size + 16;
180       if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
181         // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
182         PERFETTO_CHECK(result->slices_size() > 0);
183         send_ipc_reply(/*has_more=*/true);
184         approx_reply_size = 0;
185       }
186       approx_reply_size += approx_slice_size;
187 
188       auto* res_slice = result->add_slices();
189       res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
190       res_slice->set_data(slice.start, slice.size);
191     }
192   }
193   send_ipc_reply(has_more);
194 }
195 
196 }  // namespace perfetto
197