• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_
17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_
18 
19 #include <functional>
20 
21 #include "tensorflow/core/distributed_runtime/worker_session.h"
22 #include "tensorflow/core/lib/core/status.h"
23 #include "tensorflow/core/platform/mutex.h"
24 #include "tensorflow/core/platform/thread_annotations.h"
25 #include "tensorflow/core/protobuf/tensorflow_server.pb.h"
26 #include "tensorflow/core/protobuf/worker.pb.h"
27 
28 namespace tensorflow {
29 
30 class WorkerCacheInterface;
31 struct WorkerEnv;
32 
33 // SessionMgr keeps track of information related to a given session.
34 //
35 // SessionMgr runs on the workers.
36 //
37 // SessionMgr is threadsafe.
38 class SessionMgr {
39  public:
40   typedef std::function<Status(const ServerDef&, WorkerCacheInterface**)>
41       WorkerCacheFactory;
42 
43   explicit SessionMgr(
44       WorkerEnv* worker_env, const string& default_worker_name,
45       std::unique_ptr<WorkerCacheInterface> default_worker_cache,
46       WorkerCacheFactory worker_cache_factory);
~SessionMgr()47   ~SessionMgr() {}
48 
49   // Allocates state for a new session.
50   Status CreateSession(const string& session, const ServerDef& server_def,
51                        bool isolate_session_state);
52   Status CreateSession(
53       const string& session, const ServerDef& server_def,
54       const protobuf::RepeatedPtrField<DeviceAttributes>& device_attributes,
55       bool isolate_session_state);
56 
57   // Create WorkerSession from the master with the given `master_task` and
58   // `master_incarnation`. We first look for existing WorkerSessions associated
59   // with the specified master task. If there are sessions created by the same
60   // master but with a different incarnation, it indicates that the remote
61   // master has restarted before deleting the sessions on worker. When it
62   // happens, old sessions associated with the master will be automatically
63   // removed before the new session is created.
64   Status CreateSession(
65       const string& session, const ServerDef& server_def,
66       const protobuf::RepeatedPtrField<DeviceAttributes>& device_attributes,
67       bool isolate_session_state, string master_task,
68       int64_t master_incarnation);
69 
70   void ResetDefaultWorkerCache(WorkerCacheInterface* worker_cache);
71 
72   // Updates state (worker cache, devices) of worker session identified by
73   // session name (`session`) based on a new server_def and set of devices.
74   Status UpdateSession(const string& session, const ServerDef& server_def,
75                        const protobuf::RepeatedPtrField<DeviceAttributes>&
76                            cluster_device_attributes,
77                        bool isolate_session_state);
78 
79   // Locates the worker session for a given session handle
80   Status WorkerSessionForSession(const string& session_handle,
81                                  std::shared_ptr<WorkerSession>* out_session);
82   std::shared_ptr<WorkerSession> LegacySession();
83 
84   Status DeleteSession(const string& session);
85 
86   static string WorkerNameFromServerDef(const ServerDef& server_def);
87 
88   void SetLogging(bool active);
89 
90   void RetrieveLogs(int64_t step_id, LoggingResponse* response);
91 
92   void ClearLogs();
93 
94  private:
95   WorkerEnv* const worker_env_;  // Not owned.
96 
97   // A note about destruction:
98   // We must delete graph_mgr before device_mgr, due to shared
99   // ownership of OpKernels in the executors. (The graph_mgr will
100   // free all stateless OpKernels, and pass over borrowed stateful
101   // OpKernels, which are also held in their respective devices'
102   // OpSegments.)
103   //
104   // legacy_session_ owns the worker_env_.device_mgr, and so we must ensure
105   // that sessions_'s WorkerSessions are deleted (which do not own the
106   // underlying devices, but instead own RenamedDevices) before
107   // legacy_session_ is deleted. Further, we must ensure that WorkerSession's
108   // device_mgr is deleted after WorkerSession's graph_mgr.
109 
110   std::unique_ptr<WorkerCacheInterface> default_worker_cache_;
111   std::shared_ptr<WorkerSession> legacy_session_;
112 
113   bool is_logging_active_ = false;
114 
115   const WorkerCacheFactory worker_cache_factory_;
116 
117   Status WorkerSessionForSessionLocked(
118       const string& session_handle, std::shared_ptr<WorkerSession>* out_session)
119       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
120 
121   mutex mu_;
122   // A map from session identifier to internal session structure.
123   std::map<string, std::shared_ptr<WorkerSession>> sessions_ TF_GUARDED_BY(mu_);
124 
125   // Incarnation and WorkerSession handle associated with a master task.
126   struct MasterAssociatedSession {
127     const int64 master_incarnation;
128     const string session_handle;
129   };
130   // A map from master task name to its associated worker sessions.
131   std::unordered_multimap<string, MasterAssociatedSession>
132       master_to_associated_sessions_ TF_GUARDED_BY(mu_);
133 };
134 
135 }  // namespace tensorflow
136 
137 #endif  // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_SESSION_MGR_H_
138