• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
18 #define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
19 
20 #include <array>
21 #include <cinttypes>
22 #include <functional>
23 #include <map>
24 #include <optional>
25 #include <vector>
26 
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/ext/base/unix_socket.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "perfetto/ext/tracing/core/basic_types.h"
31 #include "perfetto/ext/tracing/core/producer.h"
32 #include "perfetto/ext/tracing/core/trace_writer.h"
33 #include "perfetto/ext/tracing/core/tracing_service.h"
34 #include "perfetto/tracing/core/data_source_config.h"
35 #include "perfetto/tracing/core/forward_decls.h"
36 #include "src/profiling/common/interning_output.h"
37 #include "src/profiling/common/proc_utils.h"
38 #include "src/profiling/common/profiler_guardrails.h"
39 #include "src/profiling/memory/bookkeeping.h"
40 #include "src/profiling/memory/bookkeeping_dump.h"
41 #include "src/profiling/memory/log_histogram.h"
42 #include "src/profiling/memory/shared_ring_buffer.h"
43 #include "src/profiling/memory/system_property.h"
44 #include "src/profiling/memory/unwinding.h"
45 #include "src/profiling/memory/unwound_messages.h"
46 
47 #include "protos/perfetto/config/profiling/heapprofd_config.gen.h"
48 
49 namespace perfetto {
50 namespace profiling {
51 
52 using HeapprofdConfig = protos::gen::HeapprofdConfig;
53 
54 struct Process {
55   pid_t pid;
56   std::string cmdline;
57 };
58 
59 // TODO(rsavitski): central daemon can do less work if it knows that the global
60 // operating mode is fork-based, as it then will not be interacting with the
61 // clients. This can be implemented as an additional mode here.
62 enum class HeapprofdMode { kCentral, kChild };
63 
64 bool HeapprofdConfigToClientConfiguration(
65     const HeapprofdConfig& heapprofd_config,
66     ClientConfiguration* cli_config);
67 
68 // Heap profiling producer. Can be instantiated in two modes, central and
69 // child (also referred to as fork mode).
70 //
71 // The central mode producer is instantiated by the system heapprofd daemon. Its
72 // primary responsibility is activating profiling (via system properties and
73 // signals) in targets identified by profiling configs. On debug platform
74 // builds, the central producer can also handle the out-of-process unwinding &
75 // writing of the profiles for all client processes.
76 //
77 // An alternative model is where the central heapprofd triggers the profiling in
78 // the target process, but the latter fork-execs a private heapprofd binary to
79 // handle unwinding only for that process. The forked heapprofd instantiates
80 // this producer in the "child" mode. In this scenario, the profiled process
81 // never talks to the system daemon.
82 //
83 // TODO(fmayer||rsavitski): cover interesting invariants/structure of the
84 // implementation (e.g. number of data sources in child mode), including
85 // threading structure.
86 class HeapprofdProducer : public Producer, public UnwindingWorker::Delegate {
87  public:
88   friend class SocketDelegate;
89 
90   // TODO(fmayer): Split into two delegates for the listening socket in kCentral
91   // and for the per-client sockets to make this easier to understand?
92   // Alternatively, find a better name for this.
93   class SocketDelegate : public base::UnixSocket::EventListener {
94    public:
SocketDelegate(HeapprofdProducer * producer)95     explicit SocketDelegate(HeapprofdProducer* producer)
96         : producer_(producer) {}
97 
98     void OnDisconnect(base::UnixSocket* self) override;
99     void OnNewIncomingConnection(
100         base::UnixSocket* self,
101         std::unique_ptr<base::UnixSocket> new_connection) override;
102     void OnDataAvailable(base::UnixSocket* self) override;
103 
104    private:
105     HeapprofdProducer* producer_;
106   };
107 
108   HeapprofdProducer(HeapprofdMode mode,
109                     base::TaskRunner* task_runner,
110                     bool exit_when_done);
111   ~HeapprofdProducer() override;
112 
113   // Producer Impl:
114   void OnConnect() override;
115   void OnDisconnect() override;
116   void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
117   void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
118   void StopDataSource(DataSourceInstanceID) override;
119   void OnTracingSetup() override;
120   void Flush(FlushRequestID,
121              const DataSourceInstanceID* data_source_ids,
122              size_t num_data_sources) override;
ClearIncrementalState(const DataSourceInstanceID *,size_t)123   void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
124                              size_t /*num_data_sources*/) override {}
125 
126   // TODO(fmayer): Refactor once/if we have generic reconnect logic.
127   void ConnectWithRetries(const char* socket_name);
128   void DumpAll();
129 
130   // UnwindingWorker::Delegate impl:
131   void PostAllocRecord(UnwindingWorker*, std::unique_ptr<AllocRecord>) override;
132   void PostFreeRecord(UnwindingWorker*, std::vector<FreeRecord>) override;
133   void PostHeapNameRecord(UnwindingWorker*, HeapNameRecord) override;
134   void PostSocketDisconnected(UnwindingWorker*,
135                               DataSourceInstanceID,
136                               pid_t,
137                               SharedRingBuffer::Stats) override;
138   void PostDrainDone(UnwindingWorker*, DataSourceInstanceID) override;
139 
140   void HandleAllocRecord(AllocRecord*);
141   void HandleFreeRecord(FreeRecord);
142   void HandleHeapNameRecord(HeapNameRecord);
143   void HandleSocketDisconnected(DataSourceInstanceID,
144                                 pid_t,
145                                 SharedRingBuffer::Stats);
146 
147   // Valid only if mode_ == kChild.
148   void SetTargetProcess(pid_t target_pid, std::string target_cmdline);
149   void SetDataSourceCallback(std::function<void()> fn);
150 
151   // Exposed for testing.
152   void SetProducerEndpoint(
153       std::unique_ptr<TracingService::ProducerEndpoint> endpoint);
154 
socket_delegate()155   base::UnixSocket::EventListener& socket_delegate() {
156     return socket_delegate_;
157   }
158 
159   // Adopts the (connected) sockets inherited from the target process, invoking
160   // the on-connection callback.
161   // Specific to mode_ == kChild
162   void AdoptSocket(base::ScopedFile fd);
163 
164   void TerminateWhenDone();
165 
166  private:
167   // State of the connection to tracing service (traced).
168   enum State {
169     kNotStarted = 0,
170     kNotConnected,
171     kConnecting,
172     kConnected,
173   };
174 
175   struct ProcessState {
176     struct HeapInfo {
HeapInfoProcessState::HeapInfo177       HeapInfo(GlobalCallstackTrie* cs, bool dam) : heap_tracker(cs, dam) {}
178 
179       HeapTracker heap_tracker;
180       std::string heap_name;
181       uint64_t sampling_interval = 0u;
182       uint64_t orig_sampling_interval = 0u;
183     };
ProcessStateProcessState184     ProcessState(GlobalCallstackTrie* c, bool d)
185         : callsites(c), dump_at_max_mode(d) {}
186     bool disconnected = false;
187     SharedRingBuffer::ErrorState error_state =
188         SharedRingBuffer::ErrorState::kNoError;
189     bool buffer_corrupted = false;
190 
191     uint64_t heap_samples = 0;
192     uint64_t map_reparses = 0;
193     uint64_t unwinding_errors = 0;
194 
195     uint64_t total_unwinding_time_us = 0;
196     uint64_t client_spinlock_blocked_us = 0;
197     GlobalCallstackTrie* callsites;
198     bool dump_at_max_mode;
199     LogHistogram unwinding_time_us;
200     std::map<uint32_t, HeapInfo> heap_infos;
201 
GetHeapInfoProcessState202     HeapInfo& GetHeapInfo(uint32_t heap_id) {
203       auto it = heap_infos.find(heap_id);
204       if (it == heap_infos.end()) {
205         it = heap_infos.emplace_hint(
206             it, std::piecewise_construct, std::forward_as_tuple(heap_id),
207             std::forward_as_tuple(callsites, dump_at_max_mode));
208       }
209       return it->second;
210     }
211 
GetHeapTrackerProcessState212     HeapTracker& GetHeapTracker(uint32_t heap_id) {
213       return GetHeapInfo(heap_id).heap_tracker;
214     }
215   };
216 
217   struct DataSource {
DataSourceDataSource218     explicit DataSource(std::unique_ptr<TraceWriter> tw)
219         : trace_writer(std::move(tw)) {
220       // Make MSAN happy.
221       memset(&client_configuration, 0, sizeof(client_configuration));
222     }
223 
224     DataSourceInstanceID id;
225     std::unique_ptr<TraceWriter> trace_writer;
226     DataSourceConfig ds_config;
227     HeapprofdConfig config;
228     ClientConfiguration client_configuration;
229     std::vector<SystemProperties::Handle> properties;
230     std::set<pid_t> signaled_pids;
231     std::set<pid_t> rejected_pids;
232     std::map<pid_t, ProcessState> process_states;
233     std::vector<std::string> normalized_cmdlines;
234     InterningOutputTracker intern_state;
235     bool shutting_down = false;
236     bool started = false;
237     bool hit_guardrail = false;
238     bool was_stopped = false;
239     uint32_t stop_timeout_ms;
240     uint32_t dump_interval_ms = 0;
241     size_t pending_free_drains = 0;
242     GuardrailConfig guardrail_config;
243   };
244 
245   struct PendingProcess {
246     std::unique_ptr<base::UnixSocket> sock;
247     DataSourceInstanceID data_source_instance_id;
248     SharedRingBuffer shmem;
249   };
250 
251   void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
252                               Process process);
253 
254   void ConnectService();
255   void Restart();
256   void ResetConnectionBackoff();
257   void IncreaseConnectionBackoff();
258 
259   void CheckDataSourceMemoryTask();
260   void CheckDataSourceCpuTask();
261 
262   void FinishDataSourceFlush(FlushRequestID flush_id);
263   void DumpProcessesInDataSource(DataSource* ds);
264   void DumpProcessState(DataSource* ds, pid_t pid, ProcessState* process);
265   static void SetStats(protos::pbzero::ProfilePacket::ProcessStats* stats,
266                        const ProcessState& process_state);
267 
268   void DoDrainAndContinuousDump(DataSourceInstanceID id);
269   void DoContinuousDump(DataSource* ds);
270   void DrainDone(DataSourceInstanceID);
271 
272   UnwindingWorker& UnwinderForPID(pid_t);
273   bool IsPidProfiled(pid_t);
274   DataSource* GetDataSourceForProcess(const Process& proc);
275   void RecordOtherSourcesAsRejected(DataSource* active_ds, const Process& proc);
276 
277   void SetStartupProperties(DataSource* data_source);
278   void SignalRunningProcesses(DataSource* data_source);
279 
280   // Specific to mode_ == kChild
281   void TerminateProcess(int exit_status);
282 
283   void ShutdownDataSource(DataSource* ds);
284   bool MaybeFinishDataSource(DataSource* ds);
285 
286   void WriteRejectedConcurrentSession(BufferID buffer_id, pid_t pid);
287 
288   // Class state:
289 
290   // Task runner is owned by the main thread.
291   base::TaskRunner* const task_runner_;
292   const HeapprofdMode mode_;
293   // TODO(fmayer): Refactor to make this boolean unnecessary.
294   // Whether to terminate this producer after the first data-source has
295   // finished.
296   bool exit_when_done_;
297 
298   // State of connection to the tracing service.
299   State state_ = kNotStarted;
300   uint32_t connection_backoff_ms_ = 0;
301   const char* producer_sock_name_ = nullptr;
302 
303   // Client processes that have connected, but with which we have not yet
304   // finished the handshake.
305   std::map<pid_t, PendingProcess> pending_processes_;
306 
307   // Must outlive data_sources_ - owns at least the shared memory referenced by
308   // TraceWriters.
309   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
310 
311   // Must outlive data_sources_ - HeapTracker references the trie.
312   GlobalCallstackTrie callsites_;
313 
314   // Must outlive data_sources_ - DataSource can hold
315   // SystemProperties::Handle-s.
316   // Specific to mode_ == kCentral
317   SystemProperties properties_;
318 
319   std::map<FlushRequestID, size_t> flushes_in_progress_;
320   std::map<DataSourceInstanceID, DataSource> data_sources_;
321   std::vector<UnwindingWorker> unwinding_workers_;
322 
323   // Specific to mode_ == kChild
324   Process target_process_{base::kInvalidPid, ""};
325   std::optional<std::function<void()>> data_source_callback_;
326 
327   SocketDelegate socket_delegate_;
328 
329   base::WeakPtrFactory<HeapprofdProducer> weak_factory_;  // Keep last.
330 };
331 
332 }  // namespace profiling
333 }  // namespace perfetto
334 
335 #endif  // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
336