• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Test MindData Profiling Analyzer Support
17"""
18import csv
19import json
20import os
21import numpy as np
22import mindspore.common.dtype as mstype
23import mindspore.dataset as ds
24import mindspore.dataset.transforms.c_transforms as C
25from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer
26
27
28class TestMinddataProfilingAnalyzer():
29    """
30    Test the MinddataProfilingAnalyzer class
31    """
32
33    def setup_class(self):
34        """
35        Run once for the class
36        """
37        # Define filenames and path used for the MinddataProfilingAnalyzer tests. Use device_id=7.
38        self._PIPELINE_FILE = "./pipeline_profiling_7.json"
39        self._CPU_UTIL_FILE = "./minddata_cpu_utilization_7.json"
40        self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_7.txt"
41        self._SUMMARY_JSON_FILE = "./minddata_pipeline_summary_7.json"
42        self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv"
43        self._ANALYZE_FILE_PATH = "./"
44
45        # This is the set of keys for success case
46        self._EXPECTED_SUMMARY_KEYS_SUCCESS = \
47            ['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names',
48             'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops',
49             'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct']
50
51
52    def setup_method(self):
53        """
54        Run before each test function.
55        """
56        # Confirm MindData Profiling files do not yet exist
57        assert os.path.exists(self._PIPELINE_FILE) is False
58        assert os.path.exists(self._CPU_UTIL_FILE) is False
59        assert os.path.exists(self._DATASET_ITERATOR_FILE) is False
60        # Confirm MindData Profiling analyze summary files do not yet exist
61        assert os.path.exists(self._SUMMARY_JSON_FILE) is False
62        assert os.path.exists(self._SUMMARY_CSV_FILE) is False
63
64        # Set the MindData Profiling environment variables
65        os.environ['PROFILING_MODE'] = 'true'
66        os.environ['MINDDATA_PROFILING_DIR'] = '.'
67        os.environ['RANK_ID'] = '7'
68
69
70    def teardown_method(self):
71        """
72        Run after each test function.
73        """
74        # Delete MindData profiling files generated from the test.
75        os.remove(self._PIPELINE_FILE)
76        os.remove(self._CPU_UTIL_FILE)
77        os.remove(self._DATASET_ITERATOR_FILE)
78
79        # Delete MindData profiling analyze summary files generated from the test.
80        os.remove(self._SUMMARY_JSON_FILE)
81        os.remove(self._SUMMARY_CSV_FILE)
82
83        # Disable MindData Profiling environment variables
84        del os.environ['PROFILING_MODE']
85        del os.environ['MINDDATA_PROFILING_DIR']
86        del os.environ['RANK_ID']
87
88
89    def get_csv_result(self, file_pathname):
90        """
91        Get result from the CSV file.
92
93        Args:
94            file_pathname (str): The CSV file pathname.
95
96        Returns:
97            list[list], the parsed CSV information.
98        """
99        result = []
100        with open(file_pathname, 'r') as csvfile:
101            csv_reader = csv.reader(csvfile)
102            for row in csv_reader:
103                result.append(row)
104        return result
105
106
107    def verify_md_summary(self, md_summary_dict, EXPECTED_SUMMARY_KEYS):
108        """
109        Verify the content of the 3 variations of the MindData Profiling analyze summary output.
110        """
111
112        # Confirm MindData Profiling analyze summary files are created
113        assert os.path.exists(self._SUMMARY_JSON_FILE) is True
114        assert os.path.exists(self._SUMMARY_CSV_FILE) is True
115
116        # Build a list of the sorted returned keys
117        summary_returned_keys = list(md_summary_dict.keys())
118        summary_returned_keys.sort()
119
120        # 1. Confirm expected keys are in returned keys
121        for k in EXPECTED_SUMMARY_KEYS:
122            assert k in summary_returned_keys
123
124        # Read summary JSON file
125        with open(self._SUMMARY_JSON_FILE) as f:
126            summary_json_data = json.load(f)
127        # Build a list of the sorted JSON keys
128        summary_json_keys = list(summary_json_data.keys())
129        summary_json_keys.sort()
130
131        # 2a. Confirm expected keys are in JSON file keys
132        for k in EXPECTED_SUMMARY_KEYS:
133            assert k in summary_json_keys
134
135        # 2b. Confirm returned dictionary keys are identical to JSON file keys
136        np.testing.assert_array_equal(summary_returned_keys, summary_json_keys)
137
138        # Read summary CSV file
139        summary_csv_data = self.get_csv_result(self._SUMMARY_CSV_FILE)
140        # Build a list of the sorted CSV keys from the first column in the CSV file
141        summary_csv_keys = []
142        for x in summary_csv_data:
143            summary_csv_keys.append(x[0])
144        summary_csv_keys.sort()
145
146        # 3a. Confirm expected keys are in the first column of the CSV file
147        for k in EXPECTED_SUMMARY_KEYS:
148            assert k in summary_csv_keys
149
150        # 3b. Confirm returned dictionary keys are identical to CSV file first column keys
151        np.testing.assert_array_equal(summary_returned_keys, summary_csv_keys)
152
153
154    def mysource(self):
155        """Source for data values"""
156        for i in range(8000):
157            yield (np.array([i]),)
158
159
160    def test_analyze_basic(self):
161        """
162        Test MindData profiling analyze summary files exist with basic pipeline.
163        Also test basic content (subset of keys and values) from the returned summary result.
164        """
165        # Create this basic and common linear pipeline
166        # Generator -> Map -> Batch -> Repeat -> EpochCtrl
167        data1 = ds.GeneratorDataset(self.mysource, ["col1"])
168        type_cast_op = C.TypeCast(mstype.int32)
169        data1 = data1.map(operations=type_cast_op, input_columns="col1")
170        data1 = data1.batch(16)
171        data1 = data1.repeat(2)
172
173        num_iter = 0
174        # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
175        for _ in data1.create_dict_iterator(num_epochs=2):
176            num_iter = num_iter + 1
177
178        # Confirm number of rows returned
179        assert num_iter == 1000
180
181        # Confirm MindData Profiling files are created
182        assert os.path.exists(self._PIPELINE_FILE) is True
183        assert os.path.exists(self._CPU_UTIL_FILE) is True
184        assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
185
186        # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
187        md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
188        md_summary_dict = md_analyzer.analyze()
189
190        # Verify MindData Profiling Analyze Summary output
191        # Note: MindData Analyzer returns the result in 3 formats:
192        # 1. returned dictionary
193        # 2. JSON file
194        # 3. CSV file
195        self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS)
196
197        # 4. Verify non-variant values or number of values in the tested pipeline for certain keys
198        # of the returned dictionary
199        # Note: Values of num_workers are not tested since default may change in the future
200        # Note: Values related to queue metrics are not tested since they may vary on different execution environments
201        assert md_summary_dict["pipeline_ops"] == ["EpochCtrl(id=0)", "Repeat(id=1)", "Batch(id=2)", "Map(id=3)",
202                                                   "Generator(id=4)"]
203        assert md_summary_dict["op_names"] == ["EpochCtrl", "Repeat", "Batch", "Map", "Generator"]
204        assert md_summary_dict["op_ids"] == [0, 1, 2, 3, 4]
205        assert len(md_summary_dict["num_workers"]) == 5
206        assert len(md_summary_dict["queue_average_size"]) == 5
207        assert len(md_summary_dict["queue_utilization_pct"]) == 5
208        assert len(md_summary_dict["queue_empty_freq_pct"]) == 5
209        assert md_summary_dict["children_ids"] == [[1], [2], [3], [4], []]
210        assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3]
211        assert len(md_summary_dict["avg_cpu_pct"]) == 5
212
213
214    def test_analyze_sequential_pipelines_invalid(self):
215        """
216        Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines.
217        """
218        # Create the pipeline
219        # Generator -> Map -> Batch -> EpochCtrl
220        data1 = ds.GeneratorDataset(self.mysource, ["col1"])
221        type_cast_op = C.TypeCast(mstype.int32)
222        data1 = data1.map(operations=type_cast_op, input_columns="col1")
223        data1 = data1.batch(64)
224
225        # Phase 1 - For the pipeline, call create_tuple_iterator with num_epochs>1
226        # Note: This pipeline has 4 ops: Generator -> Map -> Batch -> EpochCtrl
227        num_iter = 0
228        # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
229        for _ in data1.create_dict_iterator(num_epochs=2):
230            num_iter = num_iter + 1
231
232        # Confirm number of rows returned
233        assert num_iter == 125
234
235        # Confirm MindData Profiling files are created
236        assert os.path.exists(self._PIPELINE_FILE) is True
237        assert os.path.exists(self._CPU_UTIL_FILE) is True
238        assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
239
240        # Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1
241        # Note: This pipeline has 3 ops: Generator -> Map -> Batch
242        num_iter = 0
243        # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
244        for _ in data1.create_dict_iterator(num_epochs=1):
245            num_iter = num_iter + 1
246
247        # Confirm number of rows returned
248        assert num_iter == 125
249
250        # Confirm MindData Profiling files are created
251        # Note: There is an MD bug in which which the pipeline file is not recreated;
252        #       it still has 4 ops instead of 3 ops
253        assert os.path.exists(self._PIPELINE_FILE) is True
254        assert os.path.exists(self._CPU_UTIL_FILE) is True
255        assert os.path.exists(self._DATASET_ITERATOR_FILE) is True
256
257        # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result
258        md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH)
259        md_summary_dict = md_analyzer.analyze()
260
261        # Verify MindData Profiling Analyze Summary output
262        self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS)
263
264        # Confirm pipeline data contains info for 3 ops
265        assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"]
266
267        # Verify CPU util data contains info for 3 ops
268        assert len(md_summary_dict["avg_cpu_pct"]) == 3
269