1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_ 17 #define TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_ 18 19 #include <string> 20 21 #include "tensorflow/core/framework/cancellation.h" 22 #include "tensorflow/core/framework/control_flow.h" 23 #include "tensorflow/core/framework/device_base.h" 24 #include "tensorflow/core/framework/tensor.h" 25 #include "tensorflow/core/lib/core/refcount.h" 26 #include "tensorflow/core/lib/core/status.h" 27 #include "tensorflow/core/util/device_name_utils.h" 28 29 namespace tensorflow { 30 31 // A Rendezvous is an abstraction for passing tensors from producers 32 // to consumers. A rendezvous is a table of channels. Each channel is 33 // keyed by a rendezvous key. The key encodes a pair of <producer, 34 // consumer>, where the producer and the consumer are tensorflow 35 // devices. 36 // 37 // The producer calls the Send() method to send one tensor over one 38 // named channel. The consumer calls the Recv() method to receive one 39 // tensor from a named channel. A sequence of tensors can be passed 40 // from the producer to the consumer. The consumer receives them in 41 // the order as the producer sends them. 42 // 43 // A consumer may safely request the tensor before or after it has 44 // been produced. A consumer has the choice of making a blocking call 45 // or providing a callback: in either case, the consumer receives the 46 // Tensor as soon as it is available. A producer never blocks. 47 class RendezvousInterface { 48 public: 49 struct Args { 50 DeviceContext* device_context = nullptr; 51 AllocatorAttributes alloc_attrs; 52 CancellationManager* cancellation_manager = nullptr; // not owned. 53 }; 54 55 // Parses the key constructed by CreateKey and parse src/dst device 56 // names into structures respectively. 57 struct ParsedKey { 58 StringPiece src_device; 59 DeviceNameUtils::ParsedName src; 60 uint64 src_incarnation = 0; 61 StringPiece dst_device; 62 DeviceNameUtils::ParsedName dst; 63 StringPiece edge_name; 64 ParsedKeyParsedKey65 ParsedKey() {} ParsedKeyParsedKey66 ParsedKey(const ParsedKey& b) { *this = b; } 67 68 ParsedKey& operator=(const ParsedKey& b); FullKeyParsedKey69 StringPiece FullKey() const { return buf_; } 70 71 private: 72 friend class Rendezvous; 73 friend class SendOp; 74 friend class RecvOp; 75 string buf_; 76 }; 77 78 // The caller is a tensor producer and it sends a message (a tensor 79 // "val" and a bool "is_dead") under the given "key". 80 // 81 // {val, is_dead} is bundled as a message sent and received. 82 // Typically, is_dead is set by some control flow nodes 83 // (e.g., a not-taken branch). args is passed by Send to the 84 // Recv function to communicate any information that the Recv 85 // function might need. This is typically only necessary for 86 // Send/Recv on the same worker. 87 // 88 // Send() never blocks. 89 virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val, 90 const bool is_dead) = 0; 91 92 // Callback provided by a tensor consumer waiting on the rendezvous. 93 // It will be invoked when the tensor is available, or when a non-OK 94 // status arises in the production of that tensor. It also gets 95 // two Rendezvous::Args, one provided by the sender, the other by the 96 // receiver, which may be needed when a non-CPU device is in use 97 // by either side. 98 typedef std::function<void(const Status&, const Args&, const Args&, 99 const Tensor&, const bool)> 100 DoneCallback; 101 102 virtual void RecvAsync(const ParsedKey& key, const Args& args, 103 DoneCallback done) = 0; 104 105 // Synchronous wrapper for RecvAsync. 106 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, 107 bool* is_dead, int64 timeout_ms); 108 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, 109 bool* is_dead); 110 111 // Aborts all pending and future Send/Recv with the given "status". 112 // 113 // StartAbort() does not wait for ongoing calls to finish. 114 // REQUIRES: !status.ok() 115 virtual void StartAbort(const Status& status) = 0; 116 117 protected: 118 virtual ~RendezvousInterface(); 119 is_cross_process()120 virtual bool is_cross_process() { return false; } 121 friend class ProcessFunctionLibraryRuntime; 122 }; 123 124 // A reference-counted implementation of RendezvousInterface. 125 // 126 // This class is used in cases where a rendezvous may be shared between multiple 127 // threads with no clear owner. 128 class Rendezvous : public RendezvousInterface, public core::RefCounted { 129 public: 130 // Constructs a rendezvous key for the tensor of "name" sent from 131 // "src_device" to "dst_device". The tensor is generated in the frame 132 // and iteration specified by "frame_iter". 133 static string CreateKey(const string& src_device, uint64 src_incarnation, 134 const string& dst_device, const string& name, 135 const FrameAndIter& frame_iter); 136 137 static Status ParseKey(StringPiece key, ParsedKey* out); 138 }; 139 140 // Returns a Rendezvous instance that is limited to use only by 141 // producers and consumers in the local process. The caller assumes 142 // ownership of one Ref() on the returned object. 143 Rendezvous* NewLocalRendezvous(); 144 145 } // end namespace tensorflow 146 147 #endif // TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_ 148