1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_CACHE_DATASET_CACHE_IMPL_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_CACHE_DATASET_CACHE_IMPL_H_ 18 19 #include <memory> 20 #include <string> 21 #include <optional> 22 #include <utility> 23 #include <vector> 24 #include "include/api/dual_abi_helper.h" 25 #include "minddata/dataset/engine/cache/cache_client.h" 26 #include "minddata/dataset/engine/datasetops/cache_op.h" 27 #include "minddata/dataset/engine/ir/cache/dataset_cache.h" 28 #include "minddata/dataset/engine/ir/datasetops/source/samplers/samplers_ir.h" 29 30 namespace mindspore { 31 namespace dataset { 32 /// DatasetCache is the IR of CacheClient 33 class DatasetCacheImpl : public DatasetCache { 34 public: 35 friend class PreBuiltDatasetCache; 36 /// 37 /// \brief Constructor 38 /// \param id A user assigned session id for the current pipeline. 39 /// \param mem_sz Size of the memory set aside for the row caching (default=0 which means unlimited, 40 /// note that it might bring in the risk of running out of memory on the machine). 41 /// \param spill Spill to disk if out of memory (default=False). 42 /// \param hostname optional host name (default="127.0.0.1"). 43 /// \param port optional port (default=50052). 44 /// \param num_connections optional number of connections (default=12). 45 /// \param prefetch_sz optional prefetch size (default=20). DatasetCacheImpl(session_id_type id,uint64_t mem_sz,bool spill,std::optional<std::vector<char>> hostname,std::optional<int32_t> port,std::optional<int32_t> num_connections,std::optional<int32_t> prefetch_sz)46 DatasetCacheImpl(session_id_type id, uint64_t mem_sz, bool spill, std::optional<std::vector<char>> hostname, 47 std::optional<int32_t> port, std::optional<int32_t> num_connections, 48 std::optional<int32_t> prefetch_sz) 49 : session_id_(id), 50 cache_mem_sz_(mem_sz), 51 spill_(spill), 52 port_(std::move(port)), 53 num_connections_(std::move(num_connections)), 54 prefetch_sz_(std::move(prefetch_sz)) { 55 if (hostname == std::nullopt) { 56 hostname_ = std::nullopt; 57 } else { 58 hostname_ = std::string(hostname->begin(), hostname->end()); 59 } 60 } 61 62 /// Method to initialize the DatasetCache by creating an instance of a CacheClient 63 /// \return Status Error code 64 Status Build() override; 65 66 Status CreateCacheOp(int32_t num_workers, int32_t connector_queue_size, std::shared_ptr<SamplerObj> sampler, 67 std::shared_ptr<DatasetOp> *ds) override; 68 69 Status CreateCacheLookupOp(int32_t num_workers, int32_t connector_queue_size, std::shared_ptr<SamplerObj> sampler, 70 std::shared_ptr<DatasetOp> *ds) override; 71 72 Status CreateCacheMergeOp(int32_t num_workers, int32_t connector_queue_size, std::shared_ptr<DatasetOp> *ds) override; 73 ValidateParams()74 Status ValidateParams() override { return Status::OK(); } 75 76 Status to_json(nlohmann::json *out_json) override; 77 78 ~DatasetCacheImpl() override = default; 79 80 private: 81 std::shared_ptr<CacheClient> cache_client_; 82 session_id_type session_id_; 83 uint64_t cache_mem_sz_; 84 bool spill_; 85 std::optional<std::string> hostname_; 86 std::optional<int32_t> port_; 87 std::optional<int32_t> num_connections_; 88 std::optional<int32_t> prefetch_sz_; 89 }; 90 } // namespace dataset 91 } // namespace mindspore 92 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_IR_CACHE_DATASET_CACHE_IMPL_H_ 93