• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15import numpy as np
16import pytest
17
18import mindspore.dataset as ds
19import mindspore.dataset.engine.iterators as it
20from mindspore import log as logger
21
22DATA_DIR = ["../data/dataset/testPyfuncMap/data.data"]
23SCHEMA_DIR = "../data/dataset/testPyfuncMap/schema.json"
24COLUMNS = ["col0", "col1", "col2"]
25GENERATE_GOLDEN = False
26
27
28def test_case_0():
29    """
30    Test PyFunc
31    """
32    logger.info("Test 1-1 PyFunc : lambda x : x + x")
33
34    # apply dataset operations
35    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
36
37    data1 = data1.map(operations=(lambda x: x + x), input_columns="col0", output_columns="out")
38
39    i = 0
40    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
41        # In this test, the dataset is 2x2 sequential tensors
42        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
43        np.testing.assert_array_equal(item["out"], golden)
44        i = i + 4
45
46
47def test_case_1():
48    """
49    Test PyFunc
50    """
51    logger.info("Test 1-n PyFunc : lambda x : (x , x + x) ")
52
53    col = "col0"
54
55    # apply dataset operations
56    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
57    data1 = data1.map(operations=(lambda x: (x, x + x)), input_columns=col, output_columns=["out0", "out1"],
58                      column_order=["out0", "out1"])
59
60    i = 0
61    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
62        # In this test, the dataset is 2x2 sequential tensors
63        golden = np.array([[i, i + 1], [i + 2, i + 3]])
64        np.testing.assert_array_equal(item["out0"], golden)
65        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
66        np.testing.assert_array_equal(item["out1"], golden)
67        i = i + 4
68
69
70def test_case_2():
71    """
72    Test PyFunc
73    """
74    logger.info("Test n-1 PyFunc : lambda x, y : x + y ")
75
76    col = ["col0", "col1"]
77
78    # apply dataset operations
79    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
80
81    data1 = data1.map(operations=(lambda x, y: x + y), input_columns=col, output_columns="out",
82                      column_order=["out"])
83
84    i = 0
85    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
86        # In this test, the dataset is 2x2 sequential tensors
87        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
88        np.testing.assert_array_equal(item["out"], golden)
89        i = i + 4
90
91
92def test_case_3():
93    """
94    Test PyFunc
95    """
96    logger.info("Test n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
97
98    col = ["col0", "col1"]
99
100    # apply dataset operations
101    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
102
103    data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col,
104                      output_columns=["out0", "out1", "out2"], column_order=["out0", "out1", "out2"])
105
106    i = 0
107    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
108        # In this test, the dataset is 2x2 sequential tensors
109        golden = np.array([[i, i + 1], [i + 2, i + 3]])
110        np.testing.assert_array_equal(item["out0"], golden)
111        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
112        np.testing.assert_array_equal(item["out1"], golden)
113        golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
114        np.testing.assert_array_equal(item["out2"], golden)
115        i = i + 4
116
117
118def test_case_4():
119    """
120    Test PyFunc
121    """
122    logger.info("Test Parallel n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
123
124    col = ["col0", "col1"]
125
126    # apply dataset operations
127    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
128
129    data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col,
130                      output_columns=["out0", "out1", "out2"], num_parallel_workers=4,
131                      column_order=["out0", "out1", "out2"])
132
133    i = 0
134    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
135        # In this test, the dataset is 2x2 sequential tensors
136        golden = np.array([[i, i + 1], [i + 2, i + 3]])
137        np.testing.assert_array_equal(item["out0"], golden)
138        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
139        np.testing.assert_array_equal(item["out1"], golden)
140        golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
141        np.testing.assert_array_equal(item["out2"], golden)
142        i = i + 4
143
144
145# The execution of this function will acquire GIL
146def func_5(x):
147    return np.ones(x.shape, dtype=x.dtype)
148
149
150def test_case_5():
151    """
152    Test PyFunc
153    """
154    logger.info("Test 1-1 PyFunc : lambda x: np.ones(x.shape)")
155
156    # apply dataset operations
157    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
158
159    data1 = data1.map(operations=func_5, input_columns="col0", output_columns="out")
160
161    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
162        # In this test, the dataset is 2x2 sequential tensors
163        golden = np.array([[1, 1], [1, 1]])
164        np.testing.assert_array_equal(item["out"], golden)
165
166
167def test_case_6():
168    """
169    Test PyFunc
170    """
171    logger.info("Test PyFunc Compose : (lambda x : x + x), (lambda x : x + x)")
172
173    # apply dataset operations
174    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
175
176    data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + x)], input_columns="col0", output_columns="out")
177
178    i = 0
179    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
180        # In this test, the dataset is 2x2 sequential tensors
181        golden = np.array([[i * 4, (i + 1) * 4], [(i + 2) * 4, (i + 3) * 4]])
182        np.testing.assert_array_equal(item["out"], golden)
183        i = i + 4
184
185
186def test_case_7():
187    """
188    Test PyFunc
189    """
190    logger.info("Test 1-1 PyFunc Multiprocess: lambda x : x + x")
191
192    mem_original = ds.config.get_enable_shared_mem()
193    ds.config.set_enable_shared_mem(False)
194
195    # apply dataset operations
196    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
197
198    data1 = data1.map(operations=(lambda x: x + x), input_columns="col0", output_columns="out",
199                      num_parallel_workers=4, python_multiprocessing=True)
200
201    i = 0
202    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
203        # In this test, the dataset is 2x2 sequential tensors
204        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
205        np.testing.assert_array_equal(item["out"], golden)
206        i = i + 4
207
208    ds.config.set_enable_shared_mem(mem_original)
209
210def test_case_8():
211    """
212    Test PyFunc
213    """
214    logger.info("Test Multiprocess n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
215
216    mem_original = ds.config.get_enable_shared_mem()
217    ds.config.set_enable_shared_mem(False)
218
219    col = ["col0", "col1"]
220
221    # apply dataset operations
222    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
223
224    data1 = data1.map(operations=(lambda x, y: (x, x + y, x + y + 1)), input_columns=col,
225                      output_columns=["out0", "out1", "out2"], num_parallel_workers=4,
226                      column_order=["out0", "out1", "out2"],
227                      python_multiprocessing=True)
228
229    i = 0
230    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
231        # In this test, the dataset is 2x2 sequential tensors
232        golden = np.array([[i, i + 1], [i + 2, i + 3]])
233        np.testing.assert_array_equal(item["out0"], golden)
234        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
235        np.testing.assert_array_equal(item["out1"], golden)
236        golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
237        np.testing.assert_array_equal(item["out2"], golden)
238        i = i + 4
239
240    ds.config.set_enable_shared_mem(mem_original)
241
242def test_case_9():
243    """
244    Test PyFunc
245    """
246    logger.info("Test multiple 1-1 PyFunc Multiprocess: lambda x : x + x")
247
248    mem_original = ds.config.get_enable_shared_mem()
249    ds.config.set_enable_shared_mem(False)
250
251    # apply dataset operations
252    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
253
254    data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + 1), (lambda x: x + 2)], input_columns="col0",
255                      output_columns="out", num_parallel_workers=4, python_multiprocessing=True)
256
257    i = 0
258    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
259        # In this test, the dataset is 2x2 sequential tensors
260        golden = np.array([[i * 2 + 3, (i + 1) * 2 + 3], [(i + 2) * 2 + 3, (i + 3) * 2 + 3]])
261        np.testing.assert_array_equal(item["out"], golden)
262        i = i + 4
263
264    ds.config.set_enable_shared_mem(mem_original)
265
266def test_case_10():
267    """
268    Test PyFunc
269    """
270    logger.info("Test multiple map with multiprocess: lambda x : x + x")
271
272    mem_original = ds.config.get_enable_shared_mem()
273    ds.config.set_enable_shared_mem(False)
274
275    # apply dataset operations
276    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
277
278    data1 = data1.map(operations=[(lambda x: x * 10)], input_columns="col0",
279                      output_columns="out", num_parallel_workers=4)
280    data1 = data1.map(operations=[(lambda x: x + x), (lambda x: x + 1), (lambda x: x + 2)], input_columns="out",
281                      output_columns="out", num_parallel_workers=4, python_multiprocessing=True)
282
283    i = 0
284    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
285        # In this test, the dataset is 2x2 sequential tensors
286        golden = np.array([[i * 20 + 3, (i + 1) * 20 + 3], [(i + 2) * 20 + 3, (i + 3) * 20 + 3]])
287        np.testing.assert_array_equal(item["out"], golden)
288        i = i + 4
289
290    ds.config.set_enable_shared_mem(mem_original)
291
292def test_pyfunc_implicit_compose():
293    """
294    Test Implicit Compose with pyfunc
295    """
296    logger.info("Test n-m PyFunc : lambda x, y : (x , x + 1, x + y)")
297
298    # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together,
299    # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
300    it._cleanup()  # pylint: disable=W0212
301
302    col = ["col0", "col1"]
303
304    # apply dataset operations
305    data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
306
307    data1 = data1.map(operations=[(lambda x, y: (x, x + y, x + y + 1)), (lambda x, y, z: (x, y, z))], input_columns=col,
308                      output_columns=["out0", "out1", "out2"], column_order=["out0", "out1", "out2"])
309
310    i = 0
311    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):  # each data is a dictionary
312        # In this test, the dataset is 2x2 sequential tensors
313        golden = np.array([[i, i + 1], [i + 2, i + 3]])
314        np.testing.assert_array_equal(item["out0"], golden)
315        golden = np.array([[i * 2, (i + 1) * 2], [(i + 2) * 2, (i + 3) * 2]])
316        np.testing.assert_array_equal(item["out1"], golden)
317        golden = np.array([[i * 2 + 1, (i + 1) * 2 + 1], [(i + 2) * 2 + 1, (i + 3) * 2 + 1]])
318        np.testing.assert_array_equal(item["out2"], golden)
319        i = i + 4
320
321
322def test_pyfunc_exception():
323    logger.info("Test PyFunc Exception Throw: lambda x : raise Exception()")
324
325    # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together,
326    # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
327    it._cleanup()  # pylint: disable=W0212
328
329    def pyfunc(x):
330        raise Exception("Pyfunc Throw")
331
332    with pytest.raises(RuntimeError) as info:
333        # apply dataset operations
334        data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
335        data1 = data1.map(operations=pyfunc, input_columns="col0", output_columns="out",
336                          num_parallel_workers=4)
337        for _ in data1:
338            pass
339        assert "Pyfunc Throw" in str(info.value)
340
341
342def skip_test_pyfunc_Exception_multiprocess():
343    logger.info("Test Multiprocess PyFunc Exception Throw: lambda x : raise Exception()")
344
345    # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together,
346    # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
347    it._cleanup()  # pylint: disable=W0212
348
349    def pyfunc(x):
350        raise Exception("MP Pyfunc Throw")
351
352    with pytest.raises(RuntimeError) as info:
353        # apply dataset operations
354        data1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, shuffle=False)
355        data1 = data1.map(operations=pyfunc, input_columns="col0", output_columns="out",
356                          num_parallel_workers=4, python_multiprocessing=True)
357        for _ in data1:
358            pass
359        assert "MP Pyfunc Throw" in str(info.value)
360
361
362def test_func_with_yield_manifest_dataset_01():
363    def pass_func(_):
364        for i in range(10):
365            yield (np.array([i]),)
366
367    # Sometimes there are some ITERATORS left in ITERATORS_LIST when run all UTs together,
368    # and cause core dump and blocking in this UT. Add cleanup() here to fix it.
369    it._cleanup()  # pylint: disable=W0212
370
371    DATA_FILE = "../data/dataset/testManifestData/test.manifest"
372    data = ds.ManifestDataset(DATA_FILE)
373    data = data.map(operations=pass_func, input_columns=["image"], num_parallel_workers=1, python_multiprocessing=True,
374                    max_rowsize=1)
375    num_iter = 0
376    try:
377        for _ in data.create_dict_iterator(output_numpy=True):
378            num_iter += 1
379    except RuntimeError as e:
380        assert "Can not pickle <class 'generator'> object, " in str(e)
381
382
383if __name__ == "__main__":
384    test_case_0()
385    test_case_1()
386    test_case_2()
387    test_case_3()
388    test_case_4()
389    test_case_5()
390    test_case_6()
391    test_case_7()
392    test_case_8()
393    test_case_9()
394    test_case_10()
395    test_pyfunc_implicit_compose()
396    test_pyfunc_exception()
397    skip_test_pyfunc_exception_multiprocess()
398    test_func_with_yield_manifest_dataset_01()
399