1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include <sys/types.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include "db/db_impl.h"
9 #include "db/version_set.h"
10 #include "leveldb/cache.h"
11 #include "leveldb/db.h"
12 #include "leveldb/env.h"
13 #include "leveldb/write_batch.h"
14 #include "port/port.h"
15 #include "util/crc32c.h"
16 #include "util/histogram.h"
17 #include "util/mutexlock.h"
18 #include "util/random.h"
19 #include "util/testutil.h"
20
21 // Comma-separated list of operations to run in the specified order
22 // Actual benchmarks:
23 // fillseq -- write N values in sequential key order in async mode
24 // fillrandom -- write N values in random key order in async mode
25 // overwrite -- overwrite N values in random key order in async mode
26 // fillsync -- write N/100 values in random key order in sync mode
27 // fill100K -- write N/1000 100K values in random order in async mode
28 // deleteseq -- delete N keys in sequential order
29 // deleterandom -- delete N keys in random order
30 // readseq -- read N times sequentially
31 // readreverse -- read N times in reverse order
32 // readrandom -- read N times in random order
33 // readmissing -- read N missing keys in random order
34 // readhot -- read N times in random order from 1% section of DB
35 // seekrandom -- N random seeks
36 // crc32c -- repeated crc32c of 4K of data
37 // acquireload -- load N*1000 times
38 // Meta operations:
39 // compact -- Compact the entire DB
40 // stats -- Print DB stats
41 // sstables -- Print sstable info
42 // heapprofile -- Dump a heap profile (if supported by this port)
43 static const char* FLAGS_benchmarks =
44 "fillseq,"
45 "fillsync,"
46 "fillrandom,"
47 "overwrite,"
48 "readrandom,"
49 "readrandom," // Extra run to allow previous compactions to quiesce
50 "readseq,"
51 "readreverse,"
52 "compact,"
53 "readrandom,"
54 "readseq,"
55 "readreverse,"
56 "fill100K,"
57 "crc32c,"
58 "snappycomp,"
59 "snappyuncomp,"
60 "acquireload,"
61 ;
62
63 // Number of key/values to place in database
64 static int FLAGS_num = 1000000;
65
66 // Number of read operations to do. If negative, do FLAGS_num reads.
67 static int FLAGS_reads = -1;
68
69 // Number of concurrent threads to run.
70 static int FLAGS_threads = 1;
71
72 // Size of each value
73 static int FLAGS_value_size = 100;
74
75 // Arrange to generate values that shrink to this fraction of
76 // their original size after compression
77 static double FLAGS_compression_ratio = 0.5;
78
79 // Print histogram of operation timings
80 static bool FLAGS_histogram = false;
81
82 // Number of bytes to buffer in memtable before compacting
83 // (initialized to default value by "main")
84 static int FLAGS_write_buffer_size = 0;
85
86 // Number of bytes to use as a cache of uncompressed data.
87 // Negative means use default settings.
88 static int FLAGS_cache_size = -1;
89
90 // Maximum number of files to keep open at the same time (use default if == 0)
91 static int FLAGS_open_files = 0;
92
93 // Bloom filter bits per key.
94 // Negative means use default settings.
95 static int FLAGS_bloom_bits = -1;
96
97 // If true, do not destroy the existing database. If you set this
98 // flag and also specify a benchmark that wants a fresh database, that
99 // benchmark will fail.
100 static bool FLAGS_use_existing_db = false;
101
102 // Use the db with the following name.
103 static const char* FLAGS_db = NULL;
104
105 namespace leveldb {
106
107 namespace {
108
109 // Helper for quickly generating random data.
110 class RandomGenerator {
111 private:
112 std::string data_;
113 int pos_;
114
115 public:
RandomGenerator()116 RandomGenerator() {
117 // We use a limited amount of data over and over again and ensure
118 // that it is larger than the compression window (32KB), and also
119 // large enough to serve all typical value sizes we want to write.
120 Random rnd(301);
121 std::string piece;
122 while (data_.size() < 1048576) {
123 // Add a short fragment that is as compressible as specified
124 // by FLAGS_compression_ratio.
125 test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
126 data_.append(piece);
127 }
128 pos_ = 0;
129 }
130
Generate(size_t len)131 Slice Generate(size_t len) {
132 if (pos_ + len > data_.size()) {
133 pos_ = 0;
134 assert(len < data_.size());
135 }
136 pos_ += len;
137 return Slice(data_.data() + pos_ - len, len);
138 }
139 };
140
TrimSpace(Slice s)141 static Slice TrimSpace(Slice s) {
142 size_t start = 0;
143 while (start < s.size() && isspace(s[start])) {
144 start++;
145 }
146 size_t limit = s.size();
147 while (limit > start && isspace(s[limit-1])) {
148 limit--;
149 }
150 return Slice(s.data() + start, limit - start);
151 }
152
AppendWithSpace(std::string * str,Slice msg)153 static void AppendWithSpace(std::string* str, Slice msg) {
154 if (msg.empty()) return;
155 if (!str->empty()) {
156 str->push_back(' ');
157 }
158 str->append(msg.data(), msg.size());
159 }
160
161 class Stats {
162 private:
163 double start_;
164 double finish_;
165 double seconds_;
166 int done_;
167 int next_report_;
168 int64_t bytes_;
169 double last_op_finish_;
170 Histogram hist_;
171 std::string message_;
172
173 public:
Stats()174 Stats() { Start(); }
175
Start()176 void Start() {
177 next_report_ = 100;
178 last_op_finish_ = start_;
179 hist_.Clear();
180 done_ = 0;
181 bytes_ = 0;
182 seconds_ = 0;
183 start_ = Env::Default()->NowMicros();
184 finish_ = start_;
185 message_.clear();
186 }
187
Merge(const Stats & other)188 void Merge(const Stats& other) {
189 hist_.Merge(other.hist_);
190 done_ += other.done_;
191 bytes_ += other.bytes_;
192 seconds_ += other.seconds_;
193 if (other.start_ < start_) start_ = other.start_;
194 if (other.finish_ > finish_) finish_ = other.finish_;
195
196 // Just keep the messages from one thread
197 if (message_.empty()) message_ = other.message_;
198 }
199
Stop()200 void Stop() {
201 finish_ = Env::Default()->NowMicros();
202 seconds_ = (finish_ - start_) * 1e-6;
203 }
204
AddMessage(Slice msg)205 void AddMessage(Slice msg) {
206 AppendWithSpace(&message_, msg);
207 }
208
FinishedSingleOp()209 void FinishedSingleOp() {
210 if (FLAGS_histogram) {
211 double now = Env::Default()->NowMicros();
212 double micros = now - last_op_finish_;
213 hist_.Add(micros);
214 if (micros > 20000) {
215 fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
216 fflush(stderr);
217 }
218 last_op_finish_ = now;
219 }
220
221 done_++;
222 if (done_ >= next_report_) {
223 if (next_report_ < 1000) next_report_ += 100;
224 else if (next_report_ < 5000) next_report_ += 500;
225 else if (next_report_ < 10000) next_report_ += 1000;
226 else if (next_report_ < 50000) next_report_ += 5000;
227 else if (next_report_ < 100000) next_report_ += 10000;
228 else if (next_report_ < 500000) next_report_ += 50000;
229 else next_report_ += 100000;
230 fprintf(stderr, "... finished %d ops%30s\r", done_, "");
231 fflush(stderr);
232 }
233 }
234
AddBytes(int64_t n)235 void AddBytes(int64_t n) {
236 bytes_ += n;
237 }
238
Report(const Slice & name)239 void Report(const Slice& name) {
240 // Pretend at least one op was done in case we are running a benchmark
241 // that does not call FinishedSingleOp().
242 if (done_ < 1) done_ = 1;
243
244 std::string extra;
245 if (bytes_ > 0) {
246 // Rate is computed on actual elapsed time, not the sum of per-thread
247 // elapsed times.
248 double elapsed = (finish_ - start_) * 1e-6;
249 char rate[100];
250 snprintf(rate, sizeof(rate), "%6.1f MB/s",
251 (bytes_ / 1048576.0) / elapsed);
252 extra = rate;
253 }
254 AppendWithSpace(&extra, message_);
255
256 fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
257 name.ToString().c_str(),
258 seconds_ * 1e6 / done_,
259 (extra.empty() ? "" : " "),
260 extra.c_str());
261 if (FLAGS_histogram) {
262 fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
263 }
264 fflush(stdout);
265 }
266 };
267
268 // State shared by all concurrent executions of the same benchmark.
269 struct SharedState {
270 port::Mutex mu;
271 port::CondVar cv;
272 int total;
273
274 // Each thread goes through the following states:
275 // (1) initializing
276 // (2) waiting for others to be initialized
277 // (3) running
278 // (4) done
279
280 int num_initialized;
281 int num_done;
282 bool start;
283
SharedStateleveldb::__anonb7bcf52f0111::SharedState284 SharedState() : cv(&mu) { }
285 };
286
287 // Per-thread state for concurrent executions of the same benchmark.
288 struct ThreadState {
289 int tid; // 0..n-1 when running in n threads
290 Random rand; // Has different seeds for different threads
291 Stats stats;
292 SharedState* shared;
293
ThreadStateleveldb::__anonb7bcf52f0111::ThreadState294 ThreadState(int index)
295 : tid(index),
296 rand(1000 + index) {
297 }
298 };
299
300 } // namespace
301
302 class Benchmark {
303 private:
304 Cache* cache_;
305 const FilterPolicy* filter_policy_;
306 DB* db_;
307 int num_;
308 int value_size_;
309 int entries_per_batch_;
310 WriteOptions write_options_;
311 int reads_;
312 int heap_counter_;
313
PrintHeader()314 void PrintHeader() {
315 const int kKeySize = 16;
316 PrintEnvironment();
317 fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
318 fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
319 FLAGS_value_size,
320 static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
321 fprintf(stdout, "Entries: %d\n", num_);
322 fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
323 ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
324 / 1048576.0));
325 fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
326 (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
327 / 1048576.0));
328 PrintWarnings();
329 fprintf(stdout, "------------------------------------------------\n");
330 }
331
PrintWarnings()332 void PrintWarnings() {
333 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
334 fprintf(stdout,
335 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
336 );
337 #endif
338 #ifndef NDEBUG
339 fprintf(stdout,
340 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
341 #endif
342
343 // See if snappy is working by attempting to compress a compressible string
344 const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
345 std::string compressed;
346 if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
347 fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
348 } else if (compressed.size() >= sizeof(text)) {
349 fprintf(stdout, "WARNING: Snappy compression is not effective\n");
350 }
351 }
352
PrintEnvironment()353 void PrintEnvironment() {
354 fprintf(stderr, "LevelDB: version %d.%d\n",
355 kMajorVersion, kMinorVersion);
356
357 #if defined(__linux)
358 time_t now = time(NULL);
359 fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline
360
361 FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
362 if (cpuinfo != NULL) {
363 char line[1000];
364 int num_cpus = 0;
365 std::string cpu_type;
366 std::string cache_size;
367 while (fgets(line, sizeof(line), cpuinfo) != NULL) {
368 const char* sep = strchr(line, ':');
369 if (sep == NULL) {
370 continue;
371 }
372 Slice key = TrimSpace(Slice(line, sep - 1 - line));
373 Slice val = TrimSpace(Slice(sep + 1));
374 if (key == "model name") {
375 ++num_cpus;
376 cpu_type = val.ToString();
377 } else if (key == "cache size") {
378 cache_size = val.ToString();
379 }
380 }
381 fclose(cpuinfo);
382 fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
383 fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
384 }
385 #endif
386 }
387
388 public:
Benchmark()389 Benchmark()
390 : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
391 filter_policy_(FLAGS_bloom_bits >= 0
392 ? NewBloomFilterPolicy(FLAGS_bloom_bits)
393 : NULL),
394 db_(NULL),
395 num_(FLAGS_num),
396 value_size_(FLAGS_value_size),
397 entries_per_batch_(1),
398 reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
399 heap_counter_(0) {
400 std::vector<std::string> files;
401 Env::Default()->GetChildren(FLAGS_db, &files);
402 for (size_t i = 0; i < files.size(); i++) {
403 if (Slice(files[i]).starts_with("heap-")) {
404 Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
405 }
406 }
407 if (!FLAGS_use_existing_db) {
408 DestroyDB(FLAGS_db, Options());
409 }
410 }
411
~Benchmark()412 ~Benchmark() {
413 delete db_;
414 delete cache_;
415 delete filter_policy_;
416 }
417
Run()418 void Run() {
419 PrintHeader();
420 Open();
421
422 const char* benchmarks = FLAGS_benchmarks;
423 while (benchmarks != NULL) {
424 const char* sep = strchr(benchmarks, ',');
425 Slice name;
426 if (sep == NULL) {
427 name = benchmarks;
428 benchmarks = NULL;
429 } else {
430 name = Slice(benchmarks, sep - benchmarks);
431 benchmarks = sep + 1;
432 }
433
434 // Reset parameters that may be overriddden bwlow
435 num_ = FLAGS_num;
436 reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
437 value_size_ = FLAGS_value_size;
438 entries_per_batch_ = 1;
439 write_options_ = WriteOptions();
440
441 void (Benchmark::*method)(ThreadState*) = NULL;
442 bool fresh_db = false;
443 int num_threads = FLAGS_threads;
444
445 if (name == Slice("fillseq")) {
446 fresh_db = true;
447 method = &Benchmark::WriteSeq;
448 } else if (name == Slice("fillbatch")) {
449 fresh_db = true;
450 entries_per_batch_ = 1000;
451 method = &Benchmark::WriteSeq;
452 } else if (name == Slice("fillrandom")) {
453 fresh_db = true;
454 method = &Benchmark::WriteRandom;
455 } else if (name == Slice("overwrite")) {
456 fresh_db = false;
457 method = &Benchmark::WriteRandom;
458 } else if (name == Slice("fillsync")) {
459 fresh_db = true;
460 num_ /= 1000;
461 write_options_.sync = true;
462 method = &Benchmark::WriteRandom;
463 } else if (name == Slice("fill100K")) {
464 fresh_db = true;
465 num_ /= 1000;
466 value_size_ = 100 * 1000;
467 method = &Benchmark::WriteRandom;
468 } else if (name == Slice("readseq")) {
469 method = &Benchmark::ReadSequential;
470 } else if (name == Slice("readreverse")) {
471 method = &Benchmark::ReadReverse;
472 } else if (name == Slice("readrandom")) {
473 method = &Benchmark::ReadRandom;
474 } else if (name == Slice("readmissing")) {
475 method = &Benchmark::ReadMissing;
476 } else if (name == Slice("seekrandom")) {
477 method = &Benchmark::SeekRandom;
478 } else if (name == Slice("readhot")) {
479 method = &Benchmark::ReadHot;
480 } else if (name == Slice("readrandomsmall")) {
481 reads_ /= 1000;
482 method = &Benchmark::ReadRandom;
483 } else if (name == Slice("deleteseq")) {
484 method = &Benchmark::DeleteSeq;
485 } else if (name == Slice("deleterandom")) {
486 method = &Benchmark::DeleteRandom;
487 } else if (name == Slice("readwhilewriting")) {
488 num_threads++; // Add extra thread for writing
489 method = &Benchmark::ReadWhileWriting;
490 } else if (name == Slice("compact")) {
491 method = &Benchmark::Compact;
492 } else if (name == Slice("crc32c")) {
493 method = &Benchmark::Crc32c;
494 } else if (name == Slice("acquireload")) {
495 method = &Benchmark::AcquireLoad;
496 } else if (name == Slice("snappycomp")) {
497 method = &Benchmark::SnappyCompress;
498 } else if (name == Slice("snappyuncomp")) {
499 method = &Benchmark::SnappyUncompress;
500 } else if (name == Slice("heapprofile")) {
501 HeapProfile();
502 } else if (name == Slice("stats")) {
503 PrintStats("leveldb.stats");
504 } else if (name == Slice("sstables")) {
505 PrintStats("leveldb.sstables");
506 } else {
507 if (name != Slice()) { // No error message for empty name
508 fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
509 }
510 }
511
512 if (fresh_db) {
513 if (FLAGS_use_existing_db) {
514 fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
515 name.ToString().c_str());
516 method = NULL;
517 } else {
518 delete db_;
519 db_ = NULL;
520 DestroyDB(FLAGS_db, Options());
521 Open();
522 }
523 }
524
525 if (method != NULL) {
526 RunBenchmark(num_threads, name, method);
527 }
528 }
529 }
530
531 private:
532 struct ThreadArg {
533 Benchmark* bm;
534 SharedState* shared;
535 ThreadState* thread;
536 void (Benchmark::*method)(ThreadState*);
537 };
538
ThreadBody(void * v)539 static void ThreadBody(void* v) {
540 ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
541 SharedState* shared = arg->shared;
542 ThreadState* thread = arg->thread;
543 {
544 MutexLock l(&shared->mu);
545 shared->num_initialized++;
546 if (shared->num_initialized >= shared->total) {
547 shared->cv.SignalAll();
548 }
549 while (!shared->start) {
550 shared->cv.Wait();
551 }
552 }
553
554 thread->stats.Start();
555 (arg->bm->*(arg->method))(thread);
556 thread->stats.Stop();
557
558 {
559 MutexLock l(&shared->mu);
560 shared->num_done++;
561 if (shared->num_done >= shared->total) {
562 shared->cv.SignalAll();
563 }
564 }
565 }
566
RunBenchmark(int n,Slice name,void (Benchmark::* method)(ThreadState *))567 void RunBenchmark(int n, Slice name,
568 void (Benchmark::*method)(ThreadState*)) {
569 SharedState shared;
570 shared.total = n;
571 shared.num_initialized = 0;
572 shared.num_done = 0;
573 shared.start = false;
574
575 ThreadArg* arg = new ThreadArg[n];
576 for (int i = 0; i < n; i++) {
577 arg[i].bm = this;
578 arg[i].method = method;
579 arg[i].shared = &shared;
580 arg[i].thread = new ThreadState(i);
581 arg[i].thread->shared = &shared;
582 Env::Default()->StartThread(ThreadBody, &arg[i]);
583 }
584
585 shared.mu.Lock();
586 while (shared.num_initialized < n) {
587 shared.cv.Wait();
588 }
589
590 shared.start = true;
591 shared.cv.SignalAll();
592 while (shared.num_done < n) {
593 shared.cv.Wait();
594 }
595 shared.mu.Unlock();
596
597 for (int i = 1; i < n; i++) {
598 arg[0].thread->stats.Merge(arg[i].thread->stats);
599 }
600 arg[0].thread->stats.Report(name);
601
602 for (int i = 0; i < n; i++) {
603 delete arg[i].thread;
604 }
605 delete[] arg;
606 }
607
Crc32c(ThreadState * thread)608 void Crc32c(ThreadState* thread) {
609 // Checksum about 500MB of data total
610 const int size = 4096;
611 const char* label = "(4K per op)";
612 std::string data(size, 'x');
613 int64_t bytes = 0;
614 uint32_t crc = 0;
615 while (bytes < 500 * 1048576) {
616 crc = crc32c::Value(data.data(), size);
617 thread->stats.FinishedSingleOp();
618 bytes += size;
619 }
620 // Print so result is not dead
621 fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
622
623 thread->stats.AddBytes(bytes);
624 thread->stats.AddMessage(label);
625 }
626
AcquireLoad(ThreadState * thread)627 void AcquireLoad(ThreadState* thread) {
628 int dummy;
629 port::AtomicPointer ap(&dummy);
630 int count = 0;
631 void *ptr = NULL;
632 thread->stats.AddMessage("(each op is 1000 loads)");
633 while (count < 100000) {
634 for (int i = 0; i < 1000; i++) {
635 ptr = ap.Acquire_Load();
636 }
637 count++;
638 thread->stats.FinishedSingleOp();
639 }
640 if (ptr == NULL) exit(1); // Disable unused variable warning.
641 }
642
SnappyCompress(ThreadState * thread)643 void SnappyCompress(ThreadState* thread) {
644 RandomGenerator gen;
645 Slice input = gen.Generate(Options().block_size);
646 int64_t bytes = 0;
647 int64_t produced = 0;
648 bool ok = true;
649 std::string compressed;
650 while (ok && bytes < 1024 * 1048576) { // Compress 1G
651 ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
652 produced += compressed.size();
653 bytes += input.size();
654 thread->stats.FinishedSingleOp();
655 }
656
657 if (!ok) {
658 thread->stats.AddMessage("(snappy failure)");
659 } else {
660 char buf[100];
661 snprintf(buf, sizeof(buf), "(output: %.1f%%)",
662 (produced * 100.0) / bytes);
663 thread->stats.AddMessage(buf);
664 thread->stats.AddBytes(bytes);
665 }
666 }
667
SnappyUncompress(ThreadState * thread)668 void SnappyUncompress(ThreadState* thread) {
669 RandomGenerator gen;
670 Slice input = gen.Generate(Options().block_size);
671 std::string compressed;
672 bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
673 int64_t bytes = 0;
674 char* uncompressed = new char[input.size()];
675 while (ok && bytes < 1024 * 1048576) { // Compress 1G
676 ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
677 uncompressed);
678 bytes += input.size();
679 thread->stats.FinishedSingleOp();
680 }
681 delete[] uncompressed;
682
683 if (!ok) {
684 thread->stats.AddMessage("(snappy failure)");
685 } else {
686 thread->stats.AddBytes(bytes);
687 }
688 }
689
Open()690 void Open() {
691 assert(db_ == NULL);
692 Options options;
693 options.create_if_missing = !FLAGS_use_existing_db;
694 options.block_cache = cache_;
695 options.write_buffer_size = FLAGS_write_buffer_size;
696 options.max_open_files = FLAGS_open_files;
697 options.filter_policy = filter_policy_;
698 Status s = DB::Open(options, FLAGS_db, &db_);
699 if (!s.ok()) {
700 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
701 exit(1);
702 }
703 }
704
WriteSeq(ThreadState * thread)705 void WriteSeq(ThreadState* thread) {
706 DoWrite(thread, true);
707 }
708
WriteRandom(ThreadState * thread)709 void WriteRandom(ThreadState* thread) {
710 DoWrite(thread, false);
711 }
712
DoWrite(ThreadState * thread,bool seq)713 void DoWrite(ThreadState* thread, bool seq) {
714 if (num_ != FLAGS_num) {
715 char msg[100];
716 snprintf(msg, sizeof(msg), "(%d ops)", num_);
717 thread->stats.AddMessage(msg);
718 }
719
720 RandomGenerator gen;
721 WriteBatch batch;
722 Status s;
723 int64_t bytes = 0;
724 for (int i = 0; i < num_; i += entries_per_batch_) {
725 batch.Clear();
726 for (int j = 0; j < entries_per_batch_; j++) {
727 const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
728 char key[100];
729 snprintf(key, sizeof(key), "%016d", k);
730 batch.Put(key, gen.Generate(value_size_));
731 bytes += value_size_ + strlen(key);
732 thread->stats.FinishedSingleOp();
733 }
734 s = db_->Write(write_options_, &batch);
735 if (!s.ok()) {
736 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
737 exit(1);
738 }
739 }
740 thread->stats.AddBytes(bytes);
741 }
742
ReadSequential(ThreadState * thread)743 void ReadSequential(ThreadState* thread) {
744 Iterator* iter = db_->NewIterator(ReadOptions());
745 int i = 0;
746 int64_t bytes = 0;
747 for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
748 bytes += iter->key().size() + iter->value().size();
749 thread->stats.FinishedSingleOp();
750 ++i;
751 }
752 delete iter;
753 thread->stats.AddBytes(bytes);
754 }
755
ReadReverse(ThreadState * thread)756 void ReadReverse(ThreadState* thread) {
757 Iterator* iter = db_->NewIterator(ReadOptions());
758 int i = 0;
759 int64_t bytes = 0;
760 for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
761 bytes += iter->key().size() + iter->value().size();
762 thread->stats.FinishedSingleOp();
763 ++i;
764 }
765 delete iter;
766 thread->stats.AddBytes(bytes);
767 }
768
ReadRandom(ThreadState * thread)769 void ReadRandom(ThreadState* thread) {
770 ReadOptions options;
771 std::string value;
772 int found = 0;
773 for (int i = 0; i < reads_; i++) {
774 char key[100];
775 const int k = thread->rand.Next() % FLAGS_num;
776 snprintf(key, sizeof(key), "%016d", k);
777 if (db_->Get(options, key, &value).ok()) {
778 found++;
779 }
780 thread->stats.FinishedSingleOp();
781 }
782 char msg[100];
783 snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
784 thread->stats.AddMessage(msg);
785 }
786
ReadMissing(ThreadState * thread)787 void ReadMissing(ThreadState* thread) {
788 ReadOptions options;
789 std::string value;
790 for (int i = 0; i < reads_; i++) {
791 char key[100];
792 const int k = thread->rand.Next() % FLAGS_num;
793 snprintf(key, sizeof(key), "%016d.", k);
794 db_->Get(options, key, &value);
795 thread->stats.FinishedSingleOp();
796 }
797 }
798
ReadHot(ThreadState * thread)799 void ReadHot(ThreadState* thread) {
800 ReadOptions options;
801 std::string value;
802 const int range = (FLAGS_num + 99) / 100;
803 for (int i = 0; i < reads_; i++) {
804 char key[100];
805 const int k = thread->rand.Next() % range;
806 snprintf(key, sizeof(key), "%016d", k);
807 db_->Get(options, key, &value);
808 thread->stats.FinishedSingleOp();
809 }
810 }
811
SeekRandom(ThreadState * thread)812 void SeekRandom(ThreadState* thread) {
813 ReadOptions options;
814 std::string value;
815 int found = 0;
816 for (int i = 0; i < reads_; i++) {
817 Iterator* iter = db_->NewIterator(options);
818 char key[100];
819 const int k = thread->rand.Next() % FLAGS_num;
820 snprintf(key, sizeof(key), "%016d", k);
821 iter->Seek(key);
822 if (iter->Valid() && iter->key() == key) found++;
823 delete iter;
824 thread->stats.FinishedSingleOp();
825 }
826 char msg[100];
827 snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
828 thread->stats.AddMessage(msg);
829 }
830
DoDelete(ThreadState * thread,bool seq)831 void DoDelete(ThreadState* thread, bool seq) {
832 RandomGenerator gen;
833 WriteBatch batch;
834 Status s;
835 for (int i = 0; i < num_; i += entries_per_batch_) {
836 batch.Clear();
837 for (int j = 0; j < entries_per_batch_; j++) {
838 const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
839 char key[100];
840 snprintf(key, sizeof(key), "%016d", k);
841 batch.Delete(key);
842 thread->stats.FinishedSingleOp();
843 }
844 s = db_->Write(write_options_, &batch);
845 if (!s.ok()) {
846 fprintf(stderr, "del error: %s\n", s.ToString().c_str());
847 exit(1);
848 }
849 }
850 }
851
DeleteSeq(ThreadState * thread)852 void DeleteSeq(ThreadState* thread) {
853 DoDelete(thread, true);
854 }
855
DeleteRandom(ThreadState * thread)856 void DeleteRandom(ThreadState* thread) {
857 DoDelete(thread, false);
858 }
859
ReadWhileWriting(ThreadState * thread)860 void ReadWhileWriting(ThreadState* thread) {
861 if (thread->tid > 0) {
862 ReadRandom(thread);
863 } else {
864 // Special thread that keeps writing until other threads are done.
865 RandomGenerator gen;
866 while (true) {
867 {
868 MutexLock l(&thread->shared->mu);
869 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
870 // Other threads have finished
871 break;
872 }
873 }
874
875 const int k = thread->rand.Next() % FLAGS_num;
876 char key[100];
877 snprintf(key, sizeof(key), "%016d", k);
878 Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
879 if (!s.ok()) {
880 fprintf(stderr, "put error: %s\n", s.ToString().c_str());
881 exit(1);
882 }
883 }
884
885 // Do not count any of the preceding work/delay in stats.
886 thread->stats.Start();
887 }
888 }
889
Compact(ThreadState * thread)890 void Compact(ThreadState* thread) {
891 db_->CompactRange(NULL, NULL);
892 }
893
PrintStats(const char * key)894 void PrintStats(const char* key) {
895 std::string stats;
896 if (!db_->GetProperty(key, &stats)) {
897 stats = "(failed)";
898 }
899 fprintf(stdout, "\n%s\n", stats.c_str());
900 }
901
WriteToFile(void * arg,const char * buf,int n)902 static void WriteToFile(void* arg, const char* buf, int n) {
903 reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
904 }
905
HeapProfile()906 void HeapProfile() {
907 char fname[100];
908 snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
909 WritableFile* file;
910 Status s = Env::Default()->NewWritableFile(fname, &file);
911 if (!s.ok()) {
912 fprintf(stderr, "%s\n", s.ToString().c_str());
913 return;
914 }
915 bool ok = port::GetHeapProfile(WriteToFile, file);
916 delete file;
917 if (!ok) {
918 fprintf(stderr, "heap profiling not supported\n");
919 Env::Default()->DeleteFile(fname);
920 }
921 }
922 };
923
924 } // namespace leveldb
925
main(int argc,char ** argv)926 int main(int argc, char** argv) {
927 FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
928 FLAGS_open_files = leveldb::Options().max_open_files;
929 std::string default_db_path;
930
931 for (int i = 1; i < argc; i++) {
932 double d;
933 int n;
934 char junk;
935 if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
936 FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
937 } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
938 FLAGS_compression_ratio = d;
939 } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
940 (n == 0 || n == 1)) {
941 FLAGS_histogram = n;
942 } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
943 (n == 0 || n == 1)) {
944 FLAGS_use_existing_db = n;
945 } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
946 FLAGS_num = n;
947 } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
948 FLAGS_reads = n;
949 } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
950 FLAGS_threads = n;
951 } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
952 FLAGS_value_size = n;
953 } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
954 FLAGS_write_buffer_size = n;
955 } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
956 FLAGS_cache_size = n;
957 } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
958 FLAGS_bloom_bits = n;
959 } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
960 FLAGS_open_files = n;
961 } else if (strncmp(argv[i], "--db=", 5) == 0) {
962 FLAGS_db = argv[i] + 5;
963 } else {
964 fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
965 exit(1);
966 }
967 }
968
969 // Choose a location for the test database if none given with --db=<path>
970 if (FLAGS_db == NULL) {
971 leveldb::Env::Default()->GetTestDirectory(&default_db_path);
972 default_db_path += "/dbbench";
973 FLAGS_db = default_db_path.c_str();
974 }
975
976 leveldb::Benchmark benchmark;
977 benchmark.Run();
978 return 0;
979 }
980