• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/framework/run_handler_util.h"
17 
18 #include <cmath>
19 
20 #include "tensorflow/core/lib/strings/numbers.h"
21 #include "tensorflow/core/platform/logging.h"
22 #include "tensorflow/core/platform/str_util.h"
23 
24 namespace tensorflow {
25 
ParamFromEnvWithDefault(const char * var_name,double default_value)26 double ParamFromEnvWithDefault(const char* var_name, double default_value) {
27   const char* val = std::getenv(var_name);
28   double num;
29   return (val && strings::safe_strtod(val, &num)) ? num : default_value;
30 }
31 
ParamFromEnvWithDefault(const char * var_name,std::vector<double> default_value)32 std::vector<double> ParamFromEnvWithDefault(const char* var_name,
33                                             std::vector<double> default_value) {
34   const char* val = std::getenv(var_name);
35   if (!val) {
36     return default_value;
37   }
38   std::vector<string> splits = str_util::Split(val, ",");
39   std::vector<double> result;
40   result.reserve(splits.size());
41   for (auto& split : splits) {
42     double num;
43     if (strings::safe_strtod(split, &num)) {
44       result.push_back(num);
45     } else {
46       LOG(ERROR) << "Wrong format for " << var_name << ". Use default value.";
47       return default_value;
48     }
49   }
50   return result;
51 }
52 
ParamFromEnvWithDefault(const char * var_name,std::vector<int> default_value)53 std::vector<int> ParamFromEnvWithDefault(const char* var_name,
54                                          std::vector<int> default_value) {
55   const char* val = std::getenv(var_name);
56   if (!val) {
57     return default_value;
58   }
59   std::vector<string> splits = str_util::Split(val, ",");
60   std::vector<int> result;
61   result.reserve(splits.size());
62   for (auto& split : splits) {
63     int num;
64     if (strings::safe_strto32(split, &num)) {
65       result.push_back(num);
66     } else {
67       LOG(ERROR) << "Wrong format for " << var_name << ". Use default value.";
68       return default_value;
69     }
70   }
71   return result;
72 }
73 
ParamFromEnvBoolWithDefault(const char * var_name,bool default_value)74 bool ParamFromEnvBoolWithDefault(const char* var_name, bool default_value) {
75   const char* val = std::getenv(var_name);
76   return (val) ? str_util::Lowercase(val) == "true" : default_value;
77 }
78 
ComputeInterOpSchedulingRanges(int num_active_requests,int num_threads,int min_threads_per_request,std::vector<std::uint_fast32_t> * start_vec,std::vector<std::uint_fast32_t> * end_vec)79 void ComputeInterOpSchedulingRanges(int num_active_requests, int num_threads,
80                                     int min_threads_per_request,
81                                     std::vector<std::uint_fast32_t>* start_vec,
82                                     std::vector<std::uint_fast32_t>* end_vec) {
83   // Each request is expected to have weight W[i] = num_active_requests - i.
84   // Therefore, total_weight = sum of all request weights.
85   float total_weight = 0.5f * num_active_requests * (num_active_requests + 1);
86   float demand_factor = static_cast<float>(num_threads) / total_weight;
87   float last_cumulative_weight = 0.0;
88   min_threads_per_request = std::max(1, min_threads_per_request);
89   for (int i = 0; i != num_active_requests; i++) {
90     float cumulative_weight =
91         static_cast<float>(i + 1) *
92         (num_active_requests - static_cast<float>(i) * 0.5f);
93     float weight = cumulative_weight - last_cumulative_weight;
94     // Quantize thread_demand by rounding up, and also satisfying
95     // `min_threads_per_request` constraint.
96     // Note: We subtract a small epsilon (0.00001) to prevent ceil(..) from
97     // rounding weights like 4.0 to 5.
98     int demand = std::max(
99         min_threads_per_request,
100         static_cast<int>(std::ceil(weight * demand_factor - 0.00001f)));
101     // For the quantized range [start, end); compute the floor of real start,
102     // and expand downwards from there with length `demand` and adjust for
103     // boundary conditions.
104     int start = last_cumulative_weight * demand_factor;
105     int end = std::min(num_threads, start + demand);
106     start = std::max(0, std::min(start, end - demand));
107     start_vec->at(i) = start;
108     end_vec->at(i) = end;
109     last_cumulative_weight = cumulative_weight;
110   }
111 }
112 
ComputeInterOpStealingRanges(int num_threads,int min_threads_per_domain,std::vector<std::uint_fast32_t> * start_vec,std::vector<std::uint_fast32_t> * end_vec)113 void ComputeInterOpStealingRanges(int num_threads, int min_threads_per_domain,
114                                   std::vector<std::uint_fast32_t>* start_vec,
115                                   std::vector<std::uint_fast32_t>* end_vec) {
116   int steal_domain_size = std::min(min_threads_per_domain, num_threads);
117   unsigned steal_start = 0, steal_end = steal_domain_size;
118   for (int i = 0; i < num_threads; ++i) {
119     if (i >= steal_end) {
120       if (steal_end + steal_domain_size < num_threads) {
121         steal_start = steal_end;
122         steal_end += steal_domain_size;
123       } else {
124         steal_end = num_threads;
125         steal_start = steal_end - steal_domain_size;
126       }
127     }
128     start_vec->at(i) = steal_start;
129     end_vec->at(i) = steal_end;
130   }
131 }
132 
ChooseRequestsWithExponentialDistribution(int num_active_requests,int num_threads)133 std::vector<int> ChooseRequestsWithExponentialDistribution(
134     int num_active_requests, int num_threads) {
135   // Fraction of the total threads that will be evenly distributed across
136   // requests. The rest of threads will be exponentially distributed across
137   // requests.
138   static const double kCapacityFractionForEvenDistribution =
139       ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_EVEN_FRACTION", 0.5);
140 
141   // For the threads that will be exponentially distributed across requests,
142   // a request will get allocated (kPowerBase - 1) times as much threads as
143   // threads allocated to all requests that arrive after it. For example, the
144   // oldest request will be allocated num_threads*(kPowerBase-1)/kPowerBase
145   // number of threads.
146   static const double kPowerBase =
147       ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_POWER_BASE", 2.0);
148 
149   static const int kMinEvenThreadsFromEnv = static_cast<int>(
150       ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MIN_EVEN_THREADS", 1));
151   static const int kMaxEvenThreadsFromEnv = static_cast<int>(
152       ParamFromEnvWithDefault("TF_RUN_HANDLER_EXP_DIST_MAX_EVEN_THREADS", 3));
153 
154   std::vector<int> request_idx_list;
155   request_idx_list.resize(num_threads);
156   // Each request gets at least this number of threads that steal from it first.
157   int min_threads_per_request =
158       num_threads * kCapacityFractionForEvenDistribution / num_active_requests;
159   min_threads_per_request =
160       std::max(kMinEvenThreadsFromEnv, min_threads_per_request);
161   min_threads_per_request =
162       std::min(kMaxEvenThreadsFromEnv, min_threads_per_request);
163 
164   int num_remaining_threads =
165       std::max(0, num_threads - num_active_requests * min_threads_per_request);
166   int request_idx = -1;
167   int num_threads_next_request = 0;
168 
169   for (int tid = 0; tid < num_threads; ++tid) {
170     if (num_threads_next_request <= 0) {
171       request_idx = std::min(num_active_requests - 1, request_idx + 1);
172       int num_extra_threads_next_request =
173           std::ceil(num_remaining_threads * (kPowerBase - 1.0) / kPowerBase);
174       num_remaining_threads -= num_extra_threads_next_request;
175       num_threads_next_request =
176           num_extra_threads_next_request + min_threads_per_request;
177     }
178     num_threads_next_request--;
179     request_idx_list[tid] = request_idx;
180   }
181   return request_idx_list;
182 }
183 
184 }  // namespace tensorflow
185