1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Test MindData Profiling Analyzer Support 17""" 18import csv 19import json 20import os 21import numpy as np 22import mindspore.common.dtype as mstype 23import mindspore.dataset as ds 24import mindspore.dataset.transforms.c_transforms as C 25from mindspore.profiler.parser.minddata_analyzer import MinddataProfilingAnalyzer 26 27 28class TestMinddataProfilingAnalyzer(): 29 """ 30 Test the MinddataProfilingAnalyzer class 31 """ 32 33 def setup_class(self): 34 """ 35 Run once for the class 36 """ 37 # Define filenames and path used for the MinddataProfilingAnalyzer tests. Use device_id=7. 38 self._PIPELINE_FILE = "./pipeline_profiling_7.json" 39 self._CPU_UTIL_FILE = "./minddata_cpu_utilization_7.json" 40 self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_7.txt" 41 self._SUMMARY_JSON_FILE = "./minddata_pipeline_summary_7.json" 42 self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv" 43 self._ANALYZE_FILE_PATH = "./" 44 45 # This is the set of keys for success case 46 self._EXPECTED_SUMMARY_KEYS_SUCCESS = \ 47 ['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names', 48 'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops', 49 'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct'] 50 51 52 def setup_method(self): 53 """ 54 Run before each test function. 55 """ 56 # Confirm MindData Profiling files do not yet exist 57 assert os.path.exists(self._PIPELINE_FILE) is False 58 assert os.path.exists(self._CPU_UTIL_FILE) is False 59 assert os.path.exists(self._DATASET_ITERATOR_FILE) is False 60 # Confirm MindData Profiling analyze summary files do not yet exist 61 assert os.path.exists(self._SUMMARY_JSON_FILE) is False 62 assert os.path.exists(self._SUMMARY_CSV_FILE) is False 63 64 # Set the MindData Profiling environment variables 65 os.environ['PROFILING_MODE'] = 'true' 66 os.environ['MINDDATA_PROFILING_DIR'] = '.' 67 os.environ['RANK_ID'] = '7' 68 69 70 def teardown_method(self): 71 """ 72 Run after each test function. 73 """ 74 # Delete MindData profiling files generated from the test. 75 os.remove(self._PIPELINE_FILE) 76 os.remove(self._CPU_UTIL_FILE) 77 os.remove(self._DATASET_ITERATOR_FILE) 78 79 # Delete MindData profiling analyze summary files generated from the test. 80 os.remove(self._SUMMARY_JSON_FILE) 81 os.remove(self._SUMMARY_CSV_FILE) 82 83 # Disable MindData Profiling environment variables 84 del os.environ['PROFILING_MODE'] 85 del os.environ['MINDDATA_PROFILING_DIR'] 86 del os.environ['RANK_ID'] 87 88 89 def get_csv_result(self, file_pathname): 90 """ 91 Get result from the CSV file. 92 93 Args: 94 file_pathname (str): The CSV file pathname. 95 96 Returns: 97 list[list], the parsed CSV information. 98 """ 99 result = [] 100 with open(file_pathname, 'r') as csvfile: 101 csv_reader = csv.reader(csvfile) 102 for row in csv_reader: 103 result.append(row) 104 return result 105 106 107 def verify_md_summary(self, md_summary_dict, EXPECTED_SUMMARY_KEYS): 108 """ 109 Verify the content of the 3 variations of the MindData Profiling analyze summary output. 110 """ 111 112 # Confirm MindData Profiling analyze summary files are created 113 assert os.path.exists(self._SUMMARY_JSON_FILE) is True 114 assert os.path.exists(self._SUMMARY_CSV_FILE) is True 115 116 # Build a list of the sorted returned keys 117 summary_returned_keys = list(md_summary_dict.keys()) 118 summary_returned_keys.sort() 119 120 # 1. Confirm expected keys are in returned keys 121 for k in EXPECTED_SUMMARY_KEYS: 122 assert k in summary_returned_keys 123 124 # Read summary JSON file 125 with open(self._SUMMARY_JSON_FILE) as f: 126 summary_json_data = json.load(f) 127 # Build a list of the sorted JSON keys 128 summary_json_keys = list(summary_json_data.keys()) 129 summary_json_keys.sort() 130 131 # 2a. Confirm expected keys are in JSON file keys 132 for k in EXPECTED_SUMMARY_KEYS: 133 assert k in summary_json_keys 134 135 # 2b. Confirm returned dictionary keys are identical to JSON file keys 136 np.testing.assert_array_equal(summary_returned_keys, summary_json_keys) 137 138 # Read summary CSV file 139 summary_csv_data = self.get_csv_result(self._SUMMARY_CSV_FILE) 140 # Build a list of the sorted CSV keys from the first column in the CSV file 141 summary_csv_keys = [] 142 for x in summary_csv_data: 143 summary_csv_keys.append(x[0]) 144 summary_csv_keys.sort() 145 146 # 3a. Confirm expected keys are in the first column of the CSV file 147 for k in EXPECTED_SUMMARY_KEYS: 148 assert k in summary_csv_keys 149 150 # 3b. Confirm returned dictionary keys are identical to CSV file first column keys 151 np.testing.assert_array_equal(summary_returned_keys, summary_csv_keys) 152 153 154 def mysource(self): 155 """Source for data values""" 156 for i in range(8000): 157 yield (np.array([i]),) 158 159 160 def test_analyze_basic(self): 161 """ 162 Test MindData profiling analyze summary files exist with basic pipeline. 163 Also test basic content (subset of keys and values) from the returned summary result. 164 """ 165 # Create this basic and common linear pipeline 166 # Generator -> Map -> Batch -> Repeat -> EpochCtrl 167 data1 = ds.GeneratorDataset(self.mysource, ["col1"]) 168 type_cast_op = C.TypeCast(mstype.int32) 169 data1 = data1.map(operations=type_cast_op, input_columns="col1") 170 data1 = data1.batch(16) 171 data1 = data1.repeat(2) 172 173 num_iter = 0 174 # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline 175 for _ in data1.create_dict_iterator(num_epochs=2): 176 num_iter = num_iter + 1 177 178 # Confirm number of rows returned 179 assert num_iter == 1000 180 181 # Confirm MindData Profiling files are created 182 assert os.path.exists(self._PIPELINE_FILE) is True 183 assert os.path.exists(self._CPU_UTIL_FILE) is True 184 assert os.path.exists(self._DATASET_ITERATOR_FILE) is True 185 186 # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result 187 md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH) 188 md_summary_dict = md_analyzer.analyze() 189 190 # Verify MindData Profiling Analyze Summary output 191 # Note: MindData Analyzer returns the result in 3 formats: 192 # 1. returned dictionary 193 # 2. JSON file 194 # 3. CSV file 195 self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS) 196 197 # 4. Verify non-variant values or number of values in the tested pipeline for certain keys 198 # of the returned dictionary 199 # Note: Values of num_workers are not tested since default may change in the future 200 # Note: Values related to queue metrics are not tested since they may vary on different execution environments 201 assert md_summary_dict["pipeline_ops"] == ["EpochCtrl(id=0)", "Repeat(id=1)", "Batch(id=2)", "Map(id=3)", 202 "Generator(id=4)"] 203 assert md_summary_dict["op_names"] == ["EpochCtrl", "Repeat", "Batch", "Map", "Generator"] 204 assert md_summary_dict["op_ids"] == [0, 1, 2, 3, 4] 205 assert len(md_summary_dict["num_workers"]) == 5 206 assert len(md_summary_dict["queue_average_size"]) == 5 207 assert len(md_summary_dict["queue_utilization_pct"]) == 5 208 assert len(md_summary_dict["queue_empty_freq_pct"]) == 5 209 assert md_summary_dict["children_ids"] == [[1], [2], [3], [4], []] 210 assert md_summary_dict["parent_id"] == [-1, 0, 1, 2, 3] 211 assert len(md_summary_dict["avg_cpu_pct"]) == 5 212 213 214 def test_analyze_sequential_pipelines_invalid(self): 215 """ 216 Test invalid scenario in which MinddataProfilingAnalyzer is called for two sequential pipelines. 217 """ 218 # Create the pipeline 219 # Generator -> Map -> Batch -> EpochCtrl 220 data1 = ds.GeneratorDataset(self.mysource, ["col1"]) 221 type_cast_op = C.TypeCast(mstype.int32) 222 data1 = data1.map(operations=type_cast_op, input_columns="col1") 223 data1 = data1.batch(64) 224 225 # Phase 1 - For the pipeline, call create_tuple_iterator with num_epochs>1 226 # Note: This pipeline has 4 ops: Generator -> Map -> Batch -> EpochCtrl 227 num_iter = 0 228 # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline 229 for _ in data1.create_dict_iterator(num_epochs=2): 230 num_iter = num_iter + 1 231 232 # Confirm number of rows returned 233 assert num_iter == 125 234 235 # Confirm MindData Profiling files are created 236 assert os.path.exists(self._PIPELINE_FILE) is True 237 assert os.path.exists(self._CPU_UTIL_FILE) is True 238 assert os.path.exists(self._DATASET_ITERATOR_FILE) is True 239 240 # Phase 2 - For the pipeline, call create_tuple_iterator with num_epochs=1 241 # Note: This pipeline has 3 ops: Generator -> Map -> Batch 242 num_iter = 0 243 # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline 244 for _ in data1.create_dict_iterator(num_epochs=1): 245 num_iter = num_iter + 1 246 247 # Confirm number of rows returned 248 assert num_iter == 125 249 250 # Confirm MindData Profiling files are created 251 # Note: There is an MD bug in which which the pipeline file is not recreated; 252 # it still has 4 ops instead of 3 ops 253 assert os.path.exists(self._PIPELINE_FILE) is True 254 assert os.path.exists(self._CPU_UTIL_FILE) is True 255 assert os.path.exists(self._DATASET_ITERATOR_FILE) is True 256 257 # Call MindData Analyzer for generated MindData profiling files to generate MindData pipeline summary result 258 md_analyzer = MinddataProfilingAnalyzer(self._ANALYZE_FILE_PATH, 7, self._ANALYZE_FILE_PATH) 259 md_summary_dict = md_analyzer.analyze() 260 261 # Verify MindData Profiling Analyze Summary output 262 self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS) 263 264 # Confirm pipeline data contains info for 3 ops 265 assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"] 266 267 # Verify CPU util data contains info for 3 ops 268 assert len(md_summary_dict["avg_cpu_pct"]) == 3 269