• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/distributed_runtime/recent_request_ids.h"
17 
18 #include <utility>
19 
20 #include "tensorflow/core/lib/core/errors.h"
21 #include "tensorflow/core/lib/strings/strcat.h"
22 #include "tensorflow/core/platform/logging.h"
23 
24 namespace tensorflow {
25 
RecentRequestIds(int num_tracked_request_ids)26 RecentRequestIds::RecentRequestIds(int num_tracked_request_ids)
27     : circular_buffer_(num_tracked_request_ids) {
28   set_.reserve(num_tracked_request_ids);
29 }
30 
Insert(int64_t request_id)31 bool RecentRequestIds::Insert(int64_t request_id) {
32   if (request_id == 0) {
33     // For backwards compatibility, allow all requests with request_id 0.
34     return true;
35   }
36 
37   mutex_lock l(mu_);
38   const bool inserted = set_.insert(request_id).second;
39   if (!inserted) {
40     // Note: RecentRequestIds is not strict LRU because we don't update
41     // request_id's age in the circular_buffer_ if it's tracked again. Strict
42     // LRU is not useful here because returning this error will close the
43     // current Session.
44     return false;
45   }
46 
47   // Remove the oldest request_id from the set_. circular_buffer_ is
48   // zero-initialized, and zero is never tracked, so it's safe to do this even
49   // when the buffer is not yet full.
50   set_.erase(circular_buffer_[next_index_]);
51   circular_buffer_[next_index_] = request_id;
52   next_index_ = (next_index_ + 1) % circular_buffer_.size();
53   return true;
54 }
55 
TrackUnique(int64_t request_id,const string & method_name,const protobuf::Message & request)56 Status RecentRequestIds::TrackUnique(int64_t request_id,
57                                      const string& method_name,
58                                      const protobuf::Message& request) {
59   if (Insert(request_id)) {
60     return Status::OK();
61   } else {
62     return errors::Aborted("The same ", method_name,
63                            " request was received twice. ",
64                            request.ShortDebugString());
65   }
66 }
67 
68 }  // namespace tensorflow
69