1# Copyright 2019 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15import numpy as np 16import pytest 17 18import mindspore.dataset as ds 19import mindspore.dataset.engine.iterators as it 20from mindspore import log as logger 21 22DATA_DIR = ["../data/dataset/testPyfuncMap/data.data"] 23SCHEMA_DIR = "../data/dataset/testPyfuncMap/schema.json" 24COLUMNS = ["col0", "col1", "col2"] 25GENERATE_GOLDEN = False 26 27 28def test_case_0(): 29 """ 30 Test PyFunc 31 """ 32 logger.info("Test 1-1 PyFunc : lambda x : x + x") 33 34 # apply dataset operations 35 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 36 37 data1 = data1.map(operations=(lambda x: x + x), input_columns="col0", output_columns="out") 38 39 i = 0 40 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 41 # In this test, the dataset is 2x2 sequential tensors 42 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 43 np.testing.assert_array_equal(item["out"], golden) 44 i = i + 4 45 46 47def test_case_1(): 48 """ 49 Test PyFunc 50 """ 51 logger.info("Test 1-n PyFunc : lambda x : (x , x + x) ") 52 53 col = "col0" 54 55 # apply dataset operations 56 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 57 data1 = data1.map(operations=(lambda x: (x, x + x)), input_columns=col, output_columns=["out0", "out1"], 58 column_order=["out0", "out1"]) 59 60 i = 0 61 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 62 # In this test, the dataset is 2x2 sequential tensors 63 golden = np.array([[i, i + 1], [i + 2, i + 3]]) 64 np.testing.assert_array_equal(item["out0"], golden) 65 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 66 np.testing.assert_array_equal(item["out1"], golden) 67 i = i + 4 68 69 70def test_case_2(): 71 """ 72 Test PyFunc 73 """ 74 logger.info("Test n-1 PyFunc : lambda x, y : x + y ") 75 76 col = ["col0", "col1"] 77 78 # apply dataset operations 79 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 80 81 data1 = data1.map(operations=(lambda x, y: x + y), input_columns=col, output_columns="out", 82 column_order=["out"]) 83 84 i = 0 85 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 86 # In this test, the dataset is 2x2 sequential tensors 87 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 88 np.testing.assert_array_equal(item["out"], golden) 89 i = i + 4 90 91 92def test_case_3(): 93 """ 94 Test PyFunc 95 """ 96 logger.info("Test n-m PyFunc : lambda x, y : (x , x + 1, x + y)") 97 98 col = ["col0", "col1"] 99 100 # apply dataset operations 101 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 102 103 data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col, 104 output_columns=["out0", "out1", "out2"], column_order=["out0", "out1", "out2"]) 105 106 i = 0 107 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 108 # In this test, the dataset is 2x2 sequential tensors 109 golden = np.array([[i, i + 1], [i + 2, i + 3]]) 110 np.testing.assert_array_equal(item["out0"], golden) 111 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 112 np.testing.assert_array_equal(item["out1"], golden) 113 golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]]) 114 np.testing.assert_array_equal(item["out2"], golden) 115 i = i + 4 116 117 118def test_case_4(): 119 """ 120 Test PyFunc 121 """ 122 logger.info("Test Parallel n-m PyFunc : lambda x, y : (x , x + 1, x + y)") 123 124 col = ["col0", "col1"] 125 126 # apply dataset operations 127 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 128 129 data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col, 130 output_columns=["out0", "out1", "out2"], num_parallel_workers=4, 131 column_order=["out0", "out1", "out2"]) 132 133 i = 0 134 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 135 # In this test, the dataset is 2x2 sequential tensors 136 golden = np.array([[i, i + 1], [i + 2, i + 3]]) 137 np.testing.assert_array_equal(item["out0"], golden) 138 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 139 np.testing.assert_array_equal(item["out1"], golden) 140 golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]]) 141 np.testing.assert_array_equal(item["out2"], golden) 142 i = i + 4 143 144 145# The execution of this function will acquire GIL 146def func_5(x): 147 return np.ones(x.shape, dtype=x.dtype) 148 149 150def test_case_5(): 151 """ 152 Test PyFunc 153 """ 154 logger.info("Test 1-1 PyFunc : lambda x: np.ones(x.shape)") 155 156 # apply dataset operations 157 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 158 159 data1 = data1.map(operations=func_5, input_columns="col0", output_columns="out") 160 161 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 162 # In this test, the dataset is 2x2 sequential tensors 163 golden = np.array([[1, 1], [1, 1]]) 164 np.testing.assert_array_equal(item["out"], golden) 165 166 167def test_case_6(): 168 """ 169 Test PyFunc 170 """ 171 logger.info("Test PyFunc Compose : (lambda x : x + x), (lambda x : x + x)") 172 173 # apply dataset operations 174 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 175 176 data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + x)], input_columns="col0", output_columns="out") 177 178 i = 0 179 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 180 # In this test, the dataset is 2x2 sequential tensors 181 golden = np.array([[i * 4, (i + 1) * 4], [(i + 2) * 4, (i + 3) * 4]]) 182 np.testing.assert_array_equal(item["out"], golden) 183 i = i + 4 184 185 186def test_case_7(): 187 """ 188 Test PyFunc 189 """ 190 logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x") 191 192 mem_original = ds.config.get_enable_shared_mem() 193 ds.config.set_enable_shared_mem(False) 194 195 # apply dataset operations 196 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 197 198 data1 = data1.map(operations=(lambda x: x + x), input_columns="col0", output_columns="out", 199 num_parallel_workers=4, python_multiprocessing=True) 200 201 i = 0 202 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 203 # In this test, the dataset is 2x2 sequential tensors 204 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 205 np.testing.assert_array_equal(item["out"], golden) 206 i = i + 4 207 208 ds.config.set_enable_shared_mem(mem_original) 209 210def test_case_8(): 211 """ 212 Test PyFunc 213 """ 214 logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)") 215 216 mem_original = ds.config.get_enable_shared_mem() 217 ds.config.set_enable_shared_mem(False) 218 219 col = ["col0", "col1"] 220 221 # apply dataset operations 222 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 223 224 data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col, 225 output_columns=["out0", "out1", "out2"], num_parallel_workers=4, 226 column_order=["out0", "out1", "out2"], 227 python_multiprocessing=True) 228 229 i = 0 230 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 231 # In this test, the dataset is 2x2 sequential tensors 232 golden = np.array([[i, i + 1], [i + 2, i + 3]]) 233 np.testing.assert_array_equal(item["out0"], golden) 234 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 235 np.testing.assert_array_equal(item["out1"], golden) 236 golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]]) 237 np.testing.assert_array_equal(item["out2"], golden) 238 i = i + 4 239 240 ds.config.set_enable_shared_mem(mem_original) 241 242def test_case_9(): 243 """ 244 Test PyFunc 245 """ 246 logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x") 247 248 mem_original = ds.config.get_enable_shared_mem() 249 ds.config.set_enable_shared_mem(False) 250 251 # apply dataset operations 252 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 253 254 data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + 1), (lambda x: x + 2)], input_columns="col0", 255 output_columns="out", num_parallel_workers=4, python_multiprocessing=True) 256 257 i = 0 258 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 259 # In this test, the dataset is 2x2 sequential tensors 260 golden = np.array([[i * 2 + 3, (i + 1) * 2 + 3], [(i + 2) * 2 + 3, (i + 3) * 2 + 3]]) 261 np.testing.assert_array_equal(item["out"], golden) 262 i = i + 4 263 264 ds.config.set_enable_shared_mem(mem_original) 265 266def test_case_10(): 267 """ 268 Test PyFunc 269 """ 270 logger.info("Test multiple map with multiprocess: lambda x : x + x") 271 272 mem_original = ds.config.get_enable_shared_mem() 273 ds.config.set_enable_shared_mem(False) 274 275 # apply dataset operations 276 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 277 278 data1 = data1.map(operations=[(lambda x: x * 10)], input_columns="col0", 279 output_columns="out", num_parallel_workers=4) 280 data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + 1), (lambda x: x + 2)], input_columns="out", 281 output_columns="out", num_parallel_workers=4, python_multiprocessing=True) 282 283 i = 0 284 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 285 # In this test, the dataset is 2x2 sequential tensors 286 golden = np.array([[i * 20 + 3, (i + 1) * 20 + 3], [(i + 2) * 20 + 3, (i + 3) * 20 + 3]]) 287 np.testing.assert_array_equal(item["out"], golden) 288 i = i + 4 289 290 ds.config.set_enable_shared_mem(mem_original) 291 292def test_pyfunc_implicit_compose(): 293 """ 294 Test Implicit Compose with pyfunc 295 """ 296 logger.info("Test n-m PyFunc : lambda x, y : (x , x + 1, x + y)") 297 298 # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together, 299 # and cause core dump and blocking in this UT. Add cleanup() here to fix it. 300 it._cleanup() # pylint: disable=W0212 301 302 col = ["col0", "col1"] 303 304 # apply dataset operations 305 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 306 307 data1 = data1.map(operations=[(lambda x, y: (x, x + y, x + y + 1)), (lambda x, y, z: (x, y, z))], input_columns=col, 308 output_columns=["out0", "out1", "out2"], column_order=["out0", "out1", "out2"]) 309 310 i = 0 311 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): # each data is a dictionary 312 # In this test, the dataset is 2x2 sequential tensors 313 golden = np.array([[i, i + 1], [i + 2, i + 3]]) 314 np.testing.assert_array_equal(item["out0"], golden) 315 golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]]) 316 np.testing.assert_array_equal(item["out1"], golden) 317 golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]]) 318 np.testing.assert_array_equal(item["out2"], golden) 319 i = i + 4 320 321 322def test_pyfunc_exception(): 323 logger.info("Test PyFunc Exception Throw: lambda x : raise Exception()") 324 325 # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together, 326 # and cause core dump and blocking in this UT. Add cleanup() here to fix it. 327 it._cleanup() # pylint: disable=W0212 328 329 def pyfunc(x): 330 raise Exception("Pyfunc Throw") 331 332 with pytest.raises(RuntimeError) as info: 333 # apply dataset operations 334 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 335 data1 = data1.map(operations=pyfunc, input_columns="col0", output_columns="out", 336 num_parallel_workers=4) 337 for _ in data1: 338 pass 339 assert "Pyfunc Throw" in str(info.value) 340 341 342def skip_test_pyfunc_Exception_multiprocess(): 343 logger.info("Test Multiprocess PyFunc Exception Throw: lambda x : raise Exception()") 344 345 # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together, 346 # and cause core dump and blocking in this UT. Add cleanup() here to fix it. 347 it._cleanup() # pylint: disable=W0212 348 349 def pyfunc(x): 350 raise Exception("MP Pyfunc Throw") 351 352 with pytest.raises(RuntimeError) as info: 353 # apply dataset operations 354 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 355 data1 = data1.map(operations=pyfunc, input_columns="col0", output_columns="out", 356 num_parallel_workers=4, python_multiprocessing=True) 357 for _ in data1: 358 pass 359 assert "MP Pyfunc Throw" in str(info.value) 360 361 362def test_func_with_yield_manifest_dataset_01(): 363 def pass_func(_): 364 for i in range(10): 365 yield (np.array([i]),) 366 367 # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together, 368 # and cause core dump and blocking in this UT. Add cleanup() here to fix it. 369 it._cleanup() # pylint: disable=W0212 370 371 DATA_FILE = "../data/dataset/testManifestData/test.manifest" 372 data = ds.ManifestDataset(DATA_FILE) 373 data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True, 374 max_rowsize=1) 375 num_iter = 0 376 try: 377 for _ in data.create_dict_iterator(output_numpy=True): 378 num_iter += 1 379 except RuntimeError as e: 380 assert "Can not pickle <class 'generator'> object, " in str(e) 381 382 383if __name__ == "__main__": 384 test_case_0() 385 test_case_1() 386 test_case_2() 387 test_case_3() 388 test_case_4() 389 test_case_5() 390 test_case_6() 391 test_case_7() 392 test_case_8() 393 test_case_9() 394 test_case_10() 395 test_pyfunc_implicit_compose() 396 test_pyfunc_exception() 397 skip_test_pyfunc_exception_multiprocess() 398 test_func_with_yield_manifest_dataset_01() 399