1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_SYNC_STREAM_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_DEVICE_SYNC_STREAM_H_ 19 20 #include <memory> 21 #include <string> 22 #include <vector> 23 #include <thread> 24 #include <mutex> 25 #include <condition_variable> 26 27 #include "runtime/hardware/device_context.h" 28 #include "include/backend/visible.h" 29 30 namespace mindspore { 31 namespace device { 32 constexpr uint32_t kTimeoutInSeconds = 30; 33 34 // Execute synchronization stream with timeout mechanism. Typical application scenarios: it is used to monitor 35 // distributed data parallel training scenarios and whether a process exits unexpectedly. 36 class BACKEND_EXPORT StreamSynchronizer { 37 public: GetInstance()38 static std::shared_ptr<StreamSynchronizer> &GetInstance() { 39 std::lock_guard<std::mutex> lock(instance_lock_); 40 if (instance_ == nullptr) { 41 instance_.reset(new (std::nothrow) StreamSynchronizer()); 42 MS_EXCEPTION_IF_NULL(instance_); 43 instance_->Initialize(); 44 } 45 return instance_; 46 } 47 48 ~StreamSynchronizer() = default; 49 50 // Execute synchronization stream with timeout mechanism. 51 bool SyncStream(const std::string &device_name, uint32_t timeout = kTimeoutInSeconds); 52 53 // Initialize stream synchronizer, Create a thread to actually execute the synchronization stream task. 54 void Initialize(); 55 56 // Finalize stream synchronizer, wait worker_thread_ finish. 57 void Finalize(); 58 59 private: 60 StreamSynchronizer() = default; 61 62 DISABLE_COPY_AND_ASSIGN(StreamSynchronizer); 63 64 // Monitor whether there are synchronization stream tasks, and actually execute the synchronization stream 65 // tasks. 66 void DoSyncStreamTask(); 67 68 // Used for multi-thread safety of singleton creation. 69 static std::mutex instance_lock_; 70 71 // The singleton pointer. 72 static std::shared_ptr<StreamSynchronizer> instance_; 73 74 // Return value of synchronization stream. 75 bool sync_stream_ret_{false}; 76 77 // Whether synchronization stream thread need to stop. 78 bool stop_{false}; 79 80 DeviceContext *device_context_{nullptr}; 81 82 // The method SyncStream is not multiple threads safe, so use this lock to prevent simultaneous access by 83 // multiple threads. 84 std::mutex reentrant_mutex_; 85 86 // Use this lock to ensure the safety of external calls to SyncStream and the execution of DoSyncStreamTask 87 // in worker_thread_; 88 std::mutex task_mutex_; 89 90 // The thread to actually execute the synchronization stream task. 91 std::thread worker_thread_; 92 93 std::condition_variable time_out_cv_; 94 std::condition_variable do_sync_stream_cv_; 95 }; 96 } // namespace device 97 } // namespace mindspore 98 #endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_SYNC_STREAM_H_ 99