1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/framework/run_handler_util.h"
17
18 #include <cmath>
19
20 #include "tensorflow/core/lib/strings/numbers.h"
21 #include "tensorflow/core/platform/logging.h"
22 #include "tensorflow/core/platform/str_util.h"
23
24 namespace tensorflow {
25
ParamFromEnvWithDefault(const char * var_name,double default_value)26 double ParamFromEnvWithDefault(const char* var_name, double default_value) {
27 const char* val = std::getenv(var_name);
28 double num;
29 return (val && strings::safe_strtod(val, &num)) ? num : default_value;
30 }
31
ParamFromEnvWithDefault(const char * var_name,std::vector<double> default_value)32 std::vector<double> ParamFromEnvWithDefault(const char* var_name,
33 std::vector<double> default_value) {
34 const char* val = std::getenv(var_name);
35 if (!val) {
36 return default_value;
37 }
38 std::vector<string> splits = str_util::Split(val, ",");
39 std::vector<double> result;
40 result.reserve(splits.size());
41 for (auto& split : splits) {
42 double num;
43 if (strings::safe_strtod(split, &num)) {
44 result.push_back(num);
45 } else {
46 LOG(ERROR) << "Wrong format for " << var_name << ". Use default value.";
47 return default_value;
48 }
49 }
50 return result;
51 }
52
ParamFromEnvWithDefault(const char * var_name,std::vector<int> default_value)53 std::vector<int> ParamFromEnvWithDefault(const char* var_name,
54 std::vector<int> default_value) {
55 const char* val = std::getenv(var_name);
56 if (!val) {
57 return default_value;
58 }
59 std::vector<string> splits = str_util::Split(val, ",");
60 std::vector<int> result;
61 result.reserve(splits.size());
62 for (auto& split : splits) {
63 int num;
64 if (strings::safe_strto32(split, &num)) {
65 result.push_back(num);
66 } else {
67 LOG(ERROR) << "Wrong format for " << var_name << ". Use default value.";
68 return default_value;
69 }
70 }
71 return result;
72 }
73
ParamFromEnvBoolWithDefault(const char * var_name,bool default_value)74 bool ParamFromEnvBoolWithDefault(const char* var_name, bool default_value) {
75 const char* val = std::getenv(var_name);
76 return (val) ? str_util::Lowercase(val) == "true" : default_value;
77 }
78
ComputeInterOpSchedulingRanges(int num_active_requests,int num_threads,int min_threads_per_request,std::vector<std::uint_fast32_t> * start_vec,std::vector<std::uint_fast32_t> * end_vec)79 void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads,
80 int min_threads_per_request,
81 std::vector<std::uint_fast32_t>* start_vec,
82 std::vector<std::uint_fast32_t>* end_vec) {
83 // Each request is expected to have weight W[i] = num_active_requests - i.
84 // Therefore, total_weight = sum of all request weights.
85 float total_weight = 0.5f * num_active_requests * (num_active_requests + 1);
86 float demand_factor = static_cast<float>(num_threads) / total_weight;
87 float last_cumulative_weight = 0.0;
88 min_threads_per_request = std::max(1, min_threads_per_request);
89 for (int i = 0; i != num_active_requests; i++) {
90 float cumulative_weight =
91 static_cast<float>(i + 1) *
92 (num_active_requests - static_cast<float>(i) * 0.5f);
93 float weight = cumulative_weight - last_cumulative_weight;
94 // Quantize thread_demand by rounding up, and also satisfying
95 // `min_threads_per_request` constraint.
96 // Note: We subtract a small epsilon (0.00001) to prevent ceil(..) from
97 // rounding weights like 4.0 to 5.
98 int demand = std::max(
99 min_threads_per_request,
100 static_cast<int>(std::ceil(weight * demand_factor - 0.00001f)));
101 // For the quantized range [start, end); compute the floor of real start,
102 // and expand downwards from there with length `demand` and adjust for
103 // boundary conditions.
104 int start = last_cumulative_weight * demand_factor;
105 int end = std::min(num_threads, start + demand);
106 start = std::max(0, std::min(start, end - demand));
107 start_vec->at(i) = start;
108 end_vec->at(i) = end;
109 last_cumulative_weight = cumulative_weight;
110 }
111 }
112
ComputeInterOpStealingRanges(int num_threads,int min_threads_per_domain,std::vector<std::uint_fast32_t> * start_vec,std::vector<std::uint_fast32_t> * end_vec)113 void ComputeInterOpStealingRanges(int num_threads, int min_threads_per_domain,
114 std::vector<std::uint_fast32_t>* start_vec,
115 std::vector<std::uint_fast32_t>* end_vec) {
116 int steal_domain_size = std::min(min_threads_per_domain, num_threads);
117 unsigned steal_start = 0, steal_end = steal_domain_size;
118 for (int i = 0; i < num_threads; ++i) {
119 if (i >= steal_end) {
120 if (steal_end + steal_domain_size < num_threads) {
121 steal_start = steal_end;
122 steal_end += steal_domain_size;
123 } else {
124 steal_end = num_threads;
125 steal_start = steal_end - steal_domain_size;
126 }
127 }
128 start_vec->at(i) = steal_start;
129 end_vec->at(i) = steal_end;
130 }
131 }
132
ChooseRequestsWithExponentialDistribution(int num_active_requests,int num_threads)133 std::vector<int> ChooseRequestsWithExponentialDistribution(
134 int num_active_requests, int num_threads) {
135 // Fraction of the total threads that will be evenly distributed across
136 // requests. The rest of threads will be exponentially distributed across
137 // requests.
138 static const double kCapacityFractionForEvenDistribution =
139 ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_EVEN_FRACTION", 0.5);
140
141 // For the threads that will be exponentially distributed across requests,
142 // a request will get allocated (kPowerBase - 1) times as much threads as
143 // threads allocated to all requests that arrive after it. For example, the
144 // oldest request will be allocated num_threads*(kPowerBase-1)/kPowerBase
145 // number of threads.
146 static const double kPowerBase =
147 ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_POWER_BASE", 2.0);
148
149 static const int kMinEvenThreadsFromEnv = static_cast<int>(
150 ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MIN_EVEN_THREADS", 1));
151 static const int kMaxEvenThreadsFromEnv = static_cast<int>(
152 ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MAX_EVEN_THREADS", 3));
153
154 std::vector<int> request_idx_list;
155 request_idx_list.resize(num_threads);
156 // Each request gets at least this number of threads that steal from it first.
157 int min_threads_per_request =
158 num_threads * kCapacityFractionForEvenDistribution / num_active_requests;
159 min_threads_per_request =
160 std::max(kMinEvenThreadsFromEnv, min_threads_per_request);
161 min_threads_per_request =
162 std::min(kMaxEvenThreadsFromEnv, min_threads_per_request);
163
164 int num_remaining_threads =
165 std::max(0, num_threads - num_active_requests * min_threads_per_request);
166 int request_idx = -1;
167 int num_threads_next_request = 0;
168
169 for (int tid = 0; tid < num_threads; ++tid) {
170 if (num_threads_next_request <= 0) {
171 request_idx = std::min(num_active_requests - 1, request_idx + 1);
172 int num_extra_threads_next_request =
173 std::ceil(num_remaining_threads * (kPowerBase - 1.0) / kPowerBase);
174 num_remaining_threads -= num_extra_threads_next_request;
175 num_threads_next_request =
176 num_extra_threads_next_request + min_threads_per_request;
177 }
178 num_threads_next_request--;
179 request_idx_list[tid] = request_idx;
180 }
181 return request_idx_list;
182 }
183
184 } // namespace tensorflow
185