1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/status.h"
17
18 #include <stdio.h>
19
20 #include <deque>
21 #include <map>
22
23 #include "absl/base/call_once.h"
24 #include "absl/strings/escaping.h"
25 #include "tensorflow/core/platform/mutex.h"
26 #include "tensorflow/core/platform/stacktrace.h"
27 #include "tensorflow/core/platform/str_util.h"
28 #include "tensorflow/core/platform/strcat.h"
29 #include "tensorflow/core/platform/stringprintf.h"
30 #include "tensorflow/core/protobuf/error_codes.pb.h"
31
32 namespace tensorflow {
33
34 namespace {
35
36 // Log sink is used to collect recent warning and error log messages to be
37 // attached to the error status.
38 class StatusLogSink : public TFLogSink {
39 public:
GetInstance()40 static StatusLogSink* GetInstance() {
41 static StatusLogSink* sink = new StatusLogSink();
42 return sink;
43 }
44
enable()45 void enable() {
46 absl::call_once(flag_, [this] {
47 num_messages_ = 5; // default to 5 messages
48
49 if (const char* num_msgs_str =
50 getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
51 if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
52 LOG(WARNING) << "Failed to parse env variable "
53 "TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
54 << num_msgs_str << " as int. Using the default value "
55 << num_messages_ << ".";
56 }
57 }
58
59 if (num_messages_ > 0) {
60 TFAddLogSink(this);
61 }
62 });
63 }
64
GetMessages(std::vector<std::string> * logs)65 void GetMessages(std::vector<std::string>* logs) TF_LOCKS_EXCLUDED(mu_) {
66 mutex_lock lock(mu_);
67
68 for (auto& msg : messages_) {
69 logs->push_back(msg);
70 }
71 }
72
Send(const TFLogEntry & entry)73 void Send(const TFLogEntry& entry) override TF_LOCKS_EXCLUDED(mu_) {
74 if (entry.log_severity() < absl::LogSeverity::kWarning) return;
75
76 mutex_lock lock(mu_);
77 messages_.emplace_back(entry.ToString());
78 if (messages_.size() > static_cast<size_t>(num_messages_)) {
79 messages_.pop_front();
80 }
81 }
82
83 private:
84 mutex mu_;
85 // for allowing repeated/concurrent calls to enable()
86 absl::once_flag flag_;
87 int num_messages_ = 0;
88 std::deque<std::string> messages_ TF_GUARDED_BY(mu_);
89 };
90
91 } // namespace
92
Status(tensorflow::error::Code code,tensorflow::StringPiece msg,std::vector<StackFrame> && stack_trace)93 Status::Status(tensorflow::error::Code code, tensorflow::StringPiece msg,
94 std::vector<StackFrame>&& stack_trace) {
95 assert(code != tensorflow::error::OK);
96 state_ = std::unique_ptr<State>(new State);
97 state_->code = code;
98 state_->msg = string(msg);
99 state_->stack_trace = std::move(stack_trace);
100 VLOG(5) << "Generated non-OK status: \"" << *this << "\". "
101 << CurrentStackTrace();
102 }
103
Update(const Status & new_status)104 void Status::Update(const Status& new_status) {
105 if (ok()) {
106 *this = new_status;
107 }
108 }
109
SlowCopyFrom(const State * src)110 void Status::SlowCopyFrom(const State* src) {
111 if (src == nullptr) {
112 state_ = nullptr;
113 } else {
114 state_ = std::unique_ptr<State>(new State(*src));
115 }
116 }
117
empty_string()118 const string& Status::empty_string() {
119 static string* empty = new string;
120 return *empty;
121 }
122
empty_stack_trace()123 const std::vector<StackFrame>& Status::empty_stack_trace() {
124 static std::vector<StackFrame>* empty = new std::vector<StackFrame>();
125 return *empty;
126 }
127
error_name(error::Code code)128 string error_name(error::Code code) {
129 switch (code) {
130 case tensorflow::error::OK:
131 return "OK";
132 break;
133 case tensorflow::error::CANCELLED:
134 return "Cancelled";
135 break;
136 case tensorflow::error::UNKNOWN:
137 return "Unknown";
138 break;
139 case tensorflow::error::INVALID_ARGUMENT:
140 return "Invalid argument";
141 break;
142 case tensorflow::error::DEADLINE_EXCEEDED:
143 return "Deadline exceeded";
144 break;
145 case tensorflow::error::NOT_FOUND:
146 return "Not found";
147 break;
148 case tensorflow::error::ALREADY_EXISTS:
149 return "Already exists";
150 break;
151 case tensorflow::error::PERMISSION_DENIED:
152 return "Permission denied";
153 break;
154 case tensorflow::error::UNAUTHENTICATED:
155 return "Unauthenticated";
156 break;
157 case tensorflow::error::RESOURCE_EXHAUSTED:
158 return "Resource exhausted";
159 break;
160 case tensorflow::error::FAILED_PRECONDITION:
161 return "Failed precondition";
162 break;
163 case tensorflow::error::ABORTED:
164 return "Aborted";
165 break;
166 case tensorflow::error::OUT_OF_RANGE:
167 return "Out of range";
168 break;
169 case tensorflow::error::UNIMPLEMENTED:
170 return "Unimplemented";
171 break;
172 case tensorflow::error::INTERNAL:
173 return "Internal";
174 break;
175 case tensorflow::error::UNAVAILABLE:
176 return "Unavailable";
177 break;
178 case tensorflow::error::DATA_LOSS:
179 return "Data loss";
180 break;
181 default:
182 char tmp[30];
183 snprintf(tmp, sizeof(tmp), "Unknown code(%d)", static_cast<int>(code));
184 return tmp;
185 break;
186 }
187 }
188
ToString() const189 string Status::ToString() const {
190 if (state_ == nullptr) {
191 return "OK";
192 } else {
193 string result(error_name(code()));
194 result += ": ";
195 result += state_->msg;
196
197 for (const std::pair<const std::string, std::string>& element :
198 state_->payloads) {
199 absl::StrAppend(&result, " [", element.first, "='",
200 absl::CHexEscape(element.second), "']");
201 }
202
203 return result;
204 }
205 }
206
IgnoreError() const207 void Status::IgnoreError() const {
208 // no-op
209 }
210
SetPayload(tensorflow::StringPiece type_url,tensorflow::StringPiece payload)211 void Status::SetPayload(tensorflow::StringPiece type_url,
212 tensorflow::StringPiece payload) {
213 if (ok()) return;
214 state_->payloads[std::string(type_url)] = std::string(payload);
215 }
216
GetPayload(tensorflow::StringPiece type_url) const217 tensorflow::StringPiece Status::GetPayload(
218 tensorflow::StringPiece type_url) const {
219 if (ok()) return tensorflow::StringPiece();
220 auto payload_iter = state_->payloads.find(std::string(type_url));
221 if (payload_iter == state_->payloads.end()) return tensorflow::StringPiece();
222 return tensorflow::StringPiece(payload_iter->second);
223 }
224
ErasePayload(tensorflow::StringPiece type_url)225 bool Status::ErasePayload(tensorflow::StringPiece type_url) {
226 if (ok()) return false;
227 auto payload_iter = state_->payloads.find(std::string(type_url));
228 if (payload_iter == state_->payloads.end()) return false;
229 state_->payloads.erase(payload_iter);
230 return true;
231 }
232
GetAllPayloads() const233 const std::unordered_map<std::string, std::string> Status::GetAllPayloads()
234 const {
235 if (ok()) return {};
236 return state_->payloads;
237 }
238
ReplaceAllPayloads(const std::unordered_map<std::string,std::string> & payloads)239 void Status::ReplaceAllPayloads(
240 const std::unordered_map<std::string, std::string>& payloads) {
241 if (ok() || payloads.empty()) return;
242 if (state_ == nullptr) state_ = std::make_unique<State>();
243 state_->payloads = payloads;
244 }
245
operator <<(std::ostream & os,const Status & x)246 std::ostream& operator<<(std::ostream& os, const Status& x) {
247 os << x.ToString();
248 return os;
249 }
250
TfCheckOpHelperOutOfLine(const::tensorflow::Status & v,const char * msg)251 string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
252 const char* msg) {
253 string r("Non-OK-status: ");
254 r += msg;
255 r += " status: ";
256 r += v.ToString();
257 // Leaks string but this is only to be used in a fatal error message
258 return new string(r);
259 }
260
261 // kDerivedMarker is appended to the Status message string to indicate whether a
262 // Status object is the root cause of an error or if it has been triggered by
263 // cancelling/aborting a step.
264 static const char* kDerivedMarker = "[_Derived_]";
265
MakeDerived(const Status & s)266 Status StatusGroup::MakeDerived(const Status& s) {
267 if (IsDerived(s)) {
268 return s;
269 } else {
270 return Status(s.code(), strings::StrCat(kDerivedMarker, s.error_message()));
271 }
272 }
273
IsDerived(const Status & s)274 bool StatusGroup::IsDerived(const Status& s) {
275 return s.error_message().find(kDerivedMarker) != std::string::npos;
276 }
277
ConfigureLogHistory()278 void StatusGroup::ConfigureLogHistory() {
279 StatusLogSink::GetInstance()->enable();
280 }
281
Update(const Status & s)282 void StatusGroup::Update(const Status& s) {
283 if (s.ok()) {
284 ++num_ok_;
285 } else {
286 ok_ = false;
287 children_.push_back(s);
288 }
289 }
290
GetNonDerivedStatuses(const std::vector<Status> & status)291 static std::vector<Status> GetNonDerivedStatuses(
292 const std::vector<Status>& status) {
293 std::vector<Status> nonderived_statuses;
294 for (auto& s : status) {
295 if (!StatusGroup::IsDerived(s)) {
296 nonderived_statuses.push_back(s);
297 }
298 }
299 return nonderived_statuses;
300 }
301
302 static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
303 static constexpr int kMaxAttachedLogMessageSize = 512;
304
305 // Summarize all the status objects in the StatusGroup. This is used when
306 // individual Status objects in the StatusGroup are not already summarized.
as_summary_status() const307 Status StatusGroup::as_summary_status() const {
308 if (ok_) {
309 return Status::OK();
310 }
311
312 // Gather recent logs as a string
313 auto get_recent_logs = [this]() -> std::string {
314 if (!recent_logs_.empty()) {
315 std::vector<std::string> fmt;
316 fmt.push_back("\nRecent warning and error logs:");
317 for (auto& log : recent_logs_) {
318 // Add an indentation to make it look nicer.
319 fmt.push_back(" " + log.substr(0, kMaxAttachedLogMessageSize));
320 }
321 return absl::StrJoin(fmt, "\n");
322 } else {
323 return "";
324 }
325 };
326
327 std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
328
329 // If only one root status is found, do not add summary header and footer.
330 if (nonderived_statuses.size() == 1) {
331 return Status(nonderived_statuses[0].code(),
332 strings::StrCat(nonderived_statuses[0].error_message(),
333 get_recent_logs()));
334 }
335
336 if (!nonderived_statuses.empty()) {
337 std::vector<std::string> fmt;
338
339 fmt.push_back(strings::Printf("%zu root error(s) found.",
340 nonderived_statuses.size()));
341
342 int index = 0;
343 auto code = tensorflow::error::CANCELLED;
344 for (auto& s : nonderived_statuses) {
345 // NOTE: Avoid using CANCELLED as the code of summary status if the group
346 // contains other error code.
347 if (code == tensorflow::error::CANCELLED &&
348 s.code() != tensorflow::error::CANCELLED) {
349 code = s.code();
350 }
351 fmt.emplace_back(strings::StrCat(" (", index, ") ", s.ToString()));
352 ++index;
353 }
354
355 fmt.push_back(strings::Printf("%zu successful operations.", num_ok_));
356 fmt.push_back(
357 strings::Printf("%zu derived errors ignored.",
358 children_.size() - nonderived_statuses.size()));
359
360 std::string error_msg =
361 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
362
363 return Status(code, strings::StrCat(error_msg, get_recent_logs()));
364 } else {
365 // All statuses are derived. Pick the first available status to return.
366 return children_[0];
367 }
368 }
369
370 // Concatenate all the status objects in the StatusGroup. This is used when
371 // individual Status objects in the StatusGroup are already summarized Status.
as_concatenated_status() const372 Status StatusGroup::as_concatenated_status() const {
373 if (ok_) {
374 return Status::OK();
375 }
376
377 std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
378
379 // If only one root status is found, return it directly.
380 if (nonderived_statuses.size() == 1) {
381 return nonderived_statuses[0];
382 }
383
384 if (!nonderived_statuses.empty()) {
385 std::vector<string> fmt;
386 fmt.emplace_back("\n=====================");
387 for (auto& s : nonderived_statuses) {
388 fmt.emplace_back(s.ToString());
389 }
390 fmt.emplace_back("=====================\n");
391 return Status(
392 nonderived_statuses[0].code(),
393 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize));
394 } else {
395 // All statuses are derived. Pick the first available status to return.
396 // This should not happen in normal execution.
397 return children_[0];
398 }
399 }
400
AttachLogMessages()401 void StatusGroup::AttachLogMessages() {
402 recent_logs_.clear();
403 StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
404 }
405
406 } // namespace tensorflow
407