1 // std
2 #include <algorithm>
3 #include <cstdlib>
4 #include <iostream>
5 #include <mutex>
6 #include <queue>
7 #include <random>
8 #include <sstream>
9 #include <string>
10 #include <thread>
11
12 // gnu-c
13 #include <sys/types.h>
14 #include <unistd.h>
15
16 // usdt_sample_lib1
17 #include "usdt_sample_lib1/lib1.h"
18
print_usage(int argc,char ** argv)19 void print_usage(int argc, char** argv)
20 {
21 std::cout << "Usage:" << std::endl;
22 std::cout << argv[0]
23 << " <InputPrefix> <InputMinimum (1-50)> <InputMaximum (1-50)> <CallsPerSec (1-50)> <MinimumLatencyMs (1-50)> <MaximumLatencyMs (1-50)>"
24 << std::endl;
25 std::cout << "InputPrefix: Prefix of the input string to the operation. Default: dummy" << std::endl;
26 std::cout << "InputMinimum: Minimum number to make the input string to the operation somewhat unique. Default: 1" << std::endl;
27 std::cout << "InputMaximum: Maximum number to make the input string to the operation somewhat unique. Default: 50" << std::endl;
28 std::cout << "CallsPerSec: Rate of calls to the operation. Default: 10" << std::endl;
29 std::cout << "MinimumLatencyMs: Minimum latency to apply to the operation. Default: 20" << std::endl;
30 std::cout << "MaximumLatencyMs: Maximum latency to apply to the operation. Default: 40" << std::endl;
31 }
32
main(int argc,char ** argv)33 int main(int argc, char** argv)
34 {
35 std::string inputPrefix("dummy");
36 std::uint32_t inputMinimum = 1;
37 std::uint32_t inputMaximum = 50;
38 std::uint32_t callsPerSec = 10;
39 std::uint32_t minLatMs = 20;
40 std::uint32_t maxLatMs = 40;
41
42 try {
43 if (argc > 1) {
44 inputPrefix = argv[1];
45 }
46
47 if (argc > 2) {
48 inputMinimum = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[2]))));
49 }
50
51 if (argc > 3) {
52 inputMaximum = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[3]))));
53 }
54
55 if (argc > 4) {
56 callsPerSec = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[4]))));
57 }
58
59 if (argc > 5) {
60 minLatMs = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[5]))));
61 }
62
63 if (argc > 6) {
64 maxLatMs = static_cast<std::uint32_t>(std::max(1, std::min(50, std::atoi(argv[6]))));
65 }
66 }
67 catch (const std::exception& exc) {
68 std::cout << "Exception while reading arguments: " << exc.what() << std::endl;
69 print_usage(argc, argv);
70 return -1;
71 }
72 catch (...) {
73 std::cout << "Unknown exception while reading arguments." << std::endl;
74 print_usage(argc, argv);
75 return -1;
76 }
77
78 if (inputMinimum > inputMaximum) {
79 std::cout << "InputMinimum must be smaller than InputMaximum." << std::endl;
80 print_usage(argc, argv);
81 return -1;
82 }
83
84 if (minLatMs > maxLatMs) {
85 std::cout << "MinimumLatencyMs must be smaller than MaximumLatencyMs." << std::endl;
86 print_usage(argc, argv);
87 return -1;
88 }
89
90 std::cout << "Applying the following parameters:" << std::endl
91 << "Input prefix: " << inputPrefix << "." << std::endl
92 << "Input range: [" << inputMinimum << ", " << inputMaximum << "]." << std::endl
93 << "Calls Per Second: " << callsPerSec << "." << std::endl
94 << "Latency range: [" << minLatMs << ", " << maxLatMs << "] ms." << std::endl;
95
96 const int sleepTimeMs = 1000 / callsPerSec;
97 OperationProvider op(minLatMs, maxLatMs);
98
99 std::mutex queueMutex;
100 std::queue<std::shared_future<OperationResponse>> responseQueue;
101
102 auto dequeueFuture = std::async(std::launch::async, [&]() {
103 while (true) {
104 bool empty = false;
105 {
106 std::lock_guard<std::mutex> lg(queueMutex);
107 empty = responseQueue.empty();
108 }
109
110 if (empty) {
111 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
112 continue;
113 }
114
115 responseQueue.front().get();
116
117 // std::cout << "Removing item from queue." << std::endl;
118 std::lock_guard<std::mutex> lg(queueMutex);
119 responseQueue.pop();
120 }
121 });
122
123 std::random_device rd;
124 std::uniform_int_distribution<> dis(inputMinimum, inputMaximum);
125
126 std::cout << "You can now run the bcc scripts, see usdt_sample.md for examples." << std::endl;
127 std::cout << "pid: " << ::getpid() << std::endl;
128 std::cout << "Press ctrl-c to exit." << std::endl;
129 while (true) {
130 std::ostringstream inputOss;
131 inputOss << inputPrefix << "_" << dis(rd);
132 auto responseFuture = op.executeAsync(OperationRequest(inputOss.str()));
133
134 {
135 std::lock_guard<std::mutex> lg(queueMutex);
136 responseQueue.push(responseFuture);
137 }
138
139 // For a sample application, this is good enough to simulate callsPerSec.
140 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
141 }
142
143 dequeueFuture.get();
144 return 0;
145 }
146