• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
18 #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
19 
20 #include <algorithm>
21 #include <memory>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25 #include <map>
26 #include <functional>
27 #include "plugin/device/cpu/kernel/cpu_kernel.h"
28 #include "plugin/factory/ms_factory.h"
29 #include "include/common/thread_pool.h"
30 #include "ops/op_utils.h"
31 
32 namespace mindspore {
33 namespace kernel {
34 template <typename DataType, typename IndexType>
35 struct UniqueParam {
36   DataType *input_{nullptr};
37   IndexType *input_idx_{nullptr};
38   DataType *output_{nullptr};
39   IndexType *inverse_idx_{nullptr};
40   DataType *workspace_{nullptr};
41   IndexType *workspace_idx_{nullptr};
42   size_t input_size_{0};
43   size_t output_size_{0};
44   size_t thread_num_{0};
45   bool need_sort_{true};
46 };
47 
48 template <typename FromType>
ToSize(FromType input)49 size_t ToSize(FromType input) {
50   return static_cast<size_t>(input);
51 }
52 
53 template <>
ToSize(int input)54 inline size_t ToSize(int input) {
55   return IntToSize(input);
56 }
57 
58 template <>
ToSize(int64_t input)59 inline size_t ToSize(int64_t input) {
60   return LongToSize(input);
61 }
62 
63 template <>
ToSize(float16 input)64 inline size_t ToSize(float16 input) {
65   return FloatToSize(static_cast<float>(input));
66 }
67 
68 template <>
ToSize(float input)69 inline size_t ToSize(float input) {
70   return FloatToSize(input);
71 }
72 
73 template <typename ToType>
SizeTo(size_t input)74 ToType SizeTo(size_t input) {
75   return static_cast<ToType>(input);
76 }
77 
78 template <>
SizeTo(size_t input)79 inline int SizeTo(size_t input) {
80   return SizeToInt(input);
81 }
82 
83 template <>
SizeTo(size_t input)84 inline int64_t SizeTo(size_t input) {
85   return SizeToLong(input);
86 }
87 
88 template <typename DataType>
NotEqual(DataType left,DataType right)89 bool NotEqual(DataType left, DataType right) {
90   return left != right;
91 }
92 
93 template <>
NotEqual(float left,float right)94 inline bool NotEqual(float left, float right) {
95   const float kEps = 1e-30;
96   return abs(left - right) > kEps;
97 }
98 
99 template <>
NotEqual(double left,double right)100 inline bool NotEqual(double left, double right) {
101   const double kEps = 1e-30;
102   return abs(left - right) > kEps;
103 }
104 
105 template <typename DataType>
BucketId(DataType input,size_t bucket_num)106 size_t BucketId(DataType input, size_t bucket_num) {
107   if (static_cast<double>(input) < 0.0) {
108     input = -input;
109   }
110   size_t data = ToSize<DataType>(input);
111   if (static_cast<double>(bucket_num) < 1.0) {
112     return data;
113   }
114   return data % bucket_num;
115 }
116 
117 class UniqueCpuKernelMod : public NativeCpuKernelMod {
118  public:
119   UniqueCpuKernelMod() = default;
120   ~UniqueCpuKernelMod() override = default;
121 
Init(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)122   bool Init(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &outputs) override {
123     dtype_ = inputs[0]->dtype_id();
124     auto batch_rank = ops::get_batch_rank(primitive_);
125     if (batch_rank < 0) {
126       return false;
127     }
128     batch_rank_ = static_cast<size_t>(batch_rank);
129     return true;
130   }
Resize(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)131   int Resize(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &outputs) override {
132     auto ret = KernelMod::Resize(inputs, outputs);
133     if (ret != KRET_UNKNOWN_OUT_SHAPE && ret != KRET_OK) {
134       return ret;
135     }
136     if (inputs.size() < 1) {
137       MS_LOG(EXCEPTION) << kernel_name_ << " requires not less than 1 inputs, but got " << inputs.size() << ".";
138     }
139     auto input_shape = inputs[0]->GetShapeVector();
140     if (batch_rank_ == 0) {
141       if (input_shape.size() != 1) {
142         MS_LOG(EXCEPTION) << "For '" << kernel_name_ << "', the dimension of input must be 1D, but got "
143                           << input_shape.size() << "D";
144       }
145       batch_size_ = 1;
146       input_size_ = static_cast<size_t>(input_shape[0]);
147     } else {
148       if (input_shape.size() != static_cast<size_t>(batch_rank_ + 1)) {
149         MS_LOG(EXCEPTION) << "For '" << kernel_name_
150                           << "', the shape size of 'input' must be equal to 'batch_rank + 1', "
151                              "but got the shape of 'input': "
152                           << input_shape << " and 'batch_rank': " << batch_rank_;
153       }
154       batch_size_ =
155         std::accumulate(input_shape.begin(), input_shape.begin() + batch_rank_, 1, std::multiplies<int64_t>());
156       input_size_ = static_cast<size_t>(input_shape[batch_rank_]);
157     }
158     if (primitive_->HasAttr(SORTED)) {
159       auto value_ptr = primitive_->GetAttr(SORTED);
160       sorted_ = GetValue<bool>(value_ptr);
161     }
162     workspace_size_list_.clear();
163     (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
164     (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
165     (void)workspace_size_list_.emplace_back(input_size_ * sizeof(int64_t));
166     return ret;
167   }
168 
169   bool Launch(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &workspace,
170               const std::vector<KernelTensor *> &outputs) override;
171 
GetOpSupport()172   std::vector<KernelAttr> GetOpSupport() override {
173     static const std::vector<KernelAttr> support_list = {
174       KernelAttr().AddInputAttr(kNumberTypeInt8).AddOutputAttr(kNumberTypeInt8).AddOutputAttr(kNumberTypeInt32),
175       KernelAttr().AddInputAttr(kNumberTypeInt16).AddOutputAttr(kNumberTypeInt16).AddOutputAttr(kNumberTypeInt32),
176       KernelAttr().AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32),
177       KernelAttr().AddInputAttr(kNumberTypeInt64).AddOutputAttr(kNumberTypeInt64).AddOutputAttr(kNumberTypeInt64),
178       KernelAttr().AddInputAttr(kNumberTypeUInt8).AddOutputAttr(kNumberTypeUInt8).AddOutputAttr(kNumberTypeInt32),
179       KernelAttr().AddInputAttr(kNumberTypeUInt16).AddOutputAttr(kNumberTypeUInt16).AddOutputAttr(kNumberTypeInt32),
180       KernelAttr().AddInputAttr(kNumberTypeFloat16).AddOutputAttr(kNumberTypeFloat16).AddOutputAttr(kNumberTypeInt32),
181       KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeInt32),
182       KernelAttr().AddInputAttr(kNumberTypeFloat64).AddOutputAttr(kNumberTypeFloat64).AddOutputAttr(kNumberTypeInt64),
183     };
184     return support_list;
185   }
IsNeedUpdateOutputShapeAndSize()186   bool IsNeedUpdateOutputShapeAndSize() override { return true; }
UpdateOutputShapeAndSize(const std::vector<KernelTensor * > & inputs,const std::vector<KernelTensor * > & outputs)187   void UpdateOutputShapeAndSize(const std::vector<KernelTensor *> &inputs,
188                                 const std::vector<KernelTensor *> &outputs) override {
189     ShapeVector out_shape;
190     if (output_sizes_.empty()) {
191       (void)out_shape.emplace_back(SizeToLong(0));
192     } else {
193       (void)out_shape.emplace_back(SizeToLong(output_sizes_[0]));
194     }
195     outputs[0]->SetShapeVector(out_shape);
196     outputs[0]->set_size(LongToSize(out_shape[0]) * UnitSizeInBytes(outputs[0]->dtype_id()));
197   }
198 
199  protected:
200   template <typename DataType, typename IndexType>
201   void LaunchKernel(const std::vector<KernelTensor *> &inputs, const std::vector<KernelTensor *> &workspace,
202                     const std::vector<KernelTensor *> &outputs);
203   size_t input_size_{0};
204   TypeId dtype_{kTypeUnknown};
205   size_t batch_size_{1};
206   size_t batch_rank_{0};
207   std::vector<size_t> output_sizes_;
208   bool sorted_{false};
209 
210   template <typename DataType, typename IndexType>
CalculateEachBucketSize(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<size_t> * each_bucket_size)211   static void CalculateEachBucketSize(const std::shared_ptr<UniqueParam<DataType, IndexType>> &params,
212                                       std::vector<size_t> *each_bucket_size) {
213     MS_EXCEPTION_IF_NULL(params);
214     MS_EXCEPTION_IF_NULL(params->input_);
215     MS_EXCEPTION_IF_NULL(each_bucket_size);
216     size_t bucket_num = each_bucket_size->size();
217     if (params->input_size_ < 1) {
218       return;
219     }
220     for (size_t i = 0; i < params->input_size_; ++i) {
221       auto bucket_id = BucketId(params->input_[i], bucket_num);
222       each_bucket_size->at(bucket_id)++;
223     }
224   }
225 
226   template <typename DataType, typename IndexType>
SplitAndCalculateBucketSize(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * segments_ptr,std::vector<std::shared_ptr<std::vector<size_t>>> * segment_bucket_sizes_ptr)227   static void SplitAndCalculateBucketSize(const std::shared_ptr<UniqueParam<DataType, IndexType>> &params,
228                                           std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *segments_ptr,
229                                           std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr) {
230     MS_EXCEPTION_IF_NULL(params);
231     MS_EXCEPTION_IF_NULL(params->input_);
232     MS_EXCEPTION_IF_NULL(segments_ptr);
233     MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr);
234     auto &segments = *segments_ptr;
235     auto &segment_bucket_sizes = *segment_bucket_sizes_ptr;
236 
237     size_t input_size = params->input_size_;
238     size_t thread_num = params->thread_num_;
239     if (thread_num < 1) {
240       MS_LOG(EXCEPTION) << "For 'Unique', thread num must be greater than 0, but got " << thread_num;
241     }
242     size_t thread_data_size = input_size / thread_num;
243     size_t left_data_size = input_size % thread_num;
244     segments.reserve(thread_num);
245     segment_bucket_sizes.reserve(thread_num);
246     size_t current_offset = 0;
247     std::vector<common::Task> tasks;
248     tasks.reserve(thread_num);
249     for (size_t i = 0; i < thread_num; ++i) {
250       (void)segment_bucket_sizes.emplace_back(std::make_shared<std::vector<size_t>>(thread_num, 0));
251       size_t data_size = thread_data_size;
252       if (i < left_data_size) {
253         data_size += 1;
254       }
255       (void)segments.emplace_back(std::make_shared<UniqueParam<DataType, IndexType>>());
256       segments[i]->input_ = params->input_ + current_offset;
257       segments[i]->input_size_ = data_size;
258       segments[i]->thread_num_ = thread_num;
259       auto task = [&segments, &segment_bucket_sizes, i]() {
260         CalculateEachBucketSize<DataType, IndexType>(segments[i], segment_bucket_sizes[i].get());
261         return common::SUCCESS;
262       };
263       (void)tasks.emplace_back(task);
264       current_offset += data_size;
265     }
266     ParallelLaunch(tasks);
267   }
268 
269   template <typename DataType, typename IndexType>
SegmentToBuckets(const std::shared_ptr<UniqueParam<DataType,IndexType>> & segment,size_t segment_offset,const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets)270   static void SegmentToBuckets(const std::shared_ptr<UniqueParam<DataType, IndexType>> &segment, size_t segment_offset,
271                                const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets) {
272     MS_LOG(DEBUG) << "Start";
273     MS_EXCEPTION_IF_NULL(segment);
274     MS_EXCEPTION_IF_NULL(segment->input_);
275     std::vector<size_t> bucket_data_num(segment->thread_num_, 0);
276     auto bucket_size = buckets.size();
277     if (segment->input_size_ < 1) {
278       return;
279     }
280     for (size_t i = 0; i < segment->input_size_; ++i) {
281       DataType data = segment->input_[i];
282       auto bucket_id = BucketId(data, segment->thread_num_);
283       auto bucket_index = bucket_data_num[bucket_id];
284       if (bucket_id >= bucket_size) {
285         MS_LOG(ERROR) << "For 'Unique', bucket id must be less than bucket size, but got 'bucket_id': " << bucket_id
286                       << "and 'bucket_size': " << bucket_size;
287         continue;
288       }
289       auto &bucket = buckets[bucket_id];
290       MS_EXCEPTION_IF_NULL(bucket);
291       if (bucket_index >= bucket->input_size_) {
292         MS_LOG(ERROR) << "For 'Unique', bucket index must be less than input size, but got bucket index: "
293                       << bucket_index << "and input size: " << bucket->input_size_;
294         continue;
295       }
296       bucket->input_[bucket_index] = data;
297       bucket->workspace_idx_[bucket_index] = SizeTo<IndexType>(segment_offset + i);
298       bucket_data_num[bucket_id]++;
299     }
300     MS_LOG(DEBUG) << "End";
301   }
302 
303   template <typename DataType, typename IndexType>
GatherSegmentsToBuckets(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * segments_ptr,std::vector<std::shared_ptr<std::vector<size_t>>> * segment_bucket_sizes_ptr,std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> * buckets_ptr)304   static void GatherSegmentsToBuckets(const std::shared_ptr<UniqueParam<DataType, IndexType>> &params,
305                                       std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *segments_ptr,
306                                       std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr,
307                                       std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> *buckets_ptr) {
308     MS_LOG(DEBUG) << "Start";
309     MS_EXCEPTION_IF_NULL(params);
310     MS_EXCEPTION_IF_NULL(params->workspace_);
311     MS_EXCEPTION_IF_NULL(params->inverse_idx_);
312     MS_EXCEPTION_IF_NULL(params->workspace_idx_);
313     MS_EXCEPTION_IF_NULL(params->output_);
314     MS_EXCEPTION_IF_NULL(params->input_idx_);
315     MS_EXCEPTION_IF_NULL(segments_ptr);
316     MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr);
317     MS_EXCEPTION_IF_NULL(buckets_ptr);
318     auto &segments = *segments_ptr;
319     auto &segment_bucket_sizes = *segment_bucket_sizes_ptr;
320     auto &buckets = *buckets_ptr;
321     auto thread_num = segments.size();
322     buckets.reserve(thread_num);
323     std::vector<size_t> bucket_data_size(thread_num, 0);
324     for (size_t i = 0; i < thread_num; ++i) {
325       for (size_t j = 0; j < thread_num; ++j) {
326         bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
327       }
328     }
329 
330     size_t current_offset = 0;
331     for (size_t i = 0; i < thread_num; ++i) {
332       auto bucket = std::make_shared<UniqueParam<DataType, IndexType>>();
333       bucket->input_ = params->output_ + current_offset;
334       bucket->input_idx_ = params->inverse_idx_ + current_offset;
335       bucket->workspace_idx_ = params->workspace_idx_ + current_offset;
336       bucket->output_ = params->workspace_ + current_offset;
337       bucket->inverse_idx_ = params->input_idx_ + current_offset;
338       bucket->input_size_ = bucket_data_size[i];
339       current_offset += bucket_data_size[i];
340       (void)buckets.emplace_back(bucket);
341     }
342     std::vector<size_t> tmp_bucket_data_size(thread_num, 0);
343     std::vector<std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>>> thread_buckets;
344     for (size_t i = 0; i < thread_num; ++i) {
345       std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> local_buckets;
346       for (size_t j = 0; j < thread_num; ++j) {
347         auto bucket = std::make_shared<UniqueParam<DataType, IndexType>>();
348         bucket->input_ = buckets[j]->input_ + tmp_bucket_data_size[j];
349         bucket->input_size_ = buckets[j]->input_size_ - tmp_bucket_data_size[j];
350         bucket->workspace_idx_ = buckets[j]->workspace_idx_ + tmp_bucket_data_size[j];
351         (void)local_buckets.emplace_back(bucket);
352         tmp_bucket_data_size[j] += segment_bucket_sizes[i]->at(j);
353       }
354       (void)thread_buckets.emplace_back(local_buckets);
355     }
356     std::vector<common::Task> tasks;
357     tasks.reserve(thread_num);
358     current_offset = 0;
359     for (size_t i = 0; i < thread_num; ++i) {
360       MS_EXCEPTION_IF_NULL(segments[i]);
361       auto task = [&segments, &thread_buckets, current_offset, i]() {
362         SegmentToBuckets<DataType, IndexType>(segments[i], current_offset, thread_buckets[i]);
363         return common::SUCCESS;
364       };
365       (void)tasks.emplace_back(task);
366       current_offset += segments[i]->input_size_;
367     }
368     ParallelLaunch(tasks);
369     MS_LOG(DEBUG) << "End";
370   }
371 
372   template <typename DataType, typename IndexType>
Unique(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params)373   static void Unique(const std::shared_ptr<UniqueParam<DataType, IndexType>> &params) {
374     MS_LOG(DEBUG) << "Start";
375     MS_EXCEPTION_IF_NULL(params);
376     DataType *input = params->input_;
377     IndexType *input_idx = params->input_idx_;
378     DataType *output = params->output_;
379     IndexType *inverse_idx = params->inverse_idx_;
380     MS_EXCEPTION_IF_NULL(input);
381     MS_EXCEPTION_IF_NULL(input_idx);
382     MS_EXCEPTION_IF_NULL(output);
383     MS_EXCEPTION_IF_NULL(inverse_idx);
384     IndexType j = 0;
385     if (params->input_size_ < 1) {
386       return;
387     }
388     if (params->need_sort_) {
389       for (size_t i = 0; i < params->input_size_; ++i) {
390         input_idx[i] = SizeTo<IndexType>(i);
391       }
392       std::sort(input_idx, input_idx + params->input_size_,
393                 [&](size_t left, size_t right) { return input[left] < input[right]; });
394       DataType last = input[0];
395       for (size_t i = 0; i < params->input_size_; ++i) {
396         auto curr = input[input_idx[i]];
397         if (i == 0 || NotEqual(curr, last)) {
398           if (i != 0) {
399             j++;
400           }
401           output[j] = curr;
402           inverse_idx[input_idx[i]] = j;
403           last = curr;
404         } else {
405           inverse_idx[input_idx[i]] = j;
406         }
407       }
408       params->output_size_ = ToSize<IndexType>(j + 1);
409     } else {
410       std::unordered_map<DataType, IndexType> uniq;
411       uniq.reserve(params->input_size_);
412       for (size_t i = 0; i < params->input_size_; ++i) {
413         auto it = uniq.emplace(input[i], j);
414         inverse_idx[i] = it.first->second;
415         if (it.second) {
416           ++j;
417         }
418       }
419       for (const auto &it : uniq) {
420         output[it.second] = it.first;
421       }
422       params->output_size_ = ToSize<IndexType>(j);
423     }
424     MS_LOG(DEBUG) << "End";
425   }
426 
427   template <typename DataType, typename IndexType>
UniqueEachBucket(const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets)428   static void UniqueEachBucket(const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets) {
429     MS_LOG(DEBUG) << "Start";
430     size_t thread_num = buckets.size();
431     std::vector<common::Task> tasks;
432     tasks.reserve(thread_num);
433     for (size_t i = 0; i < thread_num; ++i) {
434       auto task = [&buckets, i]() {
435         Unique<DataType, IndexType>(buckets[i]);
436         return common::SUCCESS;
437       };
438       (void)tasks.emplace_back(task);
439     }
440     ParallelLaunch(tasks);
441     MS_LOG(DEBUG) << "End";
442   }
443 
444   template <typename DataType, typename IndexType>
TransformBucketReverseIndices(const std::shared_ptr<UniqueParam<DataType,IndexType>> & bucket,const std::shared_ptr<UniqueParam<DataType,IndexType>> & result,size_t offset)445   static void TransformBucketReverseIndices(const std::shared_ptr<UniqueParam<DataType, IndexType>> &bucket,
446                                             const std::shared_ptr<UniqueParam<DataType, IndexType>> &result,
447                                             size_t offset) {
448     MS_EXCEPTION_IF_NULL(bucket);
449     MS_EXCEPTION_IF_NULL(bucket->inverse_idx_);
450     MS_EXCEPTION_IF_NULL(bucket->workspace_idx_);
451     MS_EXCEPTION_IF_NULL(result);
452     MS_EXCEPTION_IF_NULL(result->inverse_idx_);
453     if (bucket->input_size_ < 1) {
454       return;
455     }
456     for (size_t i = 0; i < bucket->input_size_; ++i) {
457       auto origin_idx = bucket->workspace_idx_[i];
458       if (origin_idx < 0) {
459         continue;
460       }
461       size_t index = ToSize<IndexType>(origin_idx);
462       if (index < result->input_size_) {
463         result->inverse_idx_[index] = bucket->inverse_idx_[i] + SizeTo<IndexType>(offset);
464       }
465     }
466   }
467 
468   template <typename DataType, typename IndexType>
MergeBuckets(const std::vector<std::shared_ptr<UniqueParam<DataType,IndexType>>> & buckets,const std::shared_ptr<UniqueParam<DataType,IndexType>> & result)469   static void MergeBuckets(const std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> &buckets,
470                            const std::shared_ptr<UniqueParam<DataType, IndexType>> &result) {
471     MS_LOG(DEBUG) << "Start";
472     MS_EXCEPTION_IF_NULL(result);
473     MS_EXCEPTION_IF_NULL(result->output_);
474     size_t thread_num = buckets.size();
475     std::vector<size_t> bucket_offsets(thread_num);
476     size_t current_size = 0;
477     for (size_t i = 0; i < thread_num; ++i) {
478       auto bucket = buckets[i];
479       MS_EXCEPTION_IF_NULL(bucket);
480       MS_EXCEPTION_IF_NULL(bucket->output_);
481       bucket_offsets[i] = current_size;
482       auto ret_code = memcpy_s(result->output_ + current_size, (result->input_size_ - current_size) * sizeof(DataType),
483                                bucket->output_, bucket->output_size_ * sizeof(DataType));
484       if (ret_code != EOK) {
485         MS_LOG(EXCEPTION) << "For 'Unique', copy data failed, error no: " << ret_code;
486       }
487       current_size += bucket->output_size_;
488     }
489     result->output_size_ = current_size;
490 
491     std::vector<common::Task> tasks;
492     tasks.reserve(thread_num);
493     for (size_t i = 0; i < thread_num; ++i) {
494       auto task = [&buckets, i, result, &bucket_offsets]() {
495         TransformBucketReverseIndices<DataType, IndexType>(buckets[i], result, bucket_offsets[i]);
496         return common::SUCCESS;
497       };
498       (void)tasks.emplace_back(task);
499     }
500     ParallelLaunch(tasks);
501     MS_LOG(DEBUG) << "End";
502   }
503 
504   template <typename DataType, typename IndexType>
BucketUnique(const std::shared_ptr<UniqueParam<DataType,IndexType>> & params)505   static void BucketUnique(const std::shared_ptr<UniqueParam<DataType, IndexType>> &params) {
506     MS_EXCEPTION_IF_NULL(params);
507     std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> segments;
508     std::vector<std::shared_ptr<UniqueParam<DataType, IndexType>>> buckets;
509     std::vector<std::shared_ptr<std::vector<size_t>>> segment_bucket_sizes;
510     SplitAndCalculateBucketSize(params, &segments, &segment_bucket_sizes);
511     GatherSegmentsToBuckets(params, &segments, &segment_bucket_sizes, &buckets);
512     UniqueEachBucket(buckets);
513     MergeBuckets(buckets, params);
514   }
515 };
516 }  // namespace kernel
517 }  // namespace mindspore
518 #endif  // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_UNIQUE_CPU_KERNEL_H_
519