1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_ 17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_ 18 19 #include <functional> 20 21 #include "tensorflow/core/distributed_runtime/worker_session.h" 22 #include "tensorflow/core/lib/core/status.h" 23 #include "tensorflow/core/platform/mutex.h" 24 #include "tensorflow/core/protobuf/tensorflow_server.pb.h" 25 #include "tensorflow/core/protobuf/worker.pb.h" 26 27 namespace tensorflow { 28 29 class WorkerCacheInterface; 30 struct WorkerEnv; 31 32 // SessionMgr keeps track of information related to a given session. 33 // 34 // SessionMgr runs on the workers. 35 // 36 // SessionMgr is threadsafe. 37 class SessionMgr { 38 public: 39 typedef std::function<Status(const ServerDef&, WorkerCacheInterface**)> 40 WorkerCacheFactory; 41 42 explicit SessionMgr( 43 WorkerEnv* worker_env, const string& default_worker_name, 44 std::unique_ptr<WorkerCacheInterface> default_worker_cache, 45 WorkerCacheFactory worker_cache_factory); ~SessionMgr()46 ~SessionMgr() {} 47 48 // Allocates state for a new session. 49 Status CreateSession(const string& session, const ServerDef& server_def, 50 bool isolate_session_state); 51 Status CreateSession( 52 const string& session, const ServerDef& server_def, 53 const protobuf::RepeatedPtrField<DeviceAttributes>& device_attributes, 54 bool isolate_session_state); 55 56 // Updates state (worker cache, devices) of worker session identified by 57 // session name (`session`) based on a new server_def and set of devices. 58 Status UpdateSession(const string& session, const ServerDef& server_def, 59 const protobuf::RepeatedPtrField<DeviceAttributes>& 60 cluster_device_attributes, 61 bool isolate_session_state); 62 63 // Locates the worker session for a given session handle 64 Status WorkerSessionForSession(const string& session_handle, 65 std::shared_ptr<WorkerSession>* out_session); 66 std::shared_ptr<WorkerSession> LegacySession(); 67 68 Status DeleteSession(const string& session); 69 70 static string WorkerNameFromServerDef(const ServerDef& server_def); 71 72 void SetLogging(bool active); 73 74 void RetrieveLogs(tensorflow::int64 step_id, LoggingResponse* response); 75 76 void ClearLogs(); 77 78 private: 79 WorkerEnv* const worker_env_; // Not owned. 80 81 // A note about destruction: 82 // We must delete graph_mgr before device_mgr, due to shared 83 // ownership of OpKernels in the executors. (The graph_mgr will 84 // free all stateless OpKernels, and pass over borrowed stateful 85 // OpKernels, which are also held in their respective devices' 86 // OpSegments.) 87 // 88 // legacy_session_ owns the worker_env_.device_mgr, and so we must ensure 89 // that sessions_'s WorkerSessions are deleted (which do not own the 90 // underlying devices, but instead own RenamedDevices) before 91 // legacy_session_ is deleted. Further, we must ensure that WorkerSession's 92 // device_mgr is deleted after WorkerSession's graph_mgr. 93 94 std::unique_ptr<WorkerCacheInterface> default_worker_cache_; 95 std::shared_ptr<WorkerSession> legacy_session_; 96 97 bool is_logging_active_ = false; 98 99 const WorkerCacheFactory worker_cache_factory_; 100 101 Status WorkerSessionForSessionLocked( 102 const string& session_handle, std::shared_ptr<WorkerSession>* out_session) 103 EXCLUSIVE_LOCKS_REQUIRED(mu_); 104 105 mutex mu_; 106 // A map from session identifier to internal session structure. 107 std::map<string, std::shared_ptr<WorkerSession>> sessions_ GUARDED_BY(mu_); 108 }; 109 110 } // namespace tensorflow 111 112 #endif // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_ 113