• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_REMOTE_MGR_H_
17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_REMOTE_MGR_H_
18 
19 #include <unordered_map>
20 
21 #include "tensorflow/core/common_runtime/eager/eager_executor.h"
22 #include "tensorflow/core/common_runtime/eager/tensor_handle.h"
23 #include "tensorflow/core/distributed_runtime/eager/remote_tensor_handle.h"
24 #include "tensorflow/core/platform/mutex.h"
25 
26 namespace tensorflow {
27 namespace eager {
28 
29 const int64 kInvalidRemoteOpId = -1;
30 
31 // This class manages the states required to setup an eager cluster.
32 // TODO(fishx): Move remote state from context to this class.
33 class RemoteMgr {
34  public:
RemoteMgr(bool is_master,EagerContext * ctx)35   RemoteMgr(bool is_master, EagerContext* ctx)
36       : is_master_(is_master), parent_(ctx) {}
37 
~RemoteMgr()38   ~RemoteMgr() {
39     for (const auto& entry : remote_tensor_handle_map_) {
40       entry.second->Unref();
41     }
42   }
43 
IsMaster()44   bool IsMaster() { return is_master_; }
45 
46   void AddOperationOutputs(
47       const gtl::ArraySlice<tensorflow::TensorHandle*> handles,
48       int64 operation_id);
49 
50   void AddOperationOutput(tensorflow::TensorHandle* handles, int64 operation_id,
51                           int32 output_num);
52 
53   Status GetTensorHandle(const RemoteTensorHandleInternal& remote_handle,
54                          tensorflow::TensorHandle** handle);
55 
56   Status DeleteTensorHandle(const RemoteTensorHandleInternal& remote_handle);
57 
58   // Helper function to create monotonically increasing ids unique to this
59   // context.
NextOpId()60   uint64 NextOpId() {
61     DCHECK(is_master_);
62     mutex_lock l(next_id_mutex_);
63     return next_op_id_++;
64   }
65 
66   // Serialize a remote TensorHandle to a RemoteTensorHandle.
67   // If wait_until_ready is true, block until the remote handle is ready on a
68   // remote worker.
69   Status SerializeRemoteTensorHandle(
70       TensorHandle* in, const bool wait_until_ready, RemoteTensorHandle* out,
71       Device* device, const string& device_name,
72       const bool serialize_resource_dtype_and_shape = false);
73 
74   // Deserialize a RemoteTensorHandle to a TensorHandle(local/remote).
75   // The output holds a reference to the TensorHandle.
76   Status DeserializeRemoteTensorHandle(const RemoteTensorHandle& in,
77                                        TensorHandle** out);
78 
79   EagerExecutor& GetOrCreateExecutorForStream(uint64 stream_id);
80 
81   void DeleteExecutorForStream(uint64 stream_id);
82 
83  protected:
84   mutex next_id_mutex_;
85   uint64 next_op_id_ TF_GUARDED_BY(next_id_mutex_) = 1;
86 
87  private:
88   // Returns the op_id and output_num if the given local TensorHandle exists in
89   // remote_tensor_handle_map_.
90   Status GetRemoteTensorHandle(const tensorflow::TensorHandle* handle,
91                                const bool wait_until_ready, int64* op_id,
92                                int32* output_num)
93       TF_SHARED_LOCKS_REQUIRED(remote_tensor_handle_mu_);
94 
95   Status GetTensorHandleImpl(const RemoteTensorHandleInternal& remote_handle,
96                              tensorflow::TensorHandle** handle)
97       TF_SHARED_LOCKS_REQUIRED(remote_tensor_handle_mu_);
98 
99   Status GetMirroredResourceShape(
100       const RemoteTensorHandleInternal& remote_handle,
101       std::vector<DtypeAndPartialTensorShape>* handle);
102 
103   bool is_master_;
104 
105   using RemoteTensorHandleMap =
106       gtl::FlatMap<RemoteTensorHandleInternal, tensorflow::TensorHandle*,
107                    RemoteTensorHandleInternalHash,
108                    RemoteTensorHandleInternalEquals>;
109   using MirroredResourceShapeMap = gtl::FlatMap<
110       RemoteTensorHandleInternal, std::vector<DtypeAndPartialTensorShape>,
111       RemoteTensorHandleInternalHash, RemoteTensorHandleInternalEquals>;
112 
113   mutex remote_tensor_handle_mu_;
114   // This map maintains the TensorHandles that are required by remote workers
115   // in the cluster. Each map key is generated by the master, so it should be
116   // globally unique. This map owns references on the handles it contains.
117   RemoteTensorHandleMap remote_tensor_handle_map_
118       TF_GUARDED_BY(remote_tensor_handle_mu_);
119 
120   mutex mirrored_resource_shape_mu_;
121   // This map maintains the data types and shapes of resource variables required
122   // by remote workers in the cluster. Each map key is generated by the master,
123   // so it should be globally unique.
124   MirroredResourceShapeMap mirrored_resource_shape_map_
125       TF_GUARDED_BY(mirrored_resource_shape_mu_);
126 
127   EagerContext* parent_;  // not owned.
128 
129   mutex executor_map_mu_;
130   std::unordered_map<uint64, EagerExecutor> executor_map_
131       TF_GUARDED_BY(executor_map_mu_);
132 };
133 
134 }  // namespace eager
135 }  // namespace tensorflow
136 
137 #endif  // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_EAGER_REMOTE_MGR_H_
138