• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/framework/rendezvous.h"
17 
18 #include <deque>
19 #include <functional>
20 #include <utility>
21 #include <vector>
22 
23 #include "tensorflow/core/framework/local_rendezvous.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/lib/gtl/flatmap.h"
27 #include "tensorflow/core/lib/gtl/manual_constructor.h"
28 #include "tensorflow/core/lib/hash/hash.h"
29 #include "tensorflow/core/lib/strings/str_util.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow/core/platform/macros.h"
32 #include "tensorflow/core/platform/mutex.h"
33 #include "tensorflow/core/platform/thread_annotations.h"
34 #include "tensorflow/core/platform/types.h"
35 
36 namespace tensorflow {
37 
operator =(const ParsedKey & b)38 Rendezvous::ParsedKey& Rendezvous::ParsedKey::operator=(const ParsedKey& b) {
39   const char* b_base = b.buf_.data();
40   buf_ = b.buf_;
41   src_device = StringPiece(buf_.data() + (b.src_device.data() - b_base),
42                            b.src_device.size());
43   src = b.src;
44   src_incarnation = b.src_incarnation;
45   dst_device = StringPiece(buf_.data() + (b.dst_device.data() - b_base),
46                            b.dst_device.size());
47   dst = b.dst;
48   edge_name = StringPiece(buf_.data() + (b.edge_name.data() - b_base),
49                           b.edge_name.size());
50   return *this;
51 }
52 
53 /*  static */
CreateKey(const string & src_device,uint64 src_incarnation,const string & dst_device,const string & name,const FrameAndIter & frame_iter)54 string Rendezvous::CreateKey(const string& src_device, uint64 src_incarnation,
55                              const string& dst_device, const string& name,
56                              const FrameAndIter& frame_iter) {
57   // NOTE: ';' is not used in the device name's job name.
58   //
59   // We include both sender and receiver in the key to facilitate
60   // debugging. For correctness, we only need to encode the receiver.
61   //
62   // "src_incarnation" is used to distinguish a worker when it
63   // restarts.
64   char buf[strings::kFastToBufferSize];
65   return strings::StrCat(
66       src_device, ";", strings::Uint64ToHexString(src_incarnation, buf), ";",
67       dst_device, ";", name, ";", frame_iter.frame_id, ":", frame_iter.iter_id);
68 }
69 
70 // Return the prefix of "*s" up to the next occurrence of "delim", or
71 // the whole remaining string if "delim" is not found.  "*s" is advanced
72 // past the string returned plus the delimiter (if found).
ConsumeNextPart(StringPiece * s,char delim)73 static StringPiece ConsumeNextPart(StringPiece* s, char delim) {
74   for (size_t offset = 0; offset < s->size(); offset++) {
75     if ((*s)[offset] == delim) {
76       StringPiece result(s->data(), offset);
77       s->remove_prefix(offset + 1);  // +1: remove delim, as well
78       return result;
79     }
80   }
81   // No delimiter found: return rest of string
82   StringPiece result(s->data(), s->size());
83   s->remove_prefix(s->size());
84   return result;
85 }
86 
87 /* static */
ParseKey(StringPiece key,ParsedKey * out)88 Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
89   if (key.data() == out->buf_.data()) {
90     // Caller used our buf_ string directly, so we don't need to copy.  (The
91     // SendOp and RecvOp implementations do this, for example).
92     DCHECK_EQ(key.size(), out->buf_.size());
93   } else {
94     // Make a copy that our StringPieces can point at a copy that will persist
95     // for the lifetime of the ParsedKey object.
96     out->buf_.assign(key.data(), key.size());
97   }
98   StringPiece s(out->buf_);
99   StringPiece parts[5];
100   for (int i = 0; i < 5; i++) {
101     parts[i] = ConsumeNextPart(&s, ';');
102   }
103   if (s.empty() &&          // Consumed the whole string
104       !parts[4].empty() &&  // Exactly five parts
105       DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
106       strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
107       DeviceNameUtils::ParseFullName(parts[2], &out->dst) &&
108       !parts[3].empty()) {
109     out->src_device = StringPiece(parts[0].data(), parts[0].size());
110     out->dst_device = StringPiece(parts[2].data(), parts[2].size());
111     out->edge_name = StringPiece(parts[3].data(), parts[3].size());
112     return OkStatus();
113   }
114   return errors::InvalidArgument("Invalid  rendezvous key: ", key);
115 }
116 
~RendezvousInterface()117 RendezvousInterface::~RendezvousInterface() {}
118 
Recv(const ParsedKey & key,const Args & recv_args,Tensor * val,bool * is_dead,int64_t timeout_ms)119 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& recv_args,
120                                  Tensor* val, bool* is_dead,
121                                  int64_t timeout_ms) {
122   Status ret;
123   Notification n;
124   RecvAsync(key, recv_args,
125             [&ret, &n, val, is_dead](const Status& s, const Args& send_args,
126                                      const Args& recv_args, const Tensor& v,
127                                      const bool dead) {
128               ret = s;
129               *val = v;
130               *is_dead = dead;
131               n.Notify();
132             });
133   if (timeout_ms > 0) {
134     int64_t timeout_us = timeout_ms * 1000;
135     bool notified = WaitForNotificationWithTimeout(&n, timeout_us);
136     if (!notified) {
137       return Status(error::DEADLINE_EXCEEDED,
138                     "Timed out waiting for notification");
139     }
140   } else {
141     n.WaitForNotification();
142   }
143   return ret;
144 }
145 
Recv(const ParsedKey & key,const Args & args,Tensor * val,bool * is_dead)146 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& args,
147                                  Tensor* val, bool* is_dead) {
148   const int64_t no_timeout = 0;
149   return Recv(key, args, val, is_dead, no_timeout);
150 }
151 
152 namespace {
153 class LocalRendezvousWrapper : public Rendezvous {
154  public:
LocalRendezvousWrapper()155   LocalRendezvousWrapper() : impl_(this) {}
156 
Send(const ParsedKey & key,const Args & send_args,const Tensor & val,const bool is_dead)157   Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
158               const bool is_dead) override {
159     return impl_.Send(key, send_args, val, is_dead);
160   }
161 
RecvAsync(const ParsedKey & key,const Args & recv_args,DoneCallback done)162   void RecvAsync(const ParsedKey& key, const Args& recv_args,
163                  DoneCallback done) override {
164     impl_.RecvAsync(key, recv_args, std::move(done));
165   }
166 
StartAbort(const Status & status)167   void StartAbort(const Status& status) override { impl_.StartAbort(status); }
168 
169  private:
170   LocalRendezvous impl_;
171 
172   TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvousWrapper);
173 };
174 }  // namespace
175 
NewLocalRendezvous()176 Rendezvous* NewLocalRendezvous() { return new LocalRendezvousWrapper; }
177 
178 }  // end namespace tensorflow
179