1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/status.h"
17
18 #include <stdio.h>
19
20 #include <deque>
21 #include <map>
22
23 #include "absl/base/call_once.h"
24 #include "tensorflow/core/platform/mutex.h"
25 #include "tensorflow/core/platform/stacktrace.h"
26 #include "tensorflow/core/platform/str_util.h"
27 #include "tensorflow/core/platform/strcat.h"
28 #include "tensorflow/core/platform/stringprintf.h"
29 #include "tensorflow/core/protobuf/error_codes.pb.h"
30
31 namespace tensorflow {
32
33 namespace {
34
35 // Log sink is used to collect recent warning and error log messages to be
36 // attached to the error status.
37 class StatusLogSink : public TFLogSink {
38 public:
GetInstance()39 static StatusLogSink* GetInstance() {
40 static StatusLogSink* sink = new StatusLogSink();
41 return sink;
42 }
43
enable()44 void enable() {
45 absl::call_once(flag_, [this] {
46 num_messages_ = 5; // default to 5 messages
47
48 if (const char* num_msgs_str =
49 getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
50 if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
51 LOG(WARNING) << "Failed to parse env variable "
52 "TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
53 << num_msgs_str << " as int. Using the default value "
54 << num_messages_ << ".";
55 }
56 }
57
58 if (num_messages_ > 0) {
59 TFAddLogSink(this);
60 }
61 });
62 }
63
GetMessages(std::vector<std::string> * logs)64 void GetMessages(std::vector<std::string>* logs) LOCKS_EXCLUDED(mu_) {
65 mutex_lock lock(mu_);
66
67 for (auto& msg : messages_) {
68 logs->push_back(msg);
69 }
70 }
71
Send(const TFLogEntry & entry)72 void Send(const TFLogEntry& entry) override LOCKS_EXCLUDED(mu_) {
73 if (entry.log_severity() < absl::LogSeverity::kWarning) return;
74
75 mutex_lock lock(mu_);
76 messages_.emplace_back(entry.ToString());
77 if (messages_.size() > num_messages_) messages_.pop_front();
78 }
79
80 private:
81 mutex mu_;
82 // for allowing repeated/concurrent calls to enable()
83 absl::once_flag flag_;
84 int num_messages_ = 0;
85 std::deque<std::string> messages_ GUARDED_BY(mu_);
86 };
87
88 } // namespace
89
Status(tensorflow::error::Code code,StringPiece msg)90 Status::Status(tensorflow::error::Code code, StringPiece msg) {
91 assert(code != tensorflow::error::OK);
92 state_ = std::unique_ptr<State>(new State);
93 state_->code = code;
94 state_->msg = string(msg);
95 VLOG(5) << "Generated non-OK status: \"" << *this << "\". "
96 << CurrentStackTrace();
97 }
98
Update(const Status & new_status)99 void Status::Update(const Status& new_status) {
100 if (ok()) {
101 *this = new_status;
102 }
103 }
104
SlowCopyFrom(const State * src)105 void Status::SlowCopyFrom(const State* src) {
106 if (src == nullptr) {
107 state_ = nullptr;
108 } else {
109 state_ = std::unique_ptr<State>(new State(*src));
110 }
111 }
112
empty_string()113 const string& Status::empty_string() {
114 static string* empty = new string;
115 return *empty;
116 }
117
error_name(error::Code code)118 string error_name(error::Code code) {
119 switch (code) {
120 case tensorflow::error::OK:
121 return "OK";
122 break;
123 case tensorflow::error::CANCELLED:
124 return "Cancelled";
125 break;
126 case tensorflow::error::UNKNOWN:
127 return "Unknown";
128 break;
129 case tensorflow::error::INVALID_ARGUMENT:
130 return "Invalid argument";
131 break;
132 case tensorflow::error::DEADLINE_EXCEEDED:
133 return "Deadline exceeded";
134 break;
135 case tensorflow::error::NOT_FOUND:
136 return "Not found";
137 break;
138 case tensorflow::error::ALREADY_EXISTS:
139 return "Already exists";
140 break;
141 case tensorflow::error::PERMISSION_DENIED:
142 return "Permission denied";
143 break;
144 case tensorflow::error::UNAUTHENTICATED:
145 return "Unauthenticated";
146 break;
147 case tensorflow::error::RESOURCE_EXHAUSTED:
148 return "Resource exhausted";
149 break;
150 case tensorflow::error::FAILED_PRECONDITION:
151 return "Failed precondition";
152 break;
153 case tensorflow::error::ABORTED:
154 return "Aborted";
155 break;
156 case tensorflow::error::OUT_OF_RANGE:
157 return "Out of range";
158 break;
159 case tensorflow::error::UNIMPLEMENTED:
160 return "Unimplemented";
161 break;
162 case tensorflow::error::INTERNAL:
163 return "Internal";
164 break;
165 case tensorflow::error::UNAVAILABLE:
166 return "Unavailable";
167 break;
168 case tensorflow::error::DATA_LOSS:
169 return "Data loss";
170 break;
171 default:
172 char tmp[30];
173 snprintf(tmp, sizeof(tmp), "Unknown code(%d)", static_cast<int>(code));
174 return tmp;
175 break;
176 }
177 }
178
ToString() const179 string Status::ToString() const {
180 if (state_ == nullptr) {
181 return "OK";
182 } else {
183 string result(error_name(code()));
184 result += ": ";
185 result += state_->msg;
186 return result;
187 }
188 }
189
IgnoreError() const190 void Status::IgnoreError() const {
191 // no-op
192 }
193
operator <<(std::ostream & os,const Status & x)194 std::ostream& operator<<(std::ostream& os, const Status& x) {
195 os << x.ToString();
196 return os;
197 }
198
TfCheckOpHelperOutOfLine(const::tensorflow::Status & v,const char * msg)199 string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
200 const char* msg) {
201 string r("Non-OK-status: ");
202 r += msg;
203 r += " status: ";
204 r += v.ToString();
205 // Leaks string but this is only to be used in a fatal error message
206 return new string(r);
207 }
208
209 // kDerivedMarker is appended to the Status message string to indicate whether a
210 // Status object is the root cause of an error or if it has been triggered by
211 // cancelling/aborting a step.
212 static const char* kDerivedMarker = "[_Derived_]";
213
MakeDerived(const Status & s)214 Status StatusGroup::MakeDerived(const Status& s) {
215 if (IsDerived(s)) {
216 return s;
217 } else {
218 return Status(s.code(), strings::StrCat(kDerivedMarker, s.error_message()));
219 }
220 }
221
IsDerived(const Status & s)222 bool StatusGroup::IsDerived(const Status& s) {
223 return s.error_message().find(kDerivedMarker) != std::string::npos;
224 }
225
ConfigureLogHistory()226 void StatusGroup::ConfigureLogHistory() {
227 StatusLogSink::GetInstance()->enable();
228 }
229
Update(const Status & s)230 void StatusGroup::Update(const Status& s) {
231 if (s.ok()) {
232 ++num_ok_;
233 } else {
234 ok_ = false;
235 children_.push_back(s);
236 }
237 }
238
GetNonDerivedStatuses(const std::vector<Status> & status)239 static std::vector<Status> GetNonDerivedStatuses(
240 const std::vector<Status>& status) {
241 std::vector<Status> nonderived_statuses;
242 for (auto& s : status) {
243 if (!StatusGroup::IsDerived(s)) {
244 nonderived_statuses.push_back(s);
245 }
246 }
247 return nonderived_statuses;
248 }
249
250 static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
251 static constexpr int kMaxAttachedLogMessageSize = 512;
252
253 // Summarize all the status objects in the StatusGroup. This is used when
254 // individual Status objects in the StatusGroup are not already summarized.
as_summary_status() const255 Status StatusGroup::as_summary_status() const {
256 if (ok_) {
257 return Status::OK();
258 }
259
260 // Gather recent logs as a string
261 auto get_recent_logs = [this]() -> std::string {
262 if (!recent_logs_.empty()) {
263 std::vector<std::string> fmt;
264 fmt.push_back("\nRecent warning and error logs:");
265 for (auto& log : recent_logs_) {
266 // Add an indentation to make it look nicer.
267 fmt.push_back(" " + log.substr(0, kMaxAttachedLogMessageSize));
268 }
269 return absl::StrJoin(fmt, "\n");
270 } else {
271 return "";
272 }
273 };
274
275 std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
276
277 // If only one root status is found, do not add summary header and footer.
278 if (nonderived_statuses.size() == 1) {
279 return Status(nonderived_statuses[0].code(),
280 strings::StrCat(nonderived_statuses[0].error_message(),
281 get_recent_logs()));
282 }
283
284 if (!nonderived_statuses.empty()) {
285 std::vector<std::string> fmt;
286
287 fmt.push_back(strings::Printf("%zu root error(s) found.",
288 nonderived_statuses.size()));
289
290 int index = 0;
291 auto code = tensorflow::error::CANCELLED;
292 for (auto& s : nonderived_statuses) {
293 // NOTE: Avoid using CANCELLED as the code of summary status if the group
294 // contains other error code.
295 if (code == tensorflow::error::CANCELLED &&
296 s.code() != tensorflow::error::CANCELLED) {
297 code = s.code();
298 }
299 fmt.emplace_back(strings::StrCat(" (", index, ") ", s.ToString()));
300 ++index;
301 }
302
303 fmt.push_back(strings::Printf("%zu successful operations.", num_ok_));
304 fmt.push_back(
305 strings::Printf("%zu derived errors ignored.",
306 children_.size() - nonderived_statuses.size()));
307
308 std::string error_msg =
309 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
310
311 return Status(code, strings::StrCat(error_msg, get_recent_logs()));
312 } else {
313 // All statuses are derived. Pick the first available status to return.
314 return children_[0];
315 }
316 }
317
318 // Concatenate all the status objects in the StatusGroup. This is used when
319 // individual Status objects in the StatusGroup are already summarized Status.
as_concatenated_status() const320 Status StatusGroup::as_concatenated_status() const {
321 if (ok_) {
322 return Status::OK();
323 }
324
325 std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
326
327 // If only one root status is found, return it directly.
328 if (nonderived_statuses.size() == 1) {
329 return nonderived_statuses[0];
330 }
331
332 if (!nonderived_statuses.empty()) {
333 std::vector<string> fmt;
334 fmt.emplace_back("\n=====================");
335 for (auto& s : nonderived_statuses) {
336 fmt.emplace_back(s.ToString());
337 }
338 fmt.emplace_back("=====================\n");
339 return Status(
340 nonderived_statuses[0].code(),
341 absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize));
342 } else {
343 // All statuses are derived. Pick the first available status to return.
344 // This should not happen in normal execution.
345 return children_[0];
346 }
347 }
348
AttachLogMessages()349 void StatusGroup::AttachLogMessages() {
350 recent_logs_.clear();
351 StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
352 }
353
354 } // namespace tensorflow
355