• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Testing configuration manager
17"""
18import os
19import filecmp
20import glob
21import numpy as np
22
23import mindspore.dataset as ds
24import mindspore.dataset.engine.iterators as it
25import mindspore.dataset.transforms.py_transforms
26import mindspore.dataset.vision.c_transforms as c_vision
27import mindspore.dataset.vision.py_transforms as py_vision
28from mindspore import log as logger
29from util import dataset_equal
30
31DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
32SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
33
34
35def test_basic():
36    """
37    Test basic configuration functions
38    """
39    # Save original configuration values
40    num_parallel_workers_original = ds.config.get_num_parallel_workers()
41    prefetch_size_original = ds.config.get_prefetch_size()
42    seed_original = ds.config.get_seed()
43    monitor_sampling_interval_original = ds.config.get_monitor_sampling_interval()
44
45    ds.config.load('../data/dataset/declient.cfg')
46
47    assert ds.config.get_num_parallel_workers() == 8
48    # assert ds.config.get_worker_connector_size() == 16
49    assert ds.config.get_prefetch_size() == 16
50    assert ds.config.get_seed() == 5489
51    assert ds.config.get_monitor_sampling_interval() == 15
52
53    ds.config.set_num_parallel_workers(2)
54    # ds.config.set_worker_connector_size(3)
55    ds.config.set_prefetch_size(4)
56    ds.config.set_seed(5)
57    ds.config.set_monitor_sampling_interval(45)
58
59    assert ds.config.get_num_parallel_workers() == 2
60    # assert ds.config.get_worker_connector_size() == 3
61    assert ds.config.get_prefetch_size() == 4
62    assert ds.config.get_seed() == 5
63    assert ds.config.get_monitor_sampling_interval() == 45
64
65    # Restore original configuration values
66    ds.config.set_num_parallel_workers(num_parallel_workers_original)
67    ds.config.set_prefetch_size(prefetch_size_original)
68    ds.config.set_seed(seed_original)
69    ds.config.set_monitor_sampling_interval(monitor_sampling_interval_original)
70
71
72def test_get_seed():
73    """
74    This gets the seed value without explicitly setting a default, expect int.
75    """
76    assert isinstance(ds.config.get_seed(), int)
77
78
79def test_pipeline():
80    """
81    Test that our configuration pipeline works when we set parameters at different locations in dataset code
82    """
83    # Save original configuration values
84    num_parallel_workers_original = ds.config.get_num_parallel_workers()
85
86    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
87    data1 = data1.map(operations=[c_vision.Decode(True)], input_columns=["image"])
88    ds.serialize(data1, "testpipeline.json")
89
90    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_parallel_workers=num_parallel_workers_original,
91                               shuffle=False)
92    data2 = data2.map(operations=[c_vision.Decode(True)], input_columns=["image"])
93    ds.serialize(data2, "testpipeline2.json")
94
95    # check that the generated output is different
96    assert filecmp.cmp('testpipeline.json', 'testpipeline2.json')
97
98    # this test passes currently because our num_parallel_workers don't get updated.
99
100    # remove generated jason files
101    file_list = glob.glob('*.json')
102    for f in file_list:
103        try:
104            os.remove(f)
105        except IOError:
106            logger.info("Error while deleting: {}".format(f))
107
108    # Restore original configuration values
109    ds.config.set_num_parallel_workers(num_parallel_workers_original)
110
111
112def test_deterministic_run_fail():
113    """
114    Test RandomCrop with seed, expected to fail
115    """
116    logger.info("test_deterministic_run_fail")
117
118    # Save original configuration values
119    num_parallel_workers_original = ds.config.get_num_parallel_workers()
120    seed_original = ds.config.get_seed()
121
122    # when we set the seed all operations within our dataset should be deterministic
123    ds.config.set_seed(0)
124    ds.config.set_num_parallel_workers(1)
125    # First dataset
126    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
127    # Assuming we get the same seed on calling constructor, if this op is re-used then result won't be
128    # the same in between the two datasets. For example, RandomCrop constructor takes seed (0)
129    # outputs a deterministic series of numbers, e,g "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
130    random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
131    decode_op = c_vision.Decode()
132    data1 = data1.map(operations=decode_op, input_columns=["image"])
133    data1 = data1.map(operations=random_crop_op, input_columns=["image"])
134
135    # Second dataset
136    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
137    data2 = data2.map(operations=decode_op, input_columns=["image"])
138    # If seed is set up on constructor
139    data2 = data2.map(operations=random_crop_op, input_columns=["image"])
140
141    try:
142        dataset_equal(data1, data2, 0)
143
144    except Exception as e:
145        # two datasets split the number out of the sequence a
146        logger.info("Got an exception in DE: {}".format(str(e)))
147        assert "Array" in str(e)
148
149    # Restore original configuration values
150    ds.config.set_num_parallel_workers(num_parallel_workers_original)
151    ds.config.set_seed(seed_original)
152
153
154def test_seed_undeterministic():
155    """
156    Test seed with num parallel workers in c, this test is expected to fail some of the time
157    """
158    logger.info("test_seed_undeterministic")
159
160    # Save original configuration values
161    num_parallel_workers_original = ds.config.get_num_parallel_workers()
162    seed_original = ds.config.get_seed()
163
164    ds.config.set_seed(0)
165    ds.config.set_num_parallel_workers(3)
166
167    # First dataset
168    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
169    # We get the seed when constructor is called
170    random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
171    decode_op = c_vision.Decode()
172    data1 = data1.map(operations=decode_op, input_columns=["image"])
173    data1 = data1.map(operations=random_crop_op, input_columns=["image"])
174
175    # Second dataset
176    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
177    data2 = data2.map(operations=decode_op, input_columns=["image"])
178    # Since seed is set up on constructor, so the two ops output deterministic sequence.
179    # Assume the generated random sequence "a" = [1, 2, 3, 4, 5, 6] <- pretend these are random
180    random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
181    data2 = data2.map(operations=random_crop_op2, input_columns=["image"])
182    try:
183        dataset_equal(data1, data2, 0)
184    except Exception as e:
185        # two datasets both use numbers from the generated sequence "a"
186        logger.info("Got an exception in DE: {}".format(str(e)))
187        assert "Array" in str(e)
188
189    # Restore original configuration values
190    ds.config.set_num_parallel_workers(num_parallel_workers_original)
191    ds.config.set_seed(seed_original)
192
193
194def test_seed_deterministic():
195    """
196    Test deterministic run with setting the seed, only works with num_parallel worker = 1
197    """
198    logger.info("test_seed_deterministic")
199
200    # Save original configuration values
201    num_parallel_workers_original = ds.config.get_num_parallel_workers()
202    seed_original = ds.config.get_seed()
203
204    ds.config.set_seed(0)
205    ds.config.set_num_parallel_workers(1)
206
207    # First dataset
208    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
209    # seed will be read in during constructor call
210    random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
211    decode_op = c_vision.Decode()
212    data1 = data1.map(operations=decode_op, input_columns=["image"])
213    data1 = data1.map(operations=random_crop_op, input_columns=["image"])
214
215    # Second dataset
216    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
217    data2 = data2.map(operations=decode_op, input_columns=["image"])
218    # If seed is set up on constructor, so the two ops output deterministic sequence
219    random_crop_op2 = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
220    data2 = data2.map(operations=random_crop_op2, input_columns=["image"])
221
222    dataset_equal(data1, data2, 0)
223
224    # Restore original configuration values
225    ds.config.set_num_parallel_workers(num_parallel_workers_original)
226    ds.config.set_seed(seed_original)
227
228
229def test_deterministic_run_distribution():
230    """
231    Test deterministic run with with setting the seed being used in a distribution
232    """
233    logger.info("test_deterministic_run_distribution")
234
235    # Save original configuration values
236    num_parallel_workers_original = ds.config.get_num_parallel_workers()
237    seed_original = ds.config.get_seed()
238
239    # when we set the seed all operations within our dataset should be deterministic
240    ds.config.set_seed(0)
241    ds.config.set_num_parallel_workers(1)
242
243    # First dataset
244    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
245    random_horizontal_flip_op = c_vision.RandomHorizontalFlip(0.1)
246    decode_op = c_vision.Decode()
247    data1 = data1.map(operations=decode_op, input_columns=["image"])
248    data1 = data1.map(operations=random_horizontal_flip_op, input_columns=["image"])
249
250    # Second dataset
251    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
252    data2 = data2.map(operations=decode_op, input_columns=["image"])
253    # If seed is set up on constructor, so the two ops output deterministic sequence
254    random_horizontal_flip_op2 = c_vision.RandomHorizontalFlip(0.1)
255    data2 = data2.map(operations=random_horizontal_flip_op2, input_columns=["image"])
256
257    dataset_equal(data1, data2, 0)
258
259    # Restore original configuration values
260    ds.config.set_num_parallel_workers(num_parallel_workers_original)
261    ds.config.set_seed(seed_original)
262
263
264def test_deterministic_python_seed():
265    """
266    Test deterministic execution with seed in python
267    """
268    logger.info("test_deterministic_python_seed")
269
270    # Save original configuration values
271    num_parallel_workers_original = ds.config.get_num_parallel_workers()
272    seed_original = ds.config.get_seed()
273
274    ds.config.set_seed(0)
275    ds.config.set_num_parallel_workers(1)
276
277    # First dataset
278    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
279
280    transforms = [
281        py_vision.Decode(),
282        py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
283        py_vision.ToTensor(),
284    ]
285    transform = mindspore.dataset.transforms.py_transforms.Compose(transforms)
286    data1 = data1.map(operations=transform, input_columns=["image"])
287    data1_output = []
288    # config.set_seed() calls random.seed()
289    for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
290        data1_output.append(data_one["image"])
291
292    # Second dataset
293    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
294    data2 = data2.map(operations=transform, input_columns=["image"])
295    # config.set_seed() calls random.seed(), resets seed for next dataset iterator
296    ds.config.set_seed(0)
297
298    data2_output = []
299    for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True):
300        data2_output.append(data_two["image"])
301
302    np.testing.assert_equal(data1_output, data2_output)
303
304    # Restore original configuration values
305    ds.config.set_num_parallel_workers(num_parallel_workers_original)
306    ds.config.set_seed(seed_original)
307
308
309def test_deterministic_python_seed_multi_thread():
310    """
311    Test deterministic execution with seed in python, this fails with multi-thread pyfunc run
312    """
313    logger.info("test_deterministic_python_seed_multi_thread")
314
315    # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together,
316    # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
317    it._cleanup()  # pylint: disable=W0212
318
319    # Save original configuration values
320    num_parallel_workers_original = ds.config.get_num_parallel_workers()
321    seed_original = ds.config.get_seed()
322    mem_original = ds.config.get_enable_shared_mem()
323    ds.config.set_num_parallel_workers(3)
324    ds.config.set_seed(0)
325
326    #disable shared memory to save shm in CI
327    ds.config.set_enable_shared_mem(False)
328
329    # when we set the seed all operations within our dataset should be deterministic
330    # First dataset
331    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
332    transforms = [
333        py_vision.Decode(),
334        py_vision.RandomCrop([512, 512], [200, 200, 200, 200]),
335        py_vision.ToTensor(),
336    ]
337    transform = mindspore.dataset.transforms.py_transforms.Compose(transforms)
338    data1 = data1.map(operations=transform, input_columns=["image"], python_multiprocessing=True)
339    data1_output = []
340    # config.set_seed() calls random.seed()
341    for data_one in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
342        data1_output.append(data_one["image"])
343
344    # Second dataset
345    data2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
346    # If seed is set up on constructor
347    data2 = data2.map(operations=transform, input_columns=["image"], python_multiprocessing=True)
348    # config.set_seed() calls random.seed()
349    ds.config.set_seed(0)
350
351    data2_output = []
352    for data_two in data2.create_dict_iterator(num_epochs=1, output_numpy=True):
353        data2_output.append(data_two["image"])
354
355    try:
356        np.testing.assert_equal(data1_output, data2_output)
357    except Exception as e:
358        # expect output to not match during multi-threaded execution
359        logger.info("Got an exception in DE: {}".format(str(e)))
360        assert "Array" in str(e)
361
362    # Restore original configuration values
363    ds.config.set_num_parallel_workers(num_parallel_workers_original)
364    ds.config.set_seed(seed_original)
365    ds.config.set_enable_shared_mem(mem_original)
366
367
368def test_auto_num_workers_error():
369    """
370    Test auto_num_workers error
371    """
372    err_msg = ""
373    try:
374        ds.config.set_auto_num_workers([1, 2])
375    except TypeError as e:
376        err_msg = str(e)
377
378    assert "must be of type bool" in err_msg
379
380
381def test_auto_num_workers():
382    """
383    Test auto_num_workers can be set.
384    """
385
386    saved_config = ds.config.get_auto_num_workers()
387    assert isinstance(saved_config, bool)
388    # change to a different config
389    flipped_config = not saved_config
390    ds.config.set_auto_num_workers(flipped_config)
391    assert flipped_config == ds.config.get_auto_num_workers()
392    # now flip this back
393    ds.config.set_auto_num_workers(saved_config)
394    assert saved_config == ds.config.get_auto_num_workers()
395
396
397if __name__ == '__main__':
398    test_basic()
399    test_get_seed()
400    test_pipeline()
401    test_deterministic_run_fail()
402    test_seed_undeterministic()
403    test_seed_deterministic()
404    test_deterministic_run_distribution()
405    test_deterministic_python_seed()
406    test_deterministic_python_seed_multi_thread()
407    test_auto_num_workers_error()
408    test_auto_num_workers()
409