• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h"
17 
18 #include <algorithm>
19 #include <limits>
20 #include <memory>
21 
22 #include "minddata/dataset/util/random.h"
23 
24 namespace mindspore {
25 namespace dataset {
DistributedSamplerRT(int64_t num_shards,int64_t shard_id,bool shuffle,int64_t num_samples,uint32_t seed,int64_t offset,bool even_dist)26 DistributedSamplerRT::DistributedSamplerRT(int64_t num_shards, int64_t shard_id, bool shuffle, int64_t num_samples,
27                                            uint32_t seed, int64_t offset, bool even_dist)
28     : SamplerRT(num_samples, std::numeric_limits<int64_t>::max()),
29       cnt_(0),
30       seed_(seed == std::numeric_limits<uint32_t>::max() ? GetSeed() : seed),
31       device_id_(shard_id),
32       num_devices_(num_shards),
33       shuffle_(shuffle),
34       even_dist_(even_dist),
35       offset_(offset),
36       non_empty_(true) {
37   // Update the num_shards_ in global context. this number is only used for now by auto_num_worker_pass. User discretion
38   // is advised. Auto_num_worker_pass is currently an experimental feature which can still work if the num_shards_ isn't
39   // 100% correct. The reason behind is for now, PreBuildSampler doesn't offer a way to return num_shards. Once
40   // PreBuildSampler is phased out, this can be cleaned up.
41   GlobalContext::config_manager()->set_num_shards_for_auto_num_workers(num_devices_);
42 }
43 
InitSampler()44 Status DistributedSamplerRT::InitSampler() {
45   if (is_initialized) {
46     return Status::OK();
47   }
48   // Special value of 0 for num_samples means that the user wants to sample the entire set of data.
49   // If the user asked to sample more rows than exists in the dataset, adjust the num_samples accordingly.
50   if (num_samples_ == 0 || num_samples_ > num_rows_) {
51     num_samples_ = num_rows_;
52   }
53   CHECK_FAIL_RETURN_UNEXPECTED(num_samples_ > 0, "Invalid parameter, num_samples must be greater than 0, but got " +
54                                                    std::to_string(num_samples_) + ".\n");
55   CHECK_FAIL_RETURN_UNEXPECTED(
56     num_rows_ > 0, "[Internal ERROR] num_rows must be greater than 0, but got " + std::to_string(num_rows_) + ".\n");
57   CHECK_FAIL_RETURN_UNEXPECTED(
58     device_id_ < num_devices_ && device_id_ >= 0 && num_rows_ > 0 && num_samples_ > 0,
59     "Invalid parameter, num_shard must be greater than shard_id and greater than 0, got num_shard: " +
60       std::to_string(num_devices_) + ", shard_id: " + std::to_string(device_id_) + ".\n");
61   rnd_.seed(seed_++);
62 
63   if (offset_ != -1 || !even_dist_) {
64     if (offset_ == -1) {
65       offset_ = 0;
66     }
67     samples_per_tensor_ = (num_rows_ + offset_) / num_devices_;
68     int64_t remainder = (num_rows_ + offset_) % num_devices_;
69     if (device_id_ < remainder) {
70       samples_per_tensor_++;
71     }
72     if (device_id_ < offset_) {
73       samples_per_tensor_--;
74     }
75   } else {
76     offset_ = 0;
77     samples_per_tensor_ = (num_rows_ + num_devices_ - 1) / num_devices_;  // equals to ceil(num_rows/num_devices)
78   }
79   samples_per_tensor_ = num_samples_ < samples_per_tensor_ ? num_samples_ : samples_per_tensor_;
80   if (shuffle_) {
81     shuffle_vec_.reserve(num_rows_);
82     for (int64_t i = 0; i < num_rows_; i++) {
83       shuffle_vec_.push_back(i);
84     }
85     std::shuffle(shuffle_vec_.begin(), shuffle_vec_.end(), rnd_);
86   }
87   if (!samples_per_tensor_) {
88     non_empty_ = false;
89   }
90 
91   is_initialized = true;
92   return Status::OK();
93 }
94 
GetNextSample(TensorRow * out)95 Status DistributedSamplerRT::GetNextSample(TensorRow *out) {
96   RETURN_UNEXPECTED_IF_NULL(out);
97   if (cnt_ > samples_per_tensor_) {
98     RETURN_STATUS_UNEXPECTED(
99       "[Internal ERROR] Sampler index must be less than or equal to num_samples(total rows in dataset), but got:" +
100       std::to_string(cnt_) + ", samples_per_tensor(num_samples): " + std::to_string(samples_per_tensor_));
101   } else if (cnt_ == samples_per_tensor_ && (non_empty_ || !even_dist_)) {
102     (*out) = TensorRow(TensorRow::kFlagEOE);
103     if (samples_per_tensor_ == 0) {
104       non_empty_ = false;
105     }
106   } else if (samples_per_tensor_ == 0 && !non_empty_) {
107     // If the Tensor is empty, we add samples with subscript 0 in the current dataset.
108     // This step is to make up for the solution that the code default Tensor is not empty before.
109     // We will remove this value in the concat phase
110     non_empty_ = true;
111     std::shared_ptr<Tensor> sample_ids;
112     RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, 1));
113     auto id_ptr = sample_ids->begin<int64_t>();
114     // add index 0
115     *id_ptr = 0;
116     (*out) = {sample_ids};
117   } else {
118     if (HasChildSampler()) {
119       RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_));
120     }
121 
122     std::shared_ptr<Tensor> sample_ids;
123     RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, samples_per_tensor_));
124     auto id_ptr = sample_ids->begin<int64_t>();
125     bool flag_add_1 = false;
126     while (cnt_ < samples_per_tensor_ && id_ptr != sample_ids->end<int64_t>()) {
127       int64_t middle_value = num_devices_ * cnt_ + device_id_ - offset_;
128       // if index < 0, we move back one place
129       if (middle_value < 0) {
130         samples_per_tensor_++;
131         cnt_++;
132         flag_add_1 = true;
133         middle_value = num_devices_ * cnt_ + device_id_ - offset_;
134       }
135       int64_t sampled_id = middle_value % num_rows_;
136 
137       if (shuffle_) {
138         sampled_id = shuffle_vec_[static_cast<size_t>(sampled_id)];
139       }
140 
141       if (HasChildSampler()) {
142         RETURN_IF_NOT_OK(GetAssociatedChildId(&sampled_id, sampled_id));
143       }
144 
145       *id_ptr = sampled_id;
146       ++id_ptr;
147       cnt_++;
148     }
149 
150     // If 1 was added before, we will cut off 1 here
151     if (flag_add_1) {
152       samples_per_tensor_--;
153       cnt_--;
154     }
155     (*out) = {sample_ids};
156   }
157   return Status::OK();
158 }
159 
ResetSampler(const bool failover_reset)160 Status DistributedSamplerRT::ResetSampler(const bool failover_reset) {
161   CHECK_FAIL_RETURN_UNEXPECTED(failover_reset || cnt_ == samples_per_tensor_,
162                                "[Internal ERROR] ResetSampler() called early or late.");
163   cnt_ = 0;
164 
165   if (shuffle_ == true) {
166     rnd_.seed(seed_);
167     seed_++;
168     std::shuffle(shuffle_vec_.begin(), shuffle_vec_.end(), rnd_);
169   }
170 
171   if (HasChildSampler()) {
172     RETURN_IF_NOT_OK(child_[0]->ResetSampler(failover_reset));
173   }
174 
175   return Status::OK();
176 }
177 
CalculateNumSamples(int64_t num_rows)178 int64_t DistributedSamplerRT::CalculateNumSamples(int64_t num_rows) {
179   int64_t child_num_rows = num_rows;
180   if (!child_.empty()) {
181     child_num_rows = child_[0]->CalculateNumSamples(num_rows);
182   }
183   int64_t num_samples = (num_samples_ > 0) ? std::min(child_num_rows, num_samples_) : child_num_rows;
184   int64_t remainder = (child_num_rows + offset_) % num_devices_;
185   int64_t shard_size = (child_num_rows + offset_) / num_devices_;
186   if (offset_ != -1 || !even_dist_) {
187     if (offset_ == -1) {
188       offset_ = 0;
189     }
190     if (device_id_ < remainder) {
191       shard_size++;
192     }
193     if (device_id_ < offset_) {
194       shard_size--;
195     }
196   } else {
197     shard_size = (child_num_rows + num_devices_ - 1) / num_devices_;
198   }
199   // add 1 to an empty shard
200   // this logic is needed to follow the logic in initSampler that is written for ConcatDataset
201   if (shard_size == 0) {
202     shard_size++;
203   }
204 
205   return std::min(num_samples, shard_size);
206 }
207 
SamplerPrint(std::ostream & out,bool show_all) const208 void DistributedSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
209   out << "\nSampler: DistributedSampler";
210   if (show_all) {
211     SamplerRT::SamplerPrint(out, show_all);
212     out << "\nseed: " << seed_ << "\ndevice_id: " << device_id_ << "\nnum_devices: " << num_devices_
213         << "\nshuffle: " << shuffle_;
214   }
215 }
216 
to_json(nlohmann::json * out_json)217 Status DistributedSamplerRT::to_json(nlohmann::json *out_json) {
218   RETURN_UNEXPECTED_IF_NULL(out_json);
219   nlohmann::json args;
220   RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
221   args["sampler_name"] = "DistributedSampler";
222   args["num_shards"] = num_devices_;
223   args["shard_id"] = device_id_;
224   args["shuffle"] = shuffle_;
225   args["offset"] = offset_;
226   *out_json = args;
227   return Status::OK();
228 }
229 
230 }  // namespace dataset
231 }  // namespace mindspore
232