1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_ 17 #define TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_ 18 19 #include <string> 20 21 #include "tensorflow/core/framework/control_flow.h" 22 #include "tensorflow/core/framework/device_base.h" 23 #include "tensorflow/core/framework/tensor.h" 24 #include "tensorflow/core/lib/core/refcount.h" 25 #include "tensorflow/core/lib/core/status.h" 26 #include "tensorflow/core/util/device_name_utils.h" 27 28 namespace tensorflow { 29 30 // A Rendezvous is an abstraction for passing tensors from producers 31 // to consumers. A rendezvous is a table of channels. Each channel is 32 // keyed by a rendezvous key. The key encodes a pair of <producer, 33 // consumer>, where the producer and the consumer are tensorflow 34 // devices. 35 // 36 // The producer calls the Send() method to send one tensor over one 37 // named channel. The consumer calls the Recv() method to receive one 38 // tensor from a named channel. A sequence of tensors can be passed 39 // from the producer to the consumer. The consumer receives them in 40 // the order as the producer sends them. 41 // 42 // A consumer may safely request the tensor before or after it has 43 // been produced. A consumer has the choice of making a blocking call 44 // or providing a callback: in either case, the consumer receives the 45 // Tensor as soon as it is available. A producer never blocks. 46 class Rendezvous : public core::RefCounted { 47 public: 48 struct Args { 49 DeviceContext* device_context = nullptr; 50 AllocatorAttributes alloc_attrs; 51 }; 52 53 // Constructs a rendezvous key for the tensor of "name" sent from 54 // "src_device" to "dst_device". The tensor is generated in the frame 55 // and iteration specified by "frame_iter". 56 static string CreateKey(const string& src_device, uint64 src_incarnation, 57 const string& dst_device, const string& name, 58 const FrameAndIter& frame_iter); 59 60 // Parses the key constructed by CreateKey and parse src/dst device 61 // names into structures respectively. 62 struct ParsedKey { 63 StringPiece src_device; 64 DeviceNameUtils::ParsedName src; 65 uint64 src_incarnation = 0; 66 StringPiece dst_device; 67 DeviceNameUtils::ParsedName dst; 68 StringPiece edge_name; 69 ParsedKeyParsedKey70 ParsedKey() {} ParsedKeyParsedKey71 ParsedKey(const ParsedKey& b) { *this = b; } 72 73 ParsedKey& operator=(const ParsedKey& b); FullKeyParsedKey74 StringPiece FullKey() const { return buf_; } 75 76 private: 77 friend class Rendezvous; 78 friend class SendOp; 79 friend class RecvOp; 80 string buf_; 81 }; 82 static Status ParseKey(StringPiece key, ParsedKey* out); 83 84 // The caller is a tensor producer and it sends a message (a tensor 85 // "val" and a bool "is_dead") under the given "key". 86 // 87 // {val, is_dead} is bundled as a message sent and received. 88 // Typically, is_dead is set by some control flow nodes 89 // (e.g., a not-taken branch). args is passed by Send to the 90 // Recv function to communicate any information that the Recv 91 // function might need. This is typically only necessary for 92 // Send/Recv on the same worker. 93 // 94 // Send() never blocks. 95 virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val, 96 const bool is_dead) = 0; 97 98 // Callback provided by a tensor consumer waiting on the rendezvous. 99 // It will be invoked when the tensor is available, or when a non-OK 100 // status arises in the production of that tensor. It also gets 101 // two Rendezvous::Args, one provided by the sender, the other by the 102 // receiver, which may be needed when a non-CPU device is in use 103 // by either side. 104 typedef std::function<void(const Status&, const Args&, const Args&, 105 const Tensor&, const bool)> 106 DoneCallback; 107 108 virtual void RecvAsync(const ParsedKey& key, const Args& args, 109 DoneCallback done) = 0; 110 111 // Synchronous wrapper for RecvAsync. 112 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, 113 bool* is_dead, int64 timeout_ms); 114 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, 115 bool* is_dead); 116 117 // Aborts all pending and future Send/Recv with the given "status". 118 // 119 // StartAbort() does not wait for ongoing calls to finish. 120 // REQUIRES: !status.ok() 121 virtual void StartAbort(const Status& status) = 0; 122 123 protected: 124 ~Rendezvous() override; 125 }; 126 127 // Returns a Rendezvous instance that is limited to use only by 128 // producers and consumers in the local process. The caller assumes 129 // ownership of one Ref() on the returned object. 130 Rendezvous* NewLocalRendezvous(); 131 132 } // end namespace tensorflow 133 134 #endif // TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_ 135