1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <atomic>
16 #include <chrono>
17 #include <cinttypes>
18 #include <condition_variable>
19 #include <cstdint>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <mutex>
23 #include <pthread.h>
24 #include <sched.h>
25 #include <sys/syscall.h>
26 #include <thread>
27 #include <unistd.h>
28 #include <vector>
29
30 namespace {
31 auto constexpr RUNNING_TIME = std::chrono::milliseconds(1000);
32 }
33
GetTid()34 static long GetTid()
35 {
36 return syscall(SYS_gettid);
37 }
38
39 struct Semaphore {
40 public:
SemaphoreSemaphore41 explicit Semaphore(int count = 0) : count_(count) {}
~SemaphoreSemaphore42 ~Semaphore() {}
43
44 Semaphore(const Semaphore&) = delete;
45 Semaphore& operator=(const Semaphore&) = delete;
46 Semaphore(Semaphore&&) = delete;
47 Semaphore& operator=(Semaphore&&) = delete;
48
DownSemaphore49 void Down()
50 {
51 std::unique_lock<std::mutex> lock(mutex_);
52 while (count_ <= 0) {
53 cv_.wait(lock);
54 }
55 count_--;
56 }
57
UpSemaphore58 void Up()
59 {
60 std::unique_lock<std::mutex> lock(mutex_);
61 count_++;
62 cv_.notify_one();
63 }
64
65 private:
66 std::mutex mutex_;
67 std::condition_variable cv_;
68 volatile int count_;
69 };
70
71 struct Controller {
72 using Clock = std::chrono::steady_clock;
73
GetInstanceController74 static Controller& GetInstance()
75 {
76 static Controller instance;
77 return instance;
78 }
79
StartWorkersController80 void StartWorkers(int n)
81 {
82 running_ = true;
83 printf("[%ld] start %d workers...\n", GetTid(), n);
84 for (int i = 0; i < n; i++) {
85 workerSems_.push_back(std::make_shared<Semaphore>());
86 threads_.emplace_back(std::thread(&Controller::Work, this, i));
87 }
88 doneSem_ = std::make_shared<Semaphore>();
89 workerSems_[0]->Up();
90 }
91
SetSwitchCountController92 void SetSwitchCount(int count)
93 {
94 swtichCount_ = count;
95 }
96
WaitForDoneController97 void WaitForDone()
98 {
99 if (doneSem_) {
100 doneSem_->Down();
101 }
102 }
103
StopWorkersController104 void StopWorkers(int n)
105 {
106 running_ = false;
107 for (auto& sem : workerSems_) {
108 sem->Up();
109 }
110
111 printf("[%ld] wating %d workers...\n", GetTid(), n);
112 for (int i = 0; i < n; i++) {
113 threads_[i].join();
114 }
115 }
116
117 private:
WorkController118 void Work(int id)
119 {
120 std::string name = "worker-" + std::to_string(id);
121 pthread_setname_np(pthread_self(), name.c_str());
122 printf("[%ld] %s start!\n", GetTid(), name.c_str());
123
124 cpu_set_t cpuSet;
125 CPU_ZERO(&cpuSet);
126 CPU_SET(id, &cpuSet);
127 sched_setaffinity(GetTid(), sizeof(cpuSet), &cpuSet);
128
129 while (running_) {
130 auto sem = workerSems_[id];
131 if (!sem) {
132 break;
133 }
134
135 // wait for swith to this thread
136 sem->Down();
137 if (!running_) {
138 break;
139 }
140
141 // busy loop
142 uint64_t count = 0;
143 auto stopTime = Clock::now() + RUNNING_TIME;
144 while (Clock::now() < stopTime) {
145 count++;
146 }
147 printf("[%ld] busy loop count = %" PRIu64 "!\n", GetTid(), count);
148 if (--swtichCount_ == 0) {
149 doneSem_->Up();
150 break;
151 }
152
153 // wakeup next thread
154 auto nextId = static_cast<unsigned int>(id + 1) % workerSems_.size();
155 auto nextSem = workerSems_[nextId];
156 if (nextSem) {
157 nextSem->Up();
158 }
159 }
160 printf("[%ld] %s exit!\n", GetTid(), name.c_str());
161 }
162
163 private:
164 std::vector<std::thread> threads_;
165 std::vector<std::shared_ptr<Semaphore>> workerSems_;
166 std::atomic<bool> running_ = false;
167 std::atomic<int> swtichCount_ = false;
168 std::shared_ptr<Semaphore> doneSem_;
169 };
170
main(int argc,char * argv[])171 int main(int argc, char* argv[])
172 {
173 const int n = std::thread::hardware_concurrency();
174 const int m = ((argc > 1) ? atoi(argv[1]) : n); // switch times
175 if (n <= 1 || m <= 1) {
176 printf("unexpected m or n: %d, %d\n", m, n);
177 return 1;
178 }
179
180 pthread_setname_np(pthread_self(), "main");
181 Controller::GetInstance().SetSwitchCount(m);
182 Controller::GetInstance().StartWorkers(n);
183
184 printf("[%ld] perform %d times switch...\n", GetTid(), m);
185 Controller::GetInstance().WaitForDone();
186
187 Controller::GetInstance().StopWorkers(n);
188 printf("all %d workers done!\n", n);
189 return 0;
190 }
191