1# Copyright 2019 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Testing configuration manager 17""" 18import os 19import filecmp 20import glob 21import numpy as np 22 23import mindspore.dataset as ds 24import mindspore.dataset.engine.iterators as it 25import mindspore.dataset.transforms.py_transforms 26import mindspore.dataset.vision.c_transforms as c_vision 27import mindspore.dataset.vision.py_transforms as py_vision 28from mindspore import log as logger 29from util import dataset_equal 30 31DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"] 32SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json" 33 34 35def test_basic(): 36 """ 37 Test basic configuration functions 38 """ 39 # Save original configuration values 40 num_parallel_workers_original = ds.config.get_num_parallel_workers() 41 prefetch_size_original = ds.config.get_prefetch_size() 42 seed_original = ds.config.get_seed() 43 monitor_sampling_interval_original = ds.config.get_monitor_sampling_interval() 44 45 ds.config.load('../data/dataset/declient.cfg') 46 47 assert ds.config.get_num_parallel_workers() == 8 48 # assert ds.config.get_worker_connector_size() == 16 49 assert ds.config.get_prefetch_size() == 16 50 assert ds.config.get_seed() == 5489 51 assert ds.config.get_monitor_sampling_interval() == 15 52 53 ds.config.set_num_parallel_workers(2) 54 # ds.config.set_worker_connector_size(3) 55 ds.config.set_prefetch_size(4) 56 ds.config.set_seed(5) 57 ds.config.set_monitor_sampling_interval(45) 58 59 assert ds.config.get_num_parallel_workers() == 2 60 # assert ds.config.get_worker_connector_size() == 3 61 assert ds.config.get_prefetch_size() == 4 62 assert ds.config.get_seed() == 5 63 assert ds.config.get_monitor_sampling_interval() == 45 64 65 # Restore original configuration values 66 ds.config.set_num_parallel_workers(num_parallel_workers_original) 67 ds.config.set_prefetch_size(prefetch_size_original) 68 ds.config.set_seed(seed_original) 69 ds.config.set_monitor_sampling_interval(monitor_sampling_interval_original) 70 71 72def test_get_seed(): 73 """ 74 This gets the seed value without explicitly setting a default, expect int. 75 """ 76 assert isinstance(ds.config.get_seed(), int) 77 78 79def test_pipeline(): 80 """ 81 Test that our configuration pipeline works when we set parameters at different locations in dataset code 82 """ 83 # Save original configuration values 84 num_parallel_workers_original = ds.config.get_num_parallel_workers() 85 86 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False) 87 data1 = data1.map(operations=[c_vision.Decode(True)], input_columns=["image"]) 88 ds.serialize(data1, "testpipeline.json") 89 90 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_parallel_workers=num_parallel_workers_original, 91 shuffle=False) 92 data2 = data2.map(operations=[c_vision.Decode(True)], input_columns=["image"]) 93 ds.serialize(data2, "testpipeline2.json") 94 95 # check that the generated output is different 96 assert filecmp.cmp('testpipeline.json', 'testpipeline2.json') 97 98 # this test passes currently because our num_parallel_workers don't get updated. 99 100 # remove generated jason files 101 file_list = glob.glob('*.json') 102 for f in file_list: 103 try: 104 os.remove(f) 105 except IOError: 106 logger.info("Error while deleting: {}".format(f)) 107 108 # Restore original configuration values 109 ds.config.set_num_parallel_workers(num_parallel_workers_original) 110 111 112def test_deterministic_run_fail(): 113 """ 114 Test RandomCrop with seed, expected to fail 115 """ 116 logger.info("test_deterministic_run_fail") 117 118 # Save original configuration values 119 num_parallel_workers_original = ds.config.get_num_parallel_workers() 120 seed_original = ds.config.get_seed() 121 122 # when we set the seed all operations within our dataset should be deterministic 123 ds.config.set_seed(0) 124 ds.config.set_num_parallel_workers(1) 125 # First dataset 126 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 127 # Assuming we get the same seed on calling constructor, if this op is re-used then result won't be 128 # the same in between the two datasets. For example, RandomCrop constructor takes seed (0) 129 # outputs a deterministic series of numbers, e,g "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random 130 random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 131 decode_op = c_vision.Decode() 132 data1 = data1.map(operations=decode_op, input_columns=["image"]) 133 data1 = data1.map(operations=random_crop_op, input_columns=["image"]) 134 135 # Second dataset 136 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 137 data2 = data2.map(operations=decode_op, input_columns=["image"]) 138 # If seed is set up on constructor 139 data2 = data2.map(operations=random_crop_op, input_columns=["image"]) 140 141 try: 142 dataset_equal(data1, data2, 0) 143 144 except Exception as e: 145 # two datasets split the number out of the sequence a 146 logger.info("Got an exception in DE: {}".format(str(e))) 147 assert "Array" in str(e) 148 149 # Restore original configuration values 150 ds.config.set_num_parallel_workers(num_parallel_workers_original) 151 ds.config.set_seed(seed_original) 152 153 154def test_seed_undeterministic(): 155 """ 156 Test seed with num parallel workers in c, this test is expected to fail some of the time 157 """ 158 logger.info("test_seed_undeterministic") 159 160 # Save original configuration values 161 num_parallel_workers_original = ds.config.get_num_parallel_workers() 162 seed_original = ds.config.get_seed() 163 164 ds.config.set_seed(0) 165 ds.config.set_num_parallel_workers(3) 166 167 # First dataset 168 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 169 # We get the seed when constructor is called 170 random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 171 decode_op = c_vision.Decode() 172 data1 = data1.map(operations=decode_op, input_columns=["image"]) 173 data1 = data1.map(operations=random_crop_op, input_columns=["image"]) 174 175 # Second dataset 176 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 177 data2 = data2.map(operations=decode_op, input_columns=["image"]) 178 # Since seed is set up on constructor, so the two ops output deterministic sequence. 179 # Assume the generated random sequence "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random 180 random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 181 data2 = data2.map(operations=random_crop_op2, input_columns=["image"]) 182 try: 183 dataset_equal(data1, data2, 0) 184 except Exception as e: 185 # two datasets both use numbers from the generated sequence "a" 186 logger.info("Got an exception in DE: {}".format(str(e))) 187 assert "Array" in str(e) 188 189 # Restore original configuration values 190 ds.config.set_num_parallel_workers(num_parallel_workers_original) 191 ds.config.set_seed(seed_original) 192 193 194def test_seed_deterministic(): 195 """ 196 Test deterministic run with setting the seed, only works with num_parallel worker = 1 197 """ 198 logger.info("test_seed_deterministic") 199 200 # Save original configuration values 201 num_parallel_workers_original = ds.config.get_num_parallel_workers() 202 seed_original = ds.config.get_seed() 203 204 ds.config.set_seed(0) 205 ds.config.set_num_parallel_workers(1) 206 207 # First dataset 208 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 209 # seed will be read in during constructor call 210 random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 211 decode_op = c_vision.Decode() 212 data1 = data1.map(operations=decode_op, input_columns=["image"]) 213 data1 = data1.map(operations=random_crop_op, input_columns=["image"]) 214 215 # Second dataset 216 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 217 data2 = data2.map(operations=decode_op, input_columns=["image"]) 218 # If seed is set up on constructor, so the two ops output deterministic sequence 219 random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 220 data2 = data2.map(operations=random_crop_op2, input_columns=["image"]) 221 222 dataset_equal(data1, data2, 0) 223 224 # Restore original configuration values 225 ds.config.set_num_parallel_workers(num_parallel_workers_original) 226 ds.config.set_seed(seed_original) 227 228 229def test_deterministic_run_distribution(): 230 """ 231 Test deterministic run with with setting the seed being used in a distribution 232 """ 233 logger.info("test_deterministic_run_distribution") 234 235 # Save original configuration values 236 num_parallel_workers_original = ds.config.get_num_parallel_workers() 237 seed_original = ds.config.get_seed() 238 239 # when we set the seed all operations within our dataset should be deterministic 240 ds.config.set_seed(0) 241 ds.config.set_num_parallel_workers(1) 242 243 # First dataset 244 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 245 random_horizontal_flip_op = c_vision.RandomHorizontalFlip(0.1) 246 decode_op = c_vision.Decode() 247 data1 = data1.map(operations=decode_op, input_columns=["image"]) 248 data1 = data1.map(operations=random_horizontal_flip_op, input_columns=["image"]) 249 250 # Second dataset 251 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 252 data2 = data2.map(operations=decode_op, input_columns=["image"]) 253 # If seed is set up on constructor, so the two ops output deterministic sequence 254 random_horizontal_flip_op2 = c_vision.RandomHorizontalFlip(0.1) 255 data2 = data2.map(operations=random_horizontal_flip_op2, input_columns=["image"]) 256 257 dataset_equal(data1, data2, 0) 258 259 # Restore original configuration values 260 ds.config.set_num_parallel_workers(num_parallel_workers_original) 261 ds.config.set_seed(seed_original) 262 263 264def test_deterministic_python_seed(): 265 """ 266 Test deterministic execution with seed in python 267 """ 268 logger.info("test_deterministic_python_seed") 269 270 # Save original configuration values 271 num_parallel_workers_original = ds.config.get_num_parallel_workers() 272 seed_original = ds.config.get_seed() 273 274 ds.config.set_seed(0) 275 ds.config.set_num_parallel_workers(1) 276 277 # First dataset 278 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 279 280 transforms = [ 281 py_vision.Decode(), 282 py_vision.RandomCrop([512, 512], [200, 200, 200, 200]), 283 py_vision.ToTensor(), 284 ] 285 transform = mindspore.dataset.transforms.py_transforms.Compose(transforms) 286 data1 = data1.map(operations=transform, input_columns=["image"]) 287 data1_output = [] 288 # config.set_seed() calls random.seed() 289 for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True): 290 data1_output.append(data_one["image"]) 291 292 # Second dataset 293 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 294 data2 = data2.map(operations=transform, input_columns=["image"]) 295 # config.set_seed() calls random.seed(), resets seed for next dataset iterator 296 ds.config.set_seed(0) 297 298 data2_output = [] 299 for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True): 300 data2_output.append(data_two["image"]) 301 302 np.testing.assert_equal(data1_output, data2_output) 303 304 # Restore original configuration values 305 ds.config.set_num_parallel_workers(num_parallel_workers_original) 306 ds.config.set_seed(seed_original) 307 308 309def test_deterministic_python_seed_multi_thread(): 310 """ 311 Test deterministic execution with seed in python, this fails with multi-thread pyfunc run 312 """ 313 logger.info("test_deterministic_python_seed_multi_thread") 314 315 # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together, 316 # and cause core dump and blocking in this UT. Add cleanup() here to fix it. 317 it._cleanup() # pylint: disable=W0212 318 319 # Save original configuration values 320 num_parallel_workers_original = ds.config.get_num_parallel_workers() 321 seed_original = ds.config.get_seed() 322 mem_original = ds.config.get_enable_shared_mem() 323 ds.config.set_num_parallel_workers(3) 324 ds.config.set_seed(0) 325 326 #disable shared memory to save shm in CI 327 ds.config.set_enable_shared_mem(False) 328 329 # when we set the seed all operations within our dataset should be deterministic 330 # First dataset 331 data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 332 transforms = [ 333 py_vision.Decode(), 334 py_vision.RandomCrop([512, 512], [200, 200, 200, 200]), 335 py_vision.ToTensor(), 336 ] 337 transform = mindspore.dataset.transforms.py_transforms.Compose(transforms) 338 data1 = data1.map(operations=transform, input_columns=["image"], python_multiprocessing=True) 339 data1_output = [] 340 # config.set_seed() calls random.seed() 341 for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True): 342 data1_output.append(data_one["image"]) 343 344 # Second dataset 345 data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 346 # If seed is set up on constructor 347 data2 = data2.map(operations=transform, input_columns=["image"], python_multiprocessing=True) 348 # config.set_seed() calls random.seed() 349 ds.config.set_seed(0) 350 351 data2_output = [] 352 for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True): 353 data2_output.append(data_two["image"]) 354 355 try: 356 np.testing.assert_equal(data1_output, data2_output) 357 except Exception as e: 358 # expect output to not match during multi-threaded execution 359 logger.info("Got an exception in DE: {}".format(str(e))) 360 assert "Array" in str(e) 361 362 # Restore original configuration values 363 ds.config.set_num_parallel_workers(num_parallel_workers_original) 364 ds.config.set_seed(seed_original) 365 ds.config.set_enable_shared_mem(mem_original) 366 367 368def test_auto_num_workers_error(): 369 """ 370 Test auto_num_workers error 371 """ 372 err_msg = "" 373 try: 374 ds.config.set_auto_num_workers([1, 2]) 375 except TypeError as e: 376 err_msg = str(e) 377 378 assert "must be of type bool" in err_msg 379 380 381def test_auto_num_workers(): 382 """ 383 Test auto_num_workers can be set. 384 """ 385 386 saved_config = ds.config.get_auto_num_workers() 387 assert isinstance(saved_config, bool) 388 # change to a different config 389 flipped_config = not saved_config 390 ds.config.set_auto_num_workers(flipped_config) 391 assert flipped_config == ds.config.get_auto_num_workers() 392 # now flip this back 393 ds.config.set_auto_num_workers(saved_config) 394 assert saved_config == ds.config.get_auto_num_workers() 395 396 397if __name__ == '__main__': 398 test_basic() 399 test_get_seed() 400 test_pipeline() 401 test_deterministic_run_fail() 402 test_seed_undeterministic() 403 test_seed_deterministic() 404 test_deterministic_run_distribution() 405 test_deterministic_python_seed() 406 test_deterministic_python_seed_multi_thread() 407 test_auto_num_workers_error() 408 test_auto_num_workers() 409