• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
18 
19 #include <string>
20 #include <unordered_map>
21 
22 #include "tensorflow/core/framework/step_stats.pb.h"
23 #include "tensorflow/core/platform/mutex.h"
24 #include "tensorflow/core/platform/thread_annotations.h"
25 #include "tensorflow/core/platform/types.h"
26 
27 namespace tensorflow {
28 class StepStatsCollector;
29 
30 // WorkerCacheLogger is a thread-safe utility for use by a WorkerCache
31 // to optionally log some selected RPC activity.  A single instance
32 // should be owned by a WorkerCache, for use by its RemoteWorker
33 // instances.
34 
35 class WorkerCacheLogger {
36  public:
37   // Start/Stop logging activity.  This function increments/decrements
38   // a counter so that if two separate steps turn logging on/off,
39   // logging should be on for the union of the durations of both,
40   // regardless of relative timing.
41   void SetLogging(bool v);
42 
43   // Discard any saved log data.
44   void ClearLogs();
45 
46   // Return logs for the identified step in *ss.  Any returned data will no
47   // longer be stored.  Returns true iff *ss was modified.
48   bool RetrieveLogs(int64 step_id, StepStats* ss);
49 
50   // Return true if there is any outstanding request for logging on
51   // the RPC channels.
LoggingActive()52   bool LoggingActive() {
53     mutex_lock l(count_mu_);
54     return want_logging_count_ > 0;
55   }
56 
57   // Generates a NodeExecStats record with the given data, and saves for
58   // later retrieval by RetrieveLogs().
59   void RecordRecvTensor(int64 step_id, int64 start_usecs, int64 end_usecs,
60                         const string& tensor_name, const string& src_device,
61                         const string& dst_device, int64 bytes);
62 
63   // Generates a NodeExecStats record with the given data, and saves for
64   // later retrieval by RetrieveLogs().
65   void RecordDataTransfer(int64 step_id, int64 start_usecs, int64 end_usecs,
66                           const string& tensor_name, const string& src_device,
67                           const string& dst_device, int64 bytes,
68                           const string& details,
69                           const string& transfer_method_name);
70 
71  private:
72   mutex count_mu_;
73   int32 want_logging_count_ TF_GUARDED_BY(count_mu_) = 0;
74 
75   struct StepLog {
76     StepStats step_stats;
77     StepStatsCollector* collector;
78   };
79   typedef std::unordered_map<int64, StepLog> LogMap;
80   mutex mu_;
81   LogMap log_map_ TF_GUARDED_BY(mu_);
82 
83   // Records "ns" in log_map_ under the given device and step.
84   void Save(const string& device, int64 step_id, NodeExecStats* ns);
85 
86   void ClearLogsWithLock() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
87 };
88 }  // namespace tensorflow
89 #endif  // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_CACHE_LOGGER_H_
90