1 /**
2 * Copyright 2019-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h"
17
18 #include <algorithm>
19 #include <limits>
20 #include <memory>
21
22 #include "minddata/dataset/util/random.h"
23
24 namespace mindspore {
25 namespace dataset {
DistributedSamplerRT(int64_t num_shards,int64_t shard_id,bool shuffle,int64_t num_samples,uint32_t seed,int64_t offset,bool even_dist)26 DistributedSamplerRT::DistributedSamplerRT(int64_t num_shards, int64_t shard_id, bool shuffle, int64_t num_samples,
27 uint32_t seed, int64_t offset, bool even_dist)
28 : SamplerRT(num_samples, std::numeric_limits<int64_t>::max()),
29 cnt_(0),
30 seed_(seed == std::numeric_limits<uint32_t>::max() ? GetSeed() : seed),
31 device_id_(shard_id),
32 num_devices_(num_shards),
33 shuffle_(shuffle),
34 even_dist_(even_dist),
35 offset_(offset),
36 non_empty_(true) {
37 // Update the num_shards_ in global context. this number is only used for now by auto_num_worker_pass. User discretion
38 // is advised. Auto_num_worker_pass is currently an experimental feature which can still work if the num_shards_ isn't
39 // 100% correct. The reason behind is for now, PreBuildSampler doesn't offer a way to return num_shards. Once
40 // PreBuildSampler is phased out, this can be cleaned up.
41 GlobalContext::config_manager()->set_num_shards_for_auto_num_workers(num_devices_);
42 }
43
InitSampler()44 Status DistributedSamplerRT::InitSampler() {
45 if (is_initialized) {
46 return Status::OK();
47 }
48 // Special value of 0 for num_samples means that the user wants to sample the entire set of data.
49 // If the user asked to sample more rows than exists in the dataset, adjust the num_samples accordingly.
50 if (num_samples_ == 0 || num_samples_ > num_rows_) {
51 num_samples_ = num_rows_;
52 }
53 CHECK_FAIL_RETURN_UNEXPECTED(num_samples_ > 0, "Invalid parameter, num_samples must be greater than 0, but got " +
54 std::to_string(num_samples_) + ".\n");
55 CHECK_FAIL_RETURN_UNEXPECTED(
56 num_rows_ > 0, "[Internal ERROR] num_rows must be greater than 0, but got " + std::to_string(num_rows_) + ".\n");
57 CHECK_FAIL_RETURN_UNEXPECTED(
58 device_id_ < num_devices_ && device_id_ >= 0 && num_rows_ > 0 && num_samples_ > 0,
59 "Invalid parameter, num_shard must be greater than shard_id and greater than 0, got num_shard: " +
60 std::to_string(num_devices_) + ", shard_id: " + std::to_string(device_id_) + ".\n");
61 rnd_.seed(seed_++);
62
63 if (offset_ != -1 || !even_dist_) {
64 if (offset_ == -1) {
65 offset_ = 0;
66 }
67 samples_per_tensor_ = (num_rows_ + offset_) / num_devices_;
68 int64_t remainder = (num_rows_ + offset_) % num_devices_;
69 if (device_id_ < remainder) {
70 samples_per_tensor_++;
71 }
72 if (device_id_ < offset_) {
73 samples_per_tensor_--;
74 }
75 } else {
76 offset_ = 0;
77 samples_per_tensor_ = (num_rows_ + num_devices_ - 1) / num_devices_; // equals to ceil(num_rows/num_devices)
78 }
79 samples_per_tensor_ = num_samples_ < samples_per_tensor_ ? num_samples_ : samples_per_tensor_;
80 if (shuffle_) {
81 shuffle_vec_.reserve(num_rows_);
82 for (int64_t i = 0; i < num_rows_; i++) {
83 shuffle_vec_.push_back(i);
84 }
85 std::shuffle(shuffle_vec_.begin(), shuffle_vec_.end(), rnd_);
86 }
87 if (!samples_per_tensor_) {
88 non_empty_ = false;
89 }
90
91 is_initialized = true;
92 return Status::OK();
93 }
94
GetNextSample(TensorRow * out)95 Status DistributedSamplerRT::GetNextSample(TensorRow *out) {
96 RETURN_UNEXPECTED_IF_NULL(out);
97 if (cnt_ > samples_per_tensor_) {
98 RETURN_STATUS_UNEXPECTED(
99 "[Internal ERROR] Sampler index must be less than or equal to num_samples(total rows in dataset), but got:" +
100 std::to_string(cnt_) + ", samples_per_tensor(num_samples): " + std::to_string(samples_per_tensor_));
101 } else if (cnt_ == samples_per_tensor_ && (non_empty_ || !even_dist_)) {
102 (*out) = TensorRow(TensorRow::kFlagEOE);
103 if (samples_per_tensor_ == 0) {
104 non_empty_ = false;
105 }
106 } else if (samples_per_tensor_ == 0 && !non_empty_) {
107 // If the Tensor is empty, we add samples with subscript 0 in the current dataset.
108 // This step is to make up for the solution that the code default Tensor is not empty before.
109 // We will remove this value in the concat phase
110 non_empty_ = true;
111 std::shared_ptr<Tensor> sample_ids;
112 RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, 1));
113 auto id_ptr = sample_ids->begin<int64_t>();
114 // add index 0
115 *id_ptr = 0;
116 (*out) = {sample_ids};
117 } else {
118 if (HasChildSampler()) {
119 RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_));
120 }
121
122 std::shared_ptr<Tensor> sample_ids;
123 RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, samples_per_tensor_));
124 auto id_ptr = sample_ids->begin<int64_t>();
125 bool flag_add_1 = false;
126 while (cnt_ < samples_per_tensor_ && id_ptr != sample_ids->end<int64_t>()) {
127 int64_t middle_value = num_devices_ * cnt_ + device_id_ - offset_;
128 // if index < 0, we move back one place
129 if (middle_value < 0) {
130 samples_per_tensor_++;
131 cnt_++;
132 flag_add_1 = true;
133 middle_value = num_devices_ * cnt_ + device_id_ - offset_;
134 }
135 int64_t sampled_id = middle_value % num_rows_;
136
137 if (shuffle_) {
138 sampled_id = shuffle_vec_[static_cast<size_t>(sampled_id)];
139 }
140
141 if (HasChildSampler()) {
142 RETURN_IF_NOT_OK(GetAssociatedChildId(&sampled_id, sampled_id));
143 }
144
145 *id_ptr = sampled_id;
146 ++id_ptr;
147 cnt_++;
148 }
149
150 // If 1 was added before, we will cut off 1 here
151 if (flag_add_1) {
152 samples_per_tensor_--;
153 cnt_--;
154 }
155 (*out) = {sample_ids};
156 }
157 return Status::OK();
158 }
159
ResetSampler(const bool failover_reset)160 Status DistributedSamplerRT::ResetSampler(const bool failover_reset) {
161 CHECK_FAIL_RETURN_UNEXPECTED(failover_reset || cnt_ == samples_per_tensor_,
162 "[Internal ERROR] ResetSampler() called early or late.");
163 cnt_ = 0;
164
165 if (shuffle_ == true) {
166 rnd_.seed(seed_);
167 seed_++;
168 std::shuffle(shuffle_vec_.begin(), shuffle_vec_.end(), rnd_);
169 }
170
171 if (HasChildSampler()) {
172 RETURN_IF_NOT_OK(child_[0]->ResetSampler(failover_reset));
173 }
174
175 return Status::OK();
176 }
177
CalculateNumSamples(int64_t num_rows)178 int64_t DistributedSamplerRT::CalculateNumSamples(int64_t num_rows) {
179 int64_t child_num_rows = num_rows;
180 if (!child_.empty()) {
181 child_num_rows = child_[0]->CalculateNumSamples(num_rows);
182 }
183 int64_t num_samples = (num_samples_ > 0) ? std::min(child_num_rows, num_samples_) : child_num_rows;
184 int64_t remainder = (child_num_rows + offset_) % num_devices_;
185 int64_t shard_size = (child_num_rows + offset_) / num_devices_;
186 if (offset_ != -1 || !even_dist_) {
187 if (offset_ == -1) {
188 offset_ = 0;
189 }
190 if (device_id_ < remainder) {
191 shard_size++;
192 }
193 if (device_id_ < offset_) {
194 shard_size--;
195 }
196 } else {
197 shard_size = (child_num_rows + num_devices_ - 1) / num_devices_;
198 }
199 // add 1 to an empty shard
200 // this logic is needed to follow the logic in initSampler that is written for ConcatDataset
201 if (shard_size == 0) {
202 shard_size++;
203 }
204
205 return std::min(num_samples, shard_size);
206 }
207
SamplerPrint(std::ostream & out,bool show_all) const208 void DistributedSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
209 out << "\nSampler: DistributedSampler";
210 if (show_all) {
211 SamplerRT::SamplerPrint(out, show_all);
212 out << "\nseed: " << seed_ << "\ndevice_id: " << device_id_ << "\nnum_devices: " << num_devices_
213 << "\nshuffle: " << shuffle_;
214 }
215 }
216
to_json(nlohmann::json * out_json)217 Status DistributedSamplerRT::to_json(nlohmann::json *out_json) {
218 RETURN_UNEXPECTED_IF_NULL(out_json);
219 nlohmann::json args;
220 RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
221 args["sampler_name"] = "DistributedSampler";
222 args["num_shards"] = num_devices_;
223 args["shard_id"] = device_id_;
224 args["shuffle"] = shuffle_;
225 args["offset"] = offset_;
226 *out_json = args;
227 return Status::OK();
228 }
229
230 } // namespace dataset
231 } // namespace mindspore
232