• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_
17 #define TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_
18 
19 #include <string>
20 
21 #include "tensorflow/core/framework/cancellation.h"
22 #include "tensorflow/core/framework/control_flow.h"
23 #include "tensorflow/core/framework/device_base.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/lib/core/refcount.h"
26 #include "tensorflow/core/lib/core/status.h"
27 #include "tensorflow/core/util/device_name_utils.h"
28 
29 namespace tensorflow {
30 
31 // A Rendezvous is an abstraction for passing tensors from producers
32 // to consumers. A rendezvous is a table of channels. Each channel is
33 // keyed by a rendezvous key. The key encodes a pair of <producer,
34 // consumer>, where the producer and the consumer are tensorflow
35 // devices.
36 //
37 // The producer calls the Send() method to send one tensor over one
38 // named channel. The consumer calls the Recv() method to receive one
39 // tensor from a named channel. A sequence of tensors can be passed
40 // from the producer to the consumer.  The consumer receives them in
41 // the order as the producer sends them.
42 //
43 // A consumer may safely request the tensor before or after it has
44 // been produced.  A consumer has the choice of making a blocking call
45 // or providing a callback: in either case, the consumer receives the
46 // Tensor as soon as it is available.  A producer never blocks.
47 class RendezvousInterface {
48  public:
49   struct Args {
50     DeviceContext* device_context = nullptr;
51     AllocatorAttributes alloc_attrs;
52     CancellationManager* cancellation_manager = nullptr;  // not owned.
53   };
54 
55   // Parses the key constructed by CreateKey and parse src/dst device
56   // names into structures respectively.
57   struct ParsedKey {
58     StringPiece src_device;
59     DeviceNameUtils::ParsedName src;
60     uint64 src_incarnation = 0;
61     StringPiece dst_device;
62     DeviceNameUtils::ParsedName dst;
63     StringPiece edge_name;
64 
ParsedKeyParsedKey65     ParsedKey() {}
ParsedKeyParsedKey66     ParsedKey(const ParsedKey& b) { *this = b; }
67 
68     ParsedKey& operator=(const ParsedKey& b);
FullKeyParsedKey69     StringPiece FullKey() const { return buf_; }
70 
71    private:
72     friend class Rendezvous;
73     friend class SendOp;
74     friend class RecvOp;
75     string buf_;
76   };
77 
78   // The caller is a tensor producer and it sends a message (a tensor
79   // "val" and a bool "is_dead") under the given "key".
80   //
81   // {val, is_dead} is bundled as a message sent and received.
82   // Typically, is_dead is set by some control flow nodes
83   // (e.g., a not-taken branch).  args is passed by Send to the
84   // Recv function to communicate any information that the Recv
85   // function might need.  This is typically only necessary for
86   // Send/Recv on the same worker.
87   //
88   // Send() never blocks.
89   virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val,
90                       const bool is_dead) = 0;
91 
92   // Callback provided by a tensor consumer waiting on the rendezvous.
93   // It will be invoked when the tensor is available, or when a non-OK
94   // status arises in the production of that tensor.  It also gets
95   // two Rendezvous::Args, one provided by the sender, the other by the
96   // receiver, which may be needed when a non-CPU device is in use
97   // by either side.
98   typedef std::function<void(const Status&, const Args&, const Args&,
99                              const Tensor&, const bool)>
100       DoneCallback;
101 
102   virtual void RecvAsync(const ParsedKey& key, const Args& args,
103                          DoneCallback done) = 0;
104 
105   // Synchronous wrapper for RecvAsync.
106   Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
107               bool* is_dead, int64 timeout_ms);
108   Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
109               bool* is_dead);
110 
111   // Aborts all pending and future Send/Recv with the given "status".
112   //
113   // StartAbort() does not wait for ongoing calls to finish.
114   // REQUIRES: !status.ok()
115   virtual void StartAbort(const Status& status) = 0;
116 
117  protected:
118   virtual ~RendezvousInterface();
119 
is_cross_process()120   virtual bool is_cross_process() { return false; }
121   friend class ProcessFunctionLibraryRuntime;
122 };
123 
124 // A reference-counted implementation of RendezvousInterface.
125 //
126 // This class is used in cases where a rendezvous may be shared between multiple
127 // threads with no clear owner.
128 class Rendezvous : public RendezvousInterface, public core::RefCounted {
129  public:
130   // Constructs a rendezvous key for the tensor of "name" sent from
131   // "src_device" to "dst_device". The tensor is generated in the frame
132   // and iteration specified by "frame_iter".
133   static string CreateKey(const string& src_device, uint64 src_incarnation,
134                           const string& dst_device, const string& name,
135                           const FrameAndIter& frame_iter);
136 
137   static Status ParseKey(StringPiece key, ParsedKey* out);
138 };
139 
140 // Returns a Rendezvous instance that is limited to use only by
141 // producers and consumers in the local process.  The caller assumes
142 // ownership of one Ref() on the returned object.
143 Rendezvous* NewLocalRendezvous();
144 
145 }  // end namespace tensorflow
146 
147 #endif  // TENSORFLOW_CORE_FRAMEWORK_RENDEZVOUS_H_
148