1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/status.h"
17
18 #include <stdio.h>
19
20 #include <deque>
21 #include <functional>
22 #include <memory>
23 #include <string>
24
25 #include "absl/base/call_once.h"
26 #include "absl/strings/cord.h"
27 #include "absl/strings/escaping.h"
28 #include "absl/strings/match.h"
29 #include "absl/types/optional.h"
30 #include "tensorflow/core/platform/mutex.h"
31 #include "tensorflow/core/platform/stacktrace.h"
32 #include "tensorflow/core/platform/str_util.h"
33 #include "tensorflow/core/platform/strcat.h"
34 #include "tensorflow/core/platform/stringprintf.h"
35 #include "tensorflow/core/protobuf/error_codes.pb.h"
36 #include "tensorflow/core/protobuf/status.pb.h"
37
38 namespace tensorflow {
39
40 namespace {
41
42 // Log sink is used to collect recent warning and error log messages to be
43 // attached to the error status.
44 class StatusLogSink : public TFLogSink {
45 public:
GetInstance()46 static StatusLogSink* GetInstance() {
47 static StatusLogSink* sink = new StatusLogSink();
48 return sink;
49 }
50
enable()51 void enable() {
52 absl::call_once(flag_, [this] {
53 num_messages_ = 5; // default to 5 messages
54
55 if (const char* num_msgs_str =
56 getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
57 if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
58 LOG(WARNING) << "Failed to parse env variable "
59 "TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
60 << num_msgs_str << " as int. Using the default value "
61 << num_messages_ << ".";
62 }
63 }
64
65 if (num_messages_ > 0) {
66 TFAddLogSink(this);
67 }
68 });
69 }
70
GetMessages(std::vector<std::string> * logs)71 void GetMessages(std::vector<std::string>* logs) TF_LOCKS_EXCLUDED(mu_) {
72 mutex_lock lock(mu_);
73
74 for (auto& msg : messages_) {
75 logs->push_back(msg);
76 }
77 }
78
Send(const TFLogEntry & entry)79 void Send(const TFLogEntry& entry) override TF_LOCKS_EXCLUDED(mu_) {
80 if (entry.log_severity() < absl::LogSeverity::kWarning) return;
81
82 mutex_lock lock(mu_);
83 messages_.emplace_back(entry.ToString());
84 if (messages_.size() > static_cast<size_t>(num_messages_)) {
85 messages_.pop_front();
86 }
87 }
88
89 private:
90 mutex mu_;
91 // for allowing repeated/concurrent calls to enable()
92 absl::once_flag flag_;
93 int num_messages_ = 0;
94 std::deque<std::string> messages_ TF_GUARDED_BY(mu_);
95 };
96
97 } // namespace
98
99 // TODO(b/197552541) Move this namespace to errors.h after absl migration.
100 namespace errors {
101 static constexpr const char kStackTraceProtoUrl[] =
102 "type.googleapis.com/tensorflow.StackTracePayload";
103
SetStackTrace(::tensorflow::Status & status,std::vector<StackFrame> stack_trace)104 void SetStackTrace(::tensorflow::Status& status,
105 std::vector<StackFrame> stack_trace) {
106 status.SetStackTrace(stack_trace);
107 }
108
GetStackTrace(const::tensorflow::Status & status)109 std::vector<StackFrame> GetStackTrace(const ::tensorflow::Status& status) {
110 return status.GetStackTrace();
111 }
112
113 } // namespace errors
114
SetStackTrace(std::vector<StackFrame> stack_trace)115 void Status::SetStackTrace(std::vector<StackFrame> stack_trace) {
116 stack_trace_ = stack_trace;
117 }
118
GetStackTrace() const119 std::vector<StackFrame> Status::GetStackTrace() const { return stack_trace_; }
120
GetSourceLocations() const121 absl::Span<const SourceLocation> Status::GetSourceLocations() const {
122 return state_ != nullptr ? state_->source_locations
123 : absl::Span<const SourceLocation>();
124 }
125
MaybeAddSourceLocation(SourceLocation loc)126 void Status::MaybeAddSourceLocation(SourceLocation loc) {
127 if (state_ == nullptr) {
128 return;
129 }
130 if (loc.line <= 0) {
131 return;
132 }
133 if (loc.file_name == nullptr) {
134 return;
135 }
136 if (loc.file_name[0] == '\0') {
137 return;
138 }
139 state_->source_locations.push_back(loc);
140 }
141
Status(tensorflow::error::Code code,absl::string_view msg,SourceLocation loc)142 Status::Status(tensorflow::error::Code code, absl::string_view msg,
143 SourceLocation loc) {
144 assert(code != tensorflow::error::OK);
145 state_ = std::make_unique<State>();
146 state_->code = code;
147 state_->msg = std::string(msg);
148 MaybeAddSourceLocation(loc);
149 VLOG(5) << "Generated non-OK status: \"" << *this << "\". "
150 << CurrentStackTrace();
151 }
152
Update(const Status & new_status)153 void Status::Update(const Status& new_status) {
154 if (ok()) {
155 *this = new_status;
156 }
157 }
158
SlowCopyFrom(const State * src)159 void Status::SlowCopyFrom(const State* src) {
160 if (src == nullptr) {
161 state_ = nullptr;
162 } else {
163 state_ = std::make_unique<State>(*src);
164 }
165 }
166
empty_string()167 const std::string& Status::empty_string() {
168 static string* empty = new string;
169 return *empty;
170 }
171
error_name(error::Code code)172 std::string error_name(error::Code code) {
173 switch (code) {
174 case tensorflow::error::OK:
175 return "OK";
176 break;
177 case tensorflow::error::CANCELLED:
178 return "CANCELLED";
179 break;
180 case tensorflow::error::UNKNOWN:
181 return "UNKNOWN";
182 break;
183 case tensorflow::error::INVALID_ARGUMENT:
184 return "INVALID_ARGUMENT";
185 break;
186 case tensorflow::error::DEADLINE_EXCEEDED:
187 return "DEADLINE_EXCEEDED";
188 break;
189 case tensorflow::error::NOT_FOUND:
190 return "NOT_FOUND";
191 break;
192 case tensorflow::error::ALREADY_EXISTS:
193 return "ALREADY_EXISTS";
194 break;
195 case tensorflow::error::PERMISSION_DENIED:
196 return "PERMISSION_DENIED";
197 break;
198 case tensorflow::error::UNAUTHENTICATED:
199 return "UNAUTHENTICATED";
200 break;
201 case tensorflow::error::RESOURCE_EXHAUSTED:
202 return "RESOURCE_EXHAUSTED";
203 break;
204 case tensorflow::error::FAILED_PRECONDITION:
205 return "FAILED_PRECONDITION";
206 break;
207 case tensorflow::error::ABORTED:
208 return "ABORTED";
209 break;
210 case tensorflow::error::OUT_OF_RANGE:
211 return "OUT_OF_RANGE";
212 break;
213 case tensorflow::error::UNIMPLEMENTED:
214 return "UNIMPLEMENTED";
215 break;
216 case tensorflow::error::INTERNAL:
217 return "INTERNAL";
218 break;
219 case tensorflow::error::UNAVAILABLE:
220 return "UNAVAILABLE";
221 break;
222 case tensorflow::error::DATA_LOSS:
223 return "DATA_LOSS";
224 break;
225 default:
226 char tmp[30];
227 snprintf(tmp, sizeof(tmp), "UNKNOWN_CODE(%d)", static_cast<int>(code));
228 return tmp;
229 break;
230 }
231 }
232
ToString() const233 std::string Status::ToString() const {
234 if (state_ == nullptr) {
235 return "OK";
236 } else {
237 std::string result(error_name(state_->code));
238 result += ": ";
239 result += state_->msg;
240
241 for (const std::pair<const std::string, std::string>& element :
242 state_->payloads) {
243 absl::StrAppend(&result, " [", element.first, "='",
244 absl::CHexEscape(element.second), "']");
245 }
246
247 return result;
248 }
249 }
250
IgnoreError() const251 void Status::IgnoreError() const {
252 // no-op
253 }
254
SetPayload(absl::string_view type_url,absl::string_view payload)255 void Status::SetPayload(absl::string_view type_url, absl::string_view payload) {
256 if (ok()) return;
257 state_->payloads[std::string(type_url)] = std::string(payload);
258 }
259
GetPayload(absl::string_view type_url) const260 absl::optional<absl::Cord> Status::GetPayload(
261 absl::string_view type_url) const {
262 if (ok()) return absl::nullopt;
263 auto payload_iter = state_->payloads.find(std::string(type_url));
264 if (payload_iter == state_->payloads.end()) return absl::nullopt;
265 return absl::Cord(payload_iter->second);
266 }
267
ErasePayload(absl::string_view type_url)268 bool Status::ErasePayload(absl::string_view type_url) {
269 if (ok()) return false;
270 auto payload_iter = state_->payloads.find(std::string(type_url));
271 if (payload_iter == state_->payloads.end()) return false;
272 state_->payloads.erase(payload_iter);
273 return true;
274 }
275
ForEachPayload(const std::function<void (absl::string_view,absl::string_view)> & visitor) const276 void Status::ForEachPayload(
277 const std::function<void(absl::string_view, absl::string_view)>& visitor)
278 const {
279 if (ok()) return;
280 for (const auto& payload : state_->payloads) {
281 visitor(payload.first, payload.second);
282 }
283 }
284
operator <<(std::ostream & os,const Status & x)285 std::ostream& operator<<(std::ostream& os, const Status& x) {
286 os << x.ToString();
287 return os;
288 }
289
OkStatus()290 Status OkStatus() { return Status(); }
291
FromAbslStatus(const absl::Status & s)292 Status FromAbslStatus(const absl::Status& s) {
293 if (s.ok()) {
294 return Status();
295 }
296 Status converted(static_cast<tensorflow::error::Code>(s.code()), s.message());
297 s.ForEachPayload(
298 [&converted](absl::string_view key, const absl::Cord& value) {
299 converted.SetPayload(key, std::string(value));
300 });
301
302 return converted;
303 }
304
ToAbslStatus(const::tensorflow::Status & s)305 absl::Status ToAbslStatus(const ::tensorflow::Status& s) {
306 if (s.ok()) {
307 return absl::OkStatus();
308 }
309
310 absl::Status converted(static_cast<absl::StatusCode>(s.code()),
311 s.error_message());
312 s.ForEachPayload(
313 [&converted](tensorflow::StringPiece key, tensorflow::StringPiece value) {
314 converted.SetPayload(key, absl::Cord(value));
315 });
316
317 return converted;
318 }
319
TfCheckOpHelperOutOfLine(const::tensorflow::Status & v,const char * msg)320 std::string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
321 const char* msg) {
322 std::string r("Non-OK-status: ");
323 r += msg;
324 r += " status: ";
325 r += v.ToString();
326 // Leaks string but this is only to be used in a fatal error message
327 return new std::string(r);
328 }
329
StatusGroup()330 StatusGroup::StatusGroup() {}
331
StatusGroup(std::initializer_list<Status> statuses)332 StatusGroup::StatusGroup(std::initializer_list<Status> statuses) {
333 for (const Status& s : statuses) {
334 Update(s);
335 }
336 }
337
338 static constexpr const char kDerivedStatusProtoUrl[] =
339 "type.googleapis.com/tensorflow.DerivedStatus";
340
MakeDerived(const Status & s)341 Status StatusGroup::MakeDerived(const Status& s) {
342 if (IsDerived(s)) {
343 return s;
344 } else {
345 Status derived(s);
346 // TODO(b/200167936): Serialize an instance of DerivedStatus proto instead
347 // of using the string directly. The string is never used so it is not
348 // causing any issues at the moment.
349 derived.SetPayload(kDerivedStatusProtoUrl, "");
350 return derived;
351 }
352 }
353
IsDerived(const Status & s)354 bool StatusGroup::IsDerived(const Status& s) {
355 return s.GetPayload(kDerivedStatusProtoUrl).has_value();
356 }
357
ConfigureLogHistory()358 void StatusGroup::ConfigureLogHistory() {
359 StatusLogSink::GetInstance()->enable();
360 }
361
Update(const Status & s)362 void StatusGroup::Update(const Status& s) {
363 if (s.ok()) {
364 ++num_ok_;
365 } else {
366 ok_ = false;
367 if (IsDerived(s)) {
368 derived_.insert(s);
369 } else {
370 non_derived_.insert(s);
371 }
372 }
373 }
374
375 static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
376 static constexpr int kMaxAttachedLogMessageSize = 512;
377
GetPayloads() const378 std::unordered_map<std::string, std::string> StatusGroup::GetPayloads() const {
379 std::unordered_map<std::string, std::string> payloads;
380 auto capture_payload = [&payloads](absl::string_view key,
381 absl::string_view value) {
382 payloads[std::string(key)] = std::string(value);
383 };
384
385 for (const auto& status : derived_) {
386 status.ForEachPayload(capture_payload);
387 }
388
389 // If a key appears in both derived_ and non_derived_ payloads, then the
390 // non_derived_ payload receives priority.
391 for (const auto& status : non_derived_) {
392 status.ForEachPayload(capture_payload);
393 }
394
395 payloads.erase(kDerivedStatusProtoUrl);
396
397 return payloads;
398 }
399
MakeStatus(tensorflow::error::Code code,absl::string_view message,const std::unordered_map<std::string,std::string> & payloads)400 Status MakeStatus(
401 tensorflow::error::Code code, absl::string_view message,
402 const std::unordered_map<std::string, std::string>& payloads) {
403 Status status(code, message);
404 for (const auto& payload : payloads) {
405 status.SetPayload(payload.first, payload.second);
406 }
407 return status;
408 }
409
MakeString(const Status & status)410 std::string MakeString(const Status& status) {
411 return absl::StrCat(error_name(status.code()), ": ", status.error_message());
412 }
413
414 // Summarize all the status objects in the StatusGroup. This is used when
415 // individual Status objects in the StatusGroup are not already summarized.
as_summary_status() const416 Status StatusGroup::as_summary_status() const {
417 if (ok_) {
418 return OkStatus();
419 }
420
421 // Gather recent logs as a string
422 auto get_recent_logs = [this]() -> std::string {
423 if (!recent_logs_.empty()) {
424 std::vector<std::string> fmt;
425 fmt.push_back("\nRecent warning and error logs:");
426 for (auto& log : recent_logs_) {
427 // Add an indentation to make it look nicer.
428 fmt.push_back(" " + log.substr(0, kMaxAttachedLogMessageSize));
429 }
430 return absl::StrJoin(fmt, "\n");
431 } else {
432 return "";
433 }
434 };
435
436 // If only one root status is found, do not add summary header and footer.
437 if (non_derived_.size() == 1) {
438 return MakeStatus(non_derived_.begin()->code(),
439 strings::StrCat(non_derived_.begin()->error_message(),
440 get_recent_logs()),
441 GetPayloads());
442 }
443
444 if (!non_derived_.empty()) {
445 std::vector<std::string> fmt;
446
447 fmt.push_back(
448 strings::Printf("%zu root error(s) found.", non_derived_.size()));
449
450 int index = 0;
451 auto code = tensorflow::error::CANCELLED;
452 for (const auto& s : non_derived_) {
453 // NOTE: Avoid using CANCELLED as the code of summary status if the group
454 // contains other error code.
455 if (code == tensorflow::error::CANCELLED &&
456 s.code() != tensorflow::error::CANCELLED) {
457 code = s.code();
458 }
459 fmt.emplace_back(strings::StrCat(" (", index, ") ", MakeString(s)));
460 ++index;
461 }
462
463 fmt.push_back(strings::Printf("%zu successful operations.", num_ok_));
464 fmt.push_back(
465 strings::Printf("%zu derived errors ignored.", derived_.size()));
466
467 std::string error_msg =
468 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
469
470 return MakeStatus(code, strings::StrCat(error_msg, get_recent_logs()),
471 GetPayloads());
472 } else {
473 // All statuses are derived. Pick the first available status to return.
474 return MakeDerived(MakeStatus(derived_.begin()->code(),
475 derived_.begin()->error_message(),
476 GetPayloads()));
477 }
478 }
479
480 // Concatenate all the status objects in the StatusGroup. This is used when
481 // individual Status objects in the StatusGroup are already summarized Status.
as_concatenated_status() const482 Status StatusGroup::as_concatenated_status() const {
483 if (ok_) {
484 return OkStatus();
485 }
486
487 // If only one root status is found, return it directly.
488 if (non_derived_.size() == 1) {
489 return MakeStatus(non_derived_.begin()->code(),
490 non_derived_.begin()->error_message(), GetPayloads());
491 }
492
493 if (!non_derived_.empty()) {
494 std::vector<string> fmt;
495 fmt.emplace_back("\n=====================");
496 for (const auto& s : non_derived_) {
497 fmt.emplace_back(MakeString(s));
498 }
499 fmt.emplace_back("=====================\n");
500 return MakeStatus(
501 non_derived_.begin()->code(),
502 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize),
503 GetPayloads());
504 } else {
505 // All statuses are derived. Pick the first available status to return.
506 // This should not happen in normal execution.
507 return MakeDerived(MakeStatus(derived_.begin()->code(),
508 derived_.begin()->error_message(),
509 GetPayloads()));
510 }
511 }
512
AttachLogMessages()513 void StatusGroup::AttachLogMessages() {
514 recent_logs_.clear();
515 StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
516 }
517
518 } // namespace tensorflow
519