1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <memory>
18 #include <list>
19
20 #include "common/common.h"
21 #include "minddata/dataset/callback/ds_callback.h"
22 #include "minddata/dataset/core/client.h"
23 #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
24 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
25 #include "minddata/dataset/engine/tree_adapter.h"
26 #include "minddata/dataset/include/dataset/datasets.h"
27 #include "minddata/dataset/include/dataset/transforms.h"
28 #include "minddata/dataset/kernels/data/no_op.h"
29 #include "utils/log_adapter.h"
30
31 using namespace mindspore::dataset;
32 using mindspore::LogStream;
33 using mindspore::MsLogLevel::INFO;
34
35 namespace mindspore {
36 namespace dataset {
37 namespace test {
38
39 class TestCallback : public DSCallback {
40 public:
TestCallback(int32_t step_size)41 TestCallback(int32_t step_size)
42 : DSCallback(step_size),
43 begin_(true),
44 epoch_begin_(true),
45 step_begin_(true),
46 end_(false),
47 epoch_end_(true),
48 step_end_(true) {
49 all_names_.reserve(32);
50 all_step_nums_.reserve(32);
51 all_ep_nums_.reserve(32);
52 }
53
DSBegin(const CallbackParam & cb_param)54 Status DSBegin(const CallbackParam &cb_param) override {
55 all_names_.push_back("BGN");
56 all_step_nums_.push_back(cb_param.cur_step_num_);
57 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
58 return Status::OK();
59 }
DSEpochBegin(const CallbackParam & cb_param)60 Status DSEpochBegin(const CallbackParam &cb_param) override {
61 all_names_.push_back("EPBGN");
62 all_step_nums_.push_back(cb_param.cur_step_num_);
63 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
64 return Status::OK();
65 }
DSNStepBegin(const CallbackParam & cb_param)66 Status DSNStepBegin(const CallbackParam &cb_param) override {
67 all_names_.push_back("SPBGN");
68 all_step_nums_.push_back(cb_param.cur_step_num_);
69 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
70 return Status::OK();
71 }
DSEnd(const CallbackParam & cb_param)72 Status DSEnd(const CallbackParam &cb_param) override {
73 all_names_.push_back("END");
74 all_step_nums_.push_back(cb_param.cur_step_num_);
75 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
76 return Status::OK();
77 }
DSEpochEnd(const CallbackParam & cb_param)78 Status DSEpochEnd(const CallbackParam &cb_param) override {
79 all_names_.push_back("EPEND");
80 all_step_nums_.push_back(cb_param.cur_step_num_);
81 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
82 return Status::OK();
83 }
DSNStepEnd(const CallbackParam & cb_param)84 Status DSNStepEnd(const CallbackParam &cb_param) override {
85 all_names_.push_back("SPEND");
86 all_step_nums_.push_back(cb_param.cur_step_num_);
87 all_ep_nums_.push_back(cb_param.cur_epoch_num_);
88 return Status::OK();
89 }
90
IsBeginNeeded()91 bool IsBeginNeeded() override { return begin_; }
IsEpochBeginNeeded()92 bool IsEpochBeginNeeded() override { return epoch_begin_; }
IsNStepBeginNeeded()93 bool IsNStepBeginNeeded() override { return step_begin_; }
IsEndNeeded()94 bool IsEndNeeded() override { return end_; }
IsEpochEndNeeded()95 bool IsEpochEndNeeded() override { return epoch_end_; }
IsNStepEndNeeded()96 bool IsNStepEndNeeded() override { return step_end_; }
97
all_names(size_t len)98 std::vector<std::string> all_names(size_t len) {
99 return std::vector<std::string>(all_names_.begin(), all_names_.begin() + len);
100 }
101
all_step_nums(size_t len)102 std::vector<int64_t> all_step_nums(size_t len) {
103 return std::vector<int64_t>(all_step_nums_.begin(), all_step_nums_.begin() + len);
104 }
105
all_ep_nums(size_t len)106 std::vector<int64_t> all_ep_nums(size_t len) {
107 return std::vector<int64_t>(all_ep_nums_.begin(), all_ep_nums_.begin() + len);
108 }
109
110 // flag for turning callback on and off
111 bool begin_, epoch_begin_, step_begin_, end_, epoch_end_, step_end_;
112 // name of the callback function in sequence, BGN, EPBGN, SPB, END, EPEND, SPEND
113 std::vector<std::string> all_names_;
114 std::vector<int64_t> all_step_nums_, all_ep_nums_;
115 };
116
117 } // namespace test
118 } // namespace dataset
119 } // namespace mindspore
120
121 class MindDataTestCallback : public UT::DatasetOpTesting {
122 public:
SetUp()123 void SetUp() override {
124 DatasetOpTesting::SetUp();
125 GlobalInit();
126 }
127 };
128
TEST_F(MindDataTestCallback,TestBasicCallback)129 TEST_F(MindDataTestCallback, TestBasicCallback) {
130 MS_LOG(INFO) << "Doing: MindDataTestCallback-TestBasicCallback";
131 // config callback
132 Status rc;
133 std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(64);
134 std::shared_ptr<DSCallback> cb1 = tst_cb;
135 // config leaf_op, use random_data to avoid I/O
136 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
137 TensorShape shape({}); // empty shape is a 1-value scalar Tensor
138 ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
139 ASSERT_OK(schema->AddColumn(col));
140
141 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
142 int32_t op_connector_size = config_manager->op_connector_size();
143 int32_t num_workers = config_manager->num_parallel_workers();
144 std::shared_ptr<RandomDataOp> leaf =
145 std::make_shared<RandomDataOp>(num_workers, op_connector_size, 44, std::move(schema));
146 // config mapOp
147 std::vector<std::string> input_columns = {"label"};
148 std::vector<std::string> output_columns = {};
149 std::vector<std::shared_ptr<TensorOp>> op_list;
150 std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
151 op_list.push_back(my_no_op);
152 std::shared_ptr<MapOp> map_op =
153 std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
154 std::vector<std::shared_ptr<DSCallback>> cbs = {};
155 cbs.push_back(cb1);
156 map_op->AddCallbacks(std::move(cbs));
157 // config RepeatOp
158 std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
159 // start build then launch tree
160 leaf->SetTotalRepeats(2);
161 leaf->SetNumRepeatsPerEpoch(2);
162 map_op->SetTotalRepeats(2);
163 map_op->SetNumRepeatsPerEpoch(2);
164 std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op});
165 rc = tree->Prepare();
166 EXPECT_TRUE(rc.IsOk());
167 rc = tree->Launch();
168 EXPECT_TRUE(rc.IsOk());
169 // Start the loop of reading tensors from our pipeline
170 DatasetIterator di(tree);
171 TensorMap tensor_map;
172 rc = di.GetNextAsMap(&tensor_map);
173 EXPECT_TRUE(rc.IsOk());
174 while (!tensor_map.empty()) {
175 rc = di.GetNextAsMap(&tensor_map);
176 EXPECT_TRUE(rc.IsOk());
177 }
178
179 std::vector<std::string> callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
180 std::vector<int64_t> all_steps = {0, 0, 1, 1, 65, 65, 88};
181 std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1};
182 // doing resize to make sure no unexpected epoch_end or extra epoch_begin is called
183 size_t len = 7;
184 EXPECT_EQ(tst_cb->all_names(len), callback_names);
185 EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
186 EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
187 }
188
TEST_F(MindDataTestCallback,TestMultiEpochCallback)189 TEST_F(MindDataTestCallback, TestMultiEpochCallback) {
190 MS_LOG(INFO) << "Doing: MindDataTestCallback-TestMultiEpochCallback";
191 // config callback
192 Status rc;
193 std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(4);
194 std::shared_ptr<DSCallback> cb1 = tst_cb;
195 // config leaf_op, use random_data to avoid I/O
196 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
197 int32_t op_connector_size = config_manager->op_connector_size();
198 int32_t num_workers = config_manager->num_parallel_workers();
199 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
200 TensorShape shape({}); // empty shape is a 1-value scalar Tensor
201 ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
202 ASSERT_OK(schema->AddColumn(col));
203 std::shared_ptr<RandomDataOp> leaf = std::make_shared<RandomDataOp>(4, op_connector_size, 4, std::move(schema));
204 // config mapOp
205 std::vector<std::string> input_columns = {"label"};
206 std::vector<std::string> output_columns = {};
207 std::vector<std::shared_ptr<TensorOp>> op_list;
208 std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
209 op_list.push_back(my_no_op);
210 std::shared_ptr<MapOp> map_op =
211 std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
212 std::vector<std::shared_ptr<DSCallback>> cbs = {};
213 cbs.push_back(cb1);
214
215 map_op->AddCallbacks(std::move(cbs));
216 EXPECT_TRUE(rc.IsOk());
217 // config RepeatOp
218 std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
219 // config EpochCtrlOp
220 std::shared_ptr<EpochCtrlOp> epoch_ctrl_op = std::make_shared<EpochCtrlOp>(-1);
221 // start build then launch tree
222 leaf->SetTotalRepeats(-2);
223 leaf->SetNumRepeatsPerEpoch(2);
224 map_op->SetTotalRepeats(-2);
225 map_op->SetNumRepeatsPerEpoch(2);
226 std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op, epoch_ctrl_op});
227 rc = tree->Prepare();
228 EXPECT_TRUE(rc.IsOk());
229 rc = tree->Launch();
230 EXPECT_TRUE(rc.IsOk());
231 // Start the loop of reading tensors from our pipeline
232 DatasetIterator di(tree);
233 TensorMap tensor_map;
234 size_t num_epochs = 2;
235 for (int ep_num = 0; ep_num < num_epochs; ++ep_num) {
236 ASSERT_OK(di.GetNextAsMap(&tensor_map));
237 EXPECT_TRUE(rc.IsOk());
238
239 while (tensor_map.size() != 0) {
240 rc = di.GetNextAsMap(&tensor_map);
241 EXPECT_TRUE(rc.IsOk());
242 }
243 }
244
245 std::vector<std::string> callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND",
246 "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
247
248 std::vector<int64_t> all_steps = {0, 0, 1, 1, 5, 5, 8, 8, 9, 9, 13, 13, 16};
249 std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2};
250
251 size_t len = 13;
252 EXPECT_EQ(tst_cb->all_names(len), callback_names);
253 EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
254 EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
255 }
256
TEST_F(MindDataTestCallback,TestSelectedCallback)257 TEST_F(MindDataTestCallback, TestSelectedCallback) {
258 MS_LOG(INFO) << "Doing: MindDataTestCallback-TestSelectedCallback";
259 // config callback
260 Status rc;
261 std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(4);
262 std::shared_ptr<DSCallback> cb1 = tst_cb;
263 // turn off the epochs
264 tst_cb->epoch_begin_ = false;
265 tst_cb->epoch_end_ = false;
266
267 // config leaf_op, use random_data to avoid I/O
268 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
269 int32_t op_connector_size = config_manager->op_connector_size();
270 int32_t num_workers = config_manager->num_parallel_workers();
271
272 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
273 TensorShape shape({}); // empty shape is a 1-value scalar Tensor
274 ColDescriptor col("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &shape);
275 ASSERT_OK(schema->AddColumn(col));
276 std::shared_ptr<RandomDataOp> leaf = std::make_shared<RandomDataOp>(4, op_connector_size, 4, std::move(schema));
277 // config mapOp
278 std::vector<std::string> input_columns = {"label"};
279 std::vector<std::string> output_columns = {};
280 std::vector<std::shared_ptr<TensorOp>> op_list;
281 std::shared_ptr<TensorOp> my_no_op = std::make_shared<NoOp>();
282 op_list.push_back(my_no_op);
283 std::shared_ptr<MapOp> map_op =
284 std::make_shared<MapOp>(input_columns, output_columns, std::move(op_list), num_workers, op_connector_size);
285 map_op->AddCallbacks({cb1});
286 // config RepeatOp
287 std::shared_ptr<RepeatOp> repeat_op = std::make_shared<RepeatOp>(2);
288 // config EpochCtrlOp
289 std::shared_ptr<EpochCtrlOp> epoch_ctrl_op = std::make_shared<EpochCtrlOp>(-1);
290
291 // start build then launch tree
292 leaf->SetTotalRepeats(-2);
293 leaf->SetNumRepeatsPerEpoch(2);
294 map_op->SetTotalRepeats(-2);
295 map_op->SetNumRepeatsPerEpoch(2);
296 std::shared_ptr<ExecutionTree> tree = Build({leaf, map_op, repeat_op, epoch_ctrl_op});
297 rc = tree->Prepare();
298 EXPECT_TRUE(rc.IsOk());
299 rc = tree->Launch();
300 EXPECT_TRUE(rc.IsOk());
301 // Start the loop of reading tensors from our pipeline
302 DatasetIterator di(tree);
303 TensorMap tensor_map;
304 size_t num_epochs = 2;
305 for (int ep_num = 0; ep_num < num_epochs; ++ep_num) {
306 ASSERT_OK(di.GetNextAsMap(&tensor_map));
307 EXPECT_TRUE(rc.IsOk());
308
309 while (tensor_map.size() != 0) {
310 rc = di.GetNextAsMap(&tensor_map);
311 EXPECT_TRUE(rc.IsOk());
312 }
313 }
314
315 std::vector<std::string> callback_names = {"BGN", "SPBGN", "SPEND", "SPBGN", "SPEND",
316 "SPBGN", "SPEND", "SPBGN", "SPEND"};
317
318 std::vector<int64_t> all_steps = {0, 1, 1, 5, 5, 9, 9, 13, 13};
319 std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 2, 2, 2, 2};
320
321 size_t len = 9;
322 EXPECT_EQ(tst_cb->all_names(len), callback_names);
323 EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
324 EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
325 }
326
TEST_F(MindDataTestCallback,TestCAPICallback)327 TEST_F(MindDataTestCallback, TestCAPICallback) {
328 MS_LOG(INFO) << "Doing: MindDataTestCallback-TestCAPICallback";
329 // config callback
330 std::shared_ptr<test::TestCallback> tst_cb = std::make_shared<test::TestCallback>(64);
331 std::shared_ptr<DSCallback> cb1 = tst_cb;
332 // Create a RandomDataset. Use random_data to avoid I/O
333 std::shared_ptr<SchemaObj> schema = Schema();
334 ASSERT_OK(schema->add_column("label", mindspore::DataType::kNumberTypeUInt32, {}));
335 std::shared_ptr<Dataset> ds = RandomData(44, schema);
336 ASSERT_NE(ds, nullptr);
337 ds = ds->Map({std::make_shared<transforms::TypeCast>(mindspore::DataType::kNumberTypeUInt64)}, {"label"}, {}, {},
338 nullptr, {cb1});
339 ASSERT_NE(ds, nullptr);
340 ds = ds->Repeat(2);
341 ASSERT_NE(ds, nullptr);
342
343 TreeAdapter tree_adapter;
344 // using tree_adapter to set num_epoch = 1
345 ASSERT_OK(tree_adapter.Compile(ds->IRNode(), 1));
346
347 TensorRow row;
348 ASSERT_OK(tree_adapter.GetNext(&row));
349 while (!row.empty()) {
350 ASSERT_OK(tree_adapter.GetNext(&row));
351 }
352 std::vector<std::string> callback_names = {"BGN", "EPBGN", "SPBGN", "SPEND", "SPBGN", "SPEND", "EPEND"};
353 std::vector<int64_t> all_steps = {0, 0, 1, 1, 65, 65, 88};
354 std::vector<int64_t> all_epochs = {0, 1, 1, 1, 1, 1, 1};
355 // doing resize to make sure no unexpected epoch_end or extra epoch_begin is called
356 size_t len = 7;
357 EXPECT_EQ(tst_cb->all_names(len), callback_names);
358 EXPECT_EQ(tst_cb->all_step_nums(len), all_steps);
359 EXPECT_EQ(tst_cb->all_ep_nums(len), all_epochs);
360 }
361