1# Copyright 2020-2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Testing cache operator with non-mappable datasets 17""" 18import os 19import itertools 20import numpy as np 21import pytest 22import mindspore.common.dtype as mstype 23import mindspore.dataset as ds 24import mindspore.dataset.text as text 25import mindspore.dataset.vision.c_transforms as c_vision 26import mindspore.dataset.vision.py_transforms as py_vision 27from mindspore import log as logger 28 29DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"] 30SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json" 31 32TEXT_TF_DATA_DIR = ["../data/dataset/testTextTFRecord/text.tfrecord"] 33SCHEMA_DIR2 = "../data/dataset/testTextTFRecord/datasetSchema.json" 34 35TRAIN_DATA_DIR = ["../data/dataset/test_tf_file_3_images2/train-0000-of-0001.data", 36 "../data/dataset/test_tf_file_3_images2/train-0000-of-0002.data", 37 "../data/dataset/test_tf_file_3_images2/train-0000-of-0003.data", 38 "../data/dataset/test_tf_file_3_images2/train-0000-of-0004.data"] 39TRAIN_SCHEMA_DIR = "../data/dataset/test_tf_file_3_images2/datasetSchema.json" 40 41IMAGE_FOLDER_DATA_DIR = "../data/dataset/testImageNetData/train/" 42CLUE_DATA_DIR = '../data/dataset/testCLUE/afqmc/train.json' 43CSV_DATA_DIR = '../data/dataset/testCSV/1.csv' 44TEXT_FILE_DATA_DIR = "../data/dataset/testTextFileDataset/1.txt" 45 46PYFUNC_DATA_DIR = ["../data/dataset/testPyfuncMap/data.data"] 47PYFUNC_SCHEMA_DIR = "../data/dataset/testPyfuncMap/schema.json" 48 49GENERATE_GOLDEN = False 50 51 52@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 53def test_cache_nomap_basic1(): 54 """ 55 A random dataset (a non mappable dataset) with a cache over it just after the leaf 56 """ 57 58 logger.info("Test cache nomap basic 1") 59 if "SESSION_ID" in os.environ: 60 session_id = int(os.environ['SESSION_ID']) 61 else: 62 raise RuntimeError("Testcase requires SESSION_ID environment variable") 63 64 schema = ds.Schema() 65 schema.add_column('image', de_type=mstype.uint8, 66 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 67 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 68 69 # create a cache. arbitrary session_id for now 70 some_cache = ds.DatasetCache(session_id=session_id, size=0) 71 72 # User-created sampler here 73 ds1 = ds.RandomDataset(schema=schema, total_rows=10, num_parallel_workers=4, cache=some_cache) 74 ds1 = ds1.repeat(4) 75 76 num_iter = 0 77 for data in ds1.create_dict_iterator(num_epochs=1): 78 logger.info("printing the label: {}".format(data["label"])) 79 num_iter += 1 80 81 logger.info("Number of data in ds1: {} ".format(num_iter)) 82 assert num_iter == 40 83 logger.info("test_cache_nomap_basic1 Ended.\n") 84 85 86@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 87def test_cache_nomap_basic2(): 88 """ 89 A random dataset (a non mappable dataset) with a cache over it just after the leaf 90 """ 91 92 logger.info("Test cache nomap basic 2") 93 if "SESSION_ID" in os.environ: 94 session_id = int(os.environ['SESSION_ID']) 95 else: 96 raise RuntimeError("Testcase requires SESSION_ID environment variable") 97 98 schema = ds.Schema() 99 schema.add_column('image', de_type=mstype.uint8, 100 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 101 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 102 103 # create a cache. arbitrary session_id for now 104 some_cache = ds.DatasetCache(session_id=session_id, size=0) 105 106 # sampler arg not given directly, however any of these args will auto-generate an appropriate sampler: 107 # num_samples, shuffle, num_shards, shard_id 108 # In this case, the presence of num_samples chooses a sampler. 109 ds1 = ds.RandomDataset(schema=schema, total_rows=20, num_samples=20, num_parallel_workers=4, cache=some_cache) 110 ds1 = ds1.repeat(2) 111 112 num_iter = 0 113 for data in ds1.create_dict_iterator(num_epochs=1): 114 logger.info("printing the label: {}".format(data["label"])) 115 num_iter += 1 116 117 logger.info("Number of data in ds1: {} ".format(num_iter)) 118 assert num_iter == 40 119 logger.info("test_cache_nomap_basic2 Ended.\n") 120 121 122@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 123def test_cache_nomap_basic3(): 124 """ 125 A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf 126 127 Repeat 128 | 129 Map(decode) 130 | 131 Cache 132 | 133 TFReader 134 """ 135 136 logger.info("Test cache nomap basic 3") 137 if "SESSION_ID" in os.environ: 138 session_id = int(os.environ['SESSION_ID']) 139 else: 140 raise RuntimeError("Testcase requires SESSION_ID environment variable") 141 142 some_cache = ds.DatasetCache(session_id=session_id, size=0) 143 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache) 144 decode_op = c_vision.Decode() 145 ds1 = ds1.map(operations=decode_op, input_columns=["image"]) 146 ds1 = ds1.repeat(4) 147 148 num_iter = 0 149 for _ in ds1.create_dict_iterator(num_epochs=1): 150 num_iter += 1 151 152 logger.info("Number of data in ds1: {} ".format(num_iter)) 153 assert num_iter == 12 154 155 # Contact the server to get the statistics 156 stat = some_cache.get_stat() 157 cache_sz = stat.avg_cache_sz 158 num_mem_cached = stat.num_mem_cached 159 num_disk_cached = stat.num_disk_cached 160 161 logger.info("Number of rows cached in memory: {}".format(num_mem_cached)) 162 logger.info("Number of rows spilled to disk: {}".format(num_disk_cached)) 163 logger.info("Average row cache size: {}".format(cache_sz)) 164 165 logger.info("test_cache_nomap_basic3 Ended.\n") 166 167 168@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 169def test_cache_nomap_basic4(): 170 """ 171 A TF reader dataset (a non mappable dataset) with a map decode and cache after it 172 Since a global shuffle is used for the tf reader, it will inject a shuffle op over the tf. 173 But, if there's a cache later, that shuffle becomes invalid and should be removed. 174 175 Repeat 176 | 177 Cache 178 | 179 Map(decode) 180 | 181 TFReader 182 """ 183 184 logger.info("Test cache nomap basic 4") 185 if "SESSION_ID" in os.environ: 186 session_id = int(os.environ['SESSION_ID']) 187 else: 188 raise RuntimeError("Testcase requires SESSION_ID environment variable") 189 190 # This dataset has 3 records in it only 191 some_cache = ds.DatasetCache(session_id=session_id, size=0) 192 # With shuffle not being set, TF defaults to a "global" shuffle when there is no cache 193 # in the picture. This causes a shuffle-injection over the TF. For clarify, this test will 194 # explicitly give the global option, even though it's the default in python. 195 # But, when caching is added in the ascendent tree above TF, we do global shuffling 196 # through the sampler over the cache, not by the shuffle op. In that case, tree prepare 197 # will remove the shuffle op that got injected by the initial tree creation. 198 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=ds.Shuffle.GLOBAL) 199 decode_op = c_vision.Decode() 200 201 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 202 ds1 = ds1.repeat(4) 203 204 num_iter = 0 205 for _ in ds1.create_dict_iterator(num_epochs=1): 206 num_iter += 1 207 208 logger.info("Number of data in ds1: {} ".format(num_iter)) 209 assert num_iter == 12 210 logger.info("test_cache_nomap_basic4 Ended.\n") 211 212 213@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 214def test_cache_nomap_basic5(): 215 """ 216 A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf 217 Same as test 3, but this one does not have shuffle arg, causing tf to default to global 218 shuffle which attempts to inject a shuffle operator. However, since there is a cache 219 we do not need global shuffle, so the shuffle will not be built. It ends up being 220 identical to test basic 3, however we arrive at the same tree in different codepaths 221 (if there was no cache, then the shuffle IS built) 222 223 Repeat 224 | 225 Map(decode) 226 | 227 Cache 228 | 229 TFReader 230 """ 231 232 logger.info("Test cache nomap basic 5") 233 if "SESSION_ID" in os.environ: 234 session_id = int(os.environ['SESSION_ID']) 235 else: 236 raise RuntimeError("Testcase requires SESSION_ID environment variable") 237 238 # This dataset has 3 records in it only 239 some_cache = ds.DatasetCache(session_id=session_id, size=0) 240 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], cache=some_cache) 241 decode_op = c_vision.Decode() 242 ds1 = ds1.map(operations=decode_op, input_columns=["image"]) 243 ds1 = ds1.repeat(4) 244 245 num_iter = 0 246 for _ in ds1.create_dict_iterator(num_epochs=1): 247 num_iter += 1 248 249 logger.info("Number of data in ds1: {} ".format(num_iter)) 250 assert num_iter == 12 251 logger.info("test_cache_nomap_basic5 Ended.\n") 252 253 254@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 255def test_cache_nomap_basic6(): 256 """ 257 A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf 258 In this one, the tf dataset will be given sharding configuration, however since a cache is 259 used, the tree prepare should undo the sharding configuration and instead, a distributed 260 sampler will be chosen with the same shard config. 261 262 Repeat 263 | 264 Map(decode) 265 | 266 Cache 267 | 268 TFReader 269 """ 270 271 logger.info("Test cache nomap basic 6") 272 if "SESSION_ID" in os.environ: 273 session_id = int(os.environ['SESSION_ID']) 274 else: 275 raise RuntimeError("Testcase requires SESSION_ID environment variable") 276 277 # This dataset has 3 records in it only 278 some_cache = ds.DatasetCache(session_id=session_id, size=0) 279 280 # With only 3 records shard into 3, we expect only 1 record returned for this shard 281 # However, the sharding will be done by the sampler, not by the tf record leaf node 282 # In this case, it is a row-based sharding, not the file-based sharding that would happen if 283 # there was not any cache. 284 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], num_shards=3, shard_id=1, cache=some_cache) 285 decode_op = c_vision.Decode() 286 ds1 = ds1.map(operations=decode_op, input_columns=["image"]) 287 ds1 = ds1.repeat(4) 288 289 num_iter = 0 290 for _ in ds1.create_dict_iterator(num_epochs=1): 291 num_iter += 1 292 293 logger.info("Number of data in ds1: {} ".format(num_iter)) 294 assert num_iter == 4 295 logger.info("test_cache_nomap_basic6 Ended.\n") 296 297 298@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 299def test_cache_nomap_basic7(): 300 """ 301 A TF reader dataset (a non mappable dataset) that uses global shuffle, and is cached followed by 302 map. 303 In this one, the tf dataset with global shuffle might want to inject a shuffle op over top of the 304 tf reader, but since a cache is given, it will choose not to. 305 306 Repeat 307 | 308 Map(decode) 309 | 310 cache 311 | 312 TFReader 313 """ 314 315 logger.info("Test cache nomap basic 7") 316 if "SESSION_ID" in os.environ: 317 session_id = int(os.environ['SESSION_ID']) 318 else: 319 raise RuntimeError("Testcase requires SESSION_ID environment variable") 320 321 some_cache = ds.DatasetCache(session_id=session_id, size=0) 322 323 # This dataset has 3 records in it only 324 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=ds.Shuffle.GLOBAL, cache=some_cache) 325 decode_op = c_vision.Decode() 326 ds1 = ds1.map(operations=decode_op, input_columns=["image"]) 327 ds1 = ds1.repeat(4) 328 329 num_iter = 0 330 for _ in ds1.create_dict_iterator(num_epochs=1): 331 num_iter += 1 332 333 logger.info("Number of data in ds1: {} ".format(num_iter)) 334 assert num_iter == 12 335 logger.info("test_cache_nomap_basic7 Ended.\n") 336 337 338@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 339def test_cache_nomap_basic8(): 340 """ 341 Test cache as root node 342 343 cache 344 | 345 TFReader 346 """ 347 logger.info("Test cache basic 8") 348 if "SESSION_ID" in os.environ: 349 session_id = int(os.environ['SESSION_ID']) 350 else: 351 raise RuntimeError("Testcase requires SESSION_ID environment variable") 352 some_cache = ds.DatasetCache(session_id=session_id, size=0) 353 354 # This dataset has 3 records in it only 355 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 356 num_iter = 0 357 for _ in ds1.create_dict_iterator(num_epochs=1): 358 logger.info("get data from dataset") 359 num_iter += 1 360 361 logger.info("Number of data in ds1: {} ".format(num_iter)) 362 assert num_iter == 3 363 logger.info('test_cache_basic8 Ended.\n') 364 365 366@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 367def test_cache_nomap_basic9(): 368 """ 369 Testing the get_stat interface for getting some info from server, but this should fail if the cache is not created 370 in a pipeline. 371 """ 372 373 logger.info("Test cache nomap basic 9") 374 if "SESSION_ID" in os.environ: 375 session_id = int(os.environ['SESSION_ID']) 376 else: 377 raise RuntimeError("Testcase requires SESSION_ID environment variable") 378 379 some_cache = ds.DatasetCache(session_id=session_id, size=0) 380 381 # Contact the server to get the statistics, this should fail because we have not used this cache in any pipeline 382 # so there will not be any cache to get stats on. 383 with pytest.raises(RuntimeError) as e: 384 stat = some_cache.get_stat() 385 cache_sz = stat.avg_cache_sz 386 logger.info("Average row cache size: {}".format(cache_sz)) 387 assert "Unexpected error" in str(e.value) 388 389 logger.info("test_cache_nomap_basic9 Ended.\n") 390 391 392@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 393def test_cache_nomap_allowed_share1(): 394 """ 395 It is allowed to share the cache between the following two trees: 396 397 Repeat Shuffle 398 | | 399 Cache Cache 400 | | 401 TFReader TFReader 402 """ 403 404 logger.info("Test cache nomap allowed share 1") 405 if "SESSION_ID" in os.environ: 406 session_id = int(os.environ['SESSION_ID']) 407 else: 408 raise RuntimeError("Testcase requires SESSION_ID environment variable") 409 410 ds.config.set_seed(1) 411 # This dataset has 3 records in it only 412 some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=32) 413 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache) 414 ds1 = ds1.repeat(4) 415 416 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache) 417 ds2 = ds2.shuffle(buffer_size=2) 418 419 num_iter = 0 420 for _ in ds1.create_dict_iterator(num_epochs=1): 421 num_iter += 1 422 assert num_iter == 12 423 logger.info("Number of data in ds1: {} ".format(num_iter)) 424 425 num_iter = 0 426 for _ in ds2.create_dict_iterator(num_epochs=1): 427 num_iter += 1 428 assert num_iter == 3 429 logger.info("test_cache_nomap_allowed_share1 Ended.\n") 430 431 432@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 433def test_cache_nomap_allowed_share2(): 434 """ 435 It is allowed to share the cache between the following two trees (with map decode): 436 437 Repeat Shuffle 438 | | 439 Cache Cache 440 | | 441 Map(decode) Map(decode) 442 | | 443 TFReader TFReader 444 """ 445 446 logger.info("Test cache nomap allowed share 2") 447 if "SESSION_ID" in os.environ: 448 session_id = int(os.environ['SESSION_ID']) 449 else: 450 raise RuntimeError("Testcase requires SESSION_ID environment variable") 451 452 ds.config.set_seed(1) 453 # This dataset has 3 records in it only 454 some_cache = ds.DatasetCache(session_id=session_id, size=0) 455 decode_op = c_vision.Decode() 456 457 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 458 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 459 ds1 = ds1.repeat(4) 460 461 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 462 ds2 = ds2.map(operations=decode_op, input_columns=["image"], cache=some_cache) 463 ds2 = ds2.shuffle(buffer_size=2) 464 465 num_iter = 0 466 for _ in ds1.create_dict_iterator(num_epochs=1): 467 num_iter += 1 468 logger.info("Number of data in ds1: {} ".format(num_iter)) 469 assert num_iter == 12 470 471 num_iter = 0 472 for _ in ds2.create_dict_iterator(num_epochs=1): 473 num_iter += 1 474 assert num_iter == 3 475 logger.info("test_cache_nomap_allowed_share2 Ended.\n") 476 477 478@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 479def test_cache_nomap_allowed_share3(): 480 """ 481 It is allowed to share the cache between the following two trees (different shard ids): 482 483 Repeat Repeat 484 | | 485 Cache Cache 486 | | 487 TFReader(shard_id = 0) TFReader(shard_id = 1) 488 """ 489 490 logger.info("Test cache nomap allowed share 3") 491 if "SESSION_ID" in os.environ: 492 session_id = int(os.environ['SESSION_ID']) 493 else: 494 raise RuntimeError("Testcase requires SESSION_ID environment variable") 495 496 some_cache = ds.DatasetCache(session_id=session_id, size=0) 497 498 tf_files = ["../data/dataset/tf_file_dataset/test1.data", "../data/dataset/tf_file_dataset/test2.data"] 499 ds1 = ds.TFRecordDataset(tf_files, num_shards=2, shard_id=0, num_samples=3, shuffle=False, cache=some_cache) 500 ds1 = ds1.repeat(4) 501 502 ds2 = ds.TFRecordDataset(tf_files, num_shards=2, shard_id=1, num_samples=3, shuffle=False, cache=some_cache) 503 ds2 = ds2.repeat(4) 504 505 num_iter = 0 506 for _ in ds1.create_dict_iterator(num_epochs=1): 507 num_iter += 1 508 logger.info("Number of data in ds1: {} ".format(num_iter)) 509 assert num_iter == 12 510 511 num_iter = 0 512 for _ in ds2.create_dict_iterator(num_epochs=1): 513 num_iter += 1 514 assert num_iter == 12 515 logger.info("test_cache_nomap_allowed_share3 Ended.\n") 516 517 518@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 519def test_cache_nomap_allowed_share4(): 520 """ 521 It is allowed to share the cache between the following two trees: 522 523 Cache Cache 524 | | 525 Map(decode, num_parallel_workers=1) Map(decode, num_parallel_workers=2) 526 | | 527 TFReader TFReader 528 """ 529 530 logger.info("Test cache nomap allowed share 4") 531 if "SESSION_ID" in os.environ: 532 session_id = int(os.environ['SESSION_ID']) 533 else: 534 raise RuntimeError("Testcase requires SESSION_ID environment variable") 535 536 # This dataset has 3 records in it only 537 some_cache = ds.DatasetCache(session_id=session_id, size=0) 538 decode_op = c_vision.Decode() 539 540 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 541 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache, num_parallel_workers=1) 542 543 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 544 ds2 = ds2.map(operations=decode_op, input_columns=["image"], cache=some_cache, num_parallel_workers=2) 545 546 num_iter = 0 547 for _ in ds1.create_dict_iterator(num_epochs=1): 548 num_iter += 1 549 logger.info("Number of data in ds1: {} ".format(num_iter)) 550 assert num_iter == 3 551 552 num_iter = 0 553 for _ in ds2.create_dict_iterator(num_epochs=1): 554 num_iter += 1 555 logger.info("Number of data in ds2: {} ".format(num_iter)) 556 assert num_iter == 3 557 558 logger.info("test_cache_nomap_allowed_share4 Ended.\n") 559 560 561@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 562def test_cache_nomap_disallowed_share1(): 563 """ 564 It is not allowed to share the cache between the following two trees: 565 566 Cache Cache 567 | | 568 Map(decode) Map(rescale) 569 | | 570 TFReader TFReader 571 """ 572 573 logger.info("Test cache nomap disallowed share1") 574 if "SESSION_ID" in os.environ: 575 session_id = int(os.environ['SESSION_ID']) 576 else: 577 raise RuntimeError("Testcase requires SESSION_ID environment variable") 578 579 # This dataset has 3 records in it only 580 some_cache = ds.DatasetCache(session_id=session_id, size=0) 581 decode_op = c_vision.Decode() 582 rescale_op = c_vision.Rescale(1.0 / 255.0, -1.0) 583 584 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 585 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 586 587 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 588 ds2 = ds2.map(operations=rescale_op, input_columns=["image"], cache=some_cache) 589 590 num_iter = 0 591 for _ in ds1.create_dict_iterator(num_epochs=1): 592 num_iter += 1 593 logger.info("Number of data in ds1: {} ".format(num_iter)) 594 assert num_iter == 3 595 596 with pytest.raises(RuntimeError) as e: 597 sum([1 for _ in ds2]) 598 assert "Cannot re-use a cache for a different tree!" in str(e.value) 599 600 logger.info("test_cache_nomap_disallowed_share1 Ended.\n") 601 602 603@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 604def test_cache_nomap_running_twice1(): 605 """ 606 Executing the same pipeline for twice (from python), with cache injected after map 607 608 Repeat 609 | 610 Cache 611 | 612 Map(decode) 613 | 614 TFRecord 615 """ 616 617 logger.info("Test cache nomap running twice 1") 618 if "SESSION_ID" in os.environ: 619 session_id = int(os.environ['SESSION_ID']) 620 else: 621 raise RuntimeError("Testcase requires SESSION_ID environment variable") 622 623 some_cache = ds.DatasetCache(session_id=session_id, size=0) 624 625 # This dataset has 3 records in it only 626 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 627 decode_op = c_vision.Decode() 628 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 629 ds1 = ds1.repeat(4) 630 631 num_iter = 0 632 for _ in ds1.create_dict_iterator(): 633 num_iter += 1 634 logger.info("Number of data in ds1: {} ".format(num_iter)) 635 assert num_iter == 12 636 637 num_iter = 0 638 for _ in ds1.create_dict_iterator(): 639 num_iter += 1 640 logger.info("Number of data in ds1: {} ".format(num_iter)) 641 assert num_iter == 12 642 643 logger.info("test_cache_nomap_running_twice1 Ended.\n") 644 645 646@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 647def test_cache_nomap_running_twice2(): 648 """ 649 Executing the same pipeline for twice (from shell), with cache injected after leaf 650 651 Repeat 652 | 653 Map(decode) 654 | 655 Cache 656 | 657 TFRecord 658 """ 659 660 logger.info("Test cache nomap running twice 2") 661 if "SESSION_ID" in os.environ: 662 session_id = int(os.environ['SESSION_ID']) 663 else: 664 raise RuntimeError("Testcase requires SESSION_ID environment variable") 665 666 some_cache = ds.DatasetCache(session_id=session_id, size=0) 667 668 # This dataset has 3 records in it only 669 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 670 decode_op = c_vision.Decode() 671 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 672 ds1 = ds1.repeat(4) 673 674 num_iter = 0 675 for _ in ds1.create_dict_iterator(): 676 num_iter += 1 677 678 logger.info("Number of data in ds1: {} ".format(num_iter)) 679 assert num_iter == 12 680 logger.info("test_cache_nomap_running_twice2 Ended.\n") 681 682 683@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 684def test_cache_nomap_extra_small_size1(): 685 """ 686 Test running pipeline with cache of extra small size and spilling true 687 688 Repeat 689 | 690 Map(decode) 691 | 692 Cache 693 | 694 TFRecord 695 """ 696 697 logger.info("Test cache nomap extra small size 1") 698 if "SESSION_ID" in os.environ: 699 session_id = int(os.environ['SESSION_ID']) 700 else: 701 raise RuntimeError("Testcase requires SESSION_ID environment variable") 702 some_cache = ds.DatasetCache(session_id=session_id, size=1, spilling=True) 703 704 # This dataset has 3 records in it only 705 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 706 decode_op = c_vision.Decode() 707 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 708 ds1 = ds1.repeat(4) 709 710 num_iter = 0 711 for _ in ds1.create_dict_iterator(): 712 num_iter += 1 713 714 logger.info("Number of data in ds1: {} ".format(num_iter)) 715 assert num_iter == 12 716 logger.info("test_cache_nomap_extra_small_size1 Ended.\n") 717 718 719@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 720def test_cache_nomap_extra_small_size2(): 721 """ 722 Test running pipeline with cache of extra small size and spilling false (failure) 723 724 Repeat 725 | 726 Cache 727 | 728 Map(decode) 729 | 730 TFRecord 731 """ 732 733 logger.info("Test cache nomap extra small size 2") 734 if "SESSION_ID" in os.environ: 735 session_id = int(os.environ['SESSION_ID']) 736 else: 737 raise RuntimeError("Testcase requires SESSION_ID environment variable") 738 some_cache = ds.DatasetCache(session_id=session_id, size=1, spilling=False) 739 740 # This dataset has 3 records in it only 741 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 742 decode_op = c_vision.Decode() 743 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 744 ds1 = ds1.repeat(4) 745 746 with pytest.raises(RuntimeError) as e: 747 sum([1 for _ in ds1]) 748 assert "Out of memory" in str(e.value) 749 logger.info("test_cache_nomap_extra_small_size2 Ended.\n") 750 751 752@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 753def test_cache_nomap_parallel_pipeline1(shard): 754 """ 755 Test running two parallel pipelines (sharing cache) with cache injected after leaf op 756 757 Repeat 758 | 759 Map(decode) 760 | 761 cache 762 | 763 TFReader 764 """ 765 766 logger.info("Test cache nomap parallel pipeline 1") 767 if "SESSION_ID" in os.environ: 768 session_id = int(os.environ['SESSION_ID']) 769 else: 770 raise RuntimeError("Testcase requires SESSION_ID environment variable") 771 some_cache = ds.DatasetCache(session_id=session_id, size=0) 772 773 # This dataset has 3 records in it only 774 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=3, shard_id=int(shard), cache=some_cache) 775 decode_op = c_vision.Decode() 776 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 777 ds1 = ds1.repeat(4) 778 779 num_iter = 0 780 for _ in ds1.create_dict_iterator(num_epochs=1): 781 num_iter += 1 782 783 logger.info("Number of data in ds1: {} ".format(num_iter)) 784 assert num_iter == 4 785 logger.info("test_cache_nomap_parallel_pipeline1 Ended.\n") 786 787 788@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 789def test_cache_nomap_parallel_pipeline2(shard): 790 """ 791 Test running two parallel pipelines (sharing cache) with cache injected after map op 792 793 Repeat 794 | 795 cache 796 | 797 Map(decode) 798 | 799 TFReader 800 """ 801 802 logger.info("Test cache nomap parallel pipeline 2") 803 if "SESSION_ID" in os.environ: 804 session_id = int(os.environ['SESSION_ID']) 805 else: 806 raise RuntimeError("Testcase requires SESSION_ID environment variable") 807 some_cache = ds.DatasetCache(session_id=session_id, size=0) 808 809 # This dataset has 3 records in it only 810 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=3, shard_id=int(shard)) 811 decode_op = c_vision.Decode() 812 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 813 ds1 = ds1.repeat(4) 814 815 num_iter = 0 816 for _ in ds1.create_dict_iterator(num_epochs=1): 817 num_iter += 1 818 819 logger.info("Number of data in ds1: {} ".format(num_iter)) 820 assert num_iter == 4 821 logger.info("test_cache_nomap_parallel_pipeline2 Ended.\n") 822 823 824@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 825def test_cache_nomap_parallel_workers(): 826 """ 827 Test cache with num_parallel_workers > 1 set for map op and leaf op 828 829 Repeat 830 | 831 Map(decode) 832 | 833 cache 834 | 835 TFReader 836 """ 837 838 logger.info("Test cache nomap parallel workers") 839 if "SESSION_ID" in os.environ: 840 session_id = int(os.environ['SESSION_ID']) 841 else: 842 raise RuntimeError("Testcase requires SESSION_ID environment variable") 843 some_cache = ds.DatasetCache(session_id=session_id, size=0) 844 845 # This dataset has 3 records in it only 846 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_parallel_workers=4) 847 decode_op = c_vision.Decode() 848 ds1 = ds1.map(input_columns=["image"], operations=decode_op, num_parallel_workers=4, cache=some_cache) 849 ds1 = ds1.repeat(4) 850 851 num_iter = 0 852 for _ in ds1.create_dict_iterator(num_epochs=1): 853 num_iter += 1 854 855 logger.info("Number of data in ds1: {} ".format(num_iter)) 856 assert num_iter == 12 857 logger.info("test_cache_nomap_parallel_workers Ended.\n") 858 859 860@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 861def test_cache_nomap_server_workers_1(): 862 """ 863 start cache server with --workers 1 and then test cache function 864 865 Repeat 866 | 867 cache 868 | 869 Map(decode) 870 | 871 TFRecord 872 """ 873 874 logger.info("Test cache nomap server workers 1") 875 if "SESSION_ID" in os.environ: 876 session_id = int(os.environ['SESSION_ID']) 877 else: 878 raise RuntimeError("Testcase requires SESSION_ID environment variable") 879 880 some_cache = ds.DatasetCache(session_id=session_id, size=0) 881 882 # This dataset has 3 records in it only 883 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 884 decode_op = c_vision.Decode() 885 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 886 ds1 = ds1.repeat(4) 887 888 num_iter = 0 889 for _ in ds1.create_dict_iterator(): 890 num_iter += 1 891 892 logger.info("Number of data in ds1: {} ".format(num_iter)) 893 assert num_iter == 12 894 logger.info("test_cache_nomap_server_workers_1 Ended.\n") 895 896 897@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 898def test_cache_nomap_server_workers_100(): 899 """ 900 start cache server with --workers 100 and then test cache function 901 902 Repeat 903 | 904 Map(decode) 905 | 906 cache 907 | 908 TFRecord 909 """ 910 911 logger.info("Test cache nomap server workers 100") 912 if "SESSION_ID" in os.environ: 913 session_id = int(os.environ['SESSION_ID']) 914 else: 915 raise RuntimeError("Testcase requires SESSION_ID environment variable") 916 917 some_cache = ds.DatasetCache(session_id=session_id, size=0) 918 919 # This dataset has 3 records in it only 920 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 921 decode_op = c_vision.Decode() 922 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 923 ds1 = ds1.repeat(4) 924 925 num_iter = 0 926 for _ in ds1.create_dict_iterator(): 927 num_iter += 1 928 929 logger.info("Number of data in ds1: {} ".format(num_iter)) 930 assert num_iter == 12 931 logger.info("test_cache_nomap_server_workers_100 Ended.\n") 932 933 934@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 935def test_cache_nomap_num_connections_1(): 936 """ 937 Test setting num_connections=1 in DatasetCache 938 939 Repeat 940 | 941 cache 942 | 943 Map(decode) 944 | 945 TFRecord 946 """ 947 948 logger.info("Test cache nomap num_connections 1") 949 if "SESSION_ID" in os.environ: 950 session_id = int(os.environ['SESSION_ID']) 951 else: 952 raise RuntimeError("Testcase requires SESSION_ID environment variable") 953 954 some_cache = ds.DatasetCache(session_id=session_id, size=0, num_connections=1) 955 956 # This dataset has 3 records in it only 957 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 958 decode_op = c_vision.Decode() 959 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 960 ds1 = ds1.repeat(4) 961 962 num_iter = 0 963 for _ in ds1.create_dict_iterator(): 964 num_iter += 1 965 966 logger.info("Number of data in ds1: {} ".format(num_iter)) 967 assert num_iter == 12 968 logger.info("test_cache_nomap_num_connections_1 Ended.\n") 969 970 971@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 972def test_cache_nomap_num_connections_100(): 973 """ 974 Test setting num_connections=100 in DatasetCache 975 976 Repeat 977 | 978 Map(decode) 979 | 980 cache 981 | 982 TFRecord 983 """ 984 985 logger.info("Test cache nomap num_connections 100") 986 if "SESSION_ID" in os.environ: 987 session_id = int(os.environ['SESSION_ID']) 988 else: 989 raise RuntimeError("Testcase requires SESSION_ID environment variable") 990 991 some_cache = ds.DatasetCache(session_id=session_id, size=0, num_connections=100) 992 993 # This dataset has 3 records in it only 994 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 995 decode_op = c_vision.Decode() 996 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 997 ds1 = ds1.repeat(4) 998 999 num_iter = 0 1000 for _ in ds1.create_dict_iterator(): 1001 num_iter += 1 1002 1003 logger.info("Number of data in ds1: {} ".format(num_iter)) 1004 assert num_iter == 12 1005 logger.info("test_cache_nomap_num_connections_100 Ended.\n") 1006 1007 1008@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1009def test_cache_nomap_prefetch_size_1(): 1010 """ 1011 Test setting prefetch_size=1 in DatasetCache 1012 1013 Repeat 1014 | 1015 cache 1016 | 1017 Map(decode) 1018 | 1019 TFRecord 1020 """ 1021 1022 logger.info("Test cache nomap prefetch_size 1") 1023 if "SESSION_ID" in os.environ: 1024 session_id = int(os.environ['SESSION_ID']) 1025 else: 1026 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1027 1028 some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=1) 1029 1030 # This dataset has 3 records in it only 1031 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1032 decode_op = c_vision.Decode() 1033 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 1034 ds1 = ds1.repeat(4) 1035 1036 num_iter = 0 1037 for _ in ds1.create_dict_iterator(): 1038 num_iter += 1 1039 1040 logger.info("Number of data in ds1: {} ".format(num_iter)) 1041 assert num_iter == 12 1042 logger.info("test_cache_nomap_prefetch_size_1 Ended.\n") 1043 1044 1045@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1046def test_cache_nomap_prefetch_size_100(): 1047 """ 1048 Test setting prefetch_size=100 in DatasetCache 1049 1050 Repeat 1051 | 1052 Map(decode) 1053 | 1054 cache 1055 | 1056 TFRecord 1057 """ 1058 1059 logger.info("Test cache nomap prefetch_size 100") 1060 if "SESSION_ID" in os.environ: 1061 session_id = int(os.environ['SESSION_ID']) 1062 else: 1063 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1064 1065 some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=100) 1066 1067 # This dataset has 3 records in it only 1068 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 1069 decode_op = c_vision.Decode() 1070 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 1071 ds1 = ds1.repeat(4) 1072 1073 num_iter = 0 1074 for _ in ds1.create_dict_iterator(): 1075 num_iter += 1 1076 1077 logger.info("Number of data in ds1: {} ".format(num_iter)) 1078 assert num_iter == 12 1079 logger.info("test_cache_nomap_prefetch_size_100 Ended.\n") 1080 1081 1082@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1083def test_cache_nomap_to_device(): 1084 """ 1085 Test cache with to_device 1086 1087 DeviceQueue 1088 | 1089 EpochCtrl 1090 | 1091 Repeat 1092 | 1093 Map(decode) 1094 | 1095 cache 1096 | 1097 TFReader 1098 """ 1099 1100 logger.info("Test cache nomap to_device") 1101 if "SESSION_ID" in os.environ: 1102 session_id = int(os.environ['SESSION_ID']) 1103 else: 1104 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1105 1106 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1107 1108 # This dataset has 3 records in it only 1109 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1110 decode_op = c_vision.Decode() 1111 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 1112 ds1 = ds1.repeat(4) 1113 ds1 = ds1.to_device() 1114 ds1.send() 1115 1116 logger.info("test_cache_nomap_to_device Ended.\n") 1117 1118 1119@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1120def test_cache_nomap_session_destroy(): 1121 """ 1122 Test executing cache_admin -d while the pipeline is running 1123 1124 Repeat 1125 | 1126 Cache 1127 | 1128 RandomDataset 1129 """ 1130 1131 logger.info("Test cache nomap session destroy") 1132 if "SESSION_ID" in os.environ: 1133 session_id = int(os.environ['SESSION_ID']) 1134 else: 1135 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1136 1137 schema = ds.Schema() 1138 schema.add_column('image', de_type=mstype.uint8, 1139 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 1140 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 1141 1142 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1143 1144 # User-created sampler here 1145 ds1 = ds.RandomDataset(schema=schema, num_parallel_workers=4, cache=some_cache) 1146 ds1 = ds1.repeat() 1147 1148 with pytest.raises(RuntimeError) as e: 1149 num_iter = 0 1150 for _ in ds1.create_dict_iterator(): 1151 num_iter += 1 1152 assert "Unexpected error" in str(e.value) 1153 1154 logger.info("test_cache_nomap_session_destroy Ended.\n") 1155 1156 1157@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1158def test_cache_nomap_server_stop(): 1159 """ 1160 Test executing cache_admin --stop while the pipeline is running 1161 1162 Repeat 1163 | 1164 Cache 1165 | 1166 RandomDataset 1167 """ 1168 1169 logger.info("Test cache nomap server stop") 1170 if "SESSION_ID" in os.environ: 1171 session_id = int(os.environ['SESSION_ID']) 1172 else: 1173 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1174 1175 schema = ds.Schema() 1176 schema.add_column('image', de_type=mstype.uint8, 1177 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 1178 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 1179 1180 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1181 1182 # User-created sampler here 1183 ds1 = ds.RandomDataset(schema=schema, num_parallel_workers=4, cache=some_cache) 1184 ds1 = ds1.repeat() 1185 1186 with pytest.raises(RuntimeError) as e: 1187 num_iter = 0 1188 for _ in ds1.create_dict_iterator(): 1189 num_iter += 1 1190 assert "Network error. Cache server with port 50052 is unreachable. Make sure the server is running." in \ 1191 str(e.value) 1192 1193 logger.info("test_cache_nomap_server_stop Ended.\n") 1194 1195 1196@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1197def test_cache_nomap_interrupt_and_rerun(): 1198 """ 1199 Test interrupt a running pipeline and then re-use the same cache to run another pipeline 1200 1201 Cache 1202 | 1203 RandomDataset 1204 """ 1205 1206 logger.info("Test cache nomap interrupt and rerun") 1207 if "SESSION_ID" in os.environ: 1208 session_id = int(os.environ['SESSION_ID']) 1209 else: 1210 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1211 1212 schema = ds.Schema() 1213 schema.add_column('image', de_type=mstype.uint8, 1214 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 1215 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 1216 1217 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1218 1219 # User-created sampler here 1220 ds1 = ds.RandomDataset(schema=schema, total_rows=10000, num_parallel_workers=4, cache=some_cache) 1221 iter1 = ds1.create_dict_iterator() 1222 1223 num_iter = 0 1224 with pytest.raises(AttributeError) as e: 1225 for _ in iter1: 1226 num_iter += 1 1227 if num_iter == 10: 1228 iter1.stop() 1229 assert "'DictIterator' object has no attribute '_runtime_context'" in str(e.value) 1230 1231 num_epoch = 2 1232 iter2 = ds1.create_dict_iterator(num_epochs=num_epoch) 1233 epoch_count = 0 1234 for _ in range(num_epoch): 1235 num_iter = 0 1236 for _ in iter2: 1237 num_iter += 1 1238 logger.info("Number of data in ds1: {} ".format(num_iter)) 1239 assert num_iter == 10000 1240 epoch_count += 1 1241 1242 cache_stat = some_cache.get_stat() 1243 assert cache_stat.num_mem_cached == 10000 1244 1245 logger.info("test_cache_nomap_interrupt_and_rerun Ended.\n") 1246 1247 1248@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1249def test_cache_nomap_epoch_ctrl1(): 1250 """ 1251 Test using two-loops method to run several epochs 1252 1253 Map(decode) 1254 | 1255 cache 1256 | 1257 TFRecord 1258 """ 1259 1260 logger.info("Test cache nomap epoch ctrl1") 1261 if "SESSION_ID" in os.environ: 1262 session_id = int(os.environ['SESSION_ID']) 1263 else: 1264 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1265 1266 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1267 1268 # This dataset has 3 records in it only 1269 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 1270 decode_op = c_vision.Decode() 1271 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 1272 1273 num_epoch = 5 1274 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch) 1275 1276 epoch_count = 0 1277 for _ in range(num_epoch): 1278 row_count = 0 1279 for _ in iter1: 1280 row_count += 1 1281 logger.info("Number of data in ds1: {} ".format(row_count)) 1282 assert row_count == 3 1283 epoch_count += 1 1284 assert epoch_count == num_epoch 1285 logger.info("test_cache_nomap_epoch_ctrl1 Ended.\n") 1286 1287 1288@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1289def test_cache_nomap_epoch_ctrl2(): 1290 """ 1291 Test using two-loops method with infinite epochs 1292 1293 cache 1294 | 1295 Map(decode) 1296 | 1297 TFRecord 1298 """ 1299 1300 logger.info("Test cache nomap epoch ctrl2") 1301 if "SESSION_ID" in os.environ: 1302 session_id = int(os.environ['SESSION_ID']) 1303 else: 1304 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1305 1306 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1307 1308 # This dataset has 3 records in it only 1309 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1310 decode_op = c_vision.Decode() 1311 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 1312 1313 num_epoch = 5 1314 # iter1 will always assume there is a next epoch and never shutdown 1315 iter1 = ds1.create_dict_iterator() 1316 1317 epoch_count = 0 1318 for _ in range(num_epoch): 1319 row_count = 0 1320 for _ in iter1: 1321 row_count += 1 1322 logger.info("Number of data in ds1: {} ".format(row_count)) 1323 assert row_count == 3 1324 epoch_count += 1 1325 assert epoch_count == num_epoch 1326 1327 # manually stop the iterator 1328 iter1.stop() 1329 logger.info("test_cache_nomap_epoch_ctrl2 Ended.\n") 1330 1331 1332@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1333def test_cache_nomap_epoch_ctrl3(): 1334 """ 1335 Test using two-loops method with infinite epochs over repeat 1336 1337 repeat 1338 | 1339 Map(decode) 1340 | 1341 cache 1342 | 1343 TFRecord 1344 """ 1345 1346 logger.info("Test cache nomap epoch ctrl3") 1347 if "SESSION_ID" in os.environ: 1348 session_id = int(os.environ['SESSION_ID']) 1349 else: 1350 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1351 1352 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1353 1354 # This dataset has 3 records in it only 1355 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 1356 decode_op = c_vision.Decode() 1357 ds1 = ds1.map(input_columns=["image"], operations=decode_op) 1358 ds1 = ds1.repeat(2) 1359 1360 num_epoch = 5 1361 # iter1 will always assume there is a next epoch and never shutdown 1362 iter1 = ds1.create_dict_iterator() 1363 1364 epoch_count = 0 1365 for _ in range(num_epoch): 1366 row_count = 0 1367 for _ in iter1: 1368 row_count += 1 1369 logger.info("Number of data in ds1: {} ".format(row_count)) 1370 assert row_count == 6 1371 epoch_count += 1 1372 assert epoch_count == num_epoch 1373 1374 # reply on garbage collector to destroy iter1 1375 1376 logger.info("test_cache_nomap_epoch_ctrl3 Ended.\n") 1377 1378 1379@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1380def test_cache_nomap_epoch_ctrl4(): 1381 """ 1382 Test using two-loops method with repeat under cache 1383 1384 cache 1385 | 1386 Map(decode) 1387 | 1388 repeat 1389 | 1390 TFRecord 1391 """ 1392 1393 logger.info("Test cache nomap epoch ctrl4") 1394 if "SESSION_ID" in os.environ: 1395 session_id = int(os.environ['SESSION_ID']) 1396 else: 1397 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1398 1399 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1400 1401 # This dataset has 3 records in it only 1402 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1403 ds1 = ds1.repeat(2) 1404 decode_op = c_vision.Decode() 1405 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 1406 1407 num_epoch = 5 1408 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch) 1409 1410 epoch_count = 0 1411 for _ in range(num_epoch): 1412 row_count = 0 1413 for _ in iter1: 1414 row_count += 1 1415 logger.info("Number of data in ds1: {} ".format(row_count)) 1416 assert row_count == 6 1417 epoch_count += 1 1418 assert epoch_count == num_epoch 1419 1420 logger.info("test_cache_nomap_epoch_ctrl4 Ended.\n") 1421 1422 1423@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1424def test_cache_nomap_multiple_cache1(): 1425 """ 1426 Test multiple cache in the same python script 1427 1428 cache cache 1429 | | 1430 Map(decode) Map(decode) 1431 | | 1432 TFRecord(train) TFRecord(eval) 1433 """ 1434 1435 logger.info("Test cache nomap multiple cache 1") 1436 if "SESSION_ID" in os.environ: 1437 session_id = int(os.environ['SESSION_ID']) 1438 else: 1439 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1440 1441 train_cache = ds.DatasetCache(session_id=session_id, size=0) 1442 eval_cache = ds.DatasetCache(session_id=session_id, size=0) 1443 1444 # This dataset has 12 records in it 1445 train_dataset = ds.TFRecordDataset(TRAIN_DATA_DIR, TRAIN_SCHEMA_DIR) 1446 decode_op = c_vision.Decode() 1447 train_dataset = train_dataset.map(input_columns=["image"], operations=decode_op, cache=train_cache) 1448 1449 # This dataset has 3 records in it only 1450 eval_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1451 eval_dataset = eval_dataset.map(input_columns=["image"], operations=decode_op, cache=eval_cache) 1452 1453 num_epoch = 5 1454 train_iter = train_dataset.create_dict_iterator(num_epochs=num_epoch) 1455 eval_iter = eval_dataset.create_dict_iterator(num_epochs=num_epoch) 1456 1457 epoch_count = 0 1458 for _ in range(num_epoch): 1459 assert sum([1 for _ in train_iter]) == 12 1460 assert sum([1 for _ in eval_iter]) == 3 1461 epoch_count += 1 1462 assert epoch_count == num_epoch 1463 1464 logger.info("test_cache_nomap_multiple_cache1 Ended.\n") 1465 1466 1467@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1468def test_cache_nomap_multiple_cache2(): 1469 """ 1470 Test multiple cache in the same python script 1471 1472 cache 1473 | 1474 Map(decode) cache 1475 | | 1476 TFRecord(image) TFRecord(text) 1477 """ 1478 1479 logger.info("Test cache nomap multiple cache 2") 1480 if "SESSION_ID" in os.environ: 1481 session_id = int(os.environ['SESSION_ID']) 1482 else: 1483 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1484 1485 image_cache = ds.DatasetCache(session_id=session_id, size=0) 1486 text_cache = ds.DatasetCache(session_id=session_id, size=0) 1487 1488 # This dataset has 3 records in it only 1489 image_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1490 decode_op = c_vision.Decode() 1491 image_dataset = image_dataset.map(input_columns=["image"], operations=decode_op, cache=image_cache) 1492 1493 # This dataset has 3 records in it only 1494 text_dataset = ds.TFRecordDataset(TEXT_TF_DATA_DIR, SCHEMA_DIR2, cache=text_cache) 1495 1496 num_epoch = 5 1497 image_iter = image_dataset.create_dict_iterator(num_epochs=num_epoch) 1498 text_iter = text_dataset.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1499 1500 epoch_count = 0 1501 for _ in range(num_epoch): 1502 row_count = 0 1503 for _, _ in itertools.zip_longest(image_iter, text_iter): 1504 row_count += 1 1505 assert row_count == 3 1506 epoch_count += 1 1507 assert epoch_count == num_epoch 1508 1509 logger.info("test_cache_nomap_multiple_cache2 Ended.\n") 1510 1511 1512@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1513def test_cache_nomap_multiple_cache3(): 1514 """ 1515 Test multiple cache in the same python script 1516 1517 cache cache 1518 | | 1519 Map(decode) Map(decode) 1520 | | 1521 TFRecord ImageFolder 1522 """ 1523 1524 logger.info("Test cache nomap multiple cache 3") 1525 if "SESSION_ID" in os.environ: 1526 session_id = int(os.environ['SESSION_ID']) 1527 else: 1528 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1529 1530 tf_cache = ds.DatasetCache(session_id=session_id, size=0) 1531 image_cache = ds.DatasetCache(session_id=session_id, size=0) 1532 1533 # This dataset has 3 records in it only 1534 tf_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1535 decode_op = c_vision.Decode() 1536 tf_dataset = tf_dataset.map(input_columns=["image"], operations=decode_op, cache=tf_cache) 1537 1538 # This DATA_DIR only has 2 images in it 1539 image_dataset = ds.ImageFolderDataset(dataset_dir=IMAGE_FOLDER_DATA_DIR) 1540 image_dataset = image_dataset.map(input_columns=["image"], operations=decode_op, cache=image_cache) 1541 1542 num_epoch = 5 1543 tf_iter = tf_dataset.create_dict_iterator(num_epochs=num_epoch) 1544 image_iter = image_dataset.create_dict_iterator(num_epochs=num_epoch) 1545 1546 epoch_count = 0 1547 for _ in range(num_epoch): 1548 assert sum([1 for _ in tf_iter]) == 3 1549 assert sum([1 for _ in image_iter]) == 2 1550 epoch_count += 1 1551 assert epoch_count == num_epoch 1552 1553 logger.info("test_cache_nomap_multiple_cache3 Ended.\n") 1554 1555 1556@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1557def test_cache_nomap_multiple_cache_train(): 1558 """ 1559 Test multiple cache in different python scripts. This test case is going to run concurrently with 1560 test_cache_nomap_multiple_cache_eval. 1561 1562 cache 1563 | 1564 Map(decode) 1565 | 1566 TFRecord(train) 1567 """ 1568 1569 logger.info("Test cache nomap multiple cache train") 1570 if "SESSION_ID" in os.environ: 1571 session_id = int(os.environ['SESSION_ID']) 1572 else: 1573 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1574 1575 train_cache = ds.DatasetCache(session_id=session_id, size=0) 1576 1577 # This dataset has 12 records in it 1578 train_dataset = ds.TFRecordDataset(TRAIN_DATA_DIR, TRAIN_SCHEMA_DIR) 1579 decode_op = c_vision.Decode() 1580 train_dataset = train_dataset.map(input_columns=["image"], operations=decode_op, cache=train_cache) 1581 1582 num_epoch = 5 1583 train_iter = train_dataset.create_dict_iterator(num_epochs=num_epoch) 1584 1585 epoch_count = 0 1586 for _ in range(num_epoch): 1587 assert sum([1 for _ in train_iter]) == 12 1588 epoch_count += 1 1589 assert epoch_count == num_epoch 1590 1591 logger.info("test_cache_nomap_multiple_cache_train Ended.\n") 1592 1593 1594@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1595def test_cache_nomap_multiple_cache_eval(): 1596 """ 1597 Test multiple cache in different python scripts. This test case is going to run concurrently with 1598 test_cache_nomap_multiple_cache_train. 1599 1600 cache 1601 | 1602 Map(decode) 1603 | 1604 TFRecord(eval) 1605 """ 1606 1607 logger.info("Test cache nomap multiple cache eval") 1608 if "SESSION_ID" in os.environ: 1609 session_id = int(os.environ['SESSION_ID']) 1610 else: 1611 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1612 1613 eval_cache = ds.DatasetCache(session_id=session_id, size=0) 1614 1615 # This dataset only has 3 records in it 1616 eval_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1617 decode_op = c_vision.Decode() 1618 eval_dataset = eval_dataset.map(input_columns=["image"], operations=decode_op, cache=eval_cache) 1619 1620 num_epoch = 5 1621 eval_iter = eval_dataset.create_dict_iterator(num_epochs=num_epoch) 1622 1623 epoch_count = 0 1624 for _ in range(num_epoch): 1625 assert sum([1 for _ in eval_iter]) == 3 1626 epoch_count += 1 1627 assert epoch_count == num_epoch 1628 1629 logger.info("test_cache_nomap_multiple_cache_eval Ended.\n") 1630 1631 1632@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1633def test_cache_nomap_clue1(): 1634 """ 1635 A clue dataset (a non mappable dataset) with a cache over it just after the leaf 1636 In this one, the clue dataset will be given sharding configuration, however since a cache is 1637 used, the tree prepare should undo the sharding configuration and instead, a distributed 1638 sampler will be chosen with the same shard config. 1639 1640 Cache 1641 | 1642 CLUE 1643 """ 1644 1645 logger.info("Test cache nomap clue 1") 1646 if "SESSION_ID" in os.environ: 1647 session_id = int(os.environ['SESSION_ID']) 1648 else: 1649 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1650 1651 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1652 1653 # With only 3 records shard into 3, we expect only 1 record returned for this shard 1654 # However, the sharding will be done by the sampler, not by the clue leaf node 1655 # In this case, it is a row-based sharding, not the file-based sharding that would happen if 1656 # there was not any cache. 1657 ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train', num_shards=3, shard_id=1, cache=some_cache) 1658 1659 num_epoch = 4 1660 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1661 1662 epoch_count = 0 1663 for _ in range(num_epoch): 1664 assert sum([1 for _ in iter1]) == 1 1665 epoch_count += 1 1666 assert epoch_count == num_epoch 1667 1668 logger.info("test_cache_nomap_clue1 Ended.\n") 1669 1670 1671@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1672def test_cache_nomap_clue2(): 1673 """ 1674 A clue dataset (a non mappable dataset) with a cache over it after map 1675 In this one, a num_samples argument is given 1676 1677 Cache 1678 | 1679 map(lambda x: x) 1680 | 1681 CLUE 1682 """ 1683 1684 logger.info("Test cache nomap clue 2") 1685 if "SESSION_ID" in os.environ: 1686 session_id = int(os.environ['SESSION_ID']) 1687 else: 1688 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1689 1690 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1691 1692 ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train', num_samples=2) 1693 ds1 = ds1.map(py_vision.not_random(lambda x: x), ["label"], cache=some_cache) 1694 1695 num_epoch = 4 1696 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1697 1698 epoch_count = 0 1699 for _ in range(num_epoch): 1700 assert sum([1 for _ in iter1]) == 2 1701 epoch_count += 1 1702 assert epoch_count == num_epoch 1703 1704 logger.info("test_cache_nomap_clue2 Ended.\n") 1705 1706 1707@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1708def test_cache_nomap_csv1(): 1709 """ 1710 A csv dataset (a non mappable dataset) with a cache over it just after the leaf 1711 In this one, the csv dataset will be given sharding configuration, however since a cache is 1712 used, the tree prepare should undo the sharding configuration and instead, a distributed 1713 sampler will be chosen with the same shard config. 1714 1715 Cache 1716 | 1717 CSV 1718 """ 1719 1720 logger.info("Test cache nomap csv 1") 1721 if "SESSION_ID" in os.environ: 1722 session_id = int(os.environ['SESSION_ID']) 1723 else: 1724 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1725 1726 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1727 1728 # With only 3 records shard into 3, we expect only 1 record returned for this shard 1729 # However, the sharding will be done by the sampler, not by the clue leaf node 1730 # In this case, it is a row-based sharding, not the file-based sharding that would happen if 1731 # there was not any cache. 1732 ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"], 1733 column_names=['col1', 'col2', 'col3', 'col4'], num_shards=3, shard_id=1, cache=some_cache) 1734 1735 num_epoch = 4 1736 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1737 1738 epoch_count = 0 1739 for _ in range(num_epoch): 1740 assert sum([1 for _ in iter1]) == 1 1741 epoch_count += 1 1742 assert epoch_count == num_epoch 1743 1744 logger.info("test_cache_nomap_csv1 Ended.\n") 1745 1746 1747@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1748def test_cache_nomap_csv2(): 1749 """ 1750 A csv dataset (a non mappable dataset) with a cache over it after map 1751 In this one, a num_samples argument is given 1752 1753 Cache 1754 | 1755 map(lambda x: x) 1756 | 1757 CSV 1758 """ 1759 1760 logger.info("Test cache nomap csv 2") 1761 if "SESSION_ID" in os.environ: 1762 session_id = int(os.environ['SESSION_ID']) 1763 else: 1764 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1765 1766 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1767 1768 ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"], 1769 column_names=['col1', 'col2', 'col3', 'col4'], num_samples=2) 1770 ds1 = ds1.map(py_vision.not_random(lambda x: x), ["col1"], cache=some_cache) 1771 1772 num_epoch = 4 1773 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1774 1775 epoch_count = 0 1776 for _ in range(num_epoch): 1777 assert sum([1 for _ in iter1]) == 2 1778 epoch_count += 1 1779 assert epoch_count == num_epoch 1780 1781 logger.info("test_cache_nomap_csv2 Ended.\n") 1782 1783 1784@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1785def test_cache_nomap_textfile1(): 1786 """ 1787 A text file dataset (a non mappable dataset) with a cache over it just after the leaf 1788 In this one, the text file dataset will be given sharding configuration, however since a cache is 1789 used, the tree prepare should undo the sharding configuration and instead, a distributed 1790 sampler will be chosen with the same shard config. 1791 1792 Cache 1793 | 1794 TextFile 1795 """ 1796 1797 logger.info("Test cache nomap textfile 1") 1798 if "SESSION_ID" in os.environ: 1799 session_id = int(os.environ['SESSION_ID']) 1800 else: 1801 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1802 1803 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1804 1805 # With only 3 records shard into 3, we expect only 1 record returned for this shard 1806 # However, the sharding will be done by the sampler, not by the clue leaf node 1807 # In this case, it is a row-based sharding, not the file-based sharding that would happen if 1808 # there was not any cache. 1809 ds1 = ds.TextFileDataset(TEXT_FILE_DATA_DIR, num_shards=3, shard_id=1, cache=some_cache) 1810 1811 num_epoch = 4 1812 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1813 1814 epoch_count = 0 1815 for _ in range(num_epoch): 1816 assert sum([1 for _ in iter1]) == 1 1817 epoch_count += 1 1818 assert epoch_count == num_epoch 1819 1820 logger.info("test_cache_nomap_textfile1 Ended.\n") 1821 1822 1823@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1824def test_cache_nomap_textfile2(): 1825 """ 1826 A text file dataset (a non mappable dataset) with a cache over it after map 1827 In this one, a num_samples argument is given 1828 1829 Cache 1830 | 1831 Map(tokenizer) 1832 | 1833 TextFile 1834 """ 1835 1836 def my_tokenizer(line): 1837 words = line.split() 1838 if not words: 1839 return [""] 1840 return words 1841 1842 logger.info("Test cache nomap textfile 2") 1843 if "SESSION_ID" in os.environ: 1844 session_id = int(os.environ['SESSION_ID']) 1845 else: 1846 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1847 1848 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1849 1850 ds1 = ds.TextFileDataset(TEXT_FILE_DATA_DIR, num_samples=2) 1851 tokenizer = text.PythonTokenizer(my_tokenizer) 1852 ds1 = ds1.map(operations=tokenizer, cache=some_cache) 1853 1854 num_epoch = 4 1855 iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) 1856 1857 epoch_count = 0 1858 for _ in range(num_epoch): 1859 assert sum([1 for _ in iter1]) == 2 1860 epoch_count += 1 1861 assert epoch_count == num_epoch 1862 1863 logger.info("test_cache_nomap_textfile2 Ended.\n") 1864 1865 1866@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1867def test_cache_nomap_nested_repeat(): 1868 """ 1869 Test cache on pipeline with nested repeat ops 1870 1871 Repeat 1872 | 1873 Cache 1874 | 1875 Map(decode) 1876 | 1877 Repeat 1878 | 1879 TFRecord 1880 """ 1881 1882 logger.info("Test cache nomap nested repeat") 1883 if "SESSION_ID" in os.environ: 1884 session_id = int(os.environ['SESSION_ID']) 1885 else: 1886 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1887 1888 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1889 1890 # This dataset has 3 records in it only 1891 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR) 1892 decode_op = c_vision.Decode() 1893 ds1 = ds1.repeat(4) 1894 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 1895 ds1 = ds1.repeat(2) 1896 1897 num_iter = 0 1898 for _ in ds1.create_dict_iterator(num_epochs=1): 1899 logger.info("get data from dataset") 1900 num_iter += 1 1901 1902 logger.info("Number of data in ds1: {} ".format(num_iter)) 1903 assert num_iter == 24 1904 logger.info('test_cache_nomap_nested_repeat Ended.\n') 1905 1906 1907@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1908def test_cache_nomap_get_repeat_count(): 1909 """ 1910 Test get_repeat_count() for a pipeline with cache and nested repeat ops 1911 1912 Cache 1913 | 1914 Map(decode) 1915 | 1916 Repeat 1917 | 1918 TFRecord 1919 """ 1920 1921 logger.info("Test cache nomap get_repeat_count") 1922 if "SESSION_ID" in os.environ: 1923 session_id = int(os.environ['SESSION_ID']) 1924 else: 1925 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1926 1927 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1928 1929 # This dataset has 3 records in it only 1930 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False) 1931 ds1 = ds1.repeat(4) 1932 decode_op = c_vision.Decode() 1933 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 1934 1935 repeat_count = ds1.get_repeat_count() 1936 logger.info("repeat_count: {}".format(repeat_count)) 1937 assert repeat_count == 4 1938 1939 num_iter = 0 1940 for _ in ds1.create_dict_iterator(num_epochs=1): 1941 logger.info("get data from dataset") 1942 num_iter += 1 1943 assert num_iter == 12 1944 1945 1946@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1947def test_cache_nomap_long_file_list(): 1948 """ 1949 Test cache after TFRecord with a long list of files as arguments 1950 1951 Cache 1952 | 1953 TFRecord 1954 """ 1955 1956 logger.info("Test cache nomap long file list") 1957 if "SESSION_ID" in os.environ: 1958 session_id = int(os.environ['SESSION_ID']) 1959 else: 1960 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1961 1962 some_cache = ds.DatasetCache(session_id=session_id, size=1) 1963 1964 ds1 = ds.TFRecordDataset([DATA_DIR[0] for _ in range(0, 1000)], SCHEMA_DIR, columns_list=["image"], 1965 cache=some_cache) 1966 1967 with pytest.raises(RuntimeError) as e: 1968 sum([1 for _ in ds1]) 1969 assert "Out of memory" in str(e.value) 1970 logger.info("test_cache_nomap_long_file_list Ended.\n") 1971 1972 1973@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 1974def test_cache_nomap_failure1(): 1975 """ 1976 Test nested cache (failure) 1977 1978 Repeat 1979 | 1980 Cache 1981 | 1982 Map(decode) 1983 | 1984 Cache 1985 | 1986 TFRecord 1987 1988 """ 1989 logger.info("Test cache nomap failure 1") 1990 if "SESSION_ID" in os.environ: 1991 session_id = int(os.environ['SESSION_ID']) 1992 else: 1993 raise RuntimeError("Testcase requires SESSION_ID environment variable") 1994 1995 some_cache = ds.DatasetCache(session_id=session_id, size=0) 1996 1997 # This dataset has 3 records in it only 1998 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache) 1999 decode_op = c_vision.Decode() 2000 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 2001 ds1 = ds1.repeat(4) 2002 2003 with pytest.raises(RuntimeError) as e: 2004 ds1.get_batch_size() 2005 assert "Nested cache operations" in str(e.value) 2006 2007 with pytest.raises(RuntimeError) as e: 2008 num_iter = 0 2009 for _ in ds1.create_dict_iterator(num_epochs=1): 2010 num_iter += 1 2011 assert "Nested cache operations" in str(e.value) 2012 2013 assert num_iter == 0 2014 logger.info('test_cache_nomap_failure1 Ended.\n') 2015 2016 2017@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2018def test_cache_nomap_failure2(): 2019 """ 2020 Test zip under cache (failure) 2021 2022 repeat 2023 | 2024 Cache 2025 | 2026 Map(decode) 2027 | 2028 Zip 2029 | | 2030 Random Random 2031 2032 """ 2033 logger.info("Test cache nomap failure 2") 2034 if "SESSION_ID" in os.environ: 2035 session_id = int(os.environ['SESSION_ID']) 2036 else: 2037 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2038 2039 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2040 2041 schema = ds.Schema() 2042 schema.add_column('image', de_type=mstype.uint8, 2043 shape=[640, 480, 3]) # 921600 bytes (a bit less than 1 MB per image) 2044 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 2045 2046 ds1 = ds.RandomDataset(schema=schema) 2047 ds2 = ds.RandomDataset(schema=schema) 2048 dsz = ds.zip((ds1, ds2)) 2049 decode_op = c_vision.Decode() 2050 dsz = dsz.map(input_columns=["image"], operations=decode_op, cache=some_cache) 2051 dsz = dsz.repeat(4) 2052 2053 with pytest.raises(RuntimeError) as e: 2054 num_iter = 0 2055 for _ in dsz.create_dict_iterator(): 2056 num_iter += 1 2057 assert "ZipNode is not supported as a descendant operator under a cache" in str(e.value) 2058 2059 assert num_iter == 0 2060 logger.info('test_cache_nomap_failure2 Ended.\n') 2061 2062 2063@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2064def test_cache_nomap_failure3(): 2065 """ 2066 Test batch under cache (failure) 2067 2068 repeat 2069 | 2070 Cache 2071 | 2072 Map(resize) 2073 | 2074 Batch 2075 | 2076 Clue 2077 """ 2078 logger.info("Test cache nomap failure 3") 2079 if "SESSION_ID" in os.environ: 2080 session_id = int(os.environ['SESSION_ID']) 2081 else: 2082 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2083 2084 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2085 2086 ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train') 2087 ds1 = ds1.batch(2) 2088 resize_op = c_vision.Resize((224, 224)) 2089 ds1 = ds1.map(input_columns=["image"], operations=resize_op, cache=some_cache) 2090 ds1 = ds1.repeat(4) 2091 2092 with pytest.raises(RuntimeError) as e: 2093 num_iter = 0 2094 for _ in ds1.create_dict_iterator(): 2095 num_iter += 1 2096 assert "BatchNode is not supported as a descendant operator under a cache" in str(e.value) 2097 2098 assert num_iter == 0 2099 logger.info('test_cache_nomap_failure3 Ended.\n') 2100 2101 2102@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2103def test_cache_nomap_failure4(): 2104 """ 2105 Test filter under cache (failure) 2106 2107 repeat 2108 | 2109 Cache 2110 | 2111 Map(decode) 2112 | 2113 Filter 2114 | 2115 CSV 2116 2117 """ 2118 logger.info("Test cache nomap failure 4") 2119 if "SESSION_ID" in os.environ: 2120 session_id = int(os.environ['SESSION_ID']) 2121 else: 2122 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2123 2124 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2125 2126 ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"], 2127 column_names=['col1', 'col2', 'col3', 'col4']) 2128 ds1 = ds1.filter(predicate=lambda data: data < 11, input_columns=["label"]) 2129 2130 decode_op = c_vision.Decode() 2131 ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache) 2132 ds1 = ds1.repeat(4) 2133 2134 with pytest.raises(RuntimeError) as e: 2135 num_iter = 0 2136 for _ in ds1.create_dict_iterator(): 2137 num_iter += 1 2138 assert "FilterNode is not supported as a descendant operator under a cache" in str(e.value) 2139 2140 assert num_iter == 0 2141 logger.info('test_cache_nomap_failure4 Ended.\n') 2142 2143 2144@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2145def test_cache_nomap_failure5(): 2146 """ 2147 Test Map containing random operation under cache (failure) 2148 2149 repeat 2150 | 2151 Cache 2152 | 2153 Map(decode, randomCrop) 2154 | 2155 TextFile 2156 2157 """ 2158 logger.info("Test cache nomap failure 5") 2159 if "SESSION_ID" in os.environ: 2160 session_id = int(os.environ['SESSION_ID']) 2161 else: 2162 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2163 2164 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2165 2166 data = ds.TextFileDataset(TEXT_FILE_DATA_DIR) 2167 random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200]) 2168 decode_op = c_vision.Decode() 2169 2170 data = data.map(input_columns=["image"], operations=decode_op) 2171 data = data.map(input_columns=["image"], operations=random_crop_op, cache=some_cache) 2172 data = data.repeat(4) 2173 2174 with pytest.raises(RuntimeError) as e: 2175 num_iter = 0 2176 for _ in data.create_dict_iterator(): 2177 num_iter += 1 2178 assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value) 2179 2180 assert num_iter == 0 2181 logger.info('test_cache_nomap_failure5 Ended.\n') 2182 2183 2184@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2185def test_cache_nomap_pyfunc_lambda(): 2186 """ 2187 Test cache after map op with a python lambda function. 2188 Only allowed if the lambda function is wrapped by 'pyvision.not_random', otherwise an error will be raised. 2189 2190 Cache 2191 | 2192 Map(lambda function1, lambda function2) 2193 | 2194 TFRecord 2195 """ 2196 logger.info("Test cache nomap pyfunc lambda") 2197 if "SESSION_ID" in os.environ: 2198 session_id = int(os.environ['SESSION_ID']) 2199 else: 2200 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2201 2202 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2203 2204 # This dataset has 12 records in it 2205 data1 = ds.TFRecordDataset(PYFUNC_DATA_DIR, PYFUNC_SCHEMA_DIR, shuffle=False) 2206 transforms = [py_vision.not_random(lambda x: x + x), py_vision.not_random(lambda x: x - 1)] 2207 data1 = data1.map(operations=transforms, input_columns="col0", cache=some_cache) 2208 2209 num_iter = 0 2210 for _ in data1.create_dict_iterator(num_epochs=1): 2211 num_iter += 1 2212 assert num_iter == 12 2213 2214 other_cache = ds.DatasetCache(session_id=session_id, size=0) 2215 ds2 = ds.TFRecordDataset(PYFUNC_DATA_DIR, PYFUNC_SCHEMA_DIR, shuffle=False) 2216 ds2 = ds2.map(operations=[(lambda x: x + x)], input_columns=["col0"], cache=other_cache) 2217 2218 with pytest.raises(RuntimeError) as e: 2219 num_iter = 0 2220 for _ in ds2.create_dict_iterator(): 2221 num_iter += 1 2222 assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value) 2223 logger.info("test_cache_nomap_pyfunc_lambda Ended.\n") 2224 2225 2226@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2227def test_cache_nomap_pyfunc_builtin(): 2228 """ 2229 Test cache after map op with a python builtin PyFunc. 2230 An error will be raised if the builtin pyfunc containing random operation. 2231 2232 Cache 2233 | 2234 Map([builtin pyfunc1, builtin pyfunc2]) 2235 | 2236 TFRecord 2237 """ 2238 logger.info("Test cache nomap pyfunc builtin") 2239 if "SESSION_ID" in os.environ: 2240 session_id = int(os.environ['SESSION_ID']) 2241 else: 2242 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2243 2244 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2245 # This dataset has 3 records in it only 2246 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"]) 2247 ds1 = ds1.map(operations=[py_vision.Decode(), py_vision.ToTensor()], input_columns=["image"], cache=some_cache) 2248 2249 num_iter = 0 2250 for _ in ds1.create_dict_iterator(num_epochs=1): 2251 num_iter += 1 2252 assert num_iter == 3 2253 2254 other_cache = ds.DatasetCache(session_id=session_id, size=0) 2255 # This dataset has 3 records in it only 2256 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"]) 2257 ds2 = ds2.map(operations=[py_vision.Decode(), py_vision.RandomCrop(224), py_vision.ToTensor()], 2258 input_columns=["image"], cache=other_cache) 2259 2260 with pytest.raises(RuntimeError) as e: 2261 num_iter = 0 2262 for _ in ds2.create_dict_iterator(): 2263 num_iter += 1 2264 assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value) 2265 logger.info("test_cache_nomap_pyfunc_builtin Ended.\n") 2266 2267 2268@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2269def test_cache_nomap_pyfunc_function(): 2270 """ 2271 Test cache after map op with a python customized function. 2272 Only allowed if the function is decorated with 'py_vision.not_random', otherwise an error will be raised. 2273 2274 Cache 2275 | 2276 Map([function1, function2]) 2277 | 2278 TFRecord 2279 """ 2280 2281 @py_vision.not_random 2282 def not_random_func(x): 2283 return np.ones(x.shape, dtype=x.dtype) 2284 2285 def normal_func(x): 2286 return np.ones(x.shape, dtype=x.dtype) 2287 2288 logger.info("Test cache nomap pyfunc function") 2289 if "SESSION_ID" in os.environ: 2290 session_id = int(os.environ['SESSION_ID']) 2291 else: 2292 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2293 2294 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2295 # This dataset has 3 records in it only 2296 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"]) 2297 ds1 = ds1.map(operations=[not_random_func, not_random_func], input_columns=["image"], cache=some_cache) 2298 2299 num_iter = 0 2300 for _ in ds1.create_dict_iterator(num_epochs=1): 2301 num_iter += 1 2302 assert num_iter == 3 2303 2304 other_cache = ds.DatasetCache(session_id=session_id, size=0) 2305 # This dataset has 3 records in it only 2306 ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"]) 2307 ds2 = ds2.map(operations=[not_random_func, normal_func], input_columns=["image"], cache=other_cache) 2308 2309 with pytest.raises(RuntimeError) as e: 2310 num_iter = 0 2311 for _ in ds2.create_dict_iterator(): 2312 num_iter += 1 2313 assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value) 2314 logger.info("test_cache_nomap_pyfunc_function Ended.\n") 2315 2316 2317@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2318def test_cache_nomap_all_rows_cached(): 2319 """ 2320 Make sure all rows are cached before we switch to the fetching phase 2321 2322 Cache 2323 | 2324 RandomDataset 2325 """ 2326 2327 logger.info("Test cache nomap all rows cached") 2328 if "SESSION_ID" in os.environ: 2329 session_id = int(os.environ['SESSION_ID']) 2330 else: 2331 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2332 2333 schema = ds.Schema() 2334 schema.add_column('image', de_type=mstype.uint8, 2335 shape=[450, 450, 3]) 2336 schema.add_column('label', de_type=mstype.uint8, shape=[1]) 2337 2338 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2339 2340 # easier to reproduce the problem with 271 total rows 2341 num_total_rows = 271 2342 # User-created sampler here 2343 ds1 = ds.RandomDataset(schema=schema, total_rows=num_total_rows, num_parallel_workers=4, cache=some_cache) 2344 iter1 = ds1.create_dict_iterator() 2345 2346 num_iter = 0 2347 for _ in iter1: 2348 num_iter += 1 2349 logger.info("Number of data in ds1: {} ".format(num_iter)) 2350 assert num_iter == num_total_rows 2351 2352 cache_stat = some_cache.get_stat() 2353 assert cache_stat.num_mem_cached == num_total_rows 2354 2355 logger.info("test_cache_nomap_all_rows_cached Ended.\n") 2356 2357 2358@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2359def test_cache_nomap_dataset_size1(): 2360 """ 2361 Test get_dataset_size() when cache is injected directly after a non-mappable leaf 2362 2363 Cache 2364 | 2365 TFRecord 2366 """ 2367 2368 logger.info("Test cache nomap dataset size 1") 2369 if "SESSION_ID" in os.environ: 2370 session_id = int(os.environ['SESSION_ID']) 2371 else: 2372 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2373 2374 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2375 2376 # This dataset has 3 records in it only 2377 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0, cache=some_cache) 2378 2379 dataset_size = ds1.get_dataset_size() 2380 assert dataset_size == 2 2381 2382 num_iter = 0 2383 for _ in ds1.create_dict_iterator(): 2384 num_iter += 1 2385 2386 logger.info("Number of data in ds1: {} ".format(num_iter)) 2387 assert num_iter == dataset_size 2388 logger.info("test_cache_nomap_dataset_size1 Ended.\n") 2389 2390 2391@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") 2392def test_cache_nomap_dataset_size2(): 2393 """ 2394 Test get_dataset_size() when cache is injected after map 2395 2396 Cache 2397 | 2398 Map(decode) 2399 | 2400 TFRecord 2401 """ 2402 2403 logger.info("Test cache nomap dataset size 2") 2404 if "SESSION_ID" in os.environ: 2405 session_id = int(os.environ['SESSION_ID']) 2406 else: 2407 raise RuntimeError("Testcase requires SESSION_ID environment variable") 2408 2409 some_cache = ds.DatasetCache(session_id=session_id, size=0) 2410 2411 # This dataset has 3 records in it only 2412 ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0) 2413 decode_op = c_vision.Decode() 2414 ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache) 2415 2416 dataset_size = ds1.get_dataset_size() 2417 assert dataset_size == 2 2418 2419 num_iter = 0 2420 for _ in ds1.create_dict_iterator(): 2421 num_iter += 1 2422 2423 logger.info("Number of data in ds1: {} ".format(num_iter)) 2424 assert num_iter == dataset_size 2425 logger.info("test_cache_nomap_dataset_size2 Ended.\n") 2426 2427 2428if __name__ == '__main__': 2429 # This is just a list of tests, don't try to run these tests with 'python test_cache_nomap.py' 2430 # since cache server is required to be brought up first 2431 test_cache_nomap_basic1() 2432 test_cache_nomap_basic2() 2433 test_cache_nomap_basic3() 2434 test_cache_nomap_basic4() 2435 test_cache_nomap_basic5() 2436 test_cache_nomap_basic6() 2437 test_cache_nomap_basic7() 2438 test_cache_nomap_basic8() 2439 test_cache_nomap_basic9() 2440 test_cache_nomap_allowed_share1() 2441 test_cache_nomap_allowed_share2() 2442 test_cache_nomap_allowed_share3() 2443 test_cache_nomap_allowed_share4() 2444 test_cache_nomap_disallowed_share1() 2445 test_cache_nomap_running_twice1() 2446 test_cache_nomap_running_twice2() 2447 test_cache_nomap_extra_small_size1() 2448 test_cache_nomap_extra_small_size2() 2449 test_cache_nomap_parallel_pipeline1(shard=0) 2450 test_cache_nomap_parallel_pipeline2(shard=1) 2451 test_cache_nomap_parallel_workers() 2452 test_cache_nomap_server_workers_1() 2453 test_cache_nomap_server_workers_100() 2454 test_cache_nomap_num_connections_1() 2455 test_cache_nomap_num_connections_100() 2456 test_cache_nomap_prefetch_size_1() 2457 test_cache_nomap_prefetch_size_100() 2458 test_cache_nomap_to_device() 2459 test_cache_nomap_session_destroy() 2460 test_cache_nomap_server_stop() 2461 test_cache_nomap_epoch_ctrl1() 2462 test_cache_nomap_epoch_ctrl2() 2463 test_cache_nomap_epoch_ctrl3() 2464 test_cache_nomap_epoch_ctrl4() 2465 test_cache_nomap_multiple_cache1() 2466 test_cache_nomap_multiple_cache2() 2467 test_cache_nomap_multiple_cache3() 2468 test_cache_nomap_multiple_cache_train() 2469 test_cache_nomap_multiple_cache_eval() 2470 test_cache_nomap_clue1() 2471 test_cache_nomap_clue2() 2472 test_cache_nomap_csv1() 2473 test_cache_nomap_csv2() 2474 test_cache_nomap_textfile1() 2475 test_cache_nomap_textfile2() 2476 test_cache_nomap_nested_repeat() 2477 test_cache_nomap_get_repeat_count() 2478 test_cache_nomap_long_file_list() 2479 test_cache_nomap_failure1() 2480 test_cache_nomap_failure2() 2481 test_cache_nomap_failure3() 2482 test_cache_nomap_failure4() 2483 test_cache_nomap_failure5() 2484 test_cache_nomap_pyfunc_lambda() 2485 test_cache_nomap_pyfunc_builtin() 2486 test_cache_nomap_pyfunc_function() 2487 test_cache_nomap_dataset_size1() 2488 test_cache_nomap_dataset_size2() 2489