1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_SESSION_H_ 17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_SESSION_H_ 18 19 #include <string> 20 21 #include "tensorflow/core/common_runtime/device_mgr.h" 22 #include "tensorflow/core/distributed_runtime/cluster_function_library_runtime.h" 23 #include "tensorflow/core/distributed_runtime/graph_mgr.h" 24 #include "tensorflow/core/distributed_runtime/worker_cache.h" 25 26 namespace tensorflow { 27 28 class ClusterFunctionLibraryRuntime; 29 class GraphMgr; 30 class WorkerCacheInterface; 31 32 // WorkerSession encapsulates all of the state relating to a given session. 33 class WorkerSession { 34 public: 35 // Collection of local devices. These devices are typically 36 // RenamedDevices in all except the SessionMgr.legacy_session_ and 37 // sessions created with `isolate_session_state == false`. In the 38 // those cases, this method returns a pointer to a borrowed 39 // DeviceMgr (typically the `worker_env.device_mgr`). device_mgr()40 const DeviceMgr* device_mgr() { 41 return device_mgr_ ? device_mgr_.get() : borrowed_device_mgr_; 42 } 43 remote_device_mgr()44 DynamicDeviceMgr* remote_device_mgr() { return remote_device_mgr_.get(); } 45 session_name()46 const string& session_name() const { return session_name_; } worker_name()47 const string& worker_name() const { return worker_name_; } 48 worker_cache()49 WorkerCacheInterface* worker_cache() const { 50 tf_shared_lock l(worker_session_state_mu_); 51 return worker_cache_.get(); 52 } graph_mgr()53 GraphMgr* graph_mgr() const { return graph_mgr_.get(); } 54 cluster_flr()55 ClusterFunctionLibraryRuntime* cluster_flr() const { 56 return cluster_flr_.get(); 57 } 58 59 WorkerSession(const string& session_name, const string& worker_name, 60 std::unique_ptr<WorkerCacheInterface> worker_cache, 61 std::unique_ptr<DeviceMgr> device_mgr, 62 std::unique_ptr<GraphMgr> graph_mgr, 63 std::unique_ptr<DynamicDeviceMgr> remote_device_mgr); 64 65 static std::shared_ptr<WorkerSession> CreateWithBorrowedDeviceMgr( 66 const string& session_name, const string& worker_name, 67 std::unique_ptr<WorkerCacheInterface> worker_cache, 68 const DeviceMgr* borrowed_device_mgr, std::unique_ptr<GraphMgr> graph_mgr, 69 std::unique_ptr<DynamicDeviceMgr> remote_device_mgr); 70 71 // In the eager runtime we allow WorkerSession to be updated, where the 72 // worker cache will be recreated. If WorkerSession upate is expected and a 73 // worker in the cache is used in RPCs, the caller should hold a shared 74 // pointer to avoid the workers getting deleted. GetSharedWorkerCache()75 std::shared_ptr<WorkerCacheInterface> GetSharedWorkerCache() { 76 tf_shared_lock l(worker_session_state_mu_); 77 return worker_cache_; 78 } 79 80 // Update an existing worker session with new set of remote workers and 81 // devices. Added devices will be owned by the worker session, and removed 82 // devices will be freed by their names. 83 Status UpdateWorkerCacheAndDevices( 84 std::unique_ptr<WorkerCacheInterface> new_worker_cache, 85 std::vector<std::unique_ptr<Device>> added_remote_devices, 86 const std::vector<Device*>& removed_remote_devices); 87 88 ~WorkerSession(); 89 90 private: 91 WorkerSession(const string& session_name, const string& worker_name, 92 std::unique_ptr<WorkerCacheInterface> worker_cache, 93 const DeviceMgr* borrowed_device_mgr, 94 std::unique_ptr<GraphMgr> graph_mgr, 95 std::unique_ptr<DynamicDeviceMgr> remote_device_mgr); 96 97 // The name of the session. 98 const string session_name_; 99 100 // The name of the worker. E.g., /job:mnist/replica:0/task:1. 101 const string worker_name_; 102 103 mutable mutex worker_session_state_mu_; 104 // Object from which WorkerInterface instances can be obtained. 105 std::shared_ptr<WorkerCacheInterface> worker_cache_ 106 TF_GUARDED_BY(worker_session_state_mu_); 107 108 // graph_mgr keeps track of the registered graphs of this session. 109 // 110 // Note: graph_mgr must be deleted before rendezvous_mgr! 111 // Note: graph_mgr must be deleted before device_mgr! 112 const std::unique_ptr<GraphMgr> graph_mgr_; 113 114 std::unique_ptr<ClusterFunctionLibraryRuntime> cluster_flr_; 115 116 const std::unique_ptr<const DeviceMgr> device_mgr_; 117 const DeviceMgr* const borrowed_device_mgr_; // Not owned. 118 std::unique_ptr<DynamicDeviceMgr> remote_device_mgr_; 119 }; 120 121 } // namespace tensorflow 122 123 #endif // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_WORKER_SESSION_H_ 124