• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_
17 #define TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_
18 
19 #include <string>
20 #include <unordered_map>
21 
22 #include "tensorflow/core/common_runtime/device_mgr.h"
23 #include "tensorflow/core/framework/rendezvous.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/lib/core/status.h"
26 #include "tensorflow/core/platform/macros.h"
27 #include "tensorflow/core/platform/mutex.h"
28 #include "tensorflow/core/platform/types.h"
29 
30 namespace tensorflow {
31 
32 // IntraProcessRendezvous is a Rendezvous which expects all producers
33 // and consumers to be devices immediately accessible within the
34 // process.  That is, it will never be necessary to perform an RPC to
35 // communicate with either.
36 //
37 // Buffering of Tensor values is delegated to a "local" Rendezvous
38 // obtained from NewLocalRendezvous().  This class just adds
39 // functionality to coordinate multiple process-local devices.
40 class IntraProcessRendezvous : public Rendezvous {
41  public:
42   explicit IntraProcessRendezvous(const DeviceMgr* device_mgr);
43 
44   // Forwards to local_, where the Tensor "val" will be buffered and
45   // any waiting callback stored.
46   Status Send(const ParsedKey& key, const Rendezvous::Args& args,
47               const Tensor& val, const bool is_dead) override;
48 
49   // This method is called only by the RecvOp.  It tests to see
50   // whether the value will be produced by a local or remote device
51   // and handles accordingly.  In the local case it forwards to
52   // local_, in the remote case it initiates an RPC request.
53   void RecvAsync(const ParsedKey& key, const Rendezvous::Args& args,
54                  DoneCallback done) override;
55 
56   void StartAbort(const Status& status) override;
57 
58  private:
59   const DeviceMgr* device_mgr_;
60   Rendezvous* local_;  // Owns a Ref on this object.
61 
62   mutable mutex mu_;
63 
64   // Status given by StartAbort() if any.
65   Status status_ GUARDED_BY(mu_);
66 
67   ~IntraProcessRendezvous() override;
68 
69   // Parses "key" into "parsed". If "is_src" is true, checks that the
70   // rendezvous key's source is in this process. If "is_src" is false,
71   // checks that the rendezvous key's destination is in this process.
72   Status ParseKey(const string& key, bool is_src,
73                   Rendezvous::ParsedKey* parsed);
74 
75   // Callback handling the case when a rendezvous has been
76   // accomplished in local_ and the consumer is local to this process.
77   // Tensor "in" will be copied into "out". The key "parsed" encodes
78   // the src and dst devices.
79   typedef std::function<void(const Status&)> StatusCallback;
80   void SameWorkerRecvDone(const Rendezvous::ParsedKey& parsed,
81                           const Rendezvous::Args& send_args,
82                           const Rendezvous::Args& recv_args, const Tensor& in,
83                           Tensor* out, StatusCallback done);
84 
85   TF_DISALLOW_COPY_AND_ASSIGN(IntraProcessRendezvous);
86 };
87 
88 }  // end namespace tensorflow
89 
90 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_
91