1 // Use ALWAYS at the tag level. Control is performed manually during command
2 // line processing.
3 #define ATRACE_TAG ATRACE_TAG_ALWAYS
4 #include <utils/Trace.h>
5
6 #include <base/files/file_util.h>
7 #include <base/logging.h>
8 #include <base/strings/string_split.h>
9 #include <errno.h>
10 #include <getopt.h>
11 #include <pdx/client.h>
12 #include <pdx/default_transport/client_channel_factory.h>
13 #include <pdx/default_transport/service_endpoint.h>
14 #include <pdx/rpc/buffer_wrapper.h>
15 #include <pdx/rpc/default_initialization_allocator.h>
16 #include <pdx/rpc/message_buffer.h>
17 #include <pdx/rpc/remote_method.h>
18 #include <pdx/rpc/serializable.h>
19 #include <pdx/service.h>
20 #include <sys/prctl.h>
21 #include <time.h>
22 #include <unistd.h>
23
24 #include <atomic>
25 #include <cstdlib>
26 #include <functional>
27 #include <future>
28 #include <iomanip>
29 #include <ios>
30 #include <iostream>
31 #include <memory>
32 #include <numeric>
33 #include <sstream>
34 #include <string>
35 #include <thread>
36 #include <vector>
37
38 using android::pdx::Channel;
39 using android::pdx::ClientBase;
40 using android::pdx::Endpoint;
41 using android::pdx::ErrorStatus;
42 using android::pdx::Message;
43 using android::pdx::Service;
44 using android::pdx::ServiceBase;
45 using android::pdx::default_transport::ClientChannelFactory;
46 using android::pdx::Status;
47 using android::pdx::Transaction;
48 using android::pdx::rpc::BufferWrapper;
49 using android::pdx::rpc::DefaultInitializationAllocator;
50 using android::pdx::rpc::MessageBuffer;
51 using android::pdx::rpc::DispatchRemoteMethod;
52 using android::pdx::rpc::RemoteMethodReturn;
53 using android::pdx::rpc::ReplyBuffer;
54 using android::pdx::rpc::Void;
55 using android::pdx::rpc::WrapBuffer;
56
57 namespace {
58
59 constexpr size_t kMaxMessageSize = 4096 * 1024;
60
GetServicePath(const std::string & path,int instance_id)61 std::string GetServicePath(const std::string& path, int instance_id) {
62 return path + std::to_string(instance_id);
63 }
64
SetThreadName(const std::string & name)65 void SetThreadName(const std::string& name) {
66 prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name.c_str()), 0, 0, 0);
67 }
68
69 constexpr uint64_t kNanosPerSecond = 1000000000LLU;
70
GetClockNs()71 uint64_t GetClockNs() {
72 timespec t;
73 clock_gettime(CLOCK_MONOTONIC, &t);
74 return kNanosPerSecond * t.tv_sec + t.tv_nsec;
75 }
76
77 template <typename T>
ssizeof(const T &)78 ssize_t ssizeof(const T&) {
79 return static_cast<ssize_t>(sizeof(T));
80 }
81
82 class SchedStats {
83 public:
SchedStats()84 SchedStats() : SchedStats(gettid()) {}
SchedStats(pid_t task_id)85 explicit SchedStats(pid_t task_id) : task_id_(task_id) {}
86 SchedStats(const SchedStats&) = default;
87 SchedStats& operator=(const SchedStats&) = default;
88
Update()89 void Update() {
90 const std::string stats_path =
91 "/proc/" + std::to_string(task_id_) + "/schedstat";
92
93 std::string line;
94 base::ReadFileToString(base::FilePath{stats_path}, &line);
95 std::vector<std::string> stats = base::SplitString(
96 line, " ", base::TRIM_WHITESPACE, base::SPLIT_WANT_ALL);
97
98 CHECK_EQ(3u, stats.size());
99
100 // Calculate the deltas since the last update. Each value is absolute since
101 // the task started.
102 uint64_t current_cpu_time_ns = std::stoull(stats[0]);
103 uint64_t current_wait_ns = std::stoull(stats[1]);
104 uint64_t current_timeslices = std::stoull(stats[2]);
105 cpu_time_ns_ = current_cpu_time_ns - last_cpu_time_ns_;
106 wait_ns_ = current_wait_ns - last_wait_ns_;
107 timeslices_ = current_timeslices - last_timeslices_;
108 last_cpu_time_ns_ = current_cpu_time_ns;
109 last_wait_ns_ = current_wait_ns;
110 last_timeslices_ = current_timeslices;
111 }
112
task_id() const113 pid_t task_id() const { return task_id_; }
cpu_time_ns() const114 uint64_t cpu_time_ns() const { return cpu_time_ns_; }
wait_ns() const115 uint64_t wait_ns() const { return wait_ns_; }
timeslices() const116 uint64_t timeslices() const { return timeslices_; }
117
cpu_time_s() const118 double cpu_time_s() const {
119 return static_cast<double>(cpu_time_ns_) / kNanosPerSecond;
120 }
wait_s() const121 double wait_s() const {
122 return static_cast<double>(wait_ns_) / kNanosPerSecond;
123 }
124
125 private:
126 int32_t task_id_;
127 uint64_t cpu_time_ns_ = 0;
128 uint64_t last_cpu_time_ns_ = 0;
129 uint64_t wait_ns_ = 0;
130 uint64_t last_wait_ns_ = 0;
131 uint64_t timeslices_ = 0;
132 uint64_t last_timeslices_ = 0;
133
134 PDX_SERIALIZABLE_MEMBERS(SchedStats, task_id_, cpu_time_ns_, wait_ns_,
135 timeslices_);
136 };
137
138 // Opcodes for client/service protocol.
139 struct BenchmarkOps {
140 enum : int {
141 Nop,
142 Read,
143 Write,
144 Echo,
145 Stats,
146 WriteVector,
147 EchoVector,
148 Quit,
149 };
150 };
151
152 struct BenchmarkRPC {
153 PDX_REMOTE_METHOD(Stats, BenchmarkOps::Stats,
154 std::tuple<uint64_t, uint64_t, SchedStats>(Void));
155 PDX_REMOTE_METHOD(WriteVector, BenchmarkOps::WriteVector,
156 int(const BufferWrapper<std::vector<uint8_t>> data));
157 PDX_REMOTE_METHOD(EchoVector, BenchmarkOps::EchoVector,
158 BufferWrapper<std::vector<uint8_t>>(
159 const BufferWrapper<std::vector<uint8_t>> data));
160 };
161
162 struct BenchmarkResult {
163 int thread_id = 0;
164 int service_id = 0;
165 double time_delta_s = 0.0;
166 uint64_t bytes_sent = 0;
167 SchedStats sched_stats = {};
168 };
169
170 // Global command line option values.
171 struct Options {
172 bool verbose = false;
173 int threads = 1;
174 int opcode = BenchmarkOps::Read;
175 int blocksize = 1;
176 int count = 1;
177 int instances = 1;
178 int timeout = 1;
179 int warmup = 0;
180 } ProgramOptions;
181
182 // Command line option names.
183 const char kOptionService[] = "service";
184 const char kOptionClient[] = "client";
185 const char kOptionVerbose[] = "verbose";
186 const char kOptionOpcode[] = "op";
187 const char kOptionBlocksize[] = "bs";
188 const char kOptionCount[] = "count";
189 const char kOptionThreads[] = "threads";
190 const char kOptionInstances[] = "instances";
191 const char kOptionTimeout[] = "timeout";
192 const char kOptionTrace[] = "trace";
193 const char kOptionWarmup[] = "warmup";
194
195 // getopt() long options.
196 static option long_options[] = {
197 {kOptionService, required_argument, 0, 0},
198 {kOptionClient, required_argument, 0, 0},
199 {kOptionVerbose, no_argument, 0, 0},
200 {kOptionOpcode, required_argument, 0, 0},
201 {kOptionBlocksize, required_argument, 0, 0},
202 {kOptionCount, required_argument, 0, 0},
203 {kOptionThreads, required_argument, 0, 0},
204 {kOptionInstances, required_argument, 0, 0},
205 {kOptionTimeout, required_argument, 0, 0},
206 {kOptionTrace, no_argument, 0, 0},
207 {kOptionWarmup, required_argument, 0, 0},
208 {0, 0, 0, 0},
209 };
210
211 // Parses the argument for kOptionOpcode and sets the value of
212 // ProgramOptions.opcode.
ParseOpcodeOption(const std::string & argument)213 void ParseOpcodeOption(const std::string& argument) {
214 if (argument == "read") {
215 ProgramOptions.opcode = BenchmarkOps::Read;
216 } else if (argument == "write") {
217 ProgramOptions.opcode = BenchmarkOps::Write;
218 } else if (argument == "echo") {
219 ProgramOptions.opcode = BenchmarkOps::Echo;
220 } else if (argument == "writevec") {
221 ProgramOptions.opcode = BenchmarkOps::WriteVector;
222 } else if (argument == "echovec") {
223 ProgramOptions.opcode = BenchmarkOps::EchoVector;
224 } else if (argument == "quit") {
225 ProgramOptions.opcode = BenchmarkOps::Quit;
226 } else if (argument == "nop") {
227 ProgramOptions.opcode = BenchmarkOps::Nop;
228 } else if (argument == "stats") {
229 ProgramOptions.opcode = BenchmarkOps::Stats;
230 } else {
231 ProgramOptions.opcode = std::stoi(argument);
232 }
233 }
234
235 // Implements the service side of the benchmark.
236 class BenchmarkService : public ServiceBase<BenchmarkService> {
237 public:
OnChannelOpen(Message & message)238 std::shared_ptr<Channel> OnChannelOpen(Message& message) override {
239 VLOG(1) << "BenchmarkService::OnChannelCreate: cid="
240 << message.GetChannelId();
241 return nullptr;
242 }
243
OnChannelClose(Message & message,const std::shared_ptr<Channel> &)244 void OnChannelClose(Message& message,
245 const std::shared_ptr<Channel>& /*channel*/) override {
246 VLOG(1) << "BenchmarkService::OnChannelClose: cid="
247 << message.GetChannelId();
248 }
249
HandleMessage(Message & message)250 Status<void> HandleMessage(Message& message) override {
251 ATRACE_NAME("BenchmarkService::HandleMessage");
252
253 switch (message.GetOp()) {
254 case BenchmarkOps::Nop:
255 VLOG(1) << "BenchmarkService::HandleMessage: op=nop";
256 {
257 ATRACE_NAME("Reply");
258 CHECK(message.Reply(0));
259 }
260 return {};
261
262 case BenchmarkOps::Write: {
263 VLOG(1) << "BenchmarkService::HandleMessage: op=write send_length="
264 << message.GetSendLength()
265 << " receive_length=" << message.GetReceiveLength();
266
267 Status<void> status;
268 if (message.GetSendLength())
269 status = message.ReadAll(send_buffer.data(), message.GetSendLength());
270
271 {
272 ATRACE_NAME("Reply");
273 if (!status)
274 CHECK(message.ReplyError(status.error()));
275 else
276 CHECK(message.Reply(message.GetSendLength()));
277 }
278 return {};
279 }
280
281 case BenchmarkOps::Read: {
282 VLOG(1) << "BenchmarkService::HandleMessage: op=read send_length="
283 << message.GetSendLength()
284 << " receive_length=" << message.GetReceiveLength();
285
286 Status<void> status;
287 if (message.GetReceiveLength()) {
288 status = message.WriteAll(receive_buffer.data(),
289 message.GetReceiveLength());
290 }
291
292 {
293 ATRACE_NAME("Reply");
294 if (!status)
295 CHECK(message.ReplyError(status.error()));
296 else
297 CHECK(message.Reply(message.GetReceiveLength()));
298 }
299 return {};
300 }
301
302 case BenchmarkOps::Echo: {
303 VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
304 << message.GetSendLength()
305 << " receive_length=" << message.GetReceiveLength();
306
307 Status<void> status;
308 if (message.GetSendLength())
309 status = message.ReadAll(send_buffer.data(), message.GetSendLength());
310
311 if (!status) {
312 CHECK(message.ReplyError(status.error()));
313 return {};
314 }
315
316 if (message.GetSendLength()) {
317 status =
318 message.WriteAll(send_buffer.data(), message.GetSendLength());
319 }
320
321 {
322 ATRACE_NAME("Reply");
323 if (!status)
324 CHECK(message.ReplyError(status.error()));
325 else
326 CHECK(message.Reply(message.GetSendLength()));
327 }
328 return {};
329 }
330
331 case BenchmarkOps::Stats: {
332 VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
333 << message.GetSendLength()
334 << " receive_length=" << message.GetReceiveLength();
335
336 // Snapshot the stats when the message is received.
337 const uint64_t receive_time_ns = GetClockNs();
338 sched_stats_.Update();
339
340 // Use the RPC system to return the results.
341 RemoteMethodReturn<BenchmarkRPC::Stats>(
342 message, BenchmarkRPC::Stats::Return{receive_time_ns, GetClockNs(),
343 sched_stats_});
344 return {};
345 }
346
347 case BenchmarkOps::WriteVector:
348 VLOG(1) << "BenchmarkService::HandleMessage: op=writevec send_length="
349 << message.GetSendLength()
350 << " receive_length=" << message.GetReceiveLength();
351
352 DispatchRemoteMethod<BenchmarkRPC::WriteVector>(
353 *this, &BenchmarkService::OnWriteVector, message, kMaxMessageSize);
354 return {};
355
356 case BenchmarkOps::EchoVector:
357 VLOG(1) << "BenchmarkService::HandleMessage: op=echovec send_length="
358 << message.GetSendLength()
359 << " receive_length=" << message.GetReceiveLength();
360
361 DispatchRemoteMethod<BenchmarkRPC::EchoVector>(
362 *this, &BenchmarkService::OnEchoVector, message, kMaxMessageSize);
363 return {};
364
365 case BenchmarkOps::Quit:
366 Cancel();
367 return ErrorStatus{ESHUTDOWN};
368
369 default:
370 VLOG(1) << "BenchmarkService::HandleMessage: default case; op="
371 << message.GetOp();
372 return Service::DefaultHandleMessage(message);
373 }
374 }
375
376 // Updates the scheduler stats from procfs for this thread.
UpdateSchedStats()377 void UpdateSchedStats() { sched_stats_.Update(); }
378
379 private:
380 friend BASE;
381
BenchmarkService(std::unique_ptr<Endpoint> endpoint)382 explicit BenchmarkService(std::unique_ptr<Endpoint> endpoint)
383 : BASE("BenchmarkService", std::move(endpoint)),
384 send_buffer(kMaxMessageSize),
385 receive_buffer(kMaxMessageSize) {}
386
387 std::vector<uint8_t> send_buffer;
388 std::vector<uint8_t> receive_buffer;
389
390 // Each service thread has its own scheduler stats object.
391 static thread_local SchedStats sched_stats_;
392
393 using BufferType = BufferWrapper<
394 std::vector<uint8_t, DefaultInitializationAllocator<uint8_t>>>;
395
OnWriteVector(Message &,const BufferType & data)396 int OnWriteVector(Message&, const BufferType& data) { return data.size(); }
OnEchoVector(Message &,BufferType && data)397 BufferType OnEchoVector(Message&, BufferType&& data) {
398 return std::move(data);
399 }
400
401 BenchmarkService(const BenchmarkService&) = delete;
402 void operator=(const BenchmarkService&) = delete;
403 };
404
405 thread_local SchedStats BenchmarkService::sched_stats_;
406
407 // Implements the client side of the benchmark.
408 class BenchmarkClient : public ClientBase<BenchmarkClient> {
409 public:
Nop()410 int Nop() {
411 ATRACE_NAME("BenchmarkClient::Nop");
412 VLOG(1) << "BenchmarkClient::Nop";
413 Transaction transaction{*this};
414 return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Nop));
415 }
416
Write(const void * buffer,size_t length)417 int Write(const void* buffer, size_t length) {
418 ATRACE_NAME("BenchmarkClient::Write");
419 VLOG(1) << "BenchmarkClient::Write: buffer=" << buffer
420 << " length=" << length;
421 Transaction transaction{*this};
422 return ReturnStatusOrError(
423 transaction.Send<int>(BenchmarkOps::Write, buffer, length, nullptr, 0));
424 // return write(endpoint_fd(), buffer, length);
425 }
426
Read(void * buffer,size_t length)427 int Read(void* buffer, size_t length) {
428 ATRACE_NAME("BenchmarkClient::Read");
429 VLOG(1) << "BenchmarkClient::Read: buffer=" << buffer
430 << " length=" << length;
431 Transaction transaction{*this};
432 return ReturnStatusOrError(
433 transaction.Send<int>(BenchmarkOps::Read, nullptr, 0, buffer, length));
434 // return read(endpoint_fd(), buffer, length);
435 }
436
Echo(const void * send_buffer,size_t send_length,void * receive_buffer,size_t receive_length)437 int Echo(const void* send_buffer, size_t send_length, void* receive_buffer,
438 size_t receive_length) {
439 ATRACE_NAME("BenchmarkClient::Echo");
440 VLOG(1) << "BenchmarkClient::Echo: send_buffer=" << send_buffer
441 << " send_length=" << send_length
442 << " receive_buffer=" << receive_buffer
443 << " receive_length=" << receive_length;
444 Transaction transaction{*this};
445 return ReturnStatusOrError(
446 transaction.Send<int>(BenchmarkOps::Echo, send_buffer, send_length,
447 receive_buffer, receive_length));
448 }
449
Stats(std::tuple<uint64_t,uint64_t,SchedStats> * stats_out)450 int Stats(std::tuple<uint64_t, uint64_t, SchedStats>* stats_out) {
451 ATRACE_NAME("BenchmarkClient::Stats");
452 VLOG(1) << "BenchmarkClient::Stats";
453
454 auto status = InvokeRemoteMethodInPlace<BenchmarkRPC::Stats>(stats_out);
455 return status ? 0 : -status.error();
456 }
457
WriteVector(const BufferWrapper<std::vector<uint8_t>> & data)458 int WriteVector(const BufferWrapper<std::vector<uint8_t>>& data) {
459 ATRACE_NAME("BenchmarkClient::Stats");
460 VLOG(1) << "BenchmarkClient::Stats";
461
462 auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
463 return ReturnStatusOrError(status);
464 }
465
466 template <typename T>
WriteVector(const BufferWrapper<T> & data)467 int WriteVector(const BufferWrapper<T>& data) {
468 ATRACE_NAME("BenchmarkClient::WriteVector");
469 VLOG(1) << "BenchmarkClient::WriteVector";
470
471 auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
472 return ReturnStatusOrError(status);
473 }
474
475 template <typename T, typename U>
EchoVector(const BufferWrapper<T> & data,BufferWrapper<U> * data_out)476 int EchoVector(const BufferWrapper<T>& data, BufferWrapper<U>* data_out) {
477 ATRACE_NAME("BenchmarkClient::EchoVector");
478 VLOG(1) << "BenchmarkClient::EchoVector";
479
480 MessageBuffer<ReplyBuffer>::Reserve(kMaxMessageSize - 1);
481 auto status =
482 InvokeRemoteMethodInPlace<BenchmarkRPC::EchoVector>(data_out, data);
483 return status ? 0 : -status.error();
484 }
485
Quit()486 int Quit() {
487 VLOG(1) << "BenchmarkClient::Quit";
488 Transaction transaction{*this};
489 return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Echo));
490 }
491
492 private:
493 friend BASE;
494
BenchmarkClient(const std::string & service_path)495 explicit BenchmarkClient(const std::string& service_path)
496 : BASE(ClientChannelFactory::Create(service_path),
497 ProgramOptions.timeout) {}
498
499 BenchmarkClient(const BenchmarkClient&) = delete;
500 void operator=(const BenchmarkClient&) = delete;
501 };
502
503 // Creates a benchmark service at |path| and dispatches messages.
ServiceCommand(const std::string & path)504 int ServiceCommand(const std::string& path) {
505 if (path.empty())
506 return -EINVAL;
507
508 // Start the requested number of dispatch threads.
509 std::vector<std::thread> dispatch_threads;
510 int service_count = ProgramOptions.instances;
511 int service_id_counter = 0;
512 int thread_id_counter = 0;
513 std::atomic<bool> done(false);
514
515 while (service_count--) {
516 std::cerr << "Starting service instance " << service_id_counter
517 << std::endl;
518 auto service = BenchmarkService::Create(
519 android::pdx::default_transport::Endpoint::CreateAndBindSocket(
520 GetServicePath(path, service_id_counter),
521 android::pdx::default_transport::Endpoint::kBlocking));
522 if (!service) {
523 std::cerr << "Failed to create service instance!!" << std::endl;
524 done = true;
525 break;
526 }
527
528 int thread_count = ProgramOptions.threads;
529 while (thread_count--) {
530 std::cerr << "Starting dispatch thread " << thread_id_counter
531 << " service " << service_id_counter << std::endl;
532
533 dispatch_threads.emplace_back(
534 [&](const int thread_id, const int service_id,
535 const std::shared_ptr<BenchmarkService>& local_service) {
536 SetThreadName("service" + std::to_string(service_id));
537
538 // Read the initial schedstats for this thread from procfs.
539 local_service->UpdateSchedStats();
540
541 ATRACE_NAME("BenchmarkService::Dispatch");
542 while (!done) {
543 auto ret = local_service->ReceiveAndDispatch();
544 if (!ret) {
545 if (ret.error() != ESHUTDOWN) {
546 std::cerr << "Error while dispatching message on thread "
547 << thread_id << " service " << service_id << ": "
548 << ret.GetErrorMessage() << std::endl;
549 } else {
550 std::cerr << "Quitting thread " << thread_id << " service "
551 << service_id << std::endl;
552 }
553 done = true;
554 return;
555 }
556 }
557 },
558 thread_id_counter++, service_id_counter, service);
559 }
560
561 service_id_counter++;
562 }
563
564 // Wait for the dispatch threads to exit.
565 for (auto& thread : dispatch_threads) {
566 thread.join();
567 }
568
569 return 0;
570 }
571
ClientCommand(const std::string & path)572 int ClientCommand(const std::string& path) {
573 // Start the requested number of client threads.
574 std::vector<std::thread> client_threads;
575 std::vector<std::future<BenchmarkResult>> client_results;
576 int service_count = ProgramOptions.instances;
577 int thread_id_counter = 0;
578 int service_id_counter = 0;
579
580 // Aggregate statistics, updated when worker threads exit.
581 std::atomic<uint64_t> total_bytes(0);
582 std::atomic<uint64_t> total_time_ns(0);
583
584 // Samples for variance calculation.
585 std::vector<uint64_t> latency_samples_ns(
586 ProgramOptions.instances * ProgramOptions.threads * ProgramOptions.count);
587 const size_t samples_per_thread = ProgramOptions.count;
588
589 std::vector<uint8_t> send_buffer(ProgramOptions.blocksize);
590 std::vector<uint8_t> receive_buffer(kMaxMessageSize);
591
592 // Barriers for synchronizing thread start.
593 std::vector<std::future<void>> ready_barrier_futures;
594 std::promise<void> go_barrier_promise;
595 std::future<void> go_barrier_future = go_barrier_promise.get_future();
596
597 // Barrier for synchronizing thread tear down.
598 std::promise<void> done_barrier_promise;
599 std::future<void> done_barrier_future = done_barrier_promise.get_future();
600
601 while (service_count--) {
602 int thread_count = ProgramOptions.threads;
603 while (thread_count--) {
604 std::cerr << "Starting client thread " << thread_id_counter << " service "
605 << service_id_counter << std::endl;
606
607 std::promise<BenchmarkResult> result_promise;
608 client_results.push_back(result_promise.get_future());
609
610 std::promise<void> ready_barrier_promise;
611 ready_barrier_futures.push_back(ready_barrier_promise.get_future());
612
613 client_threads.emplace_back(
614 [&](const int thread_id, const int service_id,
615 std::promise<BenchmarkResult> result, std::promise<void> ready) {
616 SetThreadName("client" + std::to_string(thread_id) + "/" +
617 std::to_string(service_id));
618
619 ATRACE_NAME("BenchmarkClient::Dispatch");
620
621 auto client =
622 BenchmarkClient::Create(GetServicePath(path, service_id));
623 if (!client) {
624 std::cerr << "Failed to create client for service " << service_id
625 << std::endl;
626 return -ENOMEM;
627 }
628
629 uint64_t* thread_samples =
630 &latency_samples_ns[samples_per_thread * thread_id];
631
632 // Per-thread statistics.
633 uint64_t bytes_sent = 0;
634 uint64_t time_start_ns;
635 uint64_t time_end_ns;
636 SchedStats sched_stats;
637
638 // Signal ready and wait for go.
639 ready.set_value();
640 go_barrier_future.wait();
641
642 // Warmup the scheduler.
643 int warmup = ProgramOptions.warmup;
644 while (warmup--) {
645 for (int i = 0; i < 1000000; i++)
646 ;
647 }
648
649 sched_stats.Update();
650 time_start_ns = GetClockNs();
651
652 int count = ProgramOptions.count;
653 while (count--) {
654 uint64_t iteration_start_ns = GetClockNs();
655
656 switch (ProgramOptions.opcode) {
657 case BenchmarkOps::Nop: {
658 const int ret = client->Nop();
659 if (ret < 0) {
660 std::cerr << "Failed to send nop: " << strerror(-ret)
661 << std::endl;
662 return ret;
663 } else {
664 VLOG(1) << "Success";
665 }
666 break;
667 }
668
669 case BenchmarkOps::Read: {
670 const int ret = client->Read(receive_buffer.data(),
671 ProgramOptions.blocksize);
672 if (ret < 0) {
673 std::cerr << "Failed to read: " << strerror(-ret)
674 << std::endl;
675 return ret;
676 } else if (ret != ProgramOptions.blocksize) {
677 std::cerr << "Expected ret=" << ProgramOptions.blocksize
678 << "; actual ret=" << ret << std::endl;
679 return -EINVAL;
680 } else {
681 VLOG(1) << "Success";
682 bytes_sent += ret;
683 }
684 break;
685 }
686
687 case BenchmarkOps::Write: {
688 const int ret =
689 client->Write(send_buffer.data(), send_buffer.size());
690 if (ret < 0) {
691 std::cerr << "Failed to write: " << strerror(-ret)
692 << std::endl;
693 return ret;
694 } else if (ret != ProgramOptions.blocksize) {
695 std::cerr << "Expected ret=" << ProgramOptions.blocksize
696 << "; actual ret=" << ret << std::endl;
697 return -EINVAL;
698 } else {
699 VLOG(1) << "Success";
700 bytes_sent += ret;
701 }
702 break;
703 }
704
705 case BenchmarkOps::Echo: {
706 const int ret = client->Echo(
707 send_buffer.data(), send_buffer.size(),
708 receive_buffer.data(), receive_buffer.size());
709 if (ret < 0) {
710 std::cerr << "Failed to echo: " << strerror(-ret)
711 << std::endl;
712 return ret;
713 } else if (ret != ProgramOptions.blocksize) {
714 std::cerr << "Expected ret=" << ProgramOptions.blocksize
715 << "; actual ret=" << ret << std::endl;
716 return -EINVAL;
717 } else {
718 VLOG(1) << "Success";
719 bytes_sent += ret * 2;
720 }
721 break;
722 }
723
724 case BenchmarkOps::Stats: {
725 std::tuple<uint64_t, uint64_t, SchedStats> stats;
726 const int ret = client->Stats(&stats);
727 if (ret < 0) {
728 std::cerr << "Failed to get stats: " << strerror(-ret)
729 << std::endl;
730 return ret;
731 } else {
732 VLOG(1) << "Success";
733 std::cerr
734 << "Round trip: receive_time_ns=" << std::get<0>(stats)
735 << " reply_time_ns=" << std::get<1>(stats)
736 << " cpu_time_s=" << std::get<2>(stats).cpu_time_s()
737 << " wait_s=" << std::get<2>(stats).wait_s()
738 << std::endl;
739 }
740 break;
741 }
742
743 case BenchmarkOps::WriteVector: {
744 const int ret = client->WriteVector(
745 WrapBuffer(send_buffer.data(), ProgramOptions.blocksize));
746 if (ret < 0) {
747 std::cerr << "Failed to write vector: " << strerror(-ret)
748 << std::endl;
749 return ret;
750 } else {
751 VLOG(1) << "Success";
752 bytes_sent += ret;
753 }
754 break;
755 }
756
757 case BenchmarkOps::EchoVector: {
758 thread_local BufferWrapper<std::vector<
759 uint8_t, DefaultInitializationAllocator<uint8_t>>>
760 response_buffer;
761 const int ret = client->EchoVector(
762 WrapBuffer(send_buffer.data(), ProgramOptions.blocksize),
763 &response_buffer);
764 if (ret < 0) {
765 std::cerr << "Failed to echo vector: " << strerror(-ret)
766 << std::endl;
767 return ret;
768 } else {
769 VLOG(1) << "Success";
770 bytes_sent += send_buffer.size() + response_buffer.size();
771 }
772 break;
773 }
774
775 case BenchmarkOps::Quit: {
776 const int ret = client->Quit();
777 if (ret < 0 && ret != -ESHUTDOWN) {
778 std::cerr << "Failed to send quit: " << strerror(-ret);
779 return ret;
780 } else {
781 VLOG(1) << "Success";
782 }
783 break;
784 }
785
786 default:
787 std::cerr
788 << "Invalid client operation: " << ProgramOptions.opcode
789 << std::endl;
790 return -EINVAL;
791 }
792
793 uint64_t iteration_end_ns = GetClockNs();
794 uint64_t iteration_delta_ns =
795 iteration_end_ns - iteration_start_ns;
796 thread_samples[count] = iteration_delta_ns;
797
798 if (iteration_delta_ns > (kNanosPerSecond / 100)) {
799 SchedStats stats = sched_stats;
800 stats.Update();
801 std::cerr << "Thread " << thread_id << " iteration_delta_s="
802 << (static_cast<double>(iteration_delta_ns) /
803 kNanosPerSecond)
804 << " " << stats.cpu_time_s() << " " << stats.wait_s()
805 << std::endl;
806 }
807 }
808
809 time_end_ns = GetClockNs();
810 sched_stats.Update();
811
812 const double time_delta_s =
813 static_cast<double>(time_end_ns - time_start_ns) /
814 kNanosPerSecond;
815
816 total_bytes += bytes_sent;
817 total_time_ns += time_end_ns - time_start_ns;
818
819 result.set_value(
820 {thread_id, service_id, time_delta_s, bytes_sent, sched_stats});
821 done_barrier_future.wait();
822
823 return 0;
824 },
825 thread_id_counter++, service_id_counter, std::move(result_promise),
826 std::move(ready_barrier_promise));
827 }
828
829 service_id_counter++;
830 }
831
832 // Wait for workers to be ready.
833 std::cerr << "Waiting for workers to be ready..." << std::endl;
834 for (auto& ready : ready_barrier_futures)
835 ready.wait();
836
837 // Signal workers to go.
838 std::cerr << "Kicking off benchmark." << std::endl;
839 go_barrier_promise.set_value();
840
841 // Wait for all the worker threas to finish.
842 for (auto& result : client_results)
843 result.wait();
844
845 // Report worker thread results.
846 for (auto& result : client_results) {
847 BenchmarkResult benchmark_result = result.get();
848 std::cerr << std::fixed << "Thread " << benchmark_result.thread_id
849 << " service " << benchmark_result.service_id << ":" << std::endl;
850 std::cerr << "\t " << benchmark_result.bytes_sent << " bytes in "
851 << benchmark_result.time_delta_s << " seconds ("
852 << std::setprecision(0) << (benchmark_result.bytes_sent / 1024.0 /
853 benchmark_result.time_delta_s)
854 << " K/s; " << std::setprecision(3)
855 << (ProgramOptions.count / benchmark_result.time_delta_s)
856 << " txn/s; " << std::setprecision(9)
857 << (benchmark_result.time_delta_s / ProgramOptions.count)
858 << " s/txn)" << std::endl;
859 std::cerr << "\tStats: " << benchmark_result.sched_stats.cpu_time_s() << " "
860 << (benchmark_result.sched_stats.cpu_time_s() /
861 ProgramOptions.count)
862 << " " << benchmark_result.sched_stats.wait_s() << " "
863 << (benchmark_result.sched_stats.wait_s() / ProgramOptions.count)
864 << " " << benchmark_result.sched_stats.timeslices() << std::endl;
865 }
866
867 // Signal worker threads to exit.
868 done_barrier_promise.set_value();
869
870 // Wait for the worker threads to exit.
871 for (auto& thread : client_threads) {
872 thread.join();
873 }
874
875 // Report aggregate results.
876 const int total_threads = ProgramOptions.threads * ProgramOptions.instances;
877 const int iterations = ProgramOptions.count;
878 const double total_time_s =
879 static_cast<double>(total_time_ns) / kNanosPerSecond;
880 // This is about how much wall time it took to completely transfer all the
881 // paylaods.
882 const double average_time_s = total_time_s / total_threads;
883
884 const uint64_t min_sample_time_ns =
885 *std::min_element(latency_samples_ns.begin(), latency_samples_ns.end());
886 const double min_sample_time_s =
887 static_cast<double>(min_sample_time_ns) / kNanosPerSecond;
888
889 const uint64_t max_sample_time_ns =
890 *std::max_element(latency_samples_ns.begin(), latency_samples_ns.end());
891 const double max_sample_time_s =
892 static_cast<double>(max_sample_time_ns) / kNanosPerSecond;
893
894 const double total_sample_time_s =
895 std::accumulate(latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
896 [](double s, uint64_t ns) {
897 return s + static_cast<double>(ns) / kNanosPerSecond;
898 });
899 const double average_sample_time_s =
900 total_sample_time_s / latency_samples_ns.size();
901
902 const double sum_of_squared_deviations = std::accumulate(
903 latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
904 [&](double s, uint64_t ns) {
905 const double delta =
906 static_cast<double>(ns) / kNanosPerSecond - average_sample_time_s;
907 return s + delta * delta;
908 });
909 const double variance = sum_of_squared_deviations / latency_samples_ns.size();
910 const double standard_deviation = std::sqrt(variance);
911
912 const int num_buckets = 200;
913 const uint64_t sample_range_ns = max_sample_time_ns - min_sample_time_ns;
914 const uint64_t ns_per_bucket = sample_range_ns / num_buckets;
915 std::array<uint64_t, num_buckets> sample_buckets = {{0}};
916
917 // Count samples in each bucket range.
918 for (uint64_t sample_ns : latency_samples_ns) {
919 sample_buckets[(sample_ns - min_sample_time_ns) / (ns_per_bucket + 1)] += 1;
920 }
921
922 // Calculate population percentiles.
923 const uint64_t percent_50 =
924 static_cast<uint64_t>(latency_samples_ns.size() * 0.5);
925 const uint64_t percent_90 =
926 static_cast<uint64_t>(latency_samples_ns.size() * 0.9);
927 const uint64_t percent_95 =
928 static_cast<uint64_t>(latency_samples_ns.size() * 0.95);
929 const uint64_t percent_99 =
930 static_cast<uint64_t>(latency_samples_ns.size() * 0.99);
931
932 uint64_t sample_count = 0;
933 double latency_50th_percentile_s, latency_90th_percentile_s,
934 latency_95th_percentile_s, latency_99th_percentile_s;
935 for (int i = 0; i < num_buckets; i++) {
936 // Report the midpoint of the bucket range as the value of the
937 // corresponding
938 // percentile.
939 const double bucket_midpoint_time_s =
940 (ns_per_bucket * i + 0.5 * ns_per_bucket + min_sample_time_ns) /
941 kNanosPerSecond;
942 if (sample_count < percent_50 &&
943 (sample_count + sample_buckets[i]) >= percent_50) {
944 latency_50th_percentile_s = bucket_midpoint_time_s;
945 }
946 if (sample_count < percent_90 &&
947 (sample_count + sample_buckets[i]) >= percent_90) {
948 latency_90th_percentile_s = bucket_midpoint_time_s;
949 }
950 if (sample_count < percent_95 &&
951 (sample_count + sample_buckets[i]) >= percent_95) {
952 latency_95th_percentile_s = bucket_midpoint_time_s;
953 }
954 if (sample_count < percent_99 &&
955 (sample_count + sample_buckets[i]) >= percent_99) {
956 latency_99th_percentile_s = bucket_midpoint_time_s;
957 }
958 sample_count += sample_buckets[i];
959 }
960
961 std::cerr << std::fixed << "Total throughput over " << total_threads
962 << " threads:\n\t " << total_bytes << " bytes in " << average_time_s
963 << " seconds (" << std::setprecision(0)
964 << (total_bytes / 1024.0 / average_time_s) << " K/s; "
965 << std::setprecision(3)
966 << (iterations * total_threads / average_time_s)
967 << std::setprecision(9) << " txn/s; "
968 << (average_time_s / (iterations * total_threads)) << " s/txn)"
969 << std::endl;
970 std::cerr << "Sample statistics: " << std::endl;
971 std::cerr << total_sample_time_s << " s total sample time" << std::endl;
972 std::cerr << average_sample_time_s << " s avg" << std::endl;
973 std::cerr << standard_deviation << " s std dev" << std::endl;
974 std::cerr << min_sample_time_s << " s min" << std::endl;
975 std::cerr << max_sample_time_s << " s max" << std::endl;
976 std::cerr << "Latency percentiles:" << std::endl;
977 std::cerr << "50th: " << latency_50th_percentile_s << " s" << std::endl;
978 std::cerr << "90th: " << latency_90th_percentile_s << " s" << std::endl;
979 std::cerr << "95th: " << latency_95th_percentile_s << " s" << std::endl;
980 std::cerr << "99th: " << latency_99th_percentile_s << " s" << std::endl;
981
982 std::cout << total_time_ns << " " << std::fixed << std::setprecision(9)
983 << average_sample_time_s << " " << std::fixed
984 << std::setprecision(9) << standard_deviation << std::endl;
985 return 0;
986 }
987
Usage(const std::string & command_name)988 int Usage(const std::string& command_name) {
989 // clang-format off
990 std::cout << "Usage: " << command_name << " [options]" << std::endl;
991 std::cout << "\t--verbose : Use verbose messages." << std::endl;
992 std::cout << "\t--service <endpoint path> : Start service at the given path." << std::endl;
993 std::cout << "\t--client <endpoint path> : Start client to the given path." << std::endl;
994 std::cout << "\t--op <read | write | echo> : Sepcify client operation mode." << std::endl;
995 std::cout << "\t--bs <block size bytes> : Sepcify block size to use." << std::endl;
996 std::cout << "\t--count <count> : Sepcify number of transactions to make." << std::endl;
997 std::cout << "\t--instances <count> : Specify number of service instances." << std::endl;
998 std::cout << "\t--threads <count> : Sepcify number of threads per instance." << std::endl;
999 std::cout << "\t--timeout <timeout ms | -1> : Timeout to wait for services." << std::endl;
1000 std::cout << "\t--trace : Enable systrace logging." << std::endl;
1001 std::cout << "\t--warmup <iterations> : Busy loops before running benchmarks." << std::endl;
1002 // clang-format on
1003 return -1;
1004 }
1005
1006 } // anonymous namespace
1007
main(int argc,char ** argv)1008 int main(int argc, char** argv) {
1009 logging::LoggingSettings logging_settings;
1010 logging_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
1011 logging::InitLogging(logging_settings);
1012
1013 int getopt_code;
1014 int option_index;
1015 std::string option = "";
1016 std::string command = "";
1017 std::string command_argument = "";
1018 bool tracing_enabled = false;
1019
1020 // Process command line options.
1021 while ((getopt_code =
1022 getopt_long(argc, argv, "", long_options, &option_index)) != -1) {
1023 option = long_options[option_index].name;
1024 VLOG(1) << "option=" << option;
1025 switch (getopt_code) {
1026 case 0:
1027 if (option == kOptionVerbose) {
1028 ProgramOptions.verbose = true;
1029 logging::SetMinLogLevel(-1);
1030 } else if (option == kOptionOpcode) {
1031 ParseOpcodeOption(optarg);
1032 } else if (option == kOptionBlocksize) {
1033 ProgramOptions.blocksize = std::stoi(optarg);
1034 if (ProgramOptions.blocksize < 0) {
1035 std::cerr << "Invalid blocksize argument: "
1036 << ProgramOptions.blocksize << std::endl;
1037 return -EINVAL;
1038 }
1039 } else if (option == kOptionCount) {
1040 ProgramOptions.count = std::stoi(optarg);
1041 if (ProgramOptions.count < 1) {
1042 std::cerr << "Invalid count argument: " << ProgramOptions.count
1043 << std::endl;
1044 return -EINVAL;
1045 }
1046 } else if (option == kOptionThreads) {
1047 ProgramOptions.threads = std::stoi(optarg);
1048 if (ProgramOptions.threads < 1) {
1049 std::cerr << "Invalid threads argument: " << ProgramOptions.threads
1050 << std::endl;
1051 return -EINVAL;
1052 }
1053 } else if (option == kOptionInstances) {
1054 ProgramOptions.instances = std::stoi(optarg);
1055 if (ProgramOptions.instances < 1) {
1056 std::cerr << "Invalid instances argument: "
1057 << ProgramOptions.instances << std::endl;
1058 return -EINVAL;
1059 }
1060 } else if (option == kOptionTimeout) {
1061 ProgramOptions.timeout = std::stoi(optarg);
1062 } else if (option == kOptionTrace) {
1063 tracing_enabled = true;
1064 } else if (option == kOptionWarmup) {
1065 ProgramOptions.warmup = std::stoi(optarg);
1066 } else {
1067 command = option;
1068 if (optarg)
1069 command_argument = optarg;
1070 }
1071 break;
1072 }
1073 }
1074
1075 // Setup ATRACE/systrace based on command line.
1076 atrace_setup();
1077 atrace_set_tracing_enabled(tracing_enabled);
1078
1079 VLOG(1) << "command=" << command << " command_argument=" << command_argument;
1080
1081 if (command == "") {
1082 return Usage(argv[0]);
1083 } else if (command == kOptionService) {
1084 return ServiceCommand(command_argument);
1085 } else if (command == kOptionClient) {
1086 return ClientCommand(command_argument);
1087 } else {
1088 return Usage(argv[0]);
1089 }
1090 }
1091