1 /**
2 * Copyright 2020-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
18 #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
19
20 #include <algorithm>
21 #include <memory>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25 #include <map>
26 #include <functional>
27 #include "plugin/device/cpu/kernel/cpu_kernel.h"
28 #include "plugin/factory/ms_factory.h"
29 #include "include/common/thread_pool.h"
30 #include "ops/op_utils.h"
31
32 namespace mindspore {
33 namespace kernel {
34 template <typename DataType, typename IndexType>
35 struct UniqueParam {
36 DataType *input_{nullptr};
37 IndexType *input_idx_{nullptr};
38 DataType *output_{nullptr};
39 IndexType *inverse_idx_{nullptr};
40 DataType *workspace_{nullptr};
41 IndexType *workspace_idx_{nullptr};
42 size_t input_size_{0};
43 size_t output_size_{0};
44 size_t thread_num_{0};
45 bool need_sort_{true};
46 };
47
48 template <typename FromType>
ToSize(FromType input)49 size_t ToSize(FromType input) {
50 return static_cast<size_t>(input);
51 }
52
53 template <>
ToSize(int input)54 inline size_t ToSize(int input) {
55 return IntToSize(input);
56 }
57
58 template <>
ToSize(int64_t input)59 inline size_t ToSize(int64_t input) {
60 return LongToSize(input);
61 }
62
63 template <>
ToSize(float16 input)64 inline size_t ToSize(float16 input) {
65 return FloatToSize(static_cast<float>(input));
66 }
67
68 template <>
ToSize(float input)69 inline size_t ToSize(float input) {
70 return FloatToSize(input);
71 }
72
73 template <typename ToType>
SizeTo(size_t input)74 ToType SizeTo(size_t input) {
75 return static_cast<ToType>(input);
76 }
77
78 template <>
SizeTo(size_t input)79 inline int SizeTo(size_t input) {
80 return SizeToInt(input);
81 }
82
83 template <>
SizeTo(size_t input)84 inline int64_t SizeTo(size_t input) {
85 return SizeToLong(input);
86 }
87
88 template <typename DataType>
NotEqual(DataType left,DataType right)89 bool NotEqual(DataType left, DataType right) {
90 return left != right;
91 }
92
93 template <>
NotEqual(float left,float right)94 inline bool NotEqual(float left, float right) {
95 const float kEps = 1e-30;
96 return abs(left - right) > kEps;
97 }
98
99 template <>
NotEqual(double left,double right)100 inline bool NotEqual(double left, double right) {
101 const double kEps = 1e-30;
102 return abs(left - right) > kEps;
103 }
104
105 template <typename DataType>
BucketId(DataType input,size_t bucket_num)106 size_t BucketId(DataType input, size_t bucket_num) {
107 if (static_cast<double>(input) < 0.0) {
108 input = -input;
109 }
110 size_t data = ToSize<DataType>(input);
111 if (static_cast<double>(bucket_num) < 1.0) {
112 return data;
113 }
114 return data % bucket_num;
115 }
116
117 class UniqueCpuKernelMod : public NativeCpuKernelMod {
118 public:
119 UniqueCpuKernelMod() = default;
120 ~UniqueCpuKernelMod() override = default;
121
Init(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)122 bool Init(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &outputs) override {
123 dtype_ = inputs[0]->dtype_id();
124 auto batch_rank = ops::get_batch_rank(primitive_);
125 if (batch_rank < 0) {
126 return false;
127 }
128 batch_rank_ = static_cast<size_t>(batch_rank);
129 return true;
130 }
Resize(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)131 int Resize(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &outputs) override {
132 auto ret = KernelMod::Resize(inputs, outputs);
133 if (ret != KRET_UNKNOWN_OUT_SHAPE && ret != KRET_OK) {
134 return ret;
135 }
136 if (inputs.size() < 1) {
137 MS_LOG(EXCEPTION) << kernel_name_ << " requires not less than 1 inputs, but got " << inputs.size() << ".";
138 }
139 auto input_shape = inputs[0]->GetShapeVector();
140 if (batch_rank_ == 0) {
141 if (input_shape.size() != 1) {
142 MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', the dimension of input must be 1D, but got "
143 << input_shape.size() << "D";
144 }
145 batch_size_ = 1;
146 input_size_ = static_cast<size_t>(input_shape[0]);
147 } else {
148 if (input_shape.size() != static_cast<size_t>(batch_rank_ + 1)) {
149 MS_LOG(EXCEPTION) << "For '" << kernel_name_
150 << "', the shape size of 'input' must be equal to 'batch_rank + 1', "
151 "but got the shape of 'input': "
152 << input_shape << " and 'batch_rank': " << batch_rank_;
153 }
154 batch_size_ =
155 std::accumulate(input_shape.begin(), input_shape.begin() + batch_rank_, 1, std::multiplies<int64_t>());
156 input_size_ = static_cast<size_t>(input_shape[batch_rank_]);
157 }
158 if (primitive_->HasAttr(SORTED)) {
159 auto value_ptr = primitive_->GetAttr(SORTED);
160 sorted_ = GetValue<bool>(value_ptr);
161 }
162 workspace_size_list_.clear();
163 (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
164 (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
165 (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
166 return ret;
167 }
168
169 bool Launch(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &workspace,
170 const std::vector<KernelTensor *> &outputs) override;
171
GetOpSupport()172 std::vector<KernelAttr> GetOpSupport() override {
173 static const std::vector<KernelAttr> support_list = {
174 KernelAttr().AddInputAttr(kNumberTypeInt8).AddOutputAttr(kNumberTypeInt8).AddOutputAttr(kNumberTypeInt32),
175 KernelAttr().AddInputAttr(kNumberTypeInt16).AddOutputAttr(kNumberTypeInt16).AddOutputAttr(kNumberTypeInt32),
176 KernelAttr().AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32),
177 KernelAttr().AddInputAttr(kNumberTypeInt64).AddOutputAttr(kNumberTypeInt64).AddOutputAttr(kNumberTypeInt64),
178 KernelAttr().AddInputAttr(kNumberTypeUInt8).AddOutputAttr(kNumberTypeUInt8).AddOutputAttr(kNumberTypeInt32),
179 KernelAttr().AddInputAttr(kNumberTypeUInt16).AddOutputAttr(kNumberTypeUInt16).AddOutputAttr(kNumberTypeInt32),
180 KernelAttr().AddInputAttr(kNumberTypeFloat16).AddOutputAttr(kNumberTypeFloat16).AddOutputAttr(kNumberTypeInt32),
181 KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeInt32),
182 KernelAttr().AddInputAttr(kNumberTypeFloat64).AddOutputAttr(kNumberTypeFloat64).AddOutputAttr(kNumberTypeInt64),
183 };
184 return support_list;
185 }
IsNeedUpdateOutputShapeAndSize()186 bool IsNeedUpdateOutputShapeAndSize() override { return true; }
UpdateOutputShapeAndSize(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)187 void UpdateOutputShapeAndSize(const std::vector<KernelTensor *> &inputs,
188 const std::vector<KernelTensor *> &outputs) override {
189 ShapeVector out_shape;
190 if (output_sizes_.empty()) {
191 (void)out_shape.emplace_back(SizeToLong(0));
192 } else {
193 (void)out_shape.emplace_back(SizeToLong(output_sizes_[0]));
194 }
195 outputs[0]->SetShapeVector(out_shape);
196 outputs[0]->set_size(LongToSize(out_shape[0]) * UnitSizeInBytes(outputs[0]->dtype_id()));
197 }
198
199 protected:
200 template <typename DataType, typename IndexType>
201 void LaunchKernel(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &workspace,
202 const std::vector<KernelTensor *> &outputs);
203 size_t input_size_{0};
204 TypeId dtype_{kTypeUnknown};
205 size_t batch_size_{1};
206 size_t batch_rank_{0};
207 std::vector<size_t> output_sizes_;
208 bool sorted_{false};
209
210 template <typename DataType, typename IndexType>
CalculateEachBucketSize(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<size_t> * each_bucket_size)211 static void CalculateEachBucketSize(const std::shared_ptr<UniqueParam<DataType, IndexType>> ¶ms,
212 std::vector<size_t> *each_bucket_size) {
213 MS_EXCEPTION_IF_NULL(params);
214 MS_EXCEPTION_IF_NULL(params->input_);
215 MS_EXCEPTION_IF_NULL(each_bucket_size);
216 size_t bucket_num = each_bucket_size->size();
217 if (params->input_size_ < 1) {
218 return;
219 }
220 for (size_t i = 0; i < params->input_size_; ++i) {
221 auto bucket_id = BucketId(params->input_[i], bucket_num);
222 each_bucket_size->at(bucket_id)++;
223 }
224 }
225
226 template <typename DataType, typename IndexType>
SplitAndCalculateBucketSize(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * segments_ptr,std::vector<std::shared_ptr<std::vector<size_t>>> * segment_bucket_sizes_ptr)227 static void SplitAndCalculateBucketSize(const std::shared_ptr<UniqueParam<DataType, IndexType>> ¶ms,
228 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *segments_ptr,
229 std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr) {
230 MS_EXCEPTION_IF_NULL(params);
231 MS_EXCEPTION_IF_NULL(params->input_);
232 MS_EXCEPTION_IF_NULL(segments_ptr);
233 MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr);
234 auto &segments = *segments_ptr;
235 auto &segment_bucket_sizes = *segment_bucket_sizes_ptr;
236
237 size_t input_size = params->input_size_;
238 size_t thread_num = params->thread_num_;
239 if (thread_num < 1) {
240 MS_LOG(EXCEPTION) << "For 'Unique', thread num must be greater than 0, but got " << thread_num;
241 }
242 size_t thread_data_size = input_size / thread_num;
243 size_t left_data_size = input_size % thread_num;
244 segments.reserve(thread_num);
245 segment_bucket_sizes.reserve(thread_num);
246 size_t current_offset = 0;
247 std::vector<common::Task> tasks;
248 tasks.reserve(thread_num);
249 for (size_t i = 0; i < thread_num; ++i) {
250 (void)segment_bucket_sizes.emplace_back(std::make_shared<std::vector<size_t>>(thread_num, 0));
251 size_t data_size = thread_data_size;
252 if (i < left_data_size) {
253 data_size += 1;
254 }
255 (void)segments.emplace_back(std::make_shared<UniqueParam<DataType, IndexType>>());
256 segments[i]->input_ = params->input_ + current_offset;
257 segments[i]->input_size_ = data_size;
258 segments[i]->thread_num_ = thread_num;
259 auto task = [&segments, &segment_bucket_sizes, i]() {
260 CalculateEachBucketSize<DataType, IndexType>(segments[i], segment_bucket_sizes[i].get());
261 return common::SUCCESS;
262 };
263 (void)tasks.emplace_back(task);
264 current_offset += data_size;
265 }
266 ParallelLaunch(tasks);
267 }
268
269 template <typename DataType, typename IndexType>
SegmentToBuckets(const std::shared_ptr<UniqueParam<DataType,IndexType>> & segment,size_t segment_offset,const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets)270 static void SegmentToBuckets(const std::shared_ptr<UniqueParam<DataType, IndexType>> &segment, size_t segment_offset,
271 const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets) {
272 MS_LOG(DEBUG) << "Start";
273 MS_EXCEPTION_IF_NULL(segment);
274 MS_EXCEPTION_IF_NULL(segment->input_);
275 std::vector<size_t> bucket_data_num(segment->thread_num_, 0);
276 auto bucket_size = buckets.size();
277 if (segment->input_size_ < 1) {
278 return;
279 }
280 for (size_t i = 0; i < segment->input_size_; ++i) {
281 DataType data = segment->input_[i];
282 auto bucket_id = BucketId(data, segment->thread_num_);
283 auto bucket_index = bucket_data_num[bucket_id];
284 if (bucket_id >= bucket_size) {
285 MS_LOG(ERROR) << "For 'Unique', bucket id must be less than bucket size, but got 'bucket_id': " << bucket_id
286 << "and 'bucket_size': " << bucket_size;
287 continue;
288 }
289 auto &bucket = buckets[bucket_id];
290 MS_EXCEPTION_IF_NULL(bucket);
291 if (bucket_index >= bucket->input_size_) {
292 MS_LOG(ERROR) << "For 'Unique', bucket index must be less than input size, but got bucket index: "
293 << bucket_index << "and input size: " << bucket->input_size_;
294 continue;
295 }
296 bucket->input_[bucket_index] = data;
297 bucket->workspace_idx_[bucket_index] = SizeTo<IndexType>(segment_offset + i);
298 bucket_data_num[bucket_id]++;
299 }
300 MS_LOG(DEBUG) << "End";
301 }
302
303 template <typename DataType, typename IndexType>
GatherSegmentsToBuckets(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * segments_ptr,std::vector<std::shared_ptr<std::vector<size_t>>> * segment_bucket_sizes_ptr,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * buckets_ptr)304 static void GatherSegmentsToBuckets(const std::shared_ptr<UniqueParam<DataType, IndexType>> ¶ms,
305 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *segments_ptr,
306 std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr,
307 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *buckets_ptr) {
308 MS_LOG(DEBUG) << "Start";
309 MS_EXCEPTION_IF_NULL(params);
310 MS_EXCEPTION_IF_NULL(params->workspace_);
311 MS_EXCEPTION_IF_NULL(params->inverse_idx_);
312 MS_EXCEPTION_IF_NULL(params->workspace_idx_);
313 MS_EXCEPTION_IF_NULL(params->output_);
314 MS_EXCEPTION_IF_NULL(params->input_idx_);
315 MS_EXCEPTION_IF_NULL(segments_ptr);
316 MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr);
317 MS_EXCEPTION_IF_NULL(buckets_ptr);
318 auto &segments = *segments_ptr;
319 auto &segment_bucket_sizes = *segment_bucket_sizes_ptr;
320 auto &buckets = *buckets_ptr;
321 auto thread_num = segments.size();
322 buckets.reserve(thread_num);
323 std::vector<size_t> bucket_data_size(thread_num, 0);
324 for (size_t i = 0; i < thread_num; ++i) {
325 for (size_t j = 0; j < thread_num; ++j) {
326 bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
327 }
328 }
329
330 size_t current_offset = 0;
331 for (size_t i = 0; i < thread_num; ++i) {
332 auto bucket = std::make_shared<UniqueParam<DataType, IndexType>>();
333 bucket->input_ = params->output_ + current_offset;
334 bucket->input_idx_ = params->inverse_idx_ + current_offset;
335 bucket->workspace_idx_ = params->workspace_idx_ + current_offset;
336 bucket->output_ = params->workspace_ + current_offset;
337 bucket->inverse_idx_ = params->input_idx_ + current_offset;
338 bucket->input_size_ = bucket_data_size[i];
339 current_offset += bucket_data_size[i];
340 (void)buckets.emplace_back(bucket);
341 }
342 std::vector<size_t> tmp_bucket_data_size(thread_num, 0);
343 std::vector<std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>>> thread_buckets;
344 for (size_t i = 0; i < thread_num; ++i) {
345 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> local_buckets;
346 for (size_t j = 0; j < thread_num; ++j) {
347 auto bucket = std::make_shared<UniqueParam<DataType, IndexType>>();
348 bucket->input_ = buckets[j]->input_ + tmp_bucket_data_size[j];
349 bucket->input_size_ = buckets[j]->input_size_ - tmp_bucket_data_size[j];
350 bucket->workspace_idx_ = buckets[j]->workspace_idx_ + tmp_bucket_data_size[j];
351 (void)local_buckets.emplace_back(bucket);
352 tmp_bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
353 }
354 (void)thread_buckets.emplace_back(local_buckets);
355 }
356 std::vector<common::Task> tasks;
357 tasks.reserve(thread_num);
358 current_offset = 0;
359 for (size_t i = 0; i < thread_num; ++i) {
360 MS_EXCEPTION_IF_NULL(segments[i]);
361 auto task = [&segments, &thread_buckets, current_offset, i]() {
362 SegmentToBuckets<DataType, IndexType>(segments[i], current_offset, thread_buckets[i]);
363 return common::SUCCESS;
364 };
365 (void)tasks.emplace_back(task);
366 current_offset += segments[i]->input_size_;
367 }
368 ParallelLaunch(tasks);
369 MS_LOG(DEBUG) << "End";
370 }
371
372 template <typename DataType, typename IndexType>
Unique(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params)373 static void Unique(const std::shared_ptr<UniqueParam<DataType, IndexType>> ¶ms) {
374 MS_LOG(DEBUG) << "Start";
375 MS_EXCEPTION_IF_NULL(params);
376 DataType *input = params->input_;
377 IndexType *input_idx = params->input_idx_;
378 DataType *output = params->output_;
379 IndexType *inverse_idx = params->inverse_idx_;
380 MS_EXCEPTION_IF_NULL(input);
381 MS_EXCEPTION_IF_NULL(input_idx);
382 MS_EXCEPTION_IF_NULL(output);
383 MS_EXCEPTION_IF_NULL(inverse_idx);
384 IndexType j = 0;
385 if (params->input_size_ < 1) {
386 return;
387 }
388 if (params->need_sort_) {
389 for (size_t i = 0; i < params->input_size_; ++i) {
390 input_idx[i] = SizeTo<IndexType>(i);
391 }
392 std::sort(input_idx, input_idx + params->input_size_,
393 [&](size_t left, size_t right) { return input[left] < input[right]; });
394 DataType last = input[0];
395 for (size_t i = 0; i < params->input_size_; ++i) {
396 auto curr = input[input_idx[i]];
397 if (i == 0 || NotEqual(curr, last)) {
398 if (i != 0) {
399 j++;
400 }
401 output[j] = curr;
402 inverse_idx[input_idx[i]] = j;
403 last = curr;
404 } else {
405 inverse_idx[input_idx[i]] = j;
406 }
407 }
408 params->output_size_ = ToSize<IndexType>(j + 1);
409 } else {
410 std::unordered_map<DataType, IndexType> uniq;
411 uniq.reserve(params->input_size_);
412 for (size_t i = 0; i < params->input_size_; ++i) {
413 auto it = uniq.emplace(input[i], j);
414 inverse_idx[i] = it.first->second;
415 if (it.second) {
416 ++j;
417 }
418 }
419 for (const auto &it : uniq) {
420 output[it.second] = it.first;
421 }
422 params->output_size_ = ToSize<IndexType>(j);
423 }
424 MS_LOG(DEBUG) << "End";
425 }
426
427 template <typename DataType, typename IndexType>
UniqueEachBucket(const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets)428 static void UniqueEachBucket(const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets) {
429 MS_LOG(DEBUG) << "Start";
430 size_t thread_num = buckets.size();
431 std::vector<common::Task> tasks;
432 tasks.reserve(thread_num);
433 for (size_t i = 0; i < thread_num; ++i) {
434 auto task = [&buckets, i]() {
435 Unique<DataType, IndexType>(buckets[i]);
436 return common::SUCCESS;
437 };
438 (void)tasks.emplace_back(task);
439 }
440 ParallelLaunch(tasks);
441 MS_LOG(DEBUG) << "End";
442 }
443
444 template <typename DataType, typename IndexType>
TransformBucketReverseIndices(const std::shared_ptr<UniqueParam<DataType,IndexType>> & bucket,const std::shared_ptr<UniqueParam<DataType,IndexType>> & result,size_t offset)445 static void TransformBucketReverseIndices(const std::shared_ptr<UniqueParam<DataType, IndexType>> &bucket,
446 const std::shared_ptr<UniqueParam<DataType, IndexType>> &result,
447 size_t offset) {
448 MS_EXCEPTION_IF_NULL(bucket);
449 MS_EXCEPTION_IF_NULL(bucket->inverse_idx_);
450 MS_EXCEPTION_IF_NULL(bucket->workspace_idx_);
451 MS_EXCEPTION_IF_NULL(result);
452 MS_EXCEPTION_IF_NULL(result->inverse_idx_);
453 if (bucket->input_size_ < 1) {
454 return;
455 }
456 for (size_t i = 0; i < bucket->input_size_; ++i) {
457 auto origin_idx = bucket->workspace_idx_[i];
458 if (origin_idx < 0) {
459 continue;
460 }
461 size_t index = ToSize<IndexType>(origin_idx);
462 if (index < result->input_size_) {
463 result->inverse_idx_[index] = bucket->inverse_idx_[i] + SizeTo<IndexType>(offset);
464 }
465 }
466 }
467
468 template <typename DataType, typename IndexType>
MergeBuckets(const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets,const std::shared_ptr<UniqueParam<DataType,IndexType>> & result)469 static void MergeBuckets(const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets,
470 const std::shared_ptr<UniqueParam<DataType, IndexType>> &result) {
471 MS_LOG(DEBUG) << "Start";
472 MS_EXCEPTION_IF_NULL(result);
473 MS_EXCEPTION_IF_NULL(result->output_);
474 size_t thread_num = buckets.size();
475 std::vector<size_t> bucket_offsets(thread_num);
476 size_t current_size = 0;
477 for (size_t i = 0; i < thread_num; ++i) {
478 auto bucket = buckets[i];
479 MS_EXCEPTION_IF_NULL(bucket);
480 MS_EXCEPTION_IF_NULL(bucket->output_);
481 bucket_offsets[i] = current_size;
482 auto ret_code = memcpy_s(result->output_ + current_size, (result->input_size_ - current_size) * sizeof(DataType),
483 bucket->output_, bucket->output_size_ * sizeof(DataType));
484 if (ret_code != EOK) {
485 MS_LOG(EXCEPTION) << "For 'Unique', copy data failed, error no: " << ret_code;
486 }
487 current_size += bucket->output_size_;
488 }
489 result->output_size_ = current_size;
490
491 std::vector<common::Task> tasks;
492 tasks.reserve(thread_num);
493 for (size_t i = 0; i < thread_num; ++i) {
494 auto task = [&buckets, i, result, &bucket_offsets]() {
495 TransformBucketReverseIndices<DataType, IndexType>(buckets[i], result, bucket_offsets[i]);
496 return common::SUCCESS;
497 };
498 (void)tasks.emplace_back(task);
499 }
500 ParallelLaunch(tasks);
501 MS_LOG(DEBUG) << "End";
502 }
503
504 template <typename DataType, typename IndexType>
BucketUnique(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params)505 static void BucketUnique(const std::shared_ptr<UniqueParam<DataType, IndexType>> ¶ms) {
506 MS_EXCEPTION_IF_NULL(params);
507 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> segments;
508 std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> buckets;
509 std::vector<std::shared_ptr<std::vector<size_t>>> segment_bucket_sizes;
510 SplitAndCalculateBucketSize(params, &segments, &segment_bucket_sizes);
511 GatherSegmentsToBuckets(params, &segments, &segment_bucket_sizes, &buckets);
512 UniqueEachBucket(buckets);
513 MergeBuckets(buckets, params);
514 }
515 };
516 } // namespace kernel
517 } // namespace mindspore
518 #endif // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
519