• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_UTIL_LATENT_SEE_H
16 #define GRPC_SRC_CORE_UTIL_LATENT_SEE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #ifdef GRPC_ENABLE_LATENT_SEE
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 
24 #include <atomic>
25 #include <chrono>
26 #include <cstdint>
27 #include <cstdio>
28 #include <cstdlib>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 #include "absl/base/thread_annotations.h"
34 #include "absl/functional/any_invocable.h"
35 #include "absl/functional/function_ref.h"
36 #include "absl/log/log.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 #include "src/core/util/per_cpu.h"
40 #include "src/core/util/sync.h"
41 
42 #define TAGGED_POINTER_SIZE_BITS 48
43 
44 namespace grpc_core {
45 namespace latent_see {
46 
47 struct Metadata {
48   const char* file;
49   int line;
50   const char* name;
51 };
52 
53 enum class EventType : uint8_t { kBegin, kEnd, kFlowStart, kFlowEnd, kMark };
54 
55 // A bin collects all events that occur within a parent scope.
56 struct Bin {
57   struct Event {
58     const Metadata* metadata;
59     std::chrono::steady_clock::time_point timestamp;
60     uint64_t id;
61     EventType type;
62   };
63 
AppendBin64   void Append(const Metadata* metadata, EventType type, uint64_t id) {
65     events.push_back(
66         Event{metadata, std::chrono::steady_clock::now(), id, type});
67   }
68 
69   std::vector<Event> events;
70   uintptr_t next_free = 0;
71 };
72 
73 class Log {
74  public:
75   static constexpr uintptr_t kTagMask = (1ULL << TAGGED_POINTER_SIZE_BITS) - 1;
76 
77   struct RecordedEvent {
78     uint64_t thread_id;
79     uint64_t batch_id;
80     Bin::Event event;
81   };
82 
IncrementTag(uintptr_t input)83   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static uintptr_t IncrementTag(
84       uintptr_t input) {
85     return input + (1UL << TAGGED_POINTER_SIZE_BITS);
86   }
87 
ToBin(uintptr_t ptr)88   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Bin* ToBin(uintptr_t ptr) {
89     return reinterpret_cast<Bin*>(ptr & kTagMask);
90   }
91 
StartBin(void * owner)92   static uintptr_t StartBin(void* owner) {
93     uintptr_t bin_descriptor = free_bins_.load(std::memory_order_acquire);
94     Bin* bin;
95     do {
96       if (bin_descriptor == 0) {
97         bin = new Bin();
98         break;
99       }
100       bin = ToBin(bin_descriptor);
101     } while (!free_bins_.compare_exchange_strong(bin_descriptor, bin->next_free,
102                                                  std::memory_order_acq_rel));
103     bin_ = bin;
104     bin_owner_ = owner;
105     return reinterpret_cast<uintptr_t>(bin);
106   }
107 
EndBin(uintptr_t bin_descriptor,void * owner)108   static void EndBin(uintptr_t bin_descriptor, void* owner) {
109     if (bin_owner_ != owner || bin_descriptor == 0) return;
110     FlushBin(ToBin(bin_descriptor));
111     uintptr_t next_free = free_bins_.load(std::memory_order_acquire);
112     while (!free_bins_.compare_exchange_strong(
113         next_free, IncrementTag(bin_descriptor), std::memory_order_acq_rel)) {
114     }
115     bin_ = nullptr;
116     bin_owner_ = nullptr;
117   }
118 
CurrentThreadBin()119   static Bin* CurrentThreadBin() { return bin_; }
120 
Get()121   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Log& Get() {
122     static Log* log = []() {
123       atexit([] {
124         auto json = log->TryGenerateJson();
125         if (!json.has_value()) {
126           LOG(INFO) << "Failed to generate latent_see.json (contention with "
127                        "another writer)";
128           return;
129         }
130         if (log->stats_flusher_ != nullptr) {
131           log->stats_flusher_(*json);
132           return;
133         }
134         LOG(INFO) << "Writing latent_see.json in " << get_current_dir_name();
135         FILE* f = fopen("latent_see.json", "w");
136         if (f == nullptr) return;
137         fprintf(f, "%s", json->c_str());
138         fclose(f);
139       });
140       return new Log();
141     }();
142     return *log;
143   }
144 
145   void TryPullEventsAndFlush(
146       absl::FunctionRef<void(absl::Span<const RecordedEvent>)> callback);
147   absl::optional<std::string> TryGenerateJson();
148 
OverrideStatsFlusher(absl::AnyInvocable<void (absl::string_view)> stats_exporter)149   void OverrideStatsFlusher(
150       absl::AnyInvocable<void(absl::string_view)> stats_exporter) {
151     stats_flusher_ = std::move(stats_exporter);
152   }
153 
154  private:
155   Log() = default;
156 
157   static void FlushBin(Bin* bin);
158 
159   std::atomic<uint64_t> next_thread_id_{1};
160   std::atomic<uint64_t> next_batch_id_{1};
161   static thread_local uint64_t thread_id_;
162   static thread_local Bin* bin_;
163   static thread_local void* bin_owner_;
164   static std::atomic<uintptr_t> free_bins_;
165   absl::AnyInvocable<void(absl::string_view)> stats_flusher_ = nullptr;
166   Mutex mu_flushing_;
167   struct Fragment {
168     Mutex mu_active ABSL_ACQUIRED_AFTER(mu_flushing_);
169     std::vector<RecordedEvent> active ABSL_GUARDED_BY(mu_active);
170     std::vector<RecordedEvent> flushing ABSL_GUARDED_BY(&Log::mu_flushing_);
171   };
172   PerCpu<Fragment> fragments_{PerCpuOptions()};
173 };
174 
175 template <bool kParent>
176 class Scope {
177  public:
Scope(const Metadata * metadata)178   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Scope(const Metadata* metadata)
179       : metadata_(metadata) {
180     bin_ = Log::CurrentThreadBin();
181     if (kParent && bin_ == nullptr) {
182       bin_descriptor_ = Log::StartBin(this);
183       bin_ = Log::ToBin(bin_descriptor_);
184     }
185     CHECK_NE(bin_, nullptr);
186     bin_->Append(metadata_, EventType::kBegin, 0);
187   }
~Scope()188   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Scope() {
189     bin_->Append(metadata_, EventType::kEnd, 0);
190     if (kParent) Log::EndBin(bin_descriptor_, this);
191   }
192 
193   Scope(const Scope&) = delete;
194   Scope& operator=(const Scope&) = delete;
195 
196  private:
197   const Metadata* const metadata_;
198   uintptr_t bin_descriptor_ = 0;
199   Bin* bin_ = nullptr;
200 };
201 
202 using ParentScope = Scope<true>;
203 using InnerScope = Scope<false>;
204 
205 class Flow {
206  public:
Flow()207   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Flow() : metadata_(nullptr) {}
Flow(const Metadata * metadata)208   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Flow(const Metadata* metadata)
209       : metadata_(metadata),
210         id_(next_flow_id_.fetch_add(1, std::memory_order_relaxed)) {
211     Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowStart, id_);
212   }
~Flow()213   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Flow() {
214     if (metadata_ != nullptr) {
215       Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
216     }
217   }
218 
219   Flow(const Flow&) = delete;
220   Flow& operator=(const Flow&) = delete;
Flow(Flow && other)221   Flow(Flow&& other) noexcept
222       : metadata_(std::exchange(other.metadata_, nullptr)), id_(other.id_) {}
223   Flow& operator=(Flow&& other) noexcept {
224     if (metadata_ != nullptr) {
225       Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
226     }
227     metadata_ = std::exchange(other.metadata_, nullptr);
228     id_ = other.id_;
229     return *this;
230   }
231 
is_active()232   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION bool is_active() const {
233     return metadata_ != nullptr;
234   }
End()235   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {
236     if (metadata_ == nullptr) return;
237     Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
238     metadata_ = nullptr;
239   }
Begin(const Metadata * metadata)240   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(const Metadata* metadata) {
241     auto* bin = Log::CurrentThreadBin();
242     if (metadata_ != nullptr) {
243       bin->Append(metadata_, EventType::kFlowEnd, id_);
244     }
245     metadata_ = metadata;
246     if (metadata_ == nullptr) return;
247     id_ = next_flow_id_.fetch_add(1, std::memory_order_relaxed);
248     bin->Append(metadata_, EventType::kFlowStart, id_);
249   }
250 
251  private:
252   const Metadata* metadata_;
253   uint64_t id_;
254   static std::atomic<uint64_t> next_flow_id_;
255 };
256 
Mark(const Metadata * md)257 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
258   Log::CurrentThreadBin()->Append(md, EventType::kMark, 0);
259 }
260 
261 template <typename P>
Promise(const Metadata * md_poll,const Metadata * md_flow,P promise)262 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto Promise(const Metadata* md_poll,
263                                                   const Metadata* md_flow,
264                                                   P promise) {
265   return [md_poll, md_flow, promise = std::move(promise),
266           flow = Flow(md_flow)]() mutable {
267     InnerScope scope(md_poll);
268     flow.End();
269     auto r = promise();
270     flow.Begin(md_flow);
271     return r;
272   };
273 }
274 
275 }  // namespace latent_see
276 }  // namespace grpc_core
277 #define GRPC_LATENT_SEE_METADATA(name)                                     \
278   []() {                                                                   \
279     static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \
280                                                        name};              \
281     return &metadata;                                                      \
282   }()
283 // Parent scope: logs a begin and end event, and flushes the thread log on scope
284 // exit. Because the flush takes some time it's better to place one parent scope
285 // at the top of the stack, and use lighter weight scopes within it.
286 #define GRPC_LATENT_SEE_PARENT_SCOPE(name)                       \
287   grpc_core::latent_see::ParentScope latent_see_scope##__LINE__( \
288       GRPC_LATENT_SEE_METADATA(name))
289 // Inner scope: logs a begin and end event. Lighter weight than parent scope,
290 // but does not flush the thread state - so should only be enclosed by a parent
291 // scope.
292 #define GRPC_LATENT_SEE_INNER_SCOPE(name)                       \
293   grpc_core::latent_see::InnerScope latent_see_scope##__LINE__( \
294       GRPC_LATENT_SEE_METADATA(name))
295 // Mark: logs a single event.
296 // This is not flushed automatically, and so should only be used within a parent
297 // scope.
298 #define GRPC_LATENT_SEE_MARK(name) \
299   grpc_core::latent_see::Mark(GRPC_LATENT_SEE_METADATA(name))
300 #define GRPC_LATENT_SEE_PROMISE(name, promise)                           \
301   grpc_core::latent_see::Promise(GRPC_LATENT_SEE_METADATA("Poll:" name), \
302                                  GRPC_LATENT_SEE_METADATA(name), promise)
303 #else  // !def(GRPC_ENABLE_LATENT_SEE)
304 namespace grpc_core {
305 namespace latent_see {
306 struct Metadata {};
307 struct Flow {
is_activeFlow308   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION bool is_active() const { return false; }
EndFlow309   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {}
BeginFlow310   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(Metadata*) {}
311 };
312 struct ParentScope {
ParentScopeParentScope313   explicit ParentScope(Metadata*) {}
314 };
315 struct InnerScope {
InnerScopeInnerScope316   explicit InnerScope(Metadata*) {}
317 };
318 }  // namespace latent_see
319 }  // namespace grpc_core
320 #define GRPC_LATENT_SEE_METADATA(name) nullptr
321 #define GRPC_LATENT_SEE_METADATA_RAW(name) nullptr
322 #define GRPC_LATENT_SEE_PARENT_SCOPE(name) \
323   do {                                     \
324   } while (0)
325 #define GRPC_LATENT_SEE_INNER_SCOPE(name) \
326   do {                                    \
327   } while (0)
328 #define GRPC_LATENT_SEE_MARK(name) \
329   do {                             \
330   } while (0)
331 #define GRPC_LATENT_SEE_PROMISE(name, promise) promise
332 #endif  // GRPC_ENABLE_LATENT_SEE
333 
334 #endif  // GRPC_SRC_CORE_UTIL_LATENT_SEE_H
335