1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Testing profiling support in DE 17""" 18import json 19import os 20import numpy as np 21import mindspore.common.dtype as mstype 22import mindspore.dataset as ds 23import mindspore.dataset.transforms.c_transforms as C 24import mindspore.dataset.vision.c_transforms as vision 25 26FILES = ["../data/dataset/testTFTestAllTypes/test.data"] 27DATASET_ROOT = "../data/dataset/testTFTestAllTypes/" 28SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json" 29 30PIPELINE_FILE = "./pipeline_profiling_1.json" 31CPU_UTIL_FILE = "./minddata_cpu_utilization_1.json" 32DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt" 33 34 35def set_profiling_env_var(): 36 """ 37 Set the MindData Profiling environment variables 38 """ 39 os.environ['PROFILING_MODE'] = 'true' 40 os.environ['MINDDATA_PROFILING_DIR'] = '.' 41 os.environ['DEVICE_ID'] = '1' 42 os.environ['RANK_ID'] = '1' 43 44 45def delete_profiling_files(): 46 """ 47 Delete the MindData profiling files generated from the test. 48 Also disable the MindData Profiling environment variables. 49 """ 50 # Delete MindData profiling files 51 os.remove(PIPELINE_FILE) 52 os.remove(CPU_UTIL_FILE) 53 os.remove(DATASET_ITERATOR_FILE) 54 55 # Disable MindData Profiling environment variables 56 del os.environ['PROFILING_MODE'] 57 del os.environ['MINDDATA_PROFILING_DIR'] 58 del os.environ['DEVICE_ID'] 59 del os.environ['RANK_ID'] 60 61 62def confirm_cpuutil(num_pipeline_ops): 63 """ 64 Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline 65 """ 66 with open(CPU_UTIL_FILE) as file1: 67 data = json.load(file1) 68 op_info = data["op_info"] 69 # Confirm <num_pipeline_ops>+1 ops in CPU util file (including op_id=-1 for monitor thread) 70 assert len(op_info) == num_pipeline_ops + 1 71 72 73def confirm_ops_in_pipeline(num_ops, op_list): 74 """ 75 Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops 76 """ 77 with open(PIPELINE_FILE) as file1: 78 data = json.load(file1) 79 op_info = data["op_info"] 80 # Confirm ops in pipeline file 81 assert len(op_info) == num_ops 82 for i in range(num_ops): 83 assert op_info[i]["op_type"] in op_list 84 85 86def test_profiling_simple_pipeline(): 87 """ 88 Generator -> Shuffle -> Batch 89 """ 90 set_profiling_env_var() 91 92 source = [(np.array([x]),) for x in range(1024)] 93 data1 = ds.GeneratorDataset(source, ["data"]) 94 data1 = data1.shuffle(64) 95 data1 = data1.batch(32) 96 # try output shape type and dataset size and make sure no profiling file is generated 97 assert data1.output_shapes() == [[32, 1]] 98 assert [str(tp) for tp in data1.output_types()] == ["int64"] 99 assert data1.get_dataset_size() == 32 100 101 # Confirm profiling files do not (yet) exist 102 assert os.path.exists(PIPELINE_FILE) is False 103 assert os.path.exists(CPU_UTIL_FILE) is False 104 assert os.path.exists(DATASET_ITERATOR_FILE) is False 105 106 try: 107 for _ in data1: 108 pass 109 110 # Confirm profiling files now exist 111 assert os.path.exists(PIPELINE_FILE) is True 112 assert os.path.exists(CPU_UTIL_FILE) is True 113 assert os.path.exists(DATASET_ITERATOR_FILE) is True 114 115 except Exception as error: 116 delete_profiling_files() 117 raise error 118 119 else: 120 delete_profiling_files() 121 122 123def test_profiling_complex_pipeline(): 124 """ 125 Generator -> Map -> 126 -> Zip 127 TFReader -> Shuffle -> 128 """ 129 set_profiling_env_var() 130 131 source = [(np.array([x]),) for x in range(1024)] 132 data1 = ds.GeneratorDataset(source, ["gen"]) 133 data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"]) 134 135 pattern = DATASET_ROOT + "/test.data" 136 data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES) 137 data2 = data2.shuffle(4) 138 139 data3 = ds.zip((data1, data2)) 140 141 try: 142 for _ in data3: 143 pass 144 145 with open(PIPELINE_FILE) as f: 146 data = json.load(f) 147 op_info = data["op_info"] 148 assert len(op_info) == 5 149 for i in range(5): 150 if op_info[i]["op_type"] != "ZipOp": 151 assert "size" in op_info[i]["metrics"]["output_queue"] 152 assert "length" in op_info[i]["metrics"]["output_queue"] 153 assert "throughput" in op_info[i]["metrics"]["output_queue"] 154 else: 155 # Note: Zip is an inline op and hence does not have metrics information 156 assert op_info[i]["metrics"] is None 157 158 # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file 159 confirm_cpuutil(5) 160 161 except Exception as error: 162 delete_profiling_files() 163 raise error 164 165 else: 166 delete_profiling_files() 167 168 169def test_profiling_inline_ops_pipeline1(): 170 """ 171 Test pipeline with inline ops: Concat and EpochCtrl 172 Generator -> 173 Concat -> EpochCtrl 174 Generator -> 175 """ 176 set_profiling_env_var() 177 178 # In source1 dataset: Number of rows is 3; its values are 0, 1, 2 179 def source1(): 180 for i in range(3): 181 yield (np.array([i]),) 182 183 # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9 184 def source2(): 185 for i in range(3, 10): 186 yield (np.array([i]),) 187 188 data1 = ds.GeneratorDataset(source1, ["col1"]) 189 data2 = ds.GeneratorDataset(source2, ["col1"]) 190 data3 = data1.concat(data2) 191 192 try: 193 num_iter = 0 194 # Note: Do not explicitly set num_epochs argument in create_tuple_iterator() call 195 # Here i refers to index, d refers to data element 196 for i, d in enumerate(data3.create_tuple_iterator(output_numpy=True)): 197 num_iter += 1 198 t = d 199 assert i == t[0][0] 200 201 assert num_iter == 10 202 203 # Confirm pipeline is created with EpochCtrl op 204 with open(PIPELINE_FILE) as f: 205 data = json.load(f) 206 op_info = data["op_info"] 207 assert len(op_info) == 4 208 for i in range(4): 209 # Note: The following ops are inline ops: Concat, EpochCtrl 210 if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"): 211 # Confirm these inline ops do not have metrics information 212 assert op_info[i]["metrics"] is None 213 else: 214 assert "size" in op_info[i]["metrics"]["output_queue"] 215 assert "length" in op_info[i]["metrics"]["output_queue"] 216 assert "throughput" in op_info[i]["metrics"]["output_queue"] 217 218 # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file 219 confirm_cpuutil(4) 220 221 except Exception as error: 222 delete_profiling_files() 223 raise error 224 225 else: 226 delete_profiling_files() 227 228 229def test_profiling_inline_ops_pipeline2(): 230 """ 231 Test pipeline with many inline ops 232 Generator -> Rename -> Skip -> Repeat -> Take 233 """ 234 set_profiling_env_var() 235 236 # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9 237 def source1(): 238 for i in range(10): 239 yield (np.array([i]),) 240 241 data1 = ds.GeneratorDataset(source1, ["col1"]) 242 data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"]) 243 data1 = data1.skip(2) 244 data1 = data1.repeat(2) 245 data1 = data1.take(12) 246 247 try: 248 for _ in data1: 249 pass 250 251 with open(PIPELINE_FILE) as f: 252 data = json.load(f) 253 op_info = data["op_info"] 254 assert len(op_info) == 5 255 for i in range(5): 256 # Check for these inline ops 257 if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"): 258 # Confirm these inline ops do not have metrics information 259 assert op_info[i]["metrics"] is None 260 else: 261 assert "size" in op_info[i]["metrics"]["output_queue"] 262 assert "length" in op_info[i]["metrics"]["output_queue"] 263 assert "throughput" in op_info[i]["metrics"]["output_queue"] 264 265 # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file 266 confirm_cpuutil(5) 267 268 except Exception as error: 269 delete_profiling_files() 270 raise error 271 272 else: 273 delete_profiling_files() 274 275 276def test_profiling_sampling_interval(): 277 """ 278 Test non-default monitor sampling interval 279 """ 280 set_profiling_env_var() 281 282 interval_origin = ds.config.get_monitor_sampling_interval() 283 284 ds.config.set_monitor_sampling_interval(30) 285 interval = ds.config.get_monitor_sampling_interval() 286 assert interval == 30 287 288 source = [(np.array([x]),) for x in range(1024)] 289 data1 = ds.GeneratorDataset(source, ["data"]) 290 data1 = data1.shuffle(64) 291 data1 = data1.batch(32) 292 293 try: 294 for _ in data1: 295 pass 296 297 except Exception as error: 298 ds.config.set_monitor_sampling_interval(interval_origin) 299 delete_profiling_files() 300 raise error 301 302 else: 303 ds.config.set_monitor_sampling_interval(interval_origin) 304 delete_profiling_files() 305 306 307def test_profiling_basic_pipeline(): 308 """ 309 Test with this basic pipeline 310 Generator -> Map -> Batch -> Repeat -> EpochCtrl 311 """ 312 set_profiling_env_var() 313 314 def source1(): 315 for i in range(8000): 316 yield (np.array([i]),) 317 318 # Create this basic and common pipeline 319 # Leaf/Source-Op -> Map -> Batch -> Repeat 320 data1 = ds.GeneratorDataset(source1, ["col1"]) 321 322 type_cast_op = C.TypeCast(mstype.int32) 323 data1 = data1.map(operations=type_cast_op, input_columns="col1") 324 data1 = data1.batch(16) 325 data1 = data1.repeat(2) 326 327 try: 328 num_iter = 0 329 # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline 330 for _ in data1.create_dict_iterator(num_epochs=2): 331 num_iter += 1 332 333 assert num_iter == 1000 334 335 with open(PIPELINE_FILE) as f: 336 data = json.load(f) 337 op_info = data["op_info"] 338 assert len(op_info) == 5 339 for i in range(5): 340 # Check for inline ops 341 if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"): 342 # Confirm these inline ops do not have metrics information 343 assert op_info[i]["metrics"] is None 344 else: 345 assert "size" in op_info[i]["metrics"]["output_queue"] 346 assert "length" in op_info[i]["metrics"]["output_queue"] 347 assert "throughput" in op_info[i]["metrics"]["output_queue"] 348 349 # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file 350 confirm_cpuutil(5) 351 352 except Exception as error: 353 delete_profiling_files() 354 raise error 355 356 else: 357 delete_profiling_files() 358 359 360def test_profiling_cifar10_pipeline(): 361 """ 362 Test with this common pipeline with Cifar10 363 Cifar10 -> Map -> Map -> Batch -> Repeat 364 """ 365 set_profiling_env_var() 366 367 # Create this common pipeline 368 # Cifar10 -> Map -> Map -> Batch -> Repeat 369 DATA_DIR_10 = "../data/dataset/testCifar10Data" 370 data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000) 371 372 type_cast_op = C.TypeCast(mstype.int32) 373 data1 = data1.map(operations=type_cast_op, input_columns="label") 374 random_horizontal_op = vision.RandomHorizontalFlip() 375 data1 = data1.map(operations=random_horizontal_op, input_columns="image") 376 377 data1 = data1.batch(32) 378 data1 = data1.repeat(3) 379 380 try: 381 num_iter = 0 382 # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline 383 for _ in data1.create_dict_iterator(num_epochs=1): 384 num_iter += 1 385 386 assert num_iter == 750 387 388 with open(PIPELINE_FILE) as f: 389 data = json.load(f) 390 op_info = data["op_info"] 391 assert len(op_info) == 5 392 for i in range(5): 393 # Check for inline ops 394 if op_info[i]["op_type"] == "RepeatOp": 395 # Confirm these inline ops do not have metrics information 396 assert op_info[i]["metrics"] is None 397 else: 398 assert "size" in op_info[i]["metrics"]["output_queue"] 399 assert "length" in op_info[i]["metrics"]["output_queue"] 400 assert "throughput" in op_info[i]["metrics"]["output_queue"] 401 402 # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file 403 confirm_cpuutil(5) 404 405 except Exception as error: 406 delete_profiling_files() 407 raise error 408 409 else: 410 delete_profiling_files() 411 412 413def test_profiling_seq_pipelines_epochctrl3(): 414 """ 415 Test with these 2 sequential pipelines: 416 1) Generator -> Batch -> EpochCtrl 417 2) Generator -> Batch 418 Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation. 419 """ 420 set_profiling_env_var() 421 422 source = [(np.array([x]),) for x in range(64)] 423 data1 = ds.GeneratorDataset(source, ["data"]) 424 data1 = data1.batch(32) 425 426 try: 427 # Test A - Call create_dict_iterator with num_epochs>1 428 num_iter = 0 429 # Note: If create_tuple_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline 430 for _ in data1.create_dict_iterator(num_epochs=2): 431 num_iter += 1 432 assert num_iter == 2 433 434 # Confirm pipeline file and CPU util file each have 3 ops 435 confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) 436 confirm_cpuutil(3) 437 438 # Test B - Call create_dict_iterator with num_epochs=1 439 num_iter = 0 440 # Note: If create_tuple_iterator() is called with num_epochs=1, 441 # then EpochCtrlOp should not be NOT added to the pipeline 442 for _ in data1.create_dict_iterator(num_epochs=1): 443 num_iter += 1 444 assert num_iter == 2 445 446 # Confirm pipeline file and CPU util file each have 2 ops 447 confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) 448 confirm_cpuutil(2) 449 450 except Exception as error: 451 delete_profiling_files() 452 raise error 453 454 else: 455 delete_profiling_files() 456 457 458def test_profiling_seq_pipelines_epochctrl2(): 459 """ 460 Test with these 2 sequential pipelines: 461 1) Generator -> Batch 462 2) Generator -> Batch -> EpochCtrl 463 """ 464 set_profiling_env_var() 465 466 source = [(np.array([x]),) for x in range(64)] 467 data2 = ds.GeneratorDataset(source, ["data"]) 468 data2 = data2.batch(16) 469 470 try: 471 # Test A - Call create_dict_iterator with num_epochs=1 472 num_iter = 0 473 # Note: If create_tuple_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline 474 for _ in data2.create_dict_iterator(num_epochs=1): 475 num_iter += 1 476 assert num_iter == 4 477 478 # Confirm pipeline file and CPU util file each have 2 ops 479 confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) 480 confirm_cpuutil(2) 481 482 # Test B - Call create_dict_iterator with num_epochs>1 483 num_iter = 0 484 # Note: If create_tuple_iterator() is called with num_epochs>1, 485 # then EpochCtrlOp should be added to the pipeline 486 for _ in data2.create_dict_iterator(num_epochs=2): 487 num_iter += 1 488 assert num_iter == 4 489 490 # Confirm pipeline file and CPU util file each have 3 ops 491 confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) 492 confirm_cpuutil(3) 493 494 except Exception as error: 495 delete_profiling_files() 496 raise error 497 498 else: 499 delete_profiling_files() 500 501 502def test_profiling_seq_pipelines_repeat(): 503 """ 504 Test with these 2 sequential pipelines: 505 1) Generator -> Batch 506 2) Generator -> Batch -> Repeat 507 """ 508 set_profiling_env_var() 509 510 source = [(np.array([x]),) for x in range(64)] 511 data2 = ds.GeneratorDataset(source, ["data"]) 512 data2 = data2.batch(16) 513 514 try: 515 # Test A - Call create_dict_iterator with 2 ops in pipeline 516 num_iter = 0 517 for _ in data2.create_dict_iterator(num_epochs=1): 518 num_iter += 1 519 assert num_iter == 4 520 521 # Confirm pipeline file and CPU util file each have 2 ops 522 confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) 523 confirm_cpuutil(2) 524 525 # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline 526 data2 = data2.repeat(5) 527 num_iter = 0 528 for _ in data2.create_dict_iterator(num_epochs=1): 529 num_iter += 1 530 assert num_iter == 20 531 532 # Confirm pipeline file and CPU util file each have 3 ops 533 confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"]) 534 confirm_cpuutil(3) 535 536 except Exception as error: 537 delete_profiling_files() 538 raise error 539 540 else: 541 delete_profiling_files() 542 543 544if __name__ == "__main__": 545 test_profiling_simple_pipeline() 546 test_profiling_complex_pipeline() 547 test_profiling_inline_ops_pipeline1() 548 test_profiling_inline_ops_pipeline2() 549 test_profiling_sampling_interval() 550 test_profiling_basic_pipeline() 551 test_profiling_cifar10_pipeline() 552 test_profiling_seq_pipelines_epochctrl3() 553 test_profiling_seq_pipelines_epochctrl2() 554 test_profiling_seq_pipelines_repeat() 555