1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/python/profiler/internal/profiler_pywrap_impl.h"
17 
18 #include <string>
19 #include <variant>
20 
21 #include "absl/container/flat_hash_map.h"
22 #include "absl/memory/memory.h"
23 #include "absl/strings/match.h"
24 #include "absl/strings/numbers.h"
25 #include "absl/strings/str_split.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/strings/strip.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 #include "absl/types/variant.h"
31 #include "tensorflow/core/platform/env.h"
32 #include "tensorflow/core/platform/errors.h"
33 #include "tensorflow/core/platform/status.h"
34 #include "tensorflow/core/platform/types.h"
35 #include "tensorflow/core/profiler/convert/xplane_to_tools_data.h"
36 #include "tensorflow/core/profiler/convert/xplane_to_trace_events.h"
37 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
38 #include "tensorflow/core/profiler/rpc/client/capture_profile.h"
39 #include "tensorflow/core/profiler/rpc/client/save_profile.h"
40 #include "tensorflow/core/profiler/rpc/profiler_server.h"
41 
42 namespace tensorflow {
43 namespace profiler {
44 namespace pywrap {
45 
46 namespace {
47 
48 using ::tensorflow::RemoteProfilerSessionManagerOptions;
49 
50 // Profiler gives grace after profiling duration to terminate.
51 constexpr absl::Duration kMinSessionGraceTime = absl::Seconds(60);
52 
ValidateHostPortPair(absl::string_view host_port)53 tensorflow::Status ValidateHostPortPair(absl::string_view host_port) {
54   tensorflow::uint32 port;
55   std::vector<absl::string_view> parts = absl::StrSplit(host_port, ':');
56   // Must be host:port, port must be a number, host must not contain a '/',
57   // host also must not be empty.
58   if (parts.size() != 2 || !absl::SimpleAtoi(parts[1], &port) ||
59       absl::StrContains(parts[0], "/") || parts[0].empty()) {
60     return tensorflow::errors::InvalidArgument(
61         "Could not interpret \"", host_port, "\" as a host-port pair.");
62   }
63   return OkStatus();
64 }
65 
ValidateOptions(const RemoteProfilerSessionManagerOptions & options)66 tensorflow::Status ValidateOptions(
67     const RemoteProfilerSessionManagerOptions& options) {
68   if (options.service_addresses().empty()) {
69     return tensorflow::errors::InvalidArgument("No service address provided.");
70   }
71 
72   if (options.profiler_options().duration_ms() == 0) {
73     return tensorflow::errors::InvalidArgument(
74         "duration_ms must be greater than zero.");
75   }
76 
77   for (absl::string_view host_port : options.service_addresses()) {
78     TF_RETURN_IF_ERROR(ValidateHostPortPair(host_port));
79   }
80 
81   if (options.max_session_duration_ms() <
82       options.profiler_options().duration_ms()) {
83     return tensorflow::errors::InvalidArgument(
84         "The maximum profiling session duration must be greater than or equal "
85         "to the local profiler duration.");
86   }
87 
88   return OkStatus();
89 }
90 
91 // Receives a comma delimited list of service_addresses and adds them to
92 // RemoteProfilerSessionManagerOptions::service_addresses.
AddServiceAddresses(absl::string_view service_addresses,RemoteProfilerSessionManagerOptions * options)93 void AddServiceAddresses(absl::string_view service_addresses,
94                          RemoteProfilerSessionManagerOptions* options) {
95   for (absl::string_view server : absl::StrSplit(service_addresses, ',')) {
96     options->add_service_addresses(server.data(), server.size());
97   }
98 }
99 
100 // Sets gRPC deadline to a grace period based on the profiling duration.
UpdateMaxSessionDuration(RemoteProfilerSessionManagerOptions & options)101 void UpdateMaxSessionDuration(RemoteProfilerSessionManagerOptions& options) {
102   auto local_profiler_duration = options.profiler_options().duration_ms();
103   auto session_creation_ts = options.session_creation_timestamp_ns();
104   auto requested_start_ts = options.profiler_options().start_timestamp_ns();
105   // User only needs to set maximal session duration if the profiling duration
106   // is bounded.
107   DCHECK_GT(local_profiler_duration, 0);
108   VLOG(3) << "duration_ms was given as " << local_profiler_duration;
109   // Max session duration is the profiling session with grace time.
110   auto profile_duration = std::max(
111       kMinSessionGraceTime, absl::Milliseconds(local_profiler_duration) * 2);
112   absl::Duration delay_duration;
113   // When requested start timestamp is 0, profiling starts immediately.
114   if (requested_start_ts > 0) {
115     delay_duration =
116         absl::Nanoseconds(requested_start_ts - session_creation_ts);
117   }
118 
119   auto max_session_duration = profile_duration + delay_duration;
120   options.set_max_session_duration_ms(
121       absl::ToInt64Milliseconds(max_session_duration));
122   VLOG(1) << "max_session_duration set to " << max_session_duration;
123 }
124 
125 // Takes profiler options in absl::flat_hash_map and returns a
126 // RemoteProfilerSessionManagerOptions.
GetOptionsLocked(absl::string_view logdir,const absl::flat_hash_map<std::string,std::variant<int,std::string>> & opts)127 RemoteProfilerSessionManagerOptions GetOptionsLocked(
128     absl::string_view logdir,
129     const absl::flat_hash_map<std::string, std::variant<int, std::string>>&
130         opts) {
131   RemoteProfilerSessionManagerOptions options;
132   *options.mutable_profiler_options() =
133       tensorflow::ProfilerSession::DefaultOptions();
134   // Store a timestamp of when this session was created. This will be the basis
135   // of gRPC deadline afterwards.
136   auto now = absl::Now();
137   options.set_session_creation_timestamp_ns(absl::ToUnixNanos(now));
138   VLOG(2) << "set_session_creation_timestamp_ns set to "
139           << options.session_creation_timestamp_ns() << " [" << now << "]";
140 
141   // Set the path of where to store XSpaces.
142   options.mutable_profiler_options()->set_repository_path(logdir.data(),
143                                                           logdir.size());
144   VLOG(2) << "repository_path set to "
145           << options.profiler_options().repository_path();
146 
147   for (const auto& kw : opts) {
148     absl::string_view key = kw.first;
149     if (key == "host_tracer_level") {
150       int value = std::get<int>(kw.second);
151       options.mutable_profiler_options()->set_host_tracer_level(value);
152       VLOG(1) << "host_tracer_level set to " << value;
153     } else if (key == "device_tracer_level") {
154       int value = std::get<int>(kw.second);
155       options.mutable_profiler_options()->set_device_tracer_level(value);
156       VLOG(1) << "device_tracer_level set to " << value;
157     } else if (key == "python_tracer_level") {
158       int value = std::get<int>(kw.second);
159       options.mutable_profiler_options()->set_python_tracer_level(value);
160       VLOG(1) << "python_tracer_level set to " << value;
161     } else if (key == "delay_ms") {
162       int value = std::get<int>(kw.second);
163       options.set_delay_ms(value);
164       VLOG(1) << "delay_ms was set to " << value;
165     } else {
166       LOG(WARNING) << "Unrecognised key: " << key;
167     }
168   }
169 
170   return options;
171 }
172 
GetOptionsLocked(absl::string_view service_addresses,absl::string_view logdir,absl::string_view worker_list,bool include_dataset_ops,int32_t duration_ms,const absl::flat_hash_map<std::string,std::variant<int,std::string>> & opts,bool * is_cloud_tpu_session)173 RemoteProfilerSessionManagerOptions GetOptionsLocked(
174     absl::string_view service_addresses, absl::string_view logdir,
175     absl::string_view worker_list, bool include_dataset_ops,
176     int32_t duration_ms,
177     const absl::flat_hash_map<std::string, std::variant<int, std::string>>&
178         opts,
179     bool* is_cloud_tpu_session) {
180   auto options = GetOptionsLocked(logdir, opts);
181 
182   // Remote profiling does not support any use cases where the following options
183   // are set by `opts`. e.g. `opts['service_addrs']` will not happen.
184   DCHECK(options.service_addresses().empty());
185   // In remote profiling, duration is always passed by value explicitly and not
186   // set in opts.
187   DCHECK_EQ(options.profiler_options().duration_ms(), 0);
188   // Because duration_ms is not set from opts, it follows that
189   // max_session_duration_ms must be unset as well.
190   DCHECK_EQ(options.max_session_duration_ms(), 0);
191 
192   // Worker_list is only used for TensorBoard TPU capture cases. For a TPU
193   // cluster, service_address is the Master, which can already be found in the
194   // list of workers. These sessions will be used with the ProfileAnalysis
195   // service.
196   *is_cloud_tpu_session = !worker_list.empty();
197   AddServiceAddresses(*is_cloud_tpu_session ? worker_list : service_addresses,
198                       &options);
199 
200   // Set local profiler duration and profiler session durations.
201   options.mutable_profiler_options()->set_include_dataset_ops(
202       include_dataset_ops);
203   options.mutable_profiler_options()->set_duration_ms(duration_ms);
204   UpdateMaxSessionDuration(options);
205 
206   for (int idx = 0; idx < options.service_addresses_size(); ++idx) {
207     VLOG(1) << "service_addr " << idx << " set to "
208             << options.service_addresses(idx);
209   }
210   VLOG(1) << "include_dataset_ops set to " << include_dataset_ops;
211   VLOG(1) << "duration_ms set to " << duration_ms;
212 
213   return options;
214 }
215 
216 }  // namespace
217 
Trace(const char * service_addr,const char * logdir,const char * worker_list,bool include_dataset_ops,int duration_ms,int num_tracing_attempts,const absl::flat_hash_map<std::string,std::variant<int,std::string>> & options)218 tensorflow::Status Trace(
219     const char* service_addr, const char* logdir, const char* worker_list,
220     bool include_dataset_ops, int duration_ms, int num_tracing_attempts,
221     const absl::flat_hash_map<std::string, std::variant<int, std::string>>&
222         options) {
223   // TPU capture is true if the user sets worker_list.
224   bool is_cloud_tpu_session = false;
225   RemoteProfilerSessionManagerOptions opts =
226       GetOptionsLocked(service_addr, logdir, worker_list, include_dataset_ops,
227                        duration_ms, options, &is_cloud_tpu_session);
228   TF_RETURN_IF_ERROR(ValidateOptions(opts));
229 
230   {
231     TF_RETURN_IF_ERROR(tensorflow::profiler::Trace(logdir, num_tracing_attempts,
232                                                    opts, is_cloud_tpu_session));
233   }
234   return OkStatus();
235 }
236 
Monitor(const char * service_addr,int duration_ms,int monitoring_level,bool display_timestamp,tensorflow::string * result)237 tensorflow::Status Monitor(const char* service_addr, int duration_ms,
238                            int monitoring_level, bool display_timestamp,
239                            tensorflow::string* result) {
240   TF_RETURN_IF_ERROR(ValidateHostPortPair(service_addr));
241   {
242     TF_RETURN_IF_ERROR(tensorflow::profiler::Monitor(
243         service_addr, duration_ms, monitoring_level, display_timestamp,
244         result));
245   }
246   return OkStatus();
247 }
248 
Start(const char * logdir,const absl::flat_hash_map<std::string,std::variant<int,std::string>> & options)249 tensorflow::Status ProfilerSessionWrapper::Start(
250     const char* logdir,
251     const absl::flat_hash_map<std::string, std::variant<int, std::string>>&
252         options) {
253   auto opts = GetOptionsLocked(logdir, options);
254   session_ = tensorflow::ProfilerSession::Create(opts.profiler_options());
255   logdir_ = logdir;
256   return session_->Status();
257 }
258 
Stop(tensorflow::string * result)259 tensorflow::Status ProfilerSessionWrapper::Stop(tensorflow::string* result) {
260   if (session_ != nullptr) {
261     tensorflow::profiler::XSpace xspace;
262     tensorflow::Status status = session_->CollectData(&xspace);
263     session_.reset();
264     tensorflow::profiler::ConvertXSpaceToTraceEventsString(xspace, result);
265     TF_RETURN_IF_ERROR(status);
266   }
267   return OkStatus();
268 }
269 
ExportToTensorBoard()270 tensorflow::Status ProfilerSessionWrapper::ExportToTensorBoard() {
271   if (!session_ || logdir_.empty()) {
272     return OkStatus();
273   }
274   tensorflow::profiler::XSpace xspace;
275   tensorflow::Status status;
276   status = session_->CollectData(&xspace);
277   session_.reset();
278   status = tensorflow::profiler::ExportToTensorBoard(xspace, logdir_);
279   return status;
280 }
281 
282 }  // namespace pywrap
283 }  // namespace profiler
284 }  // namespace tensorflow
285