1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/framework/rendezvous.h"
17
18 #include <deque>
19 #include <functional>
20 #include <utility>
21 #include <vector>
22
23 #include "tensorflow/core/framework/local_rendezvous.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/lib/gtl/flatmap.h"
27 #include "tensorflow/core/lib/gtl/manual_constructor.h"
28 #include "tensorflow/core/lib/hash/hash.h"
29 #include "tensorflow/core/lib/strings/str_util.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow/core/platform/macros.h"
32 #include "tensorflow/core/platform/mutex.h"
33 #include "tensorflow/core/platform/thread_annotations.h"
34 #include "tensorflow/core/platform/types.h"
35
36 namespace tensorflow {
37
operator =(const ParsedKey & b)38 Rendezvous::ParsedKey& Rendezvous::ParsedKey::operator=(const ParsedKey& b) {
39 const char* b_base = b.buf_.data();
40 buf_ = b.buf_;
41 src_device = StringPiece(buf_.data() + (b.src_device.data() - b_base),
42 b.src_device.size());
43 src = b.src;
44 src_incarnation = b.src_incarnation;
45 dst_device = StringPiece(buf_.data() + (b.dst_device.data() - b_base),
46 b.dst_device.size());
47 dst = b.dst;
48 edge_name = StringPiece(buf_.data() + (b.edge_name.data() - b_base),
49 b.edge_name.size());
50 return *this;
51 }
52
53 /* static */
CreateKey(const string & src_device,uint64 src_incarnation,const string & dst_device,const string & name,const FrameAndIter & frame_iter)54 string Rendezvous::CreateKey(const string& src_device, uint64 src_incarnation,
55 const string& dst_device, const string& name,
56 const FrameAndIter& frame_iter) {
57 // NOTE: ';' is not used in the device name's job name.
58 //
59 // We include both sender and receiver in the key to facilitate
60 // debugging. For correctness, we only need to encode the receiver.
61 //
62 // "src_incarnation" is used to distinguish a worker when it
63 // restarts.
64 char buf[strings::kFastToBufferSize];
65 return strings::StrCat(
66 src_device, ";", strings::Uint64ToHexString(src_incarnation, buf), ";",
67 dst_device, ";", name, ";", frame_iter.frame_id, ":", frame_iter.iter_id);
68 }
69
70 // Return the prefix of "*s" up to the next occurrence of "delim", or
71 // the whole remaining string if "delim" is not found. "*s" is advanced
72 // past the string returned plus the delimiter (if found).
ConsumeNextPart(StringPiece * s,char delim)73 static StringPiece ConsumeNextPart(StringPiece* s, char delim) {
74 for (size_t offset = 0; offset < s->size(); offset++) {
75 if ((*s)[offset] == delim) {
76 StringPiece result(s->data(), offset);
77 s->remove_prefix(offset + 1); // +1: remove delim, as well
78 return result;
79 }
80 }
81 // No delimiter found: return rest of string
82 StringPiece result(s->data(), s->size());
83 s->remove_prefix(s->size());
84 return result;
85 }
86
87 /* static */
ParseKey(StringPiece key,ParsedKey * out)88 Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
89 if (key.data() == out->buf_.data()) {
90 // Caller used our buf_ string directly, so we don't need to copy. (The
91 // SendOp and RecvOp implementations do this, for example).
92 DCHECK_EQ(key.size(), out->buf_.size());
93 } else {
94 // Make a copy that our StringPieces can point at a copy that will persist
95 // for the lifetime of the ParsedKey object.
96 out->buf_.assign(key.data(), key.size());
97 }
98 StringPiece s(out->buf_);
99 StringPiece parts[5];
100 for (int i = 0; i < 5; i++) {
101 parts[i] = ConsumeNextPart(&s, ';');
102 }
103 if (s.empty() && // Consumed the whole string
104 !parts[4].empty() && // Exactly five parts
105 DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
106 strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
107 DeviceNameUtils::ParseFullName(parts[2], &out->dst) &&
108 !parts[3].empty()) {
109 out->src_device = StringPiece(parts[0].data(), parts[0].size());
110 out->dst_device = StringPiece(parts[2].data(), parts[2].size());
111 out->edge_name = StringPiece(parts[3].data(), parts[3].size());
112 return Status::OK();
113 }
114 return errors::InvalidArgument("Invalid rendezvous key: ", key);
115 }
116
~RendezvousInterface()117 RendezvousInterface::~RendezvousInterface() {}
118
Recv(const ParsedKey & key,const Args & recv_args,Tensor * val,bool * is_dead,int64 timeout_ms)119 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& recv_args,
120 Tensor* val, bool* is_dead, int64 timeout_ms) {
121 Status ret;
122 Notification n;
123 RecvAsync(key, recv_args,
124 [&ret, &n, val, is_dead](const Status& s, const Args& send_args,
125 const Args& recv_args, const Tensor& v,
126 const bool dead) {
127 ret = s;
128 *val = v;
129 *is_dead = dead;
130 n.Notify();
131 });
132 if (timeout_ms > 0) {
133 int64 timeout_us = timeout_ms * 1000;
134 bool notified = WaitForNotificationWithTimeout(&n, timeout_us);
135 if (!notified) {
136 return Status(error::DEADLINE_EXCEEDED,
137 "Timed out waiting for notification");
138 }
139 } else {
140 n.WaitForNotification();
141 }
142 return ret;
143 }
144
Recv(const ParsedKey & key,const Args & args,Tensor * val,bool * is_dead)145 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& args,
146 Tensor* val, bool* is_dead) {
147 const int64 no_timeout = 0;
148 return Recv(key, args, val, is_dead, no_timeout);
149 }
150
151 namespace {
152 class LocalRendezvousWrapper : public Rendezvous {
153 public:
154 LocalRendezvousWrapper() = default;
155
Send(const ParsedKey & key,const Args & send_args,const Tensor & val,const bool is_dead)156 Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
157 const bool is_dead) override {
158 return impl_.Send(key, send_args, val, is_dead);
159 }
160
RecvAsync(const ParsedKey & key,const Args & recv_args,DoneCallback done)161 void RecvAsync(const ParsedKey& key, const Args& recv_args,
162 DoneCallback done) override {
163 impl_.RecvAsync(key, recv_args, std::move(done));
164 }
165
StartAbort(const Status & status)166 void StartAbort(const Status& status) override { impl_.StartAbort(status); }
167
168 private:
169 LocalRendezvous impl_;
170
171 TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvousWrapper);
172 };
173 } // namespace
174
NewLocalRendezvous()175 Rendezvous* NewLocalRendezvous() { return new LocalRendezvousWrapper; }
176
177 } // end namespace tensorflow
178