• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <memory>
18 #include <list>
19 
20 #include "common/common.h"
21 #include "minddata/dataset/callback/ds_callback.h"
22 #include "minddata/dataset/core/client.h"
23 #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
24 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
25 #include "minddata/dataset/engine/tree_adapter.h"
26 #include "minddata/dataset/include/dataset/datasets.h"
27 #include "minddata/dataset/include/dataset/transforms.h"
28 #include "minddata/dataset/kernels/data/no_op.h"
29 #include "utils/log_adapter.h"
30 
31 using namespace mindspore::dataset;
32 using mindspore::LogStream;
33 using mindspore::MsLogLevel::INFO;
34 
35 namespace mindspore {
36 namespace dataset {
37 namespace test {
38 
39 class TestCallback : public DSCallback {
40  public:
TestCallback(int32_t step_size)41   TestCallback(int32_t step_size)
42       : DSCallback(step_size),
43         begin_(true),
44         epoch_begin_(true),
45         step_begin_(true),
46         end_(false),
47         epoch_end_(true),
48         step_end_(true) {
49     all_names_.reserve(32);
50     all_step_nums_.reserve(32);
51     all_ep_nums_.reserve(32);
52   }
53 
DSBegin(const CallbackParam & cb_param)54   Status DSBegin(const CallbackParam &cb_param) override {
55     all_names_.push_back("BGN");
56     all_step_nums_.push_back(cb_param.cur_step_num_);
57     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
58     return Status::OK();
59   }
DSEpochBegin(const CallbackParam & cb_param)60   Status DSEpochBegin(const CallbackParam &cb_param) override {
61     all_names_.push_back("EPBGN");
62     all_step_nums_.push_back(cb_param.cur_step_num_);
63     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
64     return Status::OK();
65   }
DSNStepBegin(const CallbackParam & cb_param)66   Status DSNStepBegin(const CallbackParam &cb_param) override {
67     all_names_.push_back("SPBGN");
68     all_step_nums_.push_back(cb_param.cur_step_num_);
69     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
70     return Status::OK();
71   }
DSEnd(const CallbackParam & cb_param)72   Status DSEnd(const CallbackParam &cb_param) override {
73     all_names_.push_back("END");
74     all_step_nums_.push_back(cb_param.cur_step_num_);
75     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
76     return Status::OK();
77   }
DSEpochEnd(const CallbackParam & cb_param)78   Status DSEpochEnd(const CallbackParam &cb_param) override {
79     all_names_.push_back("EPEND");
80     all_step_nums_.push_back(cb_param.cur_step_num_);
81     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
82     return Status::OK();
83   }
DSNStepEnd(const CallbackParam & cb_param)84   Status DSNStepEnd(const CallbackParam &cb_param) override {
85     all_names_.push_back("SPEND");
86     all_step_nums_.push_back(cb_param.cur_step_num_);
87     all_ep_nums_.push_back(cb_param.cur_epoch_num_);
88     return Status::OK();
89   }
90 
IsBeginNeeded()91   bool IsBeginNeeded() override { return begin_; }
IsEpochBeginNeeded()92   bool IsEpochBeginNeeded() override { return epoch_begin_; }
IsNStepBeginNeeded()93   bool IsNStepBeginNeeded() override { return step_begin_; }
IsEndNeeded()94   bool IsEndNeeded() override { return end_; }
IsEpochEndNeeded()95   bool IsEpochEndNeeded() override { return epoch_end_; }
IsNStepEndNeeded()96   bool IsNStepEndNeeded() override { return step_end_; }
97 
all_names(size_t len)98   std::vector<std::string> all_names(size_t len) {
99     return std::vector<std::string>(all_names_.begin(), all_names_.begin() + len);
100   }
101 
all_step_nums(size_t len)102   std::vector<int64_t> all_step_nums(size_t len) {
103     return std::vector<int64_t>(all_step_nums_.begin(), all_step_nums_.begin() + len);
104   }
105 
all_ep_nums(size_t len)106   std::vector<int64_t> all_ep_nums(size_t len) {
107     return std::vector<int64_t>(all_ep_nums_.begin(), all_ep_nums_.begin() + len);
108   }
109 
110   // flag for turning callback on and off
111   bool begin_, epoch_begin_, step_begin_, end_, epoch_end_, step_end_;
112   // name of the callback function in sequence, BGN, EPBGN, SPB, END, EPEND, SPEND
113   std::vector<std::string> all_names_;
114   std::vector<int64_t> all_step_nums_, all_ep_nums_;
115 };
116 
117 }  // namespace test
118 }  // namespace dataset
119 }  // namespace mindspore
120 
121 class MindDataTestCallback : public UT::DatasetOpTesting {
122  public:
SetUp()123   void SetUp() override {
124     DatasetOpTesting::SetUp();
125     GlobalInit();
126   }
127 };
128 
TEST_F(MindDataTestCallback,TestBasicCallback)129 TEST_F(MindDataTestCallback, TestBasicCallback) {
130   MS_LOG(INFO) << "Doing: MindDataTestCallback-TestBasicCallback";
131   // config callback
132   Status rc;
133   std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(64);
134   std::shared_ptr<DSCallback> cb1 = tst_cb;
135   // config leaf_op, use random_data to avoid I/O
136   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
137   TensorShape shape({});  // empty shape is a 1-value scalar Tensor
138   ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
139   ASSERT_OK(schema->AddColumn(col));
140 
141   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
142   int32_t op_connector_size = config_manager->op_connector_size();
143   int32_t num_workers = config_manager->num_parallel_workers();
144   std::shared_ptr<RandomDataOp> leaf =
145     std::make_shared<RandomDataOp>(num_workers, op_connector_size, 44, std::move(schema));
146   // config mapOp
147   std::vector<std::string> input_columns = {"label"};
148   std::vector<std::string> output_columns = {};
149   std::vector<std::shared_ptr<TensorOp>> op_list;
150   std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
151   op_list.push_back(my_no_op);
152   std::shared_ptr<MapOp> map_op =
153     std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
154   std::vector<std::shared_ptr<DSCallback>> cbs = {};
155   cbs.push_back(cb1);
156   map_op->AddCallbacks(std::move(cbs));
157   // config RepeatOp
158   std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
159   // start build then launch tree
160   leaf->SetTotalRepeats(2);
161   leaf->SetNumRepeatsPerEpoch(2);
162   map_op->SetTotalRepeats(2);
163   map_op->SetNumRepeatsPerEpoch(2);
164   std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op});
165   rc = tree->Prepare();
166   EXPECT_TRUE(rc.IsOk());
167   rc = tree->Launch();
168   EXPECT_TRUE(rc.IsOk());
169   // Start the loop of reading tensors from our pipeline
170   DatasetIterator di(tree);
171   TensorMap tensor_map;
172   rc = di.GetNextAsMap(&tensor_map);
173   EXPECT_TRUE(rc.IsOk());
174   while (!tensor_map.empty()) {
175     rc = di.GetNextAsMap(&tensor_map);
176     EXPECT_TRUE(rc.IsOk());
177   }
178 
179   std::vector<std::string> callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
180   std::vector<int64_t> all_steps = {0, 0, 1, 1, 65, 65, 88};
181   std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1};
182   // doing resize to make sure no unexpected epoch_end or extra epoch_begin is called
183   size_t len = 7;
184   EXPECT_EQ(tst_cb->all_names(len), callback_names);
185   EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
186   EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
187 }
188 
TEST_F(MindDataTestCallback,TestMultiEpochCallback)189 TEST_F(MindDataTestCallback, TestMultiEpochCallback) {
190   MS_LOG(INFO) << "Doing: MindDataTestCallback-TestMultiEpochCallback";
191   // config callback
192   Status rc;
193   std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(4);
194   std::shared_ptr<DSCallback> cb1 = tst_cb;
195   // config leaf_op, use random_data to avoid I/O
196   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
197   int32_t op_connector_size = config_manager->op_connector_size();
198   int32_t num_workers = config_manager->num_parallel_workers();
199   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
200   TensorShape shape({});  // empty shape is a 1-value scalar Tensor
201   ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
202   ASSERT_OK(schema->AddColumn(col));
203   std::shared_ptr<RandomDataOp> leaf = std::make_shared<RandomDataOp>(4, op_connector_size, 4, std::move(schema));
204   // config mapOp
205   std::vector<std::string> input_columns = {"label"};
206   std::vector<std::string> output_columns = {};
207   std::vector<std::shared_ptr<TensorOp>> op_list;
208   std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
209   op_list.push_back(my_no_op);
210   std::shared_ptr<MapOp> map_op =
211     std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
212   std::vector<std::shared_ptr<DSCallback>> cbs = {};
213   cbs.push_back(cb1);
214 
215   map_op->AddCallbacks(std::move(cbs));
216   EXPECT_TRUE(rc.IsOk());
217   // config RepeatOp
218   std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
219   // config EpochCtrlOp
220   std::shared_ptr<EpochCtrlOp> epoch_ctrl_op = std::make_shared<EpochCtrlOp>(-1);
221   // start build then launch tree
222   leaf->SetTotalRepeats(-2);
223   leaf->SetNumRepeatsPerEpoch(2);
224   map_op->SetTotalRepeats(-2);
225   map_op->SetNumRepeatsPerEpoch(2);
226   std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op, epoch_ctrl_op});
227   rc = tree->Prepare();
228   EXPECT_TRUE(rc.IsOk());
229   rc = tree->Launch();
230   EXPECT_TRUE(rc.IsOk());
231   // Start the loop of reading tensors from our pipeline
232   DatasetIterator di(tree);
233   TensorMap tensor_map;
234   size_t num_epochs = 2;
235   for (int ep_num = 0; ep_num < num_epochs; ++ep_num) {
236     ASSERT_OK(di.GetNextAsMap(&tensor_map));
237     EXPECT_TRUE(rc.IsOk());
238 
239     while (tensor_map.size() != 0) {
240       rc = di.GetNextAsMap(&tensor_map);
241       EXPECT_TRUE(rc.IsOk());
242     }
243   }
244 
245   std::vector<std::string> callback_names = {"BGN",   "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND",
246                                              "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
247 
248   std::vector<int64_t> all_steps = {0, 0, 1, 1, 5, 5, 8, 8, 9, 9, 13, 13, 16};
249   std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2};
250 
251   size_t len = 13;
252   EXPECT_EQ(tst_cb->all_names(len), callback_names);
253   EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
254   EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
255 }
256 
TEST_F(MindDataTestCallback,TestSelectedCallback)257 TEST_F(MindDataTestCallback, TestSelectedCallback) {
258   MS_LOG(INFO) << "Doing: MindDataTestCallback-TestSelectedCallback";
259   // config callback
260   Status rc;
261   std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(4);
262   std::shared_ptr<DSCallback> cb1 = tst_cb;
263   // turn off the epochs
264   tst_cb->epoch_begin_ = false;
265   tst_cb->epoch_end_ = false;
266 
267   // config leaf_op, use random_data to avoid I/O
268   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
269   int32_t op_connector_size = config_manager->op_connector_size();
270   int32_t num_workers = config_manager->num_parallel_workers();
271 
272   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
273   TensorShape shape({});  // empty shape is a 1-value scalar Tensor
274   ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
275   ASSERT_OK(schema->AddColumn(col));
276   std::shared_ptr<RandomDataOp> leaf = std::make_shared<RandomDataOp>(4, op_connector_size, 4, std::move(schema));
277   // config mapOp
278   std::vector<std::string> input_columns = {"label"};
279   std::vector<std::string> output_columns = {};
280   std::vector<std::shared_ptr<TensorOp>> op_list;
281   std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
282   op_list.push_back(my_no_op);
283   std::shared_ptr<MapOp> map_op =
284     std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
285   map_op->AddCallbacks({cb1});
286   // config RepeatOp
287   std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
288   // config EpochCtrlOp
289   std::shared_ptr<EpochCtrlOp> epoch_ctrl_op = std::make_shared<EpochCtrlOp>(-1);
290 
291   // start build then launch tree
292   leaf->SetTotalRepeats(-2);
293   leaf->SetNumRepeatsPerEpoch(2);
294   map_op->SetTotalRepeats(-2);
295   map_op->SetNumRepeatsPerEpoch(2);
296   std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op, epoch_ctrl_op});
297   rc = tree->Prepare();
298   EXPECT_TRUE(rc.IsOk());
299   rc = tree->Launch();
300   EXPECT_TRUE(rc.IsOk());
301   // Start the loop of reading tensors from our pipeline
302   DatasetIterator di(tree);
303   TensorMap tensor_map;
304   size_t num_epochs = 2;
305   for (int ep_num = 0; ep_num < num_epochs; ++ep_num) {
306     ASSERT_OK(di.GetNextAsMap(&tensor_map));
307     EXPECT_TRUE(rc.IsOk());
308 
309     while (tensor_map.size() != 0) {
310       rc = di.GetNextAsMap(&tensor_map);
311       EXPECT_TRUE(rc.IsOk());
312     }
313   }
314 
315   std::vector<std::string> callback_names = {"BGN",   "SPBGN", "SPEND", "SPBGN", "SPEND",
316                                              "SPBGN", "SPEND", "SPBGN", "SPEND"};
317 
318   std::vector<int64_t> all_steps = {0, 1, 1, 5, 5, 9, 9, 13, 13};
319   std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 2, 2, 2, 2};
320 
321   size_t len = 9;
322   EXPECT_EQ(tst_cb->all_names(len), callback_names);
323   EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
324   EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
325 }
326 
TEST_F(MindDataTestCallback,TestCAPICallback)327 TEST_F(MindDataTestCallback, TestCAPICallback) {
328   MS_LOG(INFO) << "Doing: MindDataTestCallback-TestCAPICallback";
329   // config callback
330   std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(64);
331   std::shared_ptr<DSCallback> cb1 = tst_cb;
332   // Create a RandomDataset.  Use random_data to avoid I/O
333   std::shared_ptr<SchemaObj> schema = Schema();
334   ASSERT_OK(schema->add_column("label", mindspore::DataType::kNumberTypeUInt32, {}));
335   std::shared_ptr<Dataset> ds = RandomData(44, schema);
336   ASSERT_NE(ds, nullptr);
337   ds = ds->Map({std::make_shared<transforms::TypeCast>(mindspore::DataType::kNumberTypeUInt64)}, {"label"}, {}, {},
338                nullptr, {cb1});
339   ASSERT_NE(ds, nullptr);
340   ds = ds->Repeat(2);
341   ASSERT_NE(ds, nullptr);
342 
343   TreeAdapter tree_adapter;
344   // using tree_adapter to set num_epoch = 1
345   ASSERT_OK(tree_adapter.Compile(ds->IRNode(), 1));
346 
347   TensorRow row;
348   ASSERT_OK(tree_adapter.GetNext(&row));
349   while (!row.empty()) {
350     ASSERT_OK(tree_adapter.GetNext(&row));
351   }
352   std::vector<std::string> callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
353   std::vector<int64_t> all_steps = {0, 0, 1, 1, 65, 65, 88};
354   std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1};
355   // doing resize to make sure no unexpected epoch_end or extra epoch_begin is called
356   size_t len = 7;
357   EXPECT_EQ(tst_cb->all_names(len), callback_names);
358   EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
359   EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
360 }
361