1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_FRAMEWORK_MODEL_H_ 16 #define TENSORFLOW_CORE_FRAMEWORK_MODEL_H_ 17 18 #include <list> 19 #include <memory> 20 #include <string> 21 // TODO(b/114492873): Move this include into core/platform. 22 #include <thread> // NOLINT 23 #include <utility> 24 #include <vector> 25 26 #include "tensorflow/core/framework/types.h" 27 #include "tensorflow/core/lib/gtl/cleanup.h" 28 #include "tensorflow/core/lib/gtl/map_util.h" 29 #include "tensorflow/core/lib/random/random.h" 30 #include "tensorflow/core/platform/cpu_info.h" 31 #include "tensorflow/core/platform/env.h" 32 33 namespace tensorflow { 34 namespace data { 35 namespace model { 36 37 // A constant that can be used to enable auto-tuning. 38 constexpr int kAutoTune = -1; 39 40 // Represents thread-safe state that can be shared between an input pipeline and 41 // the performance model. 42 struct SharedState { 43 public: SharedStateSharedState44 SharedState(int64 value, std::shared_ptr<mutex> mu, 45 std::shared_ptr<condition_variable> cond_var) 46 : value(value), 47 mu(std::move(mu)), 48 cond_var(std::move(cond_var)), 49 tunable(value == kAutoTune) {} 50 51 int64 value; 52 std::shared_ptr<mutex> mu; 53 std::shared_ptr<condition_variable> cond_var; 54 const bool tunable; 55 }; 56 57 // Represents a parameter. 58 struct Parameter { ParameterParameter59 Parameter(const string& name, std::shared_ptr<SharedState> state, int64 min, 60 int64 max) 61 : name(name), 62 value(state->value), 63 min(min), 64 max(max), 65 state(std::move(state)) {} 66 67 // Human-readable name of the parameter. 68 string name; 69 70 // Identifies the model value of the parameter. This can be different from 71 // the actual value (e.g. during optimization search). 72 int64 value; 73 74 // Identifies the minimum value of the parameter. 75 int64 min; 76 77 // Identifies the maximum value of the parameter. 78 int64 max; 79 80 // Shared state of the parameter. 81 std::shared_ptr<SharedState> state; 82 }; 83 84 std::shared_ptr<Parameter> MakeParameter(const string& name, 85 std::shared_ptr<SharedState> state, 86 int64 min, int64 max); 87 88 // Abstract representation of a TensorFlow input pipeline node. It collects 89 // information about inputs to this node, processing time spent executing the 90 // node logic, number of elements produced by the node, various other 91 // information (e.g. batch size or execution parallelism). 92 // 93 // Developers of tf.data transformations are not expected to interact with 94 // this class directly. Boiler plate code for creating the abstract 95 // representation of the input pipeline and collecting common information has 96 // been added to the implementation of `DatasetBase` and `DatasetBaseIterator` 97 // respectively. 98 // 99 // In addition, `DatasetBaseIterator` provides wrappers that can be used for 100 // transformation-specific information collection. The `SetMetadata` wrapper 101 // can be used to pass arbitrary metadata to the modeling framework, while the 102 // `StartWork` and `StopWork` wrappers should be used to correctly account for 103 // processing time of multi-threaded transformation that yield the CPU; such 104 // transformations should invoke `StartWork()` when a transformation thread 105 // starts executing (e.g. when created or woken up) and `StopWork()` when a 106 // transformation thread stops executing (e.g. when returning or waiting). 107 class Node { 108 public: 109 // Arguments for `Node` constructor. 110 struct Args { 111 int64 id; 112 string name; 113 std::shared_ptr<Node> output; 114 }; 115 116 using Factory = std::function<std::shared_ptr<Node>(Args)>; 117 Node(Args args)118 explicit Node(Args args) 119 : id_(args.id), name_(args.name), output_(args.output.get()) {} 120 121 // Increments the bytes buffered by the given delta. add_buffered_bytes(int64 delta)122 void add_buffered_bytes(int64 delta) LOCKS_EXCLUDED(mu_) { 123 mutex_lock l(mu_); 124 buffered_bytes_ += delta; 125 } 126 127 // Adds an input. add_input(std::shared_ptr<Node> node)128 void add_input(std::shared_ptr<Node> node) LOCKS_EXCLUDED(mu_) { 129 mutex_lock l(mu_); 130 inputs_.push_back(node); 131 } 132 133 // Increments the aggregate processing time by the given delta. add_processing_time(int64 delta)134 void add_processing_time(int64 delta) LOCKS_EXCLUDED(mu_) { 135 mutex_lock l(mu_); 136 processing_time_ += delta; 137 } 138 139 // Returns the number of bytes stored in this node's buffer. buffered_bytes()140 int64 buffered_bytes() const LOCKS_EXCLUDED(mu_) { 141 tf_shared_lock l(mu_); 142 return buffered_bytes_; 143 } 144 145 // Indicates whether the node has tunable parameters. has_tunable_parameters()146 bool has_tunable_parameters() const LOCKS_EXCLUDED(mu_) { 147 tf_shared_lock l(mu_); 148 for (const auto& pair : parameters_) { 149 if (pair.second->state->tunable) return true; 150 } 151 return false; 152 } 153 154 // Returns the unique node ID. id()155 int64 id() const LOCKS_EXCLUDED(mu_) { return id_; } 156 157 // Returns the node inputs. inputs()158 std::list<std::shared_ptr<Node>> inputs() const LOCKS_EXCLUDED(mu_) { 159 tf_shared_lock l(mu_); 160 return inputs_; 161 } 162 163 // Returns the node name. name()164 const string& name() const { return name_; } 165 166 // Returns the number of elements produced by the node. num_elements()167 int64 num_elements() const LOCKS_EXCLUDED(mu_) { 168 tf_shared_lock l(mu_); 169 return num_elements_; 170 } 171 172 // Returns the node output. output()173 Node* output() const { return output_; } 174 175 // Returns the aggregate processing time. processing_time()176 int64 processing_time() const LOCKS_EXCLUDED(mu_) { 177 tf_shared_lock l(mu_); 178 return processing_time_; 179 } 180 181 // Records that the node produced an element. record_element()182 void record_element() LOCKS_EXCLUDED(mu_) { 183 mutex_lock l(mu_); 184 num_elements_++; 185 } 186 187 // Records that a node thread has started executing. record_start(int64 time_nanos)188 void record_start(int64 time_nanos) LOCKS_EXCLUDED(mu_) { 189 mutex_lock l(mu_); 190 work_start_[std::this_thread::get_id()] = time_nanos; 191 } 192 193 // Records that a node thread has stopped executing. record_stop(int64 time_nanos)194 void record_stop(int64 time_nanos) LOCKS_EXCLUDED(mu_) { 195 mutex_lock l(mu_); 196 std::thread::id tid = std::this_thread::get_id(); 197 auto iter = work_start_.find(tid); 198 if (iter != work_start_.end()) { 199 processing_time_ += time_nanos - iter->second; 200 work_start_.erase(iter); 201 } else { 202 LOG(WARNING) 203 << "Encountered a stop event that was not preceded by a start event."; 204 } 205 } 206 207 // Removes an input. remove_input(std::shared_ptr<Node> input)208 void remove_input(std::shared_ptr<Node> input) LOCKS_EXCLUDED(mu_) { 209 mutex_lock l(mu_); 210 inputs_.remove(input); 211 } 212 213 // Collects tunable parameters in the subtree rooted in this node. CollectTunableParameters(std::vector<std::shared_ptr<Parameter>> * parameters)214 void CollectTunableParameters( 215 std::vector<std::shared_ptr<Parameter>>* parameters) const 216 LOCKS_EXCLUDED(mu_) { 217 tf_shared_lock l(mu_); 218 for (auto& pair : parameters_) { 219 if (pair.second->state->tunable) { 220 parameters->push_back(pair.second); 221 } 222 } 223 for (auto& input : inputs_) { 224 input->CollectTunableParameters(parameters); 225 } 226 } 227 228 // Returns the per-element output time for this node. OutputTime(std::vector<int64> * input_times)229 int64 OutputTime(std::vector<int64>* input_times) const LOCKS_EXCLUDED(mu_) { 230 tf_shared_lock l(mu_); 231 return OutputTimeLocked(input_times); 232 } 233 234 // Returns the per-element processing time spent in the subtree rooted in 235 // this node. ProcessingTime()236 int64 ProcessingTime() const LOCKS_EXCLUDED(mu_) { 237 tf_shared_lock l(mu_); 238 return ProcessingTimeLocked(); 239 } 240 241 // Returns a copy of this node, making a deep copy of its inputs and a 242 // shallow copy of its tunable parameters. 243 // 244 // The purpose for this method is to allow the model optimization logic to 245 // operate over immutable state while allowing concurrent model updates. Snapshot(std::shared_ptr<Node> output)246 std::shared_ptr<Node> Snapshot(std::shared_ptr<Node> output) 247 LOCKS_EXCLUDED(mu_) { 248 tf_shared_lock l(mu_); 249 std::shared_ptr<Node> result = Clone(output); 250 { 251 mutex_lock l2(result->mu_); 252 result->buffered_bytes_ = buffered_bytes_; 253 result->processing_time_ = processing_time_; 254 result->num_elements_ = num_elements_; 255 result->parameters_ = parameters_; 256 } 257 for (auto& input : inputs_) { 258 result->add_input(input->Snapshot(result)); 259 } 260 return result; 261 } 262 263 protected: 264 // Creates a clone of this node. 265 virtual std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const 266 SHARED_LOCKS_REQUIRED(mu_) = 0; 267 268 // Returns the per-element processing time spent in this node. NanosPerElementLocked()269 int64 NanosPerElementLocked() const SHARED_LOCKS_REQUIRED(mu_) { 270 if (num_elements_ == 0) { 271 return 0; 272 } 273 return static_cast<int64>(static_cast<double>(processing_time_) / 274 static_cast<double>(num_elements_)); 275 } 276 277 // Returns the sum of per-element output time for the inputs of this node. OutputTimeForInputs(std::vector<int64> * input_times)278 int64 OutputTimeForInputs(std::vector<int64>* input_times) const 279 SHARED_LOCKS_REQUIRED(mu_) { 280 int64 sum = 0; 281 for (auto& input : inputs_) { 282 sum += input->OutputTime(input_times); 283 } 284 return sum; 285 } 286 287 // Returns the per-element output time for this node. 288 virtual int64 OutputTimeLocked(std::vector<int64>* input_times) const 289 SHARED_LOCKS_REQUIRED(mu_) = 0; 290 291 // Returns the sum of per-element processing time for the inputs of this node. 292 // 293 // TODO(jsimsa): use processing time history as a prior for future inputs ProcessingTimeForInputs()294 int64 ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) { 295 int64 sum = 0; 296 for (auto& input : inputs_) { 297 sum += input->ProcessingTime(); 298 } 299 return sum; 300 } 301 302 // Returns the per-element processing time spent in the subtree rooted in 303 // this node. 304 virtual int64 ProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) = 0; 305 306 mutable mutex mu_; 307 const int64 id_; 308 const string name_; 309 int64 buffered_bytes_ GUARDED_BY(mu_) = 0; 310 int64 processing_time_ GUARDED_BY(mu_) = 0; 311 int64 num_elements_ GUARDED_BY(mu_) = 0; 312 std::map<std::thread::id, int64> work_start_ GUARDED_BY(mu_); 313 std::map<string, std::shared_ptr<Parameter>> parameters_ GUARDED_BY(mu_); 314 std::list<std::shared_ptr<Node>> inputs_ GUARDED_BY(mu_); 315 316 // The reference to the output node is not owned so that deletion of a 317 // node results in recursive deletion of the subtree rooted in the node. 318 Node* const output_; 319 }; 320 321 // InterleaveMany is used to model datasets whose inputs are used to create 322 // datasets whose elements are then interleaved. 323 std::shared_ptr<Node> MakeInterleaveManyNode(Node::Args args); 324 325 // AsyncInterleaveMany nodes are the asynchronous version of InterleaveMany 326 // nodes. 327 std::shared_ptr<Node> MakeAsyncInterleaveManyNode( 328 Node::Args args, std::vector<std::shared_ptr<Parameter>> parameters); 329 330 // KnownMany nodes model datasets that synchronously consume known number of 331 // input element per output element. 332 std::shared_ptr<Node> MakeKnownRatioNode(Node::Args args, double ratio); 333 334 // AsyncKnownRatio nodes are the asynchronous version of KnownRate nodes. 335 std::shared_ptr<Node> MakeAsyncKnownRatioNode( 336 Node::Args args, double ratio, 337 std::vector<std::shared_ptr<Parameter>> parameters); 338 339 // Source nodes represent data sources. 340 std::shared_ptr<Node> MakeSourceNode(Node::Args args); 341 342 // UnknownMany nodes represent datasets that synchronously consume an 343 // unknown number of input elements per output. 344 // 345 // Unlike KnownRatio nodes which expect the ratio between inputs and outputs is 346 // specified as a parameter, UnknownRatio estimates the ratio empirically. 347 std::shared_ptr<Node> MakeUnknownRatioNode(Node::Args args); 348 349 // Unknown nodes represent datasets for which we do not have a model. It acts 350 // as pass-through between inputs and output. 351 std::shared_ptr<Node> MakeUnknownNode(Node::Args args); 352 353 // Abstract representation of a TensorFlow input pipeline that can be used 354 // for collecting runtime information and optimizing performance. It collects 355 // runtime information about execution of the input pipeline that is used to 356 // create a performance model, which is in turn used to identify optimal values 357 // of tunable parameters. 358 // 359 // Developers of tf.data transformations are not expected to interact with this 360 // class directly. Boiler plate code for creating the abstract representation of 361 // the input pipeline and collecting runtime information has been added to the 362 // implementation of `DatasetBase` and `DatasetBaseIterator` respectively. 363 class Model { 364 public: 365 using NodeHook = std::function<void(std::shared_ptr<Node>)>; 366 367 // Creates a new model. 368 // 369 // The `remove_node_hook` argument can be used to specify functionality that 370 // should be invoked before a node is removed from the model. The hook can be 371 // used for dependency injection -- to allow the model to invoke functionality 372 // from modules that it could not depend on statically. Model(NodeHook remove_node_hook)373 Model(NodeHook remove_node_hook) 374 : collect_resource_usage_(false), 375 remove_node_hook_(std::move(remove_node_hook)) { 376 DCHECK(remove_node_hook_ != nullptr); 377 } 378 379 // Indicates whether to collect resource usage. collect_resource_usage()380 bool collect_resource_usage() const { return collect_resource_usage_; } 381 382 // Adds a node with the given name and given output. 383 std::shared_ptr<Node> AddNode(Node::Factory factory, const string& name, 384 const string& output_name) LOCKS_EXCLUDED(mu_); 385 386 // Increments the processing time for the given node.. 387 void AddProcessingTime(const string& name, int64 delta) LOCKS_EXCLUDED(mu_); 388 389 // Runs optimization. 390 void Optimize(int64 cpu_budget) LOCKS_EXCLUDED(mu_); 391 392 // Records that a node has produced an element. 393 void RecordElement(const string& name) LOCKS_EXCLUDED(mu_); 394 395 // Records that the given node has started work. If `stop_output` is set, it 396 // also records that the output of the given node has stopped work. 397 void RecordStart(const string& name, bool stop_output) LOCKS_EXCLUDED(mu_); 398 399 // Records that the given node has stopped work. If `stop_output` is set, it 400 // also records that the output of the given node has started work. 401 void RecordStop(const string& name, bool start_output) LOCKS_EXCLUDED(mu_); 402 403 // Removes the given node. 404 void RemoveNode(const string& name) LOCKS_EXCLUDED(mu_); 405 406 private: 407 // Collects tunable parameters in the tree rooted in the given node. 408 std::vector<std::shared_ptr<Parameter>> CollectTunableParameters( 409 std::shared_ptr<Node> node); 410 411 // Collects the output time for the given node. 412 int64 OutputTime(std::shared_ptr<Node> node); 413 414 // Collects the processing time for the given node. 415 int64 ProcessingTime(std::shared_ptr<Node> node); 416 417 // Used for coordination between different input pipeline threads. Exclusive 418 // access is required only when adding or removing nodes. Concurrent access to 419 // existing nodes is protected by a node mutex. 420 mutex mu_; 421 int64 id_counter_ GUARDED_BY(mu_) = 1; 422 std::shared_ptr<Node> output_ GUARDED_BY(mu_); 423 std::map<string, std::shared_ptr<Node>> lookup_table_ GUARDED_BY(mu_); 424 425 // Indicates whether the modeling framework should collect resource usage 426 // (e.g. CPU, memory). The logic for collecting this information assumes that 427 // the collection is not repeatedly disabled and enabled. As a consequence, 428 // the implementation starts collecting resource usage when it encounters a 429 // tunable parameter (because the information is used for for tuning the value 430 // of the parameter) and never stops. 431 std::atomic<bool> collect_resource_usage_; 432 433 // A hook invoked immediately before a node is removed from the model. 434 const NodeHook remove_node_hook_; 435 }; 436 437 } // namespace model 438 } // namespace data 439 } // namespace tensorflow 440 441 #endif // TENSORFLOW_CORE_FRAMEWORK_MODEL_H_ 442