• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/util/latent_see.h"
16 
17 #ifdef GRPC_ENABLE_LATENT_SEE
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <string>
22 #include <vector>
23 
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 #include "src/core/util/ring_buffer.h"
28 #include "src/core/util/sync.h"
29 
30 namespace grpc_core {
31 namespace latent_see {
32 
33 thread_local uint64_t Log::thread_id_ = Log::Get().next_thread_id_.fetch_add(1);
34 thread_local Bin* Log::bin_ = nullptr;
35 thread_local void* Log::bin_owner_ = nullptr;
36 std::atomic<uint64_t> Flow::next_flow_id_{1};
37 std::atomic<uintptr_t> Log::free_bins_{0};
38 const std::chrono::steady_clock::time_point start_time =
39     std::chrono::steady_clock::now();
40 
TryPullEventsAndFlush(absl::FunctionRef<void (absl::Span<const RecordedEvent>)> callback)41 void Log::TryPullEventsAndFlush(
42     absl::FunctionRef<void(absl::Span<const RecordedEvent>)> callback) {
43   // Try to lock... if we fail then clear the active events.
44   // This guarantees freeing up memory even if we're still serializing the
45   // previous pull.
46   if (!mu_flushing_.TryLock()) {
47     for (auto& fragment : fragments_) {
48       MutexLock lock(&fragment.mu_active);
49       fragment.active.clear();
50     }
51     return;
52   }
53   // Now we hold the lock; swap all active fragments to flushing.
54   // This is relatively quick and ensures that we don't stall capture for
55   // long.
56   for (auto& fragment : fragments_) {
57     CHECK_EQ(fragment.flushing.size(), 0);
58     MutexLock lock(&fragment.mu_active);
59     fragment.flushing.swap(fragment.active);
60   }
61   // Now we've swapped out, call the callback repeatedly with each fragment.
62   // This is the slow part - there's a lot of copying and transformation that
63   // happens here.
64   // We keep the mu_flushing_ held so that we can guarantee only one thread is
65   // consumed by this at a time.
66   // Once we've called the callback for each fragment we can clear it, so that
67   // when we next swap it with the active fragment it will be empty.
68   // This also retains the memory, so if we're serializing with a regular
69   // cadence we'll tend to stabilize memory usage for latent_see relatively
70   // quickly.
71   for (auto& fragment : fragments_) {
72     callback(fragment.flushing);
73     fragment.flushing.clear();
74   }
75   mu_flushing_.Unlock();
76 }
77 
TryGenerateJson()78 absl::optional<std::string> Log::TryGenerateJson() {
79   using Nanos = std::chrono::duration<unsigned long long, std::nano>;
80   std::string json = "[\n";
81   bool first = true;
82   int callbacks = 0;
83   TryPullEventsAndFlush([&](absl::Span<const RecordedEvent> events) {
84     ++callbacks;
85     for (const auto& event : events) {
86       absl::string_view phase;
87       bool has_id;
88       switch (event.event.type) {
89         case EventType::kBegin:
90           phase = "B";
91           has_id = false;
92           break;
93         case EventType::kEnd:
94           phase = "E";
95           has_id = false;
96           break;
97         case EventType::kFlowStart:
98           phase = "s";
99           has_id = true;
100           break;
101         case EventType::kFlowEnd:
102           phase = "f";
103           has_id = true;
104           break;
105         case EventType::kMark:
106           phase = "i";
107           has_id = false;
108           break;
109       }
110       if (!first) {
111         absl::StrAppend(&json, ",\n");
112       }
113       first = false;
114       if (event.event.metadata->name[0] != '"') {
115         absl::StrAppend(
116             &json, "{\"name\": \"", event.event.metadata->name,
117             "\", \"ph\": \"", phase, "\", \"ts\": ",
118             Nanos(event.event.timestamp - start_time).count() / 1000.0,
119             ", \"pid\": 0, \"tid\": ", event.thread_id);
120       } else {
121         absl::StrAppend(
122             &json, "{\"name\": ", event.event.metadata->name, ", \"ph\": \"",
123             phase, "\", \"ts\": ",
124             Nanos(event.event.timestamp - start_time).count() / 1000.0,
125             ", \"pid\": 0, \"tid\": ", event.thread_id);
126       }
127 
128       if (has_id) {
129         absl::StrAppend(&json, ", \"id\": ", event.event.id);
130       }
131       if (event.event.type == EventType::kFlowEnd) {
132         absl::StrAppend(&json, ", \"bp\": \"e\"");
133       }
134       absl::StrAppend(&json, ", \"args\": {\"file\": \"",
135                       event.event.metadata->file,
136                       "\", \"line\": ", event.event.metadata->line,
137                       ", \"batch\": ", event.batch_id, "}}");
138     }
139   });
140   if (callbacks == 0) return absl::nullopt;
141   absl::StrAppend(&json, "\n]");
142   return json;
143 }
144 
FlushBin(Bin * bin)145 void Log::FlushBin(Bin* bin) {
146   if (bin->events.empty()) return;
147   auto& log = Get();
148   const auto batch_id =
149       log.next_batch_id_.fetch_add(1, std::memory_order_relaxed);
150   auto& fragment = log.fragments_.this_cpu();
151   const auto thread_id = thread_id_;
152   {
153     MutexLock lock(&fragment.mu_active);
154     for (auto event : bin->events) {
155       fragment.active.push_back(RecordedEvent{thread_id, batch_id, event});
156     }
157   }
158   bin->events.clear();
159 }
160 
161 }  // namespace latent_see
162 }  // namespace grpc_core
163 #endif
164