1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_ 17 #define TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_ 18 19 #include <string> 20 #include <unordered_map> 21 22 #include "tensorflow/core/common_runtime/device_mgr.h" 23 #include "tensorflow/core/framework/rendezvous.h" 24 #include "tensorflow/core/framework/tensor.h" 25 #include "tensorflow/core/lib/core/status.h" 26 #include "tensorflow/core/platform/macros.h" 27 #include "tensorflow/core/platform/mutex.h" 28 #include "tensorflow/core/platform/types.h" 29 30 namespace tensorflow { 31 32 // IntraProcessRendezvous is a Rendezvous which expects all producers 33 // and consumers to be devices immediately accessible within the 34 // process. That is, it will never be necessary to perform an RPC to 35 // communicate with either. 36 // 37 // Buffering of Tensor values is delegated to a "local" Rendezvous 38 // obtained from NewLocalRendezvous(). This class just adds 39 // functionality to coordinate multiple process-local devices. 40 class IntraProcessRendezvous : public Rendezvous { 41 public: 42 explicit IntraProcessRendezvous(const DeviceMgr* device_mgr); 43 44 // Forwards to local_, where the Tensor "val" will be buffered and 45 // any waiting callback stored. 46 Status Send(const ParsedKey& key, const Rendezvous::Args& args, 47 const Tensor& val, const bool is_dead) override; 48 49 // This method is called only by the RecvOp. It tests to see 50 // whether the value will be produced by a local or remote device 51 // and handles accordingly. In the local case it forwards to 52 // local_, in the remote case it initiates an RPC request. 53 void RecvAsync(const ParsedKey& key, const Rendezvous::Args& args, 54 DoneCallback done) override; 55 56 void StartAbort(const Status& status) override; 57 58 private: 59 const DeviceMgr* device_mgr_; 60 Rendezvous* local_; // Owns a Ref on this object. 61 62 mutable mutex mu_; 63 64 // Status given by StartAbort() if any. 65 Status status_ GUARDED_BY(mu_); 66 67 ~IntraProcessRendezvous() override; 68 69 // Parses "key" into "parsed". If "is_src" is true, checks that the 70 // rendezvous key's source is in this process. If "is_src" is false, 71 // checks that the rendezvous key's destination is in this process. 72 Status ParseKey(const string& key, bool is_src, 73 Rendezvous::ParsedKey* parsed); 74 75 // Callback handling the case when a rendezvous has been 76 // accomplished in local_ and the consumer is local to this process. 77 // Tensor "in" will be copied into "out". The key "parsed" encodes 78 // the src and dst devices. 79 typedef std::function<void(const Status&)> StatusCallback; 80 void SameWorkerRecvDone(const Rendezvous::ParsedKey& parsed, 81 const Rendezvous::Args& send_args, 82 const Rendezvous::Args& recv_args, const Tensor& in, 83 Tensor* out, StatusCallback done); 84 85 TF_DISALLOW_COPY_AND_ASSIGN(IntraProcessRendezvous); 86 }; 87 88 } // end namespace tensorflow 89 90 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_RENDEZVOUS_MGR_H_ 91