1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 // https://developers.google.com/protocol-buffers/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 // * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 // * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31 #include <google/protobuf/arenaz_sampler.h>
32
33 #include <memory>
34 #include <random>
35 #include <vector>
36
37 #include <gmock/gmock.h>
38 #include <gtest/gtest.h>
39 #include <google/protobuf/stubs/strutil.h>
40
41
42 // Must be included last.
43 #include <google/protobuf/port_def.inc>
44
45 namespace google {
46 namespace protobuf {
47 namespace internal {
48 #if defined(PROTOBUF_ARENAZ_SAMPLE)
49 class ThreadSafeArenaStatsHandlePeer {
50 public:
IsSampled(const ThreadSafeArenaStatsHandle & h)51 static bool IsSampled(const ThreadSafeArenaStatsHandle& h) {
52 return h.info_ != nullptr;
53 }
54
GetInfo(ThreadSafeArenaStatsHandle * h)55 static ThreadSafeArenaStats* GetInfo(ThreadSafeArenaStatsHandle* h) {
56 return h->info_;
57 }
58 };
GetBytesAllocated(ThreadSafeArenazSampler * s)59 std::vector<size_t> GetBytesAllocated(ThreadSafeArenazSampler* s) {
60 std::vector<size_t> res;
61 s->Iterate([&](const ThreadSafeArenaStats& info) {
62 res.push_back(info.bytes_allocated.load(std::memory_order_acquire));
63 });
64 return res;
65 }
66
Register(ThreadSafeArenazSampler * s,size_t size)67 ThreadSafeArenaStats* Register(ThreadSafeArenazSampler* s, size_t size) {
68 auto* info = s->Register();
69 assert(info != nullptr);
70 info->bytes_allocated.store(size);
71 return info;
72 }
73
74 #endif // defined(PROTOBUF_ARENAZ_SAMPLE)
75
76 namespace {
77
78 #if defined(PROTOBUF_ARENAZ_SAMPLE)
79
TEST(ThreadSafeArenaStatsTest,PrepareForSampling)80 TEST(ThreadSafeArenaStatsTest, PrepareForSampling) {
81 ThreadSafeArenaStats info;
82 MutexLock l(&info.init_mu);
83 info.PrepareForSampling();
84
85 EXPECT_EQ(info.num_allocations.load(), 0);
86 EXPECT_EQ(info.num_resets.load(), 0);
87 EXPECT_EQ(info.bytes_requested.load(), 0);
88 EXPECT_EQ(info.bytes_allocated.load(), 0);
89 EXPECT_EQ(info.bytes_wasted.load(), 0);
90 EXPECT_EQ(info.max_bytes_allocated.load(), 0);
91
92 info.num_allocations.store(1, std::memory_order_relaxed);
93 info.num_resets.store(1, std::memory_order_relaxed);
94 info.bytes_requested.store(1, std::memory_order_relaxed);
95 info.bytes_allocated.store(1, std::memory_order_relaxed);
96 info.bytes_wasted.store(1, std::memory_order_relaxed);
97 info.max_bytes_allocated.store(1, std::memory_order_relaxed);
98
99 info.PrepareForSampling();
100 EXPECT_EQ(info.num_allocations.load(), 0);
101 EXPECT_EQ(info.num_resets.load(), 0);
102 EXPECT_EQ(info.bytes_requested.load(), 0);
103 EXPECT_EQ(info.bytes_allocated.load(), 0);
104 EXPECT_EQ(info.bytes_wasted.load(), 0);
105 EXPECT_EQ(info.max_bytes_allocated.load(), 0);
106 }
107
TEST(ThreadSafeArenaStatsTest,RecordAllocateSlow)108 TEST(ThreadSafeArenaStatsTest, RecordAllocateSlow) {
109 ThreadSafeArenaStats info;
110 MutexLock l(&info.init_mu);
111 info.PrepareForSampling();
112 RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/128, /*wasted=*/0);
113 EXPECT_EQ(info.num_allocations.load(), 1);
114 EXPECT_EQ(info.num_resets.load(), 0);
115 EXPECT_EQ(info.bytes_requested.load(), 100);
116 EXPECT_EQ(info.bytes_allocated.load(), 128);
117 EXPECT_EQ(info.bytes_wasted.load(), 0);
118 EXPECT_EQ(info.max_bytes_allocated.load(), 0);
119 RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/256,
120 /*wasted=*/28);
121 EXPECT_EQ(info.num_allocations.load(), 2);
122 EXPECT_EQ(info.num_resets.load(), 0);
123 EXPECT_EQ(info.bytes_requested.load(), 200);
124 EXPECT_EQ(info.bytes_allocated.load(), 384);
125 EXPECT_EQ(info.bytes_wasted.load(), 28);
126 EXPECT_EQ(info.max_bytes_allocated.load(), 0);
127 }
128
TEST(ThreadSafeArenaStatsTest,RecordResetSlow)129 TEST(ThreadSafeArenaStatsTest, RecordResetSlow) {
130 ThreadSafeArenaStats info;
131 MutexLock l(&info.init_mu);
132 info.PrepareForSampling();
133 EXPECT_EQ(info.num_resets.load(), 0);
134 EXPECT_EQ(info.bytes_allocated.load(), 0);
135 RecordAllocateSlow(&info, /*requested=*/100, /*allocated=*/128, /*wasted=*/0);
136 EXPECT_EQ(info.num_resets.load(), 0);
137 EXPECT_EQ(info.bytes_allocated.load(), 128);
138 RecordResetSlow(&info);
139 EXPECT_EQ(info.num_resets.load(), 1);
140 EXPECT_EQ(info.bytes_allocated.load(), 0);
141 }
142
TEST(ThreadSafeArenazSamplerTest,SmallSampleParameter)143 TEST(ThreadSafeArenazSamplerTest, SmallSampleParameter) {
144 SetThreadSafeArenazEnabled(true);
145 SetThreadSafeArenazSampleParameter(100);
146
147 for (int i = 0; i < 1000; ++i) {
148 int64_t next_sample = 0;
149 ThreadSafeArenaStats* sample = SampleSlow(&next_sample);
150 EXPECT_GT(next_sample, 0);
151 EXPECT_NE(sample, nullptr);
152 UnsampleSlow(sample);
153 }
154 }
155
TEST(ThreadSafeArenazSamplerTest,LargeSampleParameter)156 TEST(ThreadSafeArenazSamplerTest, LargeSampleParameter) {
157 SetThreadSafeArenazEnabled(true);
158 SetThreadSafeArenazSampleParameter(std::numeric_limits<int32_t>::max());
159
160 for (int i = 0; i < 1000; ++i) {
161 int64_t next_sample = 0;
162 ThreadSafeArenaStats* sample = SampleSlow(&next_sample);
163 EXPECT_GT(next_sample, 0);
164 EXPECT_NE(sample, nullptr);
165 UnsampleSlow(sample);
166 }
167 }
168
TEST(ThreadSafeArenazSamplerTest,Sample)169 TEST(ThreadSafeArenazSamplerTest, Sample) {
170 SetThreadSafeArenazEnabled(true);
171 SetThreadSafeArenazSampleParameter(100);
172 SetThreadSafeArenazGlobalNextSample(0);
173 int64_t num_sampled = 0;
174 int64_t total = 0;
175 double sample_rate = 0.0;
176 for (int i = 0; i < 1000000; ++i) {
177 ThreadSafeArenaStatsHandle h = Sample();
178 ++total;
179 if (ThreadSafeArenaStatsHandlePeer::IsSampled(h)) {
180 ++num_sampled;
181 }
182 sample_rate = static_cast<double>(num_sampled) / total;
183 if (0.005 < sample_rate && sample_rate < 0.015) break;
184 }
185 EXPECT_NEAR(sample_rate, 0.01, 0.005);
186 }
187
TEST(ThreadSafeArenazSamplerTest,Handle)188 TEST(ThreadSafeArenazSamplerTest, Handle) {
189 auto& sampler = GlobalThreadSafeArenazSampler();
190 ThreadSafeArenaStatsHandle h(sampler.Register());
191 auto* info = ThreadSafeArenaStatsHandlePeer::GetInfo(&h);
192 info->bytes_allocated.store(0x12345678, std::memory_order_relaxed);
193
194 bool found = false;
195 sampler.Iterate([&](const ThreadSafeArenaStats& h) {
196 if (&h == info) {
197 EXPECT_EQ(h.bytes_allocated.load(), 0x12345678);
198 found = true;
199 }
200 });
201 EXPECT_TRUE(found);
202
203 h = ThreadSafeArenaStatsHandle();
204 found = false;
205 sampler.Iterate([&](const ThreadSafeArenaStats& h) {
206 if (&h == info) {
207 // this will only happen if some other thread has resurrected the info
208 // the old handle was using.
209 if (h.bytes_allocated.load() == 0x12345678) {
210 found = true;
211 }
212 }
213 });
214 EXPECT_FALSE(found);
215 }
216
TEST(ThreadSafeArenazSamplerTest,Registration)217 TEST(ThreadSafeArenazSamplerTest, Registration) {
218 ThreadSafeArenazSampler sampler;
219 auto* info1 = Register(&sampler, 1);
220 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1));
221
222 auto* info2 = Register(&sampler, 2);
223 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(1, 2));
224 info1->bytes_allocated.store(3);
225 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(3, 2));
226
227 sampler.Unregister(info1);
228 sampler.Unregister(info2);
229 }
230
TEST(ThreadSafeArenazSamplerTest,Unregistration)231 TEST(ThreadSafeArenazSamplerTest, Unregistration) {
232 ThreadSafeArenazSampler sampler;
233 std::vector<ThreadSafeArenaStats*> infos;
234 for (size_t i = 0; i < 3; ++i) {
235 infos.push_back(Register(&sampler, i));
236 }
237 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(0, 1, 2));
238
239 sampler.Unregister(infos[1]);
240 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(0, 2));
241
242 infos.push_back(Register(&sampler, 3));
243 infos.push_back(Register(&sampler, 4));
244 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(0, 2, 3, 4));
245 sampler.Unregister(infos[3]);
246 EXPECT_THAT(GetBytesAllocated(&sampler), UnorderedElementsAre(0, 2, 4));
247
248 sampler.Unregister(infos[0]);
249 sampler.Unregister(infos[2]);
250 sampler.Unregister(infos[4]);
251 EXPECT_THAT(GetBytesAllocated(&sampler), IsEmpty());
252 }
253
TEST(ThreadSafeArenazSamplerTest,MultiThreaded)254 TEST(ThreadSafeArenazSamplerTest, MultiThreaded) {
255 ThreadSafeArenazSampler sampler;
256 absl::Notification stop;
257 ThreadPool pool(10);
258
259 for (int i = 0; i < 10; ++i) {
260 pool.Schedule([&sampler, &stop]() {
261 std::random_device rd;
262 std::mt19937 gen(rd());
263
264 std::vector<ThreadSafeArenaStats*> infoz;
265 while (!stop.HasBeenNotified()) {
266 if (infoz.empty()) {
267 infoz.push_back(sampler.Register());
268 }
269 switch (std::uniform_int_distribution<>(0, 1)(gen)) {
270 case 0: {
271 infoz.push_back(sampler.Register());
272 break;
273 }
274 case 1: {
275 size_t p =
276 std::uniform_int_distribution<>(0, infoz.size() - 1)(gen);
277 ThreadSafeArenaStats* info = infoz[p];
278 infoz[p] = infoz.back();
279 infoz.pop_back();
280 sampler.Unregister(info);
281 break;
282 }
283 }
284 }
285 });
286 }
287 // The threads will hammer away. Give it a little bit of time for tsan to
288 // spot errors.
289 absl::SleepFor(absl::Seconds(3));
290 stop.Notify();
291 }
292
TEST(ThreadSafeArenazSamplerTest,Callback)293 TEST(ThreadSafeArenazSamplerTest, Callback) {
294 ThreadSafeArenazSampler sampler;
295
296 auto* info1 = Register(&sampler, 1);
297 auto* info2 = Register(&sampler, 2);
298
299 static const ThreadSafeArenaStats* expected;
300
301 auto callback = [](const ThreadSafeArenaStats& info) {
302 // We can't use `info` outside of this callback because the object will be
303 // disposed as soon as we return from here.
304 EXPECT_EQ(&info, expected);
305 };
306
307 // Set the callback.
308 EXPECT_EQ(sampler.SetDisposeCallback(callback), nullptr);
309 expected = info1;
310 sampler.Unregister(info1);
311
312 // Unset the callback.
313 EXPECT_EQ(callback, sampler.SetDisposeCallback(nullptr));
314 expected = nullptr; // no more calls.
315 sampler.Unregister(info2);
316 }
317
318 class ThreadSafeArenazSamplerTestThread : public Thread {
319 protected:
Run()320 void Run() override {
321 google::protobuf::ArenaSafeUniquePtr<
322 protobuf_test_messages::proto2::TestAllTypesProto2>
323 message = google::protobuf::MakeArenaSafeUnique<
324 protobuf_test_messages::proto2::TestAllTypesProto2>(arena_);
325 GOOGLE_CHECK(message != nullptr);
326 // Signal that a message on the arena has been created. This should create
327 // a SerialArena for this thread.
328 if (barrier_->Block()) {
329 delete barrier_;
330 }
331 }
332
333 public:
ThreadSafeArenazSamplerTestThread(const thread::Options & options,StringPiece name,google::protobuf::Arena * arena,absl::Barrier * barrier)334 ThreadSafeArenazSamplerTestThread(const thread::Options& options,
335 StringPiece name,
336 google::protobuf::Arena* arena,
337 absl::Barrier* barrier)
338 : Thread(options, name), arena_(arena), barrier_(barrier) {}
339
340 private:
341 google::protobuf::Arena* arena_;
342 absl::Barrier* barrier_;
343 };
344
TEST(ThreadSafeArenazSamplerTest,MultiThread)345 TEST(ThreadSafeArenazSamplerTest, MultiThread) {
346 SetThreadSafeArenazEnabled(true);
347 // Setting 1 as the parameter value means one in every two arenas would be
348 // sampled, on average.
349 SetThreadSafeArenazSampleParameter(1);
350 SetThreadSafeArenazGlobalNextSample(0);
351 auto& sampler = GlobalThreadSafeArenazSampler();
352 int count = 0;
353 for (int i = 0; i < 10; ++i) {
354 const int kNumThreads = 10;
355 absl::Barrier* barrier = new absl::Barrier(kNumThreads + 1);
356 google::protobuf::Arena arena;
357 thread::Options options;
358 options.set_joinable(true);
359 std::vector<std::unique_ptr<ThreadSafeArenazSamplerTestThread>> threads;
360 for (int i = 0; i < kNumThreads; i++) {
361 auto t = std::make_unique<ThreadSafeArenazSamplerTestThread>(
362 options, StrCat("thread", i), &arena, barrier);
363 t->Start();
364 threads.push_back(std::move(t));
365 }
366 // Wait till each thread has created a message on the arena.
367 if (barrier->Block()) {
368 delete barrier;
369 }
370 sampler.Iterate([&](const ThreadSafeArenaStats& h) { ++count; });
371 for (int i = 0; i < kNumThreads; i++) {
372 threads[i]->Join();
373 }
374 }
375 EXPECT_GT(count, 0);
376 }
377 #endif // defined(PROTOBUF_ARENAZ_SAMPLE)
378
379 } // namespace
380 } // namespace internal
381 } // namespace protobuf
382 } // namespace google
383