1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/util/latent_see.h"
16
17 #ifdef GRPC_ENABLE_LATENT_SEE
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <string>
22 #include <vector>
23
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 #include "src/core/util/ring_buffer.h"
28 #include "src/core/util/sync.h"
29
30 namespace grpc_core {
31 namespace latent_see {
32
33 thread_local uint64_t Log::thread_id_ = Log::Get().next_thread_id_.fetch_add(1);
34 thread_local Bin* Log::bin_ = nullptr;
35 thread_local void* Log::bin_owner_ = nullptr;
36 std::atomic<uint64_t> Flow::next_flow_id_{1};
37 std::atomic<uintptr_t> Log::free_bins_{0};
38 const std::chrono::steady_clock::time_point start_time =
39 std::chrono::steady_clock::now();
40
TryPullEventsAndFlush(absl::FunctionRef<void (absl::Span<const RecordedEvent>)> callback)41 void Log::TryPullEventsAndFlush(
42 absl::FunctionRef<void(absl::Span<const RecordedEvent>)> callback) {
43 // Try to lock... if we fail then clear the active events.
44 // This guarantees freeing up memory even if we're still serializing the
45 // previous pull.
46 if (!mu_flushing_.TryLock()) {
47 for (auto& fragment : fragments_) {
48 MutexLock lock(&fragment.mu_active);
49 fragment.active.clear();
50 }
51 return;
52 }
53 // Now we hold the lock; swap all active fragments to flushing.
54 // This is relatively quick and ensures that we don't stall capture for
55 // long.
56 for (auto& fragment : fragments_) {
57 CHECK_EQ(fragment.flushing.size(), 0);
58 MutexLock lock(&fragment.mu_active);
59 fragment.flushing.swap(fragment.active);
60 }
61 // Now we've swapped out, call the callback repeatedly with each fragment.
62 // This is the slow part - there's a lot of copying and transformation that
63 // happens here.
64 // We keep the mu_flushing_ held so that we can guarantee only one thread is
65 // consumed by this at a time.
66 // Once we've called the callback for each fragment we can clear it, so that
67 // when we next swap it with the active fragment it will be empty.
68 // This also retains the memory, so if we're serializing with a regular
69 // cadence we'll tend to stabilize memory usage for latent_see relatively
70 // quickly.
71 for (auto& fragment : fragments_) {
72 callback(fragment.flushing);
73 fragment.flushing.clear();
74 }
75 mu_flushing_.Unlock();
76 }
77
TryGenerateJson()78 absl::optional<std::string> Log::TryGenerateJson() {
79 using Nanos = std::chrono::duration<unsigned long long, std::nano>;
80 std::string json = "[\n";
81 bool first = true;
82 int callbacks = 0;
83 TryPullEventsAndFlush([&](absl::Span<const RecordedEvent> events) {
84 ++callbacks;
85 for (const auto& event : events) {
86 absl::string_view phase;
87 bool has_id;
88 switch (event.event.type) {
89 case EventType::kBegin:
90 phase = "B";
91 has_id = false;
92 break;
93 case EventType::kEnd:
94 phase = "E";
95 has_id = false;
96 break;
97 case EventType::kFlowStart:
98 phase = "s";
99 has_id = true;
100 break;
101 case EventType::kFlowEnd:
102 phase = "f";
103 has_id = true;
104 break;
105 case EventType::kMark:
106 phase = "i";
107 has_id = false;
108 break;
109 }
110 if (!first) {
111 absl::StrAppend(&json, ",\n");
112 }
113 first = false;
114 if (event.event.metadata->name[0] != '"') {
115 absl::StrAppend(
116 &json, "{\"name\": \"", event.event.metadata->name,
117 "\", \"ph\": \"", phase, "\", \"ts\": ",
118 Nanos(event.event.timestamp - start_time).count() / 1000.0,
119 ", \"pid\": 0, \"tid\": ", event.thread_id);
120 } else {
121 absl::StrAppend(
122 &json, "{\"name\": ", event.event.metadata->name, ", \"ph\": \"",
123 phase, "\", \"ts\": ",
124 Nanos(event.event.timestamp - start_time).count() / 1000.0,
125 ", \"pid\": 0, \"tid\": ", event.thread_id);
126 }
127
128 if (has_id) {
129 absl::StrAppend(&json, ", \"id\": ", event.event.id);
130 }
131 if (event.event.type == EventType::kFlowEnd) {
132 absl::StrAppend(&json, ", \"bp\": \"e\"");
133 }
134 absl::StrAppend(&json, ", \"args\": {\"file\": \"",
135 event.event.metadata->file,
136 "\", \"line\": ", event.event.metadata->line,
137 ", \"batch\": ", event.batch_id, "}}");
138 }
139 });
140 if (callbacks == 0) return absl::nullopt;
141 absl::StrAppend(&json, "\n]");
142 return json;
143 }
144
FlushBin(Bin * bin)145 void Log::FlushBin(Bin* bin) {
146 if (bin->events.empty()) return;
147 auto& log = Get();
148 const auto batch_id =
149 log.next_batch_id_.fetch_add(1, std::memory_order_relaxed);
150 auto& fragment = log.fragments_.this_cpu();
151 const auto thread_id = thread_id_;
152 {
153 MutexLock lock(&fragment.mu_active);
154 for (auto event : bin->events) {
155 fragment.active.push_back(RecordedEvent{thread_id, batch_id, event});
156 }
157 }
158 bin->events.clear();
159 }
160
161 } // namespace latent_see
162 } // namespace grpc_core
163 #endif
164