• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18 
19 #include <cinttypes>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/base/scoped_file.h"
24 #include "perfetto/ext/ipc/basic_types.h"
25 #include "perfetto/ext/ipc/host.h"
26 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
27 #include "perfetto/ext/tracing/core/slice.h"
28 #include "perfetto/ext/tracing/core/trace_packet.h"
29 #include "perfetto/ext/tracing/core/trace_stats.h"
30 #include "perfetto/ext/tracing/core/tracing_service.h"
31 #include "perfetto/tracing/core/trace_config.h"
32 #include "perfetto/tracing/core/tracing_service_capabilities.h"
33 #include "perfetto/tracing/core/tracing_service_state.h"
34 
35 namespace perfetto {
36 
ConsumerIPCService(TracingService * core_service)37 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
38     : core_service_(core_service), weak_ptr_factory_(this) {}
39 
40 ConsumerIPCService::~ConsumerIPCService() = default;
41 
42 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()43 ConsumerIPCService::GetConsumerForCurrentRequest() {
44   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
45   const uid_t uid = ipc::Service::client_info().uid();
46   PERFETTO_CHECK(ipc_client_id);
47   auto it = consumers_.find(ipc_client_id);
48   if (it == consumers_.end()) {
49     auto* remote_consumer = new RemoteConsumer();
50     consumers_[ipc_client_id].reset(remote_consumer);
51     remote_consumer->service_endpoint =
52         core_service_->ConnectConsumer(remote_consumer, uid);
53     return remote_consumer;
54   }
55   return it->second.get();
56 }
57 
58 // Called by the IPC layer.
OnClientDisconnected()59 void ConsumerIPCService::OnClientDisconnected() {
60   ipc::ClientID client_id = ipc::Service::client_info().client_id();
61   consumers_.erase(client_id);
62 }
63 
64 // Called by the IPC layer.
EnableTracing(const protos::gen::EnableTracingRequest & req,DeferredEnableTracingResponse resp)65 void ConsumerIPCService::EnableTracing(
66     const protos::gen::EnableTracingRequest& req,
67     DeferredEnableTracingResponse resp) {
68   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
69   if (req.attach_notification_only()) {
70     remote_consumer->enable_tracing_response = std::move(resp);
71     return;
72   }
73   const TraceConfig& trace_config = req.trace_config();
74   base::ScopedFile fd;
75   if (trace_config.write_into_file() && trace_config.output_path().empty())
76     fd = ipc::Service::TakeReceivedFD();
77   remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
78   remote_consumer->enable_tracing_response = std::move(resp);
79 }
80 
81 // Called by the IPC layer.
StartTracing(const protos::gen::StartTracingRequest &,DeferredStartTracingResponse resp)82 void ConsumerIPCService::StartTracing(const protos::gen::StartTracingRequest&,
83                                       DeferredStartTracingResponse resp) {
84   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
85   remote_consumer->service_endpoint->StartTracing();
86   resp.Resolve(ipc::AsyncResult<protos::gen::StartTracingResponse>::Create());
87 }
88 
89 // Called by the IPC layer.
ChangeTraceConfig(const protos::gen::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)90 void ConsumerIPCService::ChangeTraceConfig(
91     const protos::gen::ChangeTraceConfigRequest& req,
92     DeferredChangeTraceConfigResponse resp) {
93   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
94   remote_consumer->service_endpoint->ChangeTraceConfig(req.trace_config());
95   resp.Resolve(
96       ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse>::Create());
97 }
98 
99 // Called by the IPC layer.
DisableTracing(const protos::gen::DisableTracingRequest &,DeferredDisableTracingResponse resp)100 void ConsumerIPCService::DisableTracing(
101     const protos::gen::DisableTracingRequest&,
102     DeferredDisableTracingResponse resp) {
103   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
104   resp.Resolve(ipc::AsyncResult<protos::gen::DisableTracingResponse>::Create());
105 }
106 
107 // Called by the IPC layer.
ReadBuffers(const protos::gen::ReadBuffersRequest &,DeferredReadBuffersResponse resp)108 void ConsumerIPCService::ReadBuffers(const protos::gen::ReadBuffersRequest&,
109                                      DeferredReadBuffersResponse resp) {
110   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
111   remote_consumer->read_buffers_response = std::move(resp);
112   remote_consumer->service_endpoint->ReadBuffers();
113 }
114 
115 // Called by the IPC layer.
FreeBuffers(const protos::gen::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)116 void ConsumerIPCService::FreeBuffers(const protos::gen::FreeBuffersRequest&,
117                                      DeferredFreeBuffersResponse resp) {
118   GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
119   resp.Resolve(ipc::AsyncResult<protos::gen::FreeBuffersResponse>::Create());
120 }
121 
122 // Called by the IPC layer.
Flush(const protos::gen::FlushRequest & req,DeferredFlushResponse resp)123 void ConsumerIPCService::Flush(const protos::gen::FlushRequest& req,
124                                DeferredFlushResponse resp) {
125   auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
126                                             std::move(resp));
127   auto weak_this = weak_ptr_factory_.GetWeakPtr();
128   auto callback = [weak_this, it](bool success) {
129     if (weak_this)
130       weak_this->OnFlushCallback(success, std::move(it));
131   };
132   FlushFlags flags(req.flags());
133   GetConsumerForCurrentRequest()->service_endpoint->Flush(
134       req.timeout_ms(), std::move(callback), flags);
135 }
136 
137 // Called by the IPC layer.
Detach(const protos::gen::DetachRequest & req,DeferredDetachResponse resp)138 void ConsumerIPCService::Detach(const protos::gen::DetachRequest& req,
139                                 DeferredDetachResponse resp) {
140   // OnDetach() will resolve the |detach_response|.
141   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
142   remote_consumer->detach_response = std::move(resp);
143   remote_consumer->service_endpoint->Detach(req.key());
144 }
145 
146 // Called by the IPC layer.
Attach(const protos::gen::AttachRequest & req,DeferredAttachResponse resp)147 void ConsumerIPCService::Attach(const protos::gen::AttachRequest& req,
148                                 DeferredAttachResponse resp) {
149   // OnAttach() will resolve the |attach_response|.
150   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
151   remote_consumer->attach_response = std::move(resp);
152   remote_consumer->service_endpoint->Attach(req.key());
153 }
154 
155 // Called by the IPC layer.
GetTraceStats(const protos::gen::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)156 void ConsumerIPCService::GetTraceStats(const protos::gen::GetTraceStatsRequest&,
157                                        DeferredGetTraceStatsResponse resp) {
158   // OnTraceStats() will resolve the |get_trace_stats_response|.
159   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
160   remote_consumer->get_trace_stats_response = std::move(resp);
161   remote_consumer->service_endpoint->GetTraceStats();
162 }
163 
164 // Called by the IPC layer.
ObserveEvents(const protos::gen::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)165 void ConsumerIPCService::ObserveEvents(
166     const protos::gen::ObserveEventsRequest& req,
167     DeferredObserveEventsResponse resp) {
168   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
169 
170   // If there's a prior stream, close it so that client can clean it up.
171   remote_consumer->CloseObserveEventsResponseStream();
172 
173   remote_consumer->observe_events_response = std::move(resp);
174 
175   uint32_t events_mask = 0;
176   for (const auto& type : req.events_to_observe()) {
177     events_mask |= static_cast<uint32_t>(type);
178   }
179   remote_consumer->service_endpoint->ObserveEvents(events_mask);
180 
181   // If no events are to be observed, close the stream immediately so that the
182   // client can clean up.
183   if (events_mask == 0)
184     remote_consumer->CloseObserveEventsResponseStream();
185 }
186 
187 // Called by the IPC layer.
QueryServiceState(const protos::gen::QueryServiceStateRequest & req,DeferredQueryServiceStateResponse resp)188 void ConsumerIPCService::QueryServiceState(
189     const protos::gen::QueryServiceStateRequest& req,
190     DeferredQueryServiceStateResponse resp) {
191   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
192   auto it = pending_query_service_responses_.insert(
193       pending_query_service_responses_.end(), std::move(resp));
194   auto weak_this = weak_ptr_factory_.GetWeakPtr();
195   auto callback = [weak_this, it](bool success,
196                                   const TracingServiceState& svc_state) {
197     if (weak_this)
198       weak_this->OnQueryServiceCallback(success, svc_state, std::move(it));
199   };
200   ConsumerEndpoint::QueryServiceStateArgs args;
201   args.sessions_only = req.sessions_only();
202   remote_consumer->service_endpoint->QueryServiceState(args, callback);
203 }
204 
205 // Called by the service in response to service_endpoint->QueryServiceState().
OnQueryServiceCallback(bool success,const TracingServiceState & svc_state,PendingQuerySvcResponses::iterator pending_response_it)206 void ConsumerIPCService::OnQueryServiceCallback(
207     bool success,
208     const TracingServiceState& svc_state,
209     PendingQuerySvcResponses::iterator pending_response_it) {
210   DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
211   pending_query_service_responses_.erase(pending_response_it);
212   if (!success) {
213     response.Reject();
214     return;
215   }
216 
217   // The TracingServiceState object might be too big to fit into a single IPC
218   // message because it contains the DataSourceDescriptor of each data source.
219   // Here we split it in chunks to fit in the IPC limit, observing the
220   // following rule: each chunk must be invididually a valid TracingServiceState
221   // message; all the chunks concatenated together must form the original
222   // message. This is to deal with the legacy API that was just sending one
223   // whole message (failing in presence of too many data sources, b/153142114).
224   // The message is split as follows: we take the whole TracingServiceState,
225   // take out the data sources section (which is a top-level repeated field)
226   // and re-add them one-by-one. If, in the process of appending, the IPC msg
227   // size is reached, a new chunk is created. This assumes that the rest of
228   // TracingServiceState fits in one IPC message and each DataSourceDescriptor
229   // fits in the worst case in a dedicated message (which is true, because
230   // otherwise the RegisterDataSource() which passes the descriptor in the first
231   // place would fail).
232 
233   std::vector<uint8_t> chunked_reply;
234 
235   // Transmits the current chunk and starts a new one.
236   bool sent_eof = false;
237   auto send_chunked_reply = [&chunked_reply, &response,
238                              &sent_eof](bool has_more) {
239     PERFETTO_CHECK(!sent_eof);
240     sent_eof = !has_more;
241     auto resp =
242         ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
243     resp.set_has_more(has_more);
244     PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
245         chunked_reply.data(), chunked_reply.size()));
246     chunked_reply.clear();
247     response.Resolve(std::move(resp));
248   };
249 
250   // Create a copy of the whole response and cut away the data_sources section.
251   protos::gen::TracingServiceState svc_state_copy = svc_state;
252   auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
253   chunked_reply = svc_state_copy.SerializeAsArray();
254 
255   // Now re-add them fitting within the IPC message limits (- some margin for
256   // the outer IPC frame).
257   constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
258   for (const auto& data_source : data_sources) {
259     protos::gen::TracingServiceState tmp;
260     tmp.mutable_data_sources()->emplace_back(std::move(data_source));
261     std::vector<uint8_t> chunk = tmp.SerializeAsArray();
262     if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
263       chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
264     } else {
265       send_chunked_reply(/*has_more=*/true);
266       chunked_reply = std::move(chunk);
267     }
268   }
269 
270   PERFETTO_DCHECK(!chunked_reply.empty());
271   send_chunked_reply(/*has_more=*/false);
272   PERFETTO_CHECK(sent_eof);
273 }
274 
275 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)276 void ConsumerIPCService::OnFlushCallback(
277     bool success,
278     PendingFlushResponses::iterator pending_response_it) {
279   DeferredFlushResponse response(std::move(*pending_response_it));
280   pending_flush_responses_.erase(pending_response_it);
281   if (success) {
282     response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
283   } else {
284     response.Reject();
285   }
286 }
287 
QueryCapabilities(const protos::gen::QueryCapabilitiesRequest &,DeferredQueryCapabilitiesResponse resp)288 void ConsumerIPCService::QueryCapabilities(
289     const protos::gen::QueryCapabilitiesRequest&,
290     DeferredQueryCapabilitiesResponse resp) {
291   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
292   auto it = pending_query_capabilities_responses_.insert(
293       pending_query_capabilities_responses_.end(), std::move(resp));
294   auto weak_this = weak_ptr_factory_.GetWeakPtr();
295   auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
296     if (weak_this)
297       weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
298   };
299   remote_consumer->service_endpoint->QueryCapabilities(callback);
300 }
301 
302 // Called by the service in response to service_endpoint->QueryCapabilities().
OnQueryCapabilitiesCallback(const TracingServiceCapabilities & caps,PendingQueryCapabilitiesResponses::iterator pending_response_it)303 void ConsumerIPCService::OnQueryCapabilitiesCallback(
304     const TracingServiceCapabilities& caps,
305     PendingQueryCapabilitiesResponses::iterator pending_response_it) {
306   DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
307   pending_query_capabilities_responses_.erase(pending_response_it);
308   auto resp =
309       ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
310   *resp->mutable_capabilities() = caps;
311   response.Resolve(std::move(resp));
312 }
313 
SaveTraceForBugreport(const protos::gen::SaveTraceForBugreportRequest &,DeferredSaveTraceForBugreportResponse resp)314 void ConsumerIPCService::SaveTraceForBugreport(
315     const protos::gen::SaveTraceForBugreportRequest&,
316     DeferredSaveTraceForBugreportResponse resp) {
317   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
318   auto it = pending_bugreport_responses_.insert(
319       pending_bugreport_responses_.end(), std::move(resp));
320   auto weak_this = weak_ptr_factory_.GetWeakPtr();
321   auto callback = [weak_this, it](bool success, const std::string& msg) {
322     if (weak_this)
323       weak_this->OnSaveTraceForBugreportCallback(success, msg, std::move(it));
324   };
325   remote_consumer->service_endpoint->SaveTraceForBugreport(callback);
326 }
327 
CloneSession(const protos::gen::CloneSessionRequest & req,DeferredCloneSessionResponse resp)328 void ConsumerIPCService::CloneSession(
329     const protos::gen::CloneSessionRequest& req,
330     DeferredCloneSessionResponse resp) {
331   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
332   remote_consumer->clone_session_response = std::move(resp);
333   ConsumerEndpoint::CloneSessionArgs args;
334   args.skip_trace_filter = req.skip_trace_filter();
335   args.for_bugreport = req.for_bugreport();
336   remote_consumer->service_endpoint->CloneSession(req.session_id(),
337                                                   std::move(args));
338 }
339 
340 // Called by the service in response to
341 // service_endpoint->SaveTraceForBugreport().
OnSaveTraceForBugreportCallback(bool success,const std::string & msg,PendingSaveTraceForBugreportResponses::iterator pending_response_it)342 void ConsumerIPCService::OnSaveTraceForBugreportCallback(
343     bool success,
344     const std::string& msg,
345     PendingSaveTraceForBugreportResponses::iterator pending_response_it) {
346   DeferredSaveTraceForBugreportResponse response(
347       std::move(*pending_response_it));
348   pending_bugreport_responses_.erase(pending_response_it);
349   auto resp =
350       ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>::Create();
351   resp->set_success(success);
352   resp->set_msg(msg);
353   response.Resolve(std::move(resp));
354 }
355 
356 ////////////////////////////////////////////////////////////////////////////////
357 // RemoteConsumer methods
358 ////////////////////////////////////////////////////////////////////////////////
359 
360 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
361 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
362 
363 // Invoked by the |core_service_| business logic after the ConnectConsumer()
364 // call. There is nothing to do here, we really expected the ConnectConsumer()
365 // to just work in the local case.
OnConnect()366 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
367 
368 // Invoked by the |core_service_| business logic after we destroy the
369 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()370 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
371 
OnTracingDisabled(const std::string & error)372 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled(
373     const std::string& error) {
374   if (enable_tracing_response.IsBound()) {
375     auto result =
376         ipc::AsyncResult<protos::gen::EnableTracingResponse>::Create();
377     result->set_disabled(true);
378     if (!error.empty())
379       result->set_error(error);
380     enable_tracing_response.Resolve(std::move(result));
381   }
382 }
383 
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)384 void ConsumerIPCService::RemoteConsumer::OnTraceData(
385     std::vector<TracePacket> trace_packets,
386     bool has_more) {
387   if (!read_buffers_response.IsBound())
388     return;
389 
390   auto result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
391 
392   // A TracePacket might be too big to fit into a single IPC message (max
393   // kIPCBufferSize). However a TracePacket is made of slices and each slice
394   // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
395   // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
396   // if its slices don't fit within one IPC, chunk them over several contiguous
397   // IPCs using the |last_slice_for_packet| for glueing on the other side.
398   static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
399                 "kIPCBufferSize too small given the max possible slice size");
400 
401   auto send_ipc_reply = [this, &result](bool more) {
402     result.set_has_more(more);
403     read_buffers_response.Resolve(std::move(result));
404     result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
405   };
406 
407   size_t approx_reply_size = 0;
408   for (const TracePacket& trace_packet : trace_packets) {
409     size_t num_slices_left_for_packet = trace_packet.slices().size();
410     for (const Slice& slice : trace_packet.slices()) {
411       // Check if this slice would cause the IPC to overflow its max size and,
412       // if that is the case, split the IPCs. The "16" and "64" below are
413       // over-estimations of, respectively:
414       // 16: the preamble that prefixes each slice (there are 2 x size fields
415       //     in the proto + the |last_slice_for_packet| bool).
416       // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
417       // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
418       // will hit a DCHECK anyways.
419       const size_t approx_slice_size = slice.size + 16;
420       if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
421         // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
422         PERFETTO_CHECK(result->slices_size() > 0);
423         send_ipc_reply(/*has_more=*/true);
424         approx_reply_size = 0;
425       }
426       approx_reply_size += approx_slice_size;
427 
428       auto* res_slice = result->add_slices();
429       res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
430       res_slice->set_data(slice.start, slice.size);
431     }
432   }
433   send_ipc_reply(has_more);
434 }
435 
OnDetach(bool success)436 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
437   if (!success) {
438     std::move(detach_response).Reject();
439     return;
440   }
441   auto resp = ipc::AsyncResult<protos::gen::DetachResponse>::Create();
442   std::move(detach_response).Resolve(std::move(resp));
443 }
444 
OnAttach(bool success,const TraceConfig & trace_config)445 void ConsumerIPCService::RemoteConsumer::OnAttach(
446     bool success,
447     const TraceConfig& trace_config) {
448   if (!success) {
449     std::move(attach_response).Reject();
450     return;
451   }
452   auto response = ipc::AsyncResult<protos::gen::AttachResponse>::Create();
453   *response->mutable_trace_config() = trace_config;
454   std::move(attach_response).Resolve(std::move(response));
455 }
456 
OnTraceStats(bool success,const TraceStats & stats)457 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
458                                                       const TraceStats& stats) {
459   if (!success) {
460     std::move(get_trace_stats_response).Reject();
461     return;
462   }
463   auto response =
464       ipc::AsyncResult<protos::gen::GetTraceStatsResponse>::Create();
465   *response->mutable_trace_stats() = stats;
466   std::move(get_trace_stats_response).Resolve(std::move(response));
467 }
468 
OnObservableEvents(const ObservableEvents & events)469 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
470     const ObservableEvents& events) {
471   if (!observe_events_response.IsBound())
472     return;
473 
474   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
475   result.set_has_more(true);
476   *result->mutable_events() = events;
477   observe_events_response.Resolve(std::move(result));
478 }
479 
CloseObserveEventsResponseStream()480 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
481   if (!observe_events_response.IsBound())
482     return;
483 
484   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
485   result.set_has_more(false);
486   observe_events_response.Resolve(std::move(result));
487 }
488 
OnSessionCloned(const OnSessionClonedArgs & args)489 void ConsumerIPCService::RemoteConsumer::OnSessionCloned(
490     const OnSessionClonedArgs& args) {
491   if (!clone_session_response.IsBound())
492     return;
493 
494   auto resp = ipc::AsyncResult<protos::gen::CloneSessionResponse>::Create();
495   resp->set_success(args.success);
496   resp->set_error(args.error);
497   resp->set_uuid_msb(args.uuid.msb());
498   resp->set_uuid_lsb(args.uuid.lsb());
499   std::move(clone_session_response).Resolve(std::move(resp));
500 }
501 
502 }  // namespace perfetto
503