• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Testing profiling support in DE
17"""
18import json
19import os
20import numpy as np
21import mindspore.common.dtype as mstype
22import mindspore.dataset as ds
23import mindspore.dataset.transforms.c_transforms as C
24import mindspore.dataset.vision.c_transforms as vision
25
26FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
27DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
28SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
29
30PIPELINE_FILE = "./pipeline_profiling_1.json"
31CPU_UTIL_FILE = "./minddata_cpu_utilization_1.json"
32DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"
33
34
35def set_profiling_env_var():
36    """
37    Set the MindData Profiling environment variables
38    """
39    os.environ['PROFILING_MODE'] = 'true'
40    os.environ['MINDDATA_PROFILING_DIR'] = '.'
41    os.environ['DEVICE_ID'] = '1'
42    os.environ['RANK_ID'] = '1'
43
44
45def delete_profiling_files():
46    """
47    Delete the MindData profiling files generated from the test.
48    Also disable the MindData Profiling environment variables.
49    """
50    # Delete MindData profiling files
51    os.remove(PIPELINE_FILE)
52    os.remove(CPU_UTIL_FILE)
53    os.remove(DATASET_ITERATOR_FILE)
54
55    # Disable MindData Profiling environment variables
56    del os.environ['PROFILING_MODE']
57    del os.environ['MINDDATA_PROFILING_DIR']
58    del os.environ['DEVICE_ID']
59    del os.environ['RANK_ID']
60
61
62def confirm_cpuutil(num_pipeline_ops):
63    """
64    Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
65    """
66    with open(CPU_UTIL_FILE) as file1:
67        data = json.load(file1)
68        op_info = data["op_info"]
69        # Confirm <num_pipeline_ops>+1 ops in CPU util file (including op_id=-1 for monitor thread)
70        assert len(op_info) == num_pipeline_ops + 1
71
72
73def confirm_ops_in_pipeline(num_ops, op_list):
74    """
75    Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
76    """
77    with open(PIPELINE_FILE) as file1:
78        data = json.load(file1)
79        op_info = data["op_info"]
80        # Confirm ops in pipeline file
81        assert len(op_info) == num_ops
82        for i in range(num_ops):
83            assert op_info[i]["op_type"] in op_list
84
85
86def test_profiling_simple_pipeline():
87    """
88    Generator -> Shuffle -> Batch
89    """
90    set_profiling_env_var()
91
92    source = [(np.array([x]),) for x in range(1024)]
93    data1 = ds.GeneratorDataset(source, ["data"])
94    data1 = data1.shuffle(64)
95    data1 = data1.batch(32)
96    # try output shape type and dataset size and make sure no profiling file is generated
97    assert data1.output_shapes() == [[32, 1]]
98    assert [str(tp) for tp in data1.output_types()] == ["int64"]
99    assert data1.get_dataset_size() == 32
100
101    # Confirm profiling files do not (yet) exist
102    assert os.path.exists(PIPELINE_FILE) is False
103    assert os.path.exists(CPU_UTIL_FILE) is False
104    assert os.path.exists(DATASET_ITERATOR_FILE) is False
105
106    try:
107        for _ in data1:
108            pass
109
110        # Confirm profiling files now exist
111        assert os.path.exists(PIPELINE_FILE) is True
112        assert os.path.exists(CPU_UTIL_FILE) is True
113        assert os.path.exists(DATASET_ITERATOR_FILE) is True
114
115    except Exception as error:
116        delete_profiling_files()
117        raise error
118
119    else:
120        delete_profiling_files()
121
122
123def test_profiling_complex_pipeline():
124    """
125    Generator -> Map     ->
126                             -> Zip
127    TFReader  -> Shuffle ->
128    """
129    set_profiling_env_var()
130
131    source = [(np.array([x]),) for x in range(1024)]
132    data1 = ds.GeneratorDataset(source, ["gen"])
133    data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
134
135    pattern = DATASET_ROOT + "/test.data"
136    data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
137    data2 = data2.shuffle(4)
138
139    data3 = ds.zip((data1, data2))
140
141    try:
142        for _ in data3:
143            pass
144
145        with open(PIPELINE_FILE) as f:
146            data = json.load(f)
147            op_info = data["op_info"]
148            assert len(op_info) == 5
149            for i in range(5):
150                if op_info[i]["op_type"] != "ZipOp":
151                    assert "size" in op_info[i]["metrics"]["output_queue"]
152                    assert "length" in op_info[i]["metrics"]["output_queue"]
153                    assert "throughput" in op_info[i]["metrics"]["output_queue"]
154                else:
155                    # Note: Zip is an inline op and hence does not have metrics information
156                    assert op_info[i]["metrics"] is None
157
158        # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
159        confirm_cpuutil(5)
160
161    except Exception as error:
162        delete_profiling_files()
163        raise error
164
165    else:
166        delete_profiling_files()
167
168
169def test_profiling_inline_ops_pipeline1():
170    """
171    Test pipeline with inline ops: Concat and EpochCtrl
172    Generator ->
173                 Concat -> EpochCtrl
174    Generator ->
175    """
176    set_profiling_env_var()
177
178    # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
179    def source1():
180        for i in range(3):
181            yield (np.array([i]),)
182
183    # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
184    def source2():
185        for i in range(3, 10):
186            yield (np.array([i]),)
187
188    data1 = ds.GeneratorDataset(source1, ["col1"])
189    data2 = ds.GeneratorDataset(source2, ["col1"])
190    data3 = data1.concat(data2)
191
192    try:
193        num_iter = 0
194        # Note: Do not explicitly set num_epochs argument in create_tuple_iterator() call
195        # Here i refers to index, d refers to data element
196        for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True)):
197            num_iter += 1
198            t = d
199            assert i == t[0][0]
200
201        assert num_iter == 10
202
203        # Confirm pipeline is created with EpochCtrl op
204        with open(PIPELINE_FILE) as f:
205            data = json.load(f)
206            op_info = data["op_info"]
207            assert len(op_info) == 4
208            for i in range(4):
209                # Note: The following ops are inline ops: Concat, EpochCtrl
210                if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
211                    # Confirm these inline ops do not have metrics information
212                    assert op_info[i]["metrics"] is None
213                else:
214                    assert "size" in op_info[i]["metrics"]["output_queue"]
215                    assert "length" in op_info[i]["metrics"]["output_queue"]
216                    assert "throughput" in op_info[i]["metrics"]["output_queue"]
217
218        # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
219        confirm_cpuutil(4)
220
221    except Exception as error:
222        delete_profiling_files()
223        raise error
224
225    else:
226        delete_profiling_files()
227
228
229def test_profiling_inline_ops_pipeline2():
230    """
231    Test pipeline with many inline ops
232    Generator -> Rename -> Skip -> Repeat -> Take
233    """
234    set_profiling_env_var()
235
236    # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
237    def source1():
238        for i in range(10):
239            yield (np.array([i]),)
240
241    data1 = ds.GeneratorDataset(source1, ["col1"])
242    data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
243    data1 = data1.skip(2)
244    data1 = data1.repeat(2)
245    data1 = data1.take(12)
246
247    try:
248        for _ in data1:
249            pass
250
251        with open(PIPELINE_FILE) as f:
252            data = json.load(f)
253            op_info = data["op_info"]
254            assert len(op_info) == 5
255            for i in range(5):
256                # Check for these inline ops
257                if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
258                    # Confirm these inline ops do not have metrics information
259                    assert op_info[i]["metrics"] is None
260                else:
261                    assert "size" in op_info[i]["metrics"]["output_queue"]
262                    assert "length" in op_info[i]["metrics"]["output_queue"]
263                    assert "throughput" in op_info[i]["metrics"]["output_queue"]
264
265        # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
266        confirm_cpuutil(5)
267
268    except Exception as error:
269        delete_profiling_files()
270        raise error
271
272    else:
273        delete_profiling_files()
274
275
276def test_profiling_sampling_interval():
277    """
278    Test non-default monitor sampling interval
279    """
280    set_profiling_env_var()
281
282    interval_origin = ds.config.get_monitor_sampling_interval()
283
284    ds.config.set_monitor_sampling_interval(30)
285    interval = ds.config.get_monitor_sampling_interval()
286    assert interval == 30
287
288    source = [(np.array([x]),) for x in range(1024)]
289    data1 = ds.GeneratorDataset(source, ["data"])
290    data1 = data1.shuffle(64)
291    data1 = data1.batch(32)
292
293    try:
294        for _ in data1:
295            pass
296
297    except Exception as error:
298        ds.config.set_monitor_sampling_interval(interval_origin)
299        delete_profiling_files()
300        raise error
301
302    else:
303        ds.config.set_monitor_sampling_interval(interval_origin)
304        delete_profiling_files()
305
306
307def test_profiling_basic_pipeline():
308    """
309    Test with this basic pipeline
310    Generator -> Map -> Batch -> Repeat -> EpochCtrl
311    """
312    set_profiling_env_var()
313
314    def source1():
315        for i in range(8000):
316            yield (np.array([i]),)
317
318    # Create this basic and common pipeline
319    # Leaf/Source-Op -> Map -> Batch -> Repeat
320    data1 = ds.GeneratorDataset(source1, ["col1"])
321
322    type_cast_op = C.TypeCast(mstype.int32)
323    data1 = data1.map(operations=type_cast_op, input_columns="col1")
324    data1 = data1.batch(16)
325    data1 = data1.repeat(2)
326
327    try:
328        num_iter = 0
329        # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
330        for _ in data1.create_dict_iterator(num_epochs=2):
331            num_iter += 1
332
333        assert num_iter == 1000
334
335        with open(PIPELINE_FILE) as f:
336            data = json.load(f)
337            op_info = data["op_info"]
338            assert len(op_info) == 5
339            for i in range(5):
340                # Check for inline ops
341                if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
342                    # Confirm these inline ops do not have metrics information
343                    assert op_info[i]["metrics"] is None
344                else:
345                    assert "size" in op_info[i]["metrics"]["output_queue"]
346                    assert "length" in op_info[i]["metrics"]["output_queue"]
347                    assert "throughput" in op_info[i]["metrics"]["output_queue"]
348
349        # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
350        confirm_cpuutil(5)
351
352    except Exception as error:
353        delete_profiling_files()
354        raise error
355
356    else:
357        delete_profiling_files()
358
359
360def test_profiling_cifar10_pipeline():
361    """
362    Test with this common pipeline with Cifar10
363    Cifar10 -> Map -> Map -> Batch -> Repeat
364    """
365    set_profiling_env_var()
366
367    # Create this common pipeline
368    # Cifar10 -> Map -> Map -> Batch -> Repeat
369    DATA_DIR_10 = "../data/dataset/testCifar10Data"
370    data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
371
372    type_cast_op = C.TypeCast(mstype.int32)
373    data1 = data1.map(operations=type_cast_op, input_columns="label")
374    random_horizontal_op = vision.RandomHorizontalFlip()
375    data1 = data1.map(operations=random_horizontal_op, input_columns="image")
376
377    data1 = data1.batch(32)
378    data1 = data1.repeat(3)
379
380    try:
381        num_iter = 0
382        # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
383        for _ in data1.create_dict_iterator(num_epochs=1):
384            num_iter += 1
385
386        assert num_iter == 750
387
388        with open(PIPELINE_FILE) as f:
389            data = json.load(f)
390            op_info = data["op_info"]
391            assert len(op_info) == 5
392            for i in range(5):
393                # Check for inline ops
394                if op_info[i]["op_type"] == "RepeatOp":
395                    # Confirm these inline ops do not have metrics information
396                    assert op_info[i]["metrics"] is None
397                else:
398                    assert "size" in op_info[i]["metrics"]["output_queue"]
399                    assert "length" in op_info[i]["metrics"]["output_queue"]
400                    assert "throughput" in op_info[i]["metrics"]["output_queue"]
401
402        # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
403        confirm_cpuutil(5)
404
405    except Exception as error:
406        delete_profiling_files()
407        raise error
408
409    else:
410        delete_profiling_files()
411
412
413def test_profiling_seq_pipelines_epochctrl3():
414    """
415    Test with these 2 sequential pipelines:
416    1) Generator -> Batch -> EpochCtrl
417    2) Generator -> Batch
418    Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
419    """
420    set_profiling_env_var()
421
422    source = [(np.array([x]),) for x in range(64)]
423    data1 = ds.GeneratorDataset(source, ["data"])
424    data1 = data1.batch(32)
425
426    try:
427        # Test A - Call create_dict_iterator with num_epochs>1
428        num_iter = 0
429        # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
430        for _ in data1.create_dict_iterator(num_epochs=2):
431            num_iter += 1
432        assert num_iter == 2
433
434        # Confirm pipeline file and CPU util file each have 3 ops
435        confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
436        confirm_cpuutil(3)
437
438        # Test B - Call create_dict_iterator with num_epochs=1
439        num_iter = 0
440        # Note: If create_tuple_iterator() is called with num_epochs=1,
441        # then EpochCtrlOp should not be NOT added to the pipeline
442        for _ in data1.create_dict_iterator(num_epochs=1):
443            num_iter += 1
444        assert num_iter == 2
445
446        # Confirm pipeline file and CPU util file each have 2 ops
447        confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
448        confirm_cpuutil(2)
449
450    except Exception as error:
451        delete_profiling_files()
452        raise error
453
454    else:
455        delete_profiling_files()
456
457
458def test_profiling_seq_pipelines_epochctrl2():
459    """
460    Test with these 2 sequential pipelines:
461    1) Generator -> Batch
462    2) Generator -> Batch -> EpochCtrl
463    """
464    set_profiling_env_var()
465
466    source = [(np.array([x]),) for x in range(64)]
467    data2 = ds.GeneratorDataset(source, ["data"])
468    data2 = data2.batch(16)
469
470    try:
471        # Test A - Call create_dict_iterator with num_epochs=1
472        num_iter = 0
473        # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
474        for _ in data2.create_dict_iterator(num_epochs=1):
475            num_iter += 1
476        assert num_iter == 4
477
478        # Confirm pipeline file and CPU util file each have 2 ops
479        confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
480        confirm_cpuutil(2)
481
482        # Test B - Call create_dict_iterator with num_epochs>1
483        num_iter = 0
484        # Note: If create_tuple_iterator() is called with num_epochs>1,
485        # then EpochCtrlOp should be added to the pipeline
486        for _ in data2.create_dict_iterator(num_epochs=2):
487            num_iter += 1
488        assert num_iter == 4
489
490        # Confirm pipeline file and CPU util file each have 3 ops
491        confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
492        confirm_cpuutil(3)
493
494    except Exception as error:
495        delete_profiling_files()
496        raise error
497
498    else:
499        delete_profiling_files()
500
501
502def test_profiling_seq_pipelines_repeat():
503    """
504    Test with these 2 sequential pipelines:
505    1) Generator -> Batch
506    2) Generator -> Batch -> Repeat
507    """
508    set_profiling_env_var()
509
510    source = [(np.array([x]),) for x in range(64)]
511    data2 = ds.GeneratorDataset(source, ["data"])
512    data2 = data2.batch(16)
513
514    try:
515        # Test A - Call create_dict_iterator with 2 ops in pipeline
516        num_iter = 0
517        for _ in data2.create_dict_iterator(num_epochs=1):
518            num_iter += 1
519        assert num_iter == 4
520
521        # Confirm pipeline file and CPU util file each have 2 ops
522        confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
523        confirm_cpuutil(2)
524
525        # Test B - Add repeat op to pipeline.  Call create_dict_iterator with 3 ops in pipeline
526        data2 = data2.repeat(5)
527        num_iter = 0
528        for _ in data2.create_dict_iterator(num_epochs=1):
529            num_iter += 1
530        assert num_iter == 20
531
532        # Confirm pipeline file and CPU util file each have 3 ops
533        confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"])
534        confirm_cpuutil(3)
535
536    except Exception as error:
537        delete_profiling_files()
538        raise error
539
540    else:
541        delete_profiling_files()
542
543
544if __name__ == "__main__":
545    test_profiling_simple_pipeline()
546    test_profiling_complex_pipeline()
547    test_profiling_inline_ops_pipeline1()
548    test_profiling_inline_ops_pipeline2()
549    test_profiling_sampling_interval()
550    test_profiling_basic_pipeline()
551    test_profiling_cifar10_pipeline()
552    test_profiling_seq_pipelines_epochctrl3()
553    test_profiling_seq_pipelines_epochctrl2()
554    test_profiling_seq_pipelines_repeat()
555