1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_PS_PS_CACHE_PS_DATA_PS_DATA_PREFETCH_H_ 17 #define MINDSPORE_CCSRC_PS_PS_CACHE_PS_DATA_PS_DATA_PREFETCH_H_ 18 19 #include <map> 20 #include <string> 21 #include <memory> 22 #include <atomic> 23 #include <condition_variable> 24 #include "ps/ps_cache/ps_data/ps_data_channel.h" 25 26 #define EXPORT __attribute__((visibility("default"))) 27 28 namespace mindspore { 29 namespace ps { 30 class EXPORT PsDataPrefetch { 31 public: GetInstance()32 EXPORT static PsDataPrefetch &GetInstance() { 33 static PsDataPrefetch instance; 34 return instance; 35 } 36 cache_enable()37 EXPORT bool cache_enable() const { return cache_enable_; } set_cache_enable(bool cache_enable)38 EXPORT void set_cache_enable(bool cache_enable) { cache_enable_ = cache_enable; } 39 EXPORT void CreateDataChannel(const std::string &channel_name, size_t step_num); 40 EXPORT bool PrefetchData(const std::string &channel_name, void *data, const size_t data_size, 41 const std::string &data_type); 42 EXPORT bool FinalizeData(const std::string &channel_name); 43 EXPORT void NotifyFinalize(); 44 EXPORT bool QueryData(const std::string &channel_name, void **data_ptr) const; 45 EXPORT size_t data_size(const std::string &channel_name) const; 46 EXPORT bool TryWakeChannel(const std::string &channel_name); 47 48 private: PsDataPrefetch()49 PsDataPrefetch() : cache_enable_(false), data_ready_(false) {} 50 virtual ~PsDataPrefetch() = default; 51 PsDataPrefetch(const PsDataPrefetch &) = delete; 52 PsDataPrefetch &operator=(const PsDataPrefetch &) = delete; 53 std::shared_ptr<PsDataChannel> ps_data_channel(const std::string &channel_name) const; 54 void WakeAllChannel(); 55 std::map<std::string, std::shared_ptr<PsDataChannel>> ps_data_channel_map_; 56 bool cache_enable_; 57 bool data_ready_; 58 std::mutex data_mutex_; 59 std::condition_variable data_prefetch_; 60 std::condition_variable data_process_; 61 std::atomic_bool need_wait_{true}; 62 std::atomic_bool invalid_data_type_{false}; 63 }; 64 } // namespace ps 65 } // namespace mindspore 66 #endif // MINDSPORE_CCSRC_PS_PS_CACHE_PS_DATA_PS_DATA_PREFETCH_H_ 67