• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020-2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Testing cache operator with non-mappable datasets
17"""
18import os
19import itertools
20import numpy as np
21import pytest
22import mindspore.common.dtype as mstype
23import mindspore.dataset as ds
24import mindspore.dataset.text as text
25import mindspore.dataset.vision.c_transforms as c_vision
26import mindspore.dataset.vision.py_transforms as py_vision
27from mindspore import log as logger
28
29DATA_DIR = ["../data/dataset/test_tf_file_3_images/train-0000-of-0001.data"]
30SCHEMA_DIR = "../data/dataset/test_tf_file_3_images/datasetSchema.json"
31
32TEXT_TF_DATA_DIR = ["../data/dataset/testTextTFRecord/text.tfrecord"]
33SCHEMA_DIR2 = "../data/dataset/testTextTFRecord/datasetSchema.json"
34
35TRAIN_DATA_DIR = ["../data/dataset/test_tf_file_3_images2/train-0000-of-0001.data",
36                  "../data/dataset/test_tf_file_3_images2/train-0000-of-0002.data",
37                  "../data/dataset/test_tf_file_3_images2/train-0000-of-0003.data",
38                  "../data/dataset/test_tf_file_3_images2/train-0000-of-0004.data"]
39TRAIN_SCHEMA_DIR = "../data/dataset/test_tf_file_3_images2/datasetSchema.json"
40
41IMAGE_FOLDER_DATA_DIR = "../data/dataset/testImageNetData/train/"
42CLUE_DATA_DIR = '../data/dataset/testCLUE/afqmc/train.json'
43CSV_DATA_DIR = '../data/dataset/testCSV/1.csv'
44TEXT_FILE_DATA_DIR = "../data/dataset/testTextFileDataset/1.txt"
45
46PYFUNC_DATA_DIR = ["../data/dataset/testPyfuncMap/data.data"]
47PYFUNC_SCHEMA_DIR = "../data/dataset/testPyfuncMap/schema.json"
48
49GENERATE_GOLDEN = False
50
51
52@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
53def test_cache_nomap_basic1():
54    """
55    A random dataset (a non mappable dataset) with a cache over it just after the leaf
56    """
57
58    logger.info("Test cache nomap basic 1")
59    if "SESSION_ID" in os.environ:
60        session_id = int(os.environ['SESSION_ID'])
61    else:
62        raise RuntimeError("Testcase requires SESSION_ID environment variable")
63
64    schema = ds.Schema()
65    schema.add_column('image', de_type=mstype.uint8,
66                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
67    schema.add_column('label', de_type=mstype.uint8, shape=[1])
68
69    # create a cache.  arbitrary session_id for now
70    some_cache = ds.DatasetCache(session_id=session_id, size=0)
71
72    # User-created sampler here
73    ds1 = ds.RandomDataset(schema=schema, total_rows=10, num_parallel_workers=4, cache=some_cache)
74    ds1 = ds1.repeat(4)
75
76    num_iter = 0
77    for data in ds1.create_dict_iterator(num_epochs=1):
78        logger.info("printing the label: {}".format(data["label"]))
79        num_iter += 1
80
81    logger.info("Number of data in ds1: {} ".format(num_iter))
82    assert num_iter == 40
83    logger.info("test_cache_nomap_basic1 Ended.\n")
84
85
86@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
87def test_cache_nomap_basic2():
88    """
89    A random dataset (a non mappable dataset) with a cache over it just after the leaf
90    """
91
92    logger.info("Test cache nomap basic 2")
93    if "SESSION_ID" in os.environ:
94        session_id = int(os.environ['SESSION_ID'])
95    else:
96        raise RuntimeError("Testcase requires SESSION_ID environment variable")
97
98    schema = ds.Schema()
99    schema.add_column('image', de_type=mstype.uint8,
100                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
101    schema.add_column('label', de_type=mstype.uint8, shape=[1])
102
103    # create a cache.  arbitrary session_id for now
104    some_cache = ds.DatasetCache(session_id=session_id, size=0)
105
106    # sampler arg not given directly, however any of these args will auto-generate an appropriate sampler:
107    # num_samples, shuffle, num_shards, shard_id
108    # In this case, the presence of num_samples chooses a sampler.
109    ds1 = ds.RandomDataset(schema=schema, total_rows=20, num_samples=20, num_parallel_workers=4, cache=some_cache)
110    ds1 = ds1.repeat(2)
111
112    num_iter = 0
113    for data in ds1.create_dict_iterator(num_epochs=1):
114        logger.info("printing the label: {}".format(data["label"]))
115        num_iter += 1
116
117    logger.info("Number of data in ds1: {} ".format(num_iter))
118    assert num_iter == 40
119    logger.info("test_cache_nomap_basic2 Ended.\n")
120
121
122@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
123def test_cache_nomap_basic3():
124    """
125    A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf
126
127       Repeat
128         |
129     Map(decode)
130         |
131       Cache
132         |
133      TFReader
134    """
135
136    logger.info("Test cache nomap basic 3")
137    if "SESSION_ID" in os.environ:
138        session_id = int(os.environ['SESSION_ID'])
139    else:
140        raise RuntimeError("Testcase requires SESSION_ID environment variable")
141
142    some_cache = ds.DatasetCache(session_id=session_id, size=0)
143    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache)
144    decode_op = c_vision.Decode()
145    ds1 = ds1.map(operations=decode_op, input_columns=["image"])
146    ds1 = ds1.repeat(4)
147
148    num_iter = 0
149    for _ in ds1.create_dict_iterator(num_epochs=1):
150        num_iter += 1
151
152    logger.info("Number of data in ds1: {} ".format(num_iter))
153    assert num_iter == 12
154
155    # Contact the server to get the statistics
156    stat = some_cache.get_stat()
157    cache_sz = stat.avg_cache_sz
158    num_mem_cached = stat.num_mem_cached
159    num_disk_cached = stat.num_disk_cached
160
161    logger.info("Number of rows cached in memory: {}".format(num_mem_cached))
162    logger.info("Number of rows spilled to disk: {}".format(num_disk_cached))
163    logger.info("Average row cache size: {}".format(cache_sz))
164
165    logger.info("test_cache_nomap_basic3 Ended.\n")
166
167
168@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
169def test_cache_nomap_basic4():
170    """
171    A TF reader dataset (a non mappable dataset) with a map decode and cache after it
172    Since a global shuffle is used for the tf reader, it will inject a shuffle op over the tf.
173    But, if there's a cache later, that shuffle becomes invalid and should be removed.
174
175       Repeat
176         |
177       Cache
178         |
179     Map(decode)
180         |
181      TFReader
182    """
183
184    logger.info("Test cache nomap basic 4")
185    if "SESSION_ID" in os.environ:
186        session_id = int(os.environ['SESSION_ID'])
187    else:
188        raise RuntimeError("Testcase requires SESSION_ID environment variable")
189
190    # This dataset has 3 records in it only
191    some_cache = ds.DatasetCache(session_id=session_id, size=0)
192    # With shuffle not being set, TF defaults to a "global" shuffle when there is no cache
193    # in the picture.  This causes a shuffle-injection over the TF.  For clarify, this test will
194    # explicitly give the global option, even though it's the default in python.
195    # But, when caching is added in the ascendent tree above TF, we do global shuffling
196    # through the sampler over the cache, not by the shuffle op.  In that case, tree prepare
197    # will remove the shuffle op that got injected by the initial tree creation.
198    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=ds.Shuffle.GLOBAL)
199    decode_op = c_vision.Decode()
200
201    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
202    ds1 = ds1.repeat(4)
203
204    num_iter = 0
205    for _ in ds1.create_dict_iterator(num_epochs=1):
206        num_iter += 1
207
208    logger.info("Number of data in ds1: {} ".format(num_iter))
209    assert num_iter == 12
210    logger.info("test_cache_nomap_basic4 Ended.\n")
211
212
213@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
214def test_cache_nomap_basic5():
215    """
216    A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf
217    Same as test 3, but this one does not have shuffle arg, causing tf to default to global
218    shuffle which attempts to inject a shuffle operator.  However, since there is a cache
219    we do not need global shuffle, so the shuffle will not be built.  It ends up being
220    identical to test basic 3, however we arrive at the same tree in different codepaths
221    (if there was no cache, then the shuffle IS built)
222
223       Repeat
224         |
225     Map(decode)
226         |
227       Cache
228         |
229      TFReader
230    """
231
232    logger.info("Test cache nomap basic 5")
233    if "SESSION_ID" in os.environ:
234        session_id = int(os.environ['SESSION_ID'])
235    else:
236        raise RuntimeError("Testcase requires SESSION_ID environment variable")
237
238    # This dataset has 3 records in it only
239    some_cache = ds.DatasetCache(session_id=session_id, size=0)
240    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], cache=some_cache)
241    decode_op = c_vision.Decode()
242    ds1 = ds1.map(operations=decode_op, input_columns=["image"])
243    ds1 = ds1.repeat(4)
244
245    num_iter = 0
246    for _ in ds1.create_dict_iterator(num_epochs=1):
247        num_iter += 1
248
249    logger.info("Number of data in ds1: {} ".format(num_iter))
250    assert num_iter == 12
251    logger.info("test_cache_nomap_basic5 Ended.\n")
252
253
254@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
255def test_cache_nomap_basic6():
256    """
257    A TF reader dataset (a non mappable dataset) with a cache over it just after the leaf
258    In this one, the tf dataset will be given sharding configuration, however since a cache is
259    used, the tree prepare should undo the sharding configuration and instead, a distributed
260    sampler will be chosen with the same shard config.
261
262       Repeat
263         |
264     Map(decode)
265         |
266       Cache
267         |
268      TFReader
269    """
270
271    logger.info("Test cache nomap basic 6")
272    if "SESSION_ID" in os.environ:
273        session_id = int(os.environ['SESSION_ID'])
274    else:
275        raise RuntimeError("Testcase requires SESSION_ID environment variable")
276
277    # This dataset has 3 records in it only
278    some_cache = ds.DatasetCache(session_id=session_id, size=0)
279
280    # With only 3 records shard into 3, we expect only 1 record returned for this shard
281    # However, the sharding will be done by the sampler, not by the tf record leaf node
282    # In this case, it is a row-based sharding, not the file-based sharding that would happen if
283    # there was not any cache.
284    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], num_shards=3, shard_id=1, cache=some_cache)
285    decode_op = c_vision.Decode()
286    ds1 = ds1.map(operations=decode_op, input_columns=["image"])
287    ds1 = ds1.repeat(4)
288
289    num_iter = 0
290    for _ in ds1.create_dict_iterator(num_epochs=1):
291        num_iter += 1
292
293    logger.info("Number of data in ds1: {} ".format(num_iter))
294    assert num_iter == 4
295    logger.info("test_cache_nomap_basic6 Ended.\n")
296
297
298@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
299def test_cache_nomap_basic7():
300    """
301    A TF reader dataset (a non mappable dataset) that uses global shuffle, and is cached followed by
302    map.
303    In this one, the tf dataset with global shuffle might want to inject a shuffle op over top of the
304    tf reader, but since a cache is given, it will choose not to.
305
306       Repeat
307         |
308     Map(decode)
309         |
310       cache
311         |
312      TFReader
313    """
314
315    logger.info("Test cache nomap basic 7")
316    if "SESSION_ID" in os.environ:
317        session_id = int(os.environ['SESSION_ID'])
318    else:
319        raise RuntimeError("Testcase requires SESSION_ID environment variable")
320
321    some_cache = ds.DatasetCache(session_id=session_id, size=0)
322
323    # This dataset has 3 records in it only
324    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=ds.Shuffle.GLOBAL, cache=some_cache)
325    decode_op = c_vision.Decode()
326    ds1 = ds1.map(operations=decode_op, input_columns=["image"])
327    ds1 = ds1.repeat(4)
328
329    num_iter = 0
330    for _ in ds1.create_dict_iterator(num_epochs=1):
331        num_iter += 1
332
333    logger.info("Number of data in ds1: {} ".format(num_iter))
334    assert num_iter == 12
335    logger.info("test_cache_nomap_basic7 Ended.\n")
336
337
338@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
339def test_cache_nomap_basic8():
340    """
341    Test cache as root node
342
343       cache
344         |
345      TFReader
346    """
347    logger.info("Test cache basic 8")
348    if "SESSION_ID" in os.environ:
349        session_id = int(os.environ['SESSION_ID'])
350    else:
351        raise RuntimeError("Testcase requires SESSION_ID environment variable")
352    some_cache = ds.DatasetCache(session_id=session_id, size=0)
353
354    # This dataset has 3 records in it only
355    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
356    num_iter = 0
357    for _ in ds1.create_dict_iterator(num_epochs=1):
358        logger.info("get data from dataset")
359        num_iter += 1
360
361    logger.info("Number of data in ds1: {} ".format(num_iter))
362    assert num_iter == 3
363    logger.info('test_cache_basic8 Ended.\n')
364
365
366@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
367def test_cache_nomap_basic9():
368    """
369    Testing the get_stat interface for getting some info from server, but this should fail if the cache is not created
370    in a pipeline.
371    """
372
373    logger.info("Test cache nomap basic 9")
374    if "SESSION_ID" in os.environ:
375        session_id = int(os.environ['SESSION_ID'])
376    else:
377        raise RuntimeError("Testcase requires SESSION_ID environment variable")
378
379    some_cache = ds.DatasetCache(session_id=session_id, size=0)
380
381    # Contact the server to get the statistics, this should fail because we have not used this cache in any pipeline
382    # so there will not be any cache to get stats on.
383    with pytest.raises(RuntimeError) as e:
384        stat = some_cache.get_stat()
385        cache_sz = stat.avg_cache_sz
386        logger.info("Average row cache size: {}".format(cache_sz))
387    assert "Unexpected error" in str(e.value)
388
389    logger.info("test_cache_nomap_basic9 Ended.\n")
390
391
392@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
393def test_cache_nomap_allowed_share1():
394    """
395    It is allowed to share the cache between the following two trees:
396
397       Repeat     Shuffle
398         |           |
399       Cache       Cache
400         |           |
401      TFReader    TFReader
402    """
403
404    logger.info("Test cache nomap allowed share 1")
405    if "SESSION_ID" in os.environ:
406        session_id = int(os.environ['SESSION_ID'])
407    else:
408        raise RuntimeError("Testcase requires SESSION_ID environment variable")
409
410    ds.config.set_seed(1)
411    # This dataset has 3 records in it only
412    some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=32)
413    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache)
414    ds1 = ds1.repeat(4)
415
416    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False, cache=some_cache)
417    ds2 = ds2.shuffle(buffer_size=2)
418
419    num_iter = 0
420    for _ in ds1.create_dict_iterator(num_epochs=1):
421        num_iter += 1
422    assert num_iter == 12
423    logger.info("Number of data in ds1: {} ".format(num_iter))
424
425    num_iter = 0
426    for _ in ds2.create_dict_iterator(num_epochs=1):
427        num_iter += 1
428    assert num_iter == 3
429    logger.info("test_cache_nomap_allowed_share1 Ended.\n")
430
431
432@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
433def test_cache_nomap_allowed_share2():
434    """
435    It is allowed to share the cache between the following two trees (with map decode):
436
437       Repeat     Shuffle
438         |           |
439       Cache       Cache
440         |           |
441     Map(decode) Map(decode)
442         |           |
443      TFReader    TFReader
444    """
445
446    logger.info("Test cache nomap allowed share 2")
447    if "SESSION_ID" in os.environ:
448        session_id = int(os.environ['SESSION_ID'])
449    else:
450        raise RuntimeError("Testcase requires SESSION_ID environment variable")
451
452    ds.config.set_seed(1)
453    # This dataset has 3 records in it only
454    some_cache = ds.DatasetCache(session_id=session_id, size=0)
455    decode_op = c_vision.Decode()
456
457    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
458    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
459    ds1 = ds1.repeat(4)
460
461    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
462    ds2 = ds2.map(operations=decode_op, input_columns=["image"], cache=some_cache)
463    ds2 = ds2.shuffle(buffer_size=2)
464
465    num_iter = 0
466    for _ in ds1.create_dict_iterator(num_epochs=1):
467        num_iter += 1
468    logger.info("Number of data in ds1: {} ".format(num_iter))
469    assert num_iter == 12
470
471    num_iter = 0
472    for _ in ds2.create_dict_iterator(num_epochs=1):
473        num_iter += 1
474    assert num_iter == 3
475    logger.info("test_cache_nomap_allowed_share2 Ended.\n")
476
477
478@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
479def test_cache_nomap_allowed_share3():
480    """
481    It is allowed to share the cache between the following two trees (different shard ids):
482
483       Repeat                     Repeat
484         |                          |
485       Cache                      Cache
486         |                          |
487      TFReader(shard_id = 0)     TFReader(shard_id = 1)
488    """
489
490    logger.info("Test cache nomap allowed share 3")
491    if "SESSION_ID" in os.environ:
492        session_id = int(os.environ['SESSION_ID'])
493    else:
494        raise RuntimeError("Testcase requires SESSION_ID environment variable")
495
496    some_cache = ds.DatasetCache(session_id=session_id, size=0)
497
498    tf_files = ["../data/dataset/tf_file_dataset/test1.data", "../data/dataset/tf_file_dataset/test2.data"]
499    ds1 = ds.TFRecordDataset(tf_files, num_shards=2, shard_id=0, num_samples=3, shuffle=False, cache=some_cache)
500    ds1 = ds1.repeat(4)
501
502    ds2 = ds.TFRecordDataset(tf_files, num_shards=2, shard_id=1, num_samples=3, shuffle=False, cache=some_cache)
503    ds2 = ds2.repeat(4)
504
505    num_iter = 0
506    for _ in ds1.create_dict_iterator(num_epochs=1):
507        num_iter += 1
508    logger.info("Number of data in ds1: {} ".format(num_iter))
509    assert num_iter == 12
510
511    num_iter = 0
512    for _ in ds2.create_dict_iterator(num_epochs=1):
513        num_iter += 1
514    assert num_iter == 12
515    logger.info("test_cache_nomap_allowed_share3 Ended.\n")
516
517
518@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
519def test_cache_nomap_allowed_share4():
520    """
521    It is allowed to share the cache between the following two trees:
522
523       Cache                                  Cache
524         |                                      |
525     Map(decode, num_parallel_workers=1)    Map(decode, num_parallel_workers=2)
526         |                                      |
527      TFReader                              TFReader
528    """
529
530    logger.info("Test cache nomap allowed share 4")
531    if "SESSION_ID" in os.environ:
532        session_id = int(os.environ['SESSION_ID'])
533    else:
534        raise RuntimeError("Testcase requires SESSION_ID environment variable")
535
536    # This dataset has 3 records in it only
537    some_cache = ds.DatasetCache(session_id=session_id, size=0)
538    decode_op = c_vision.Decode()
539
540    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
541    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache, num_parallel_workers=1)
542
543    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
544    ds2 = ds2.map(operations=decode_op, input_columns=["image"], cache=some_cache, num_parallel_workers=2)
545
546    num_iter = 0
547    for _ in ds1.create_dict_iterator(num_epochs=1):
548        num_iter += 1
549    logger.info("Number of data in ds1: {} ".format(num_iter))
550    assert num_iter == 3
551
552    num_iter = 0
553    for _ in ds2.create_dict_iterator(num_epochs=1):
554        num_iter += 1
555    logger.info("Number of data in ds2: {} ".format(num_iter))
556    assert num_iter == 3
557
558    logger.info("test_cache_nomap_allowed_share4 Ended.\n")
559
560
561@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
562def test_cache_nomap_disallowed_share1():
563    """
564    It is not allowed to share the cache between the following two trees:
565
566       Cache       Cache
567         |           |
568     Map(decode) Map(rescale)
569         |           |
570      TFReader    TFReader
571    """
572
573    logger.info("Test cache nomap disallowed share1")
574    if "SESSION_ID" in os.environ:
575        session_id = int(os.environ['SESSION_ID'])
576    else:
577        raise RuntimeError("Testcase requires SESSION_ID environment variable")
578
579    # This dataset has 3 records in it only
580    some_cache = ds.DatasetCache(session_id=session_id, size=0)
581    decode_op = c_vision.Decode()
582    rescale_op = c_vision.Rescale(1.0 / 255.0, -1.0)
583
584    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
585    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
586
587    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
588    ds2 = ds2.map(operations=rescale_op, input_columns=["image"], cache=some_cache)
589
590    num_iter = 0
591    for _ in ds1.create_dict_iterator(num_epochs=1):
592        num_iter += 1
593    logger.info("Number of data in ds1: {} ".format(num_iter))
594    assert num_iter == 3
595
596    with pytest.raises(RuntimeError) as e:
597        sum([1 for _ in ds2])
598    assert "Cannot re-use a cache for a different tree!" in str(e.value)
599
600    logger.info("test_cache_nomap_disallowed_share1 Ended.\n")
601
602
603@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
604def test_cache_nomap_running_twice1():
605    """
606    Executing the same pipeline for twice (from python), with cache injected after map
607
608       Repeat
609         |
610       Cache
611         |
612     Map(decode)
613         |
614     TFRecord
615    """
616
617    logger.info("Test cache nomap running twice 1")
618    if "SESSION_ID" in os.environ:
619        session_id = int(os.environ['SESSION_ID'])
620    else:
621        raise RuntimeError("Testcase requires SESSION_ID environment variable")
622
623    some_cache = ds.DatasetCache(session_id=session_id, size=0)
624
625    # This dataset has 3 records in it only
626    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
627    decode_op = c_vision.Decode()
628    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
629    ds1 = ds1.repeat(4)
630
631    num_iter = 0
632    for _ in ds1.create_dict_iterator():
633        num_iter += 1
634    logger.info("Number of data in ds1: {} ".format(num_iter))
635    assert num_iter == 12
636
637    num_iter = 0
638    for _ in ds1.create_dict_iterator():
639        num_iter += 1
640    logger.info("Number of data in ds1: {} ".format(num_iter))
641    assert num_iter == 12
642
643    logger.info("test_cache_nomap_running_twice1 Ended.\n")
644
645
646@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
647def test_cache_nomap_running_twice2():
648    """
649    Executing the same pipeline for twice (from shell), with cache injected after leaf
650
651       Repeat
652         |
653     Map(decode)
654         |
655       Cache
656         |
657     TFRecord
658    """
659
660    logger.info("Test cache nomap running twice 2")
661    if "SESSION_ID" in os.environ:
662        session_id = int(os.environ['SESSION_ID'])
663    else:
664        raise RuntimeError("Testcase requires SESSION_ID environment variable")
665
666    some_cache = ds.DatasetCache(session_id=session_id, size=0)
667
668    # This dataset has 3 records in it only
669    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
670    decode_op = c_vision.Decode()
671    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
672    ds1 = ds1.repeat(4)
673
674    num_iter = 0
675    for _ in ds1.create_dict_iterator():
676        num_iter += 1
677
678    logger.info("Number of data in ds1: {} ".format(num_iter))
679    assert num_iter == 12
680    logger.info("test_cache_nomap_running_twice2 Ended.\n")
681
682
683@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
684def test_cache_nomap_extra_small_size1():
685    """
686    Test running pipeline with cache of extra small size and spilling true
687
688       Repeat
689         |
690     Map(decode)
691         |
692       Cache
693         |
694     TFRecord
695    """
696
697    logger.info("Test cache nomap extra small size 1")
698    if "SESSION_ID" in os.environ:
699        session_id = int(os.environ['SESSION_ID'])
700    else:
701        raise RuntimeError("Testcase requires SESSION_ID environment variable")
702    some_cache = ds.DatasetCache(session_id=session_id, size=1, spilling=True)
703
704    # This dataset has 3 records in it only
705    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
706    decode_op = c_vision.Decode()
707    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
708    ds1 = ds1.repeat(4)
709
710    num_iter = 0
711    for _ in ds1.create_dict_iterator():
712        num_iter += 1
713
714    logger.info("Number of data in ds1: {} ".format(num_iter))
715    assert num_iter == 12
716    logger.info("test_cache_nomap_extra_small_size1 Ended.\n")
717
718
719@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
720def test_cache_nomap_extra_small_size2():
721    """
722    Test running pipeline with cache of extra small size and spilling false (failure)
723
724       Repeat
725         |
726       Cache
727         |
728     Map(decode)
729         |
730     TFRecord
731    """
732
733    logger.info("Test cache nomap extra small size 2")
734    if "SESSION_ID" in os.environ:
735        session_id = int(os.environ['SESSION_ID'])
736    else:
737        raise RuntimeError("Testcase requires SESSION_ID environment variable")
738    some_cache = ds.DatasetCache(session_id=session_id, size=1, spilling=False)
739
740    # This dataset has 3 records in it only
741    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
742    decode_op = c_vision.Decode()
743    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
744    ds1 = ds1.repeat(4)
745
746    with pytest.raises(RuntimeError) as e:
747        sum([1 for _ in ds1])
748    assert "Out of memory" in str(e.value)
749    logger.info("test_cache_nomap_extra_small_size2 Ended.\n")
750
751
752@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
753def test_cache_nomap_parallel_pipeline1(shard):
754    """
755    Test running two parallel pipelines (sharing cache) with cache injected after leaf op
756
757       Repeat
758         |
759     Map(decode)
760         |
761       cache
762         |
763      TFReader
764    """
765
766    logger.info("Test cache nomap parallel pipeline 1")
767    if "SESSION_ID" in os.environ:
768        session_id = int(os.environ['SESSION_ID'])
769    else:
770        raise RuntimeError("Testcase requires SESSION_ID environment variable")
771    some_cache = ds.DatasetCache(session_id=session_id, size=0)
772
773    # This dataset has 3 records in it only
774    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=3, shard_id=int(shard), cache=some_cache)
775    decode_op = c_vision.Decode()
776    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
777    ds1 = ds1.repeat(4)
778
779    num_iter = 0
780    for _ in ds1.create_dict_iterator(num_epochs=1):
781        num_iter += 1
782
783    logger.info("Number of data in ds1: {} ".format(num_iter))
784    assert num_iter == 4
785    logger.info("test_cache_nomap_parallel_pipeline1 Ended.\n")
786
787
788@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
789def test_cache_nomap_parallel_pipeline2(shard):
790    """
791    Test running two parallel pipelines (sharing cache) with cache injected after map op
792
793       Repeat
794         |
795       cache
796         |
797     Map(decode)
798         |
799      TFReader
800    """
801
802    logger.info("Test cache nomap parallel pipeline 2")
803    if "SESSION_ID" in os.environ:
804        session_id = int(os.environ['SESSION_ID'])
805    else:
806        raise RuntimeError("Testcase requires SESSION_ID environment variable")
807    some_cache = ds.DatasetCache(session_id=session_id, size=0)
808
809    # This dataset has 3 records in it only
810    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=3, shard_id=int(shard))
811    decode_op = c_vision.Decode()
812    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
813    ds1 = ds1.repeat(4)
814
815    num_iter = 0
816    for _ in ds1.create_dict_iterator(num_epochs=1):
817        num_iter += 1
818
819    logger.info("Number of data in ds1: {} ".format(num_iter))
820    assert num_iter == 4
821    logger.info("test_cache_nomap_parallel_pipeline2 Ended.\n")
822
823
824@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
825def test_cache_nomap_parallel_workers():
826    """
827    Test cache with num_parallel_workers > 1 set for map op and leaf op
828
829       Repeat
830         |
831     Map(decode)
832         |
833       cache
834         |
835      TFReader
836    """
837
838    logger.info("Test cache nomap parallel workers")
839    if "SESSION_ID" in os.environ:
840        session_id = int(os.environ['SESSION_ID'])
841    else:
842        raise RuntimeError("Testcase requires SESSION_ID environment variable")
843    some_cache = ds.DatasetCache(session_id=session_id, size=0)
844
845    # This dataset has 3 records in it only
846    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_parallel_workers=4)
847    decode_op = c_vision.Decode()
848    ds1 = ds1.map(input_columns=["image"], operations=decode_op, num_parallel_workers=4, cache=some_cache)
849    ds1 = ds1.repeat(4)
850
851    num_iter = 0
852    for _ in ds1.create_dict_iterator(num_epochs=1):
853        num_iter += 1
854
855    logger.info("Number of data in ds1: {} ".format(num_iter))
856    assert num_iter == 12
857    logger.info("test_cache_nomap_parallel_workers Ended.\n")
858
859
860@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
861def test_cache_nomap_server_workers_1():
862    """
863    start cache server with --workers 1 and then test cache function
864
865       Repeat
866         |
867       cache
868         |
869     Map(decode)
870         |
871      TFRecord
872    """
873
874    logger.info("Test cache nomap server workers 1")
875    if "SESSION_ID" in os.environ:
876        session_id = int(os.environ['SESSION_ID'])
877    else:
878        raise RuntimeError("Testcase requires SESSION_ID environment variable")
879
880    some_cache = ds.DatasetCache(session_id=session_id, size=0)
881
882    # This dataset has 3 records in it only
883    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
884    decode_op = c_vision.Decode()
885    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
886    ds1 = ds1.repeat(4)
887
888    num_iter = 0
889    for _ in ds1.create_dict_iterator():
890        num_iter += 1
891
892    logger.info("Number of data in ds1: {} ".format(num_iter))
893    assert num_iter == 12
894    logger.info("test_cache_nomap_server_workers_1 Ended.\n")
895
896
897@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
898def test_cache_nomap_server_workers_100():
899    """
900    start cache server with --workers 100 and then test cache function
901
902       Repeat
903         |
904     Map(decode)
905         |
906       cache
907         |
908      TFRecord
909    """
910
911    logger.info("Test cache nomap server workers 100")
912    if "SESSION_ID" in os.environ:
913        session_id = int(os.environ['SESSION_ID'])
914    else:
915        raise RuntimeError("Testcase requires SESSION_ID environment variable")
916
917    some_cache = ds.DatasetCache(session_id=session_id, size=0)
918
919    # This dataset has 3 records in it only
920    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
921    decode_op = c_vision.Decode()
922    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
923    ds1 = ds1.repeat(4)
924
925    num_iter = 0
926    for _ in ds1.create_dict_iterator():
927        num_iter += 1
928
929    logger.info("Number of data in ds1: {} ".format(num_iter))
930    assert num_iter == 12
931    logger.info("test_cache_nomap_server_workers_100 Ended.\n")
932
933
934@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
935def test_cache_nomap_num_connections_1():
936    """
937    Test setting num_connections=1 in DatasetCache
938
939       Repeat
940         |
941       cache
942         |
943     Map(decode)
944         |
945      TFRecord
946    """
947
948    logger.info("Test cache nomap num_connections 1")
949    if "SESSION_ID" in os.environ:
950        session_id = int(os.environ['SESSION_ID'])
951    else:
952        raise RuntimeError("Testcase requires SESSION_ID environment variable")
953
954    some_cache = ds.DatasetCache(session_id=session_id, size=0, num_connections=1)
955
956    # This dataset has 3 records in it only
957    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
958    decode_op = c_vision.Decode()
959    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
960    ds1 = ds1.repeat(4)
961
962    num_iter = 0
963    for _ in ds1.create_dict_iterator():
964        num_iter += 1
965
966    logger.info("Number of data in ds1: {} ".format(num_iter))
967    assert num_iter == 12
968    logger.info("test_cache_nomap_num_connections_1 Ended.\n")
969
970
971@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
972def test_cache_nomap_num_connections_100():
973    """
974    Test setting num_connections=100 in DatasetCache
975
976       Repeat
977         |
978     Map(decode)
979         |
980       cache
981         |
982      TFRecord
983    """
984
985    logger.info("Test cache nomap num_connections 100")
986    if "SESSION_ID" in os.environ:
987        session_id = int(os.environ['SESSION_ID'])
988    else:
989        raise RuntimeError("Testcase requires SESSION_ID environment variable")
990
991    some_cache = ds.DatasetCache(session_id=session_id, size=0, num_connections=100)
992
993    # This dataset has 3 records in it only
994    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
995    decode_op = c_vision.Decode()
996    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
997    ds1 = ds1.repeat(4)
998
999    num_iter = 0
1000    for _ in ds1.create_dict_iterator():
1001        num_iter += 1
1002
1003    logger.info("Number of data in ds1: {} ".format(num_iter))
1004    assert num_iter == 12
1005    logger.info("test_cache_nomap_num_connections_100 Ended.\n")
1006
1007
1008@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1009def test_cache_nomap_prefetch_size_1():
1010    """
1011    Test setting prefetch_size=1 in DatasetCache
1012
1013       Repeat
1014         |
1015       cache
1016         |
1017     Map(decode)
1018         |
1019      TFRecord
1020    """
1021
1022    logger.info("Test cache nomap prefetch_size 1")
1023    if "SESSION_ID" in os.environ:
1024        session_id = int(os.environ['SESSION_ID'])
1025    else:
1026        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1027
1028    some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=1)
1029
1030    # This dataset has 3 records in it only
1031    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1032    decode_op = c_vision.Decode()
1033    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
1034    ds1 = ds1.repeat(4)
1035
1036    num_iter = 0
1037    for _ in ds1.create_dict_iterator():
1038        num_iter += 1
1039
1040    logger.info("Number of data in ds1: {} ".format(num_iter))
1041    assert num_iter == 12
1042    logger.info("test_cache_nomap_prefetch_size_1 Ended.\n")
1043
1044
1045@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1046def test_cache_nomap_prefetch_size_100():
1047    """
1048    Test setting prefetch_size=100 in DatasetCache
1049
1050       Repeat
1051         |
1052     Map(decode)
1053         |
1054       cache
1055         |
1056      TFRecord
1057    """
1058
1059    logger.info("Test cache nomap prefetch_size 100")
1060    if "SESSION_ID" in os.environ:
1061        session_id = int(os.environ['SESSION_ID'])
1062    else:
1063        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1064
1065    some_cache = ds.DatasetCache(session_id=session_id, size=0, prefetch_size=100)
1066
1067    # This dataset has 3 records in it only
1068    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
1069    decode_op = c_vision.Decode()
1070    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
1071    ds1 = ds1.repeat(4)
1072
1073    num_iter = 0
1074    for _ in ds1.create_dict_iterator():
1075        num_iter += 1
1076
1077    logger.info("Number of data in ds1: {} ".format(num_iter))
1078    assert num_iter == 12
1079    logger.info("test_cache_nomap_prefetch_size_100 Ended.\n")
1080
1081
1082@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1083def test_cache_nomap_to_device():
1084    """
1085    Test cache with to_device
1086
1087     DeviceQueue
1088         |
1089      EpochCtrl
1090         |
1091       Repeat
1092         |
1093     Map(decode)
1094         |
1095       cache
1096         |
1097      TFReader
1098    """
1099
1100    logger.info("Test cache nomap to_device")
1101    if "SESSION_ID" in os.environ:
1102        session_id = int(os.environ['SESSION_ID'])
1103    else:
1104        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1105
1106    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1107
1108    # This dataset has 3 records in it only
1109    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1110    decode_op = c_vision.Decode()
1111    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
1112    ds1 = ds1.repeat(4)
1113    ds1 = ds1.to_device()
1114    ds1.send()
1115
1116    logger.info("test_cache_nomap_to_device Ended.\n")
1117
1118
1119@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1120def test_cache_nomap_session_destroy():
1121    """
1122    Test executing cache_admin -d while the pipeline is running
1123
1124       Repeat
1125         |
1126       Cache
1127         |
1128     RandomDataset
1129    """
1130
1131    logger.info("Test cache nomap session destroy")
1132    if "SESSION_ID" in os.environ:
1133        session_id = int(os.environ['SESSION_ID'])
1134    else:
1135        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1136
1137    schema = ds.Schema()
1138    schema.add_column('image', de_type=mstype.uint8,
1139                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
1140    schema.add_column('label', de_type=mstype.uint8, shape=[1])
1141
1142    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1143
1144    # User-created sampler here
1145    ds1 = ds.RandomDataset(schema=schema, num_parallel_workers=4, cache=some_cache)
1146    ds1 = ds1.repeat()
1147
1148    with pytest.raises(RuntimeError) as e:
1149        num_iter = 0
1150        for _ in ds1.create_dict_iterator():
1151            num_iter += 1
1152    assert "Unexpected error" in str(e.value)
1153
1154    logger.info("test_cache_nomap_session_destroy Ended.\n")
1155
1156
1157@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1158def test_cache_nomap_server_stop():
1159    """
1160    Test executing cache_admin --stop while the pipeline is running
1161
1162       Repeat
1163         |
1164       Cache
1165         |
1166     RandomDataset
1167    """
1168
1169    logger.info("Test cache nomap server stop")
1170    if "SESSION_ID" in os.environ:
1171        session_id = int(os.environ['SESSION_ID'])
1172    else:
1173        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1174
1175    schema = ds.Schema()
1176    schema.add_column('image', de_type=mstype.uint8,
1177                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
1178    schema.add_column('label', de_type=mstype.uint8, shape=[1])
1179
1180    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1181
1182    # User-created sampler here
1183    ds1 = ds.RandomDataset(schema=schema, num_parallel_workers=4, cache=some_cache)
1184    ds1 = ds1.repeat()
1185
1186    with pytest.raises(RuntimeError) as e:
1187        num_iter = 0
1188        for _ in ds1.create_dict_iterator():
1189            num_iter += 1
1190    assert "Network error. Cache server with port 50052 is unreachable. Make sure the server is running." in \
1191           str(e.value)
1192
1193    logger.info("test_cache_nomap_server_stop Ended.\n")
1194
1195
1196@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1197def test_cache_nomap_interrupt_and_rerun():
1198    """
1199    Test interrupt a running pipeline and then re-use the same cache to run another pipeline
1200
1201       Cache
1202         |
1203     RandomDataset
1204    """
1205
1206    logger.info("Test cache nomap interrupt and rerun")
1207    if "SESSION_ID" in os.environ:
1208        session_id = int(os.environ['SESSION_ID'])
1209    else:
1210        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1211
1212    schema = ds.Schema()
1213    schema.add_column('image', de_type=mstype.uint8,
1214                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
1215    schema.add_column('label', de_type=mstype.uint8, shape=[1])
1216
1217    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1218
1219    # User-created sampler here
1220    ds1 = ds.RandomDataset(schema=schema, total_rows=10000, num_parallel_workers=4, cache=some_cache)
1221    iter1 = ds1.create_dict_iterator()
1222
1223    num_iter = 0
1224    with pytest.raises(AttributeError) as e:
1225        for _ in iter1:
1226            num_iter += 1
1227            if num_iter == 10:
1228                iter1.stop()
1229    assert "'DictIterator' object has no attribute '_runtime_context'" in str(e.value)
1230
1231    num_epoch = 2
1232    iter2 = ds1.create_dict_iterator(num_epochs=num_epoch)
1233    epoch_count = 0
1234    for _ in range(num_epoch):
1235        num_iter = 0
1236        for _ in iter2:
1237            num_iter += 1
1238        logger.info("Number of data in ds1: {} ".format(num_iter))
1239        assert num_iter == 10000
1240        epoch_count += 1
1241
1242    cache_stat = some_cache.get_stat()
1243    assert cache_stat.num_mem_cached == 10000
1244
1245    logger.info("test_cache_nomap_interrupt_and_rerun Ended.\n")
1246
1247
1248@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1249def test_cache_nomap_epoch_ctrl1():
1250    """
1251    Test using two-loops method to run several epochs
1252
1253     Map(decode)
1254         |
1255       cache
1256         |
1257      TFRecord
1258    """
1259
1260    logger.info("Test cache nomap epoch ctrl1")
1261    if "SESSION_ID" in os.environ:
1262        session_id = int(os.environ['SESSION_ID'])
1263    else:
1264        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1265
1266    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1267
1268    # This dataset has 3 records in it only
1269    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
1270    decode_op = c_vision.Decode()
1271    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
1272
1273    num_epoch = 5
1274    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch)
1275
1276    epoch_count = 0
1277    for _ in range(num_epoch):
1278        row_count = 0
1279        for _ in iter1:
1280            row_count += 1
1281        logger.info("Number of data in ds1: {} ".format(row_count))
1282        assert row_count == 3
1283        epoch_count += 1
1284    assert epoch_count == num_epoch
1285    logger.info("test_cache_nomap_epoch_ctrl1 Ended.\n")
1286
1287
1288@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1289def test_cache_nomap_epoch_ctrl2():
1290    """
1291    Test using two-loops method with infinite epochs
1292
1293        cache
1294         |
1295     Map(decode)
1296         |
1297      TFRecord
1298    """
1299
1300    logger.info("Test cache nomap epoch ctrl2")
1301    if "SESSION_ID" in os.environ:
1302        session_id = int(os.environ['SESSION_ID'])
1303    else:
1304        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1305
1306    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1307
1308    # This dataset has 3 records in it only
1309    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1310    decode_op = c_vision.Decode()
1311    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
1312
1313    num_epoch = 5
1314    # iter1 will always assume there is a next epoch and never shutdown
1315    iter1 = ds1.create_dict_iterator()
1316
1317    epoch_count = 0
1318    for _ in range(num_epoch):
1319        row_count = 0
1320        for _ in iter1:
1321            row_count += 1
1322        logger.info("Number of data in ds1: {} ".format(row_count))
1323        assert row_count == 3
1324        epoch_count += 1
1325    assert epoch_count == num_epoch
1326
1327    # manually stop the iterator
1328    iter1.stop()
1329    logger.info("test_cache_nomap_epoch_ctrl2 Ended.\n")
1330
1331
1332@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1333def test_cache_nomap_epoch_ctrl3():
1334    """
1335    Test using two-loops method with infinite epochs over repeat
1336
1337       repeat
1338         |
1339     Map(decode)
1340         |
1341       cache
1342         |
1343      TFRecord
1344    """
1345
1346    logger.info("Test cache nomap epoch ctrl3")
1347    if "SESSION_ID" in os.environ:
1348        session_id = int(os.environ['SESSION_ID'])
1349    else:
1350        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1351
1352    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1353
1354    # This dataset has 3 records in it only
1355    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
1356    decode_op = c_vision.Decode()
1357    ds1 = ds1.map(input_columns=["image"], operations=decode_op)
1358    ds1 = ds1.repeat(2)
1359
1360    num_epoch = 5
1361    # iter1 will always assume there is a next epoch and never shutdown
1362    iter1 = ds1.create_dict_iterator()
1363
1364    epoch_count = 0
1365    for _ in range(num_epoch):
1366        row_count = 0
1367        for _ in iter1:
1368            row_count += 1
1369        logger.info("Number of data in ds1: {} ".format(row_count))
1370        assert row_count == 6
1371        epoch_count += 1
1372    assert epoch_count == num_epoch
1373
1374    # reply on garbage collector to destroy iter1
1375
1376    logger.info("test_cache_nomap_epoch_ctrl3 Ended.\n")
1377
1378
1379@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1380def test_cache_nomap_epoch_ctrl4():
1381    """
1382    Test using two-loops method with repeat under cache
1383
1384        cache
1385         |
1386     Map(decode)
1387         |
1388       repeat
1389         |
1390      TFRecord
1391    """
1392
1393    logger.info("Test cache nomap epoch ctrl4")
1394    if "SESSION_ID" in os.environ:
1395        session_id = int(os.environ['SESSION_ID'])
1396    else:
1397        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1398
1399    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1400
1401    # This dataset has 3 records in it only
1402    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1403    ds1 = ds1.repeat(2)
1404    decode_op = c_vision.Decode()
1405    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
1406
1407    num_epoch = 5
1408    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch)
1409
1410    epoch_count = 0
1411    for _ in range(num_epoch):
1412        row_count = 0
1413        for _ in iter1:
1414            row_count += 1
1415        logger.info("Number of data in ds1: {} ".format(row_count))
1416        assert row_count == 6
1417        epoch_count += 1
1418    assert epoch_count == num_epoch
1419
1420    logger.info("test_cache_nomap_epoch_ctrl4 Ended.\n")
1421
1422
1423@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1424def test_cache_nomap_multiple_cache1():
1425    """
1426    Test multiple cache in the same python script
1427
1428       cache                  cache
1429         |                      |
1430    Map(decode)             Map(decode)
1431         |                      |
1432    TFRecord(train)        TFRecord(eval)
1433    """
1434
1435    logger.info("Test cache nomap multiple cache 1")
1436    if "SESSION_ID" in os.environ:
1437        session_id = int(os.environ['SESSION_ID'])
1438    else:
1439        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1440
1441    train_cache = ds.DatasetCache(session_id=session_id, size=0)
1442    eval_cache = ds.DatasetCache(session_id=session_id, size=0)
1443
1444    # This dataset has 12 records in it
1445    train_dataset = ds.TFRecordDataset(TRAIN_DATA_DIR, TRAIN_SCHEMA_DIR)
1446    decode_op = c_vision.Decode()
1447    train_dataset = train_dataset.map(input_columns=["image"], operations=decode_op, cache=train_cache)
1448
1449    # This dataset has 3 records in it only
1450    eval_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1451    eval_dataset = eval_dataset.map(input_columns=["image"], operations=decode_op, cache=eval_cache)
1452
1453    num_epoch = 5
1454    train_iter = train_dataset.create_dict_iterator(num_epochs=num_epoch)
1455    eval_iter = eval_dataset.create_dict_iterator(num_epochs=num_epoch)
1456
1457    epoch_count = 0
1458    for _ in range(num_epoch):
1459        assert sum([1 for _ in train_iter]) == 12
1460        assert sum([1 for _ in eval_iter]) == 3
1461        epoch_count += 1
1462    assert epoch_count == num_epoch
1463
1464    logger.info("test_cache_nomap_multiple_cache1 Ended.\n")
1465
1466
1467@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1468def test_cache_nomap_multiple_cache2():
1469    """
1470    Test multiple cache in the same python script
1471
1472       cache
1473         |
1474    Map(decode)               cache
1475         |                      |
1476    TFRecord(image)        TFRecord(text)
1477    """
1478
1479    logger.info("Test cache nomap multiple cache 2")
1480    if "SESSION_ID" in os.environ:
1481        session_id = int(os.environ['SESSION_ID'])
1482    else:
1483        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1484
1485    image_cache = ds.DatasetCache(session_id=session_id, size=0)
1486    text_cache = ds.DatasetCache(session_id=session_id, size=0)
1487
1488    # This dataset has 3 records in it only
1489    image_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1490    decode_op = c_vision.Decode()
1491    image_dataset = image_dataset.map(input_columns=["image"], operations=decode_op, cache=image_cache)
1492
1493    # This dataset has 3 records in it only
1494    text_dataset = ds.TFRecordDataset(TEXT_TF_DATA_DIR, SCHEMA_DIR2, cache=text_cache)
1495
1496    num_epoch = 5
1497    image_iter = image_dataset.create_dict_iterator(num_epochs=num_epoch)
1498    text_iter = text_dataset.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1499
1500    epoch_count = 0
1501    for _ in range(num_epoch):
1502        row_count = 0
1503        for _, _ in itertools.zip_longest(image_iter, text_iter):
1504            row_count += 1
1505        assert row_count == 3
1506        epoch_count += 1
1507    assert epoch_count == num_epoch
1508
1509    logger.info("test_cache_nomap_multiple_cache2 Ended.\n")
1510
1511
1512@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1513def test_cache_nomap_multiple_cache3():
1514    """
1515    Test multiple cache in the same python script
1516
1517       cache                   cache
1518         |                      |
1519    Map(decode)             Map(decode)
1520         |                      |
1521    TFRecord                ImageFolder
1522    """
1523
1524    logger.info("Test cache nomap multiple cache 3")
1525    if "SESSION_ID" in os.environ:
1526        session_id = int(os.environ['SESSION_ID'])
1527    else:
1528        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1529
1530    tf_cache = ds.DatasetCache(session_id=session_id, size=0)
1531    image_cache = ds.DatasetCache(session_id=session_id, size=0)
1532
1533    # This dataset has 3 records in it only
1534    tf_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1535    decode_op = c_vision.Decode()
1536    tf_dataset = tf_dataset.map(input_columns=["image"], operations=decode_op, cache=tf_cache)
1537
1538    # This DATA_DIR only has 2 images in it
1539    image_dataset = ds.ImageFolderDataset(dataset_dir=IMAGE_FOLDER_DATA_DIR)
1540    image_dataset = image_dataset.map(input_columns=["image"], operations=decode_op, cache=image_cache)
1541
1542    num_epoch = 5
1543    tf_iter = tf_dataset.create_dict_iterator(num_epochs=num_epoch)
1544    image_iter = image_dataset.create_dict_iterator(num_epochs=num_epoch)
1545
1546    epoch_count = 0
1547    for _ in range(num_epoch):
1548        assert sum([1 for _ in tf_iter]) == 3
1549        assert sum([1 for _ in image_iter]) == 2
1550        epoch_count += 1
1551    assert epoch_count == num_epoch
1552
1553    logger.info("test_cache_nomap_multiple_cache3 Ended.\n")
1554
1555
1556@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1557def test_cache_nomap_multiple_cache_train():
1558    """
1559    Test multiple cache in different python scripts. This test case is going to run concurrently with
1560    test_cache_nomap_multiple_cache_eval.
1561
1562       cache
1563         |
1564    Map(decode)
1565         |
1566    TFRecord(train)
1567    """
1568
1569    logger.info("Test cache nomap multiple cache train")
1570    if "SESSION_ID" in os.environ:
1571        session_id = int(os.environ['SESSION_ID'])
1572    else:
1573        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1574
1575    train_cache = ds.DatasetCache(session_id=session_id, size=0)
1576
1577    # This dataset has 12 records in it
1578    train_dataset = ds.TFRecordDataset(TRAIN_DATA_DIR, TRAIN_SCHEMA_DIR)
1579    decode_op = c_vision.Decode()
1580    train_dataset = train_dataset.map(input_columns=["image"], operations=decode_op, cache=train_cache)
1581
1582    num_epoch = 5
1583    train_iter = train_dataset.create_dict_iterator(num_epochs=num_epoch)
1584
1585    epoch_count = 0
1586    for _ in range(num_epoch):
1587        assert sum([1 for _ in train_iter]) == 12
1588        epoch_count += 1
1589    assert epoch_count == num_epoch
1590
1591    logger.info("test_cache_nomap_multiple_cache_train Ended.\n")
1592
1593
1594@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1595def test_cache_nomap_multiple_cache_eval():
1596    """
1597    Test multiple cache in different python scripts. This test case is going to run concurrently with
1598    test_cache_nomap_multiple_cache_train.
1599
1600       cache
1601         |
1602    Map(decode)
1603         |
1604    TFRecord(eval)
1605    """
1606
1607    logger.info("Test cache nomap multiple cache eval")
1608    if "SESSION_ID" in os.environ:
1609        session_id = int(os.environ['SESSION_ID'])
1610    else:
1611        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1612
1613    eval_cache = ds.DatasetCache(session_id=session_id, size=0)
1614
1615    # This dataset only has 3 records in it
1616    eval_dataset = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1617    decode_op = c_vision.Decode()
1618    eval_dataset = eval_dataset.map(input_columns=["image"], operations=decode_op, cache=eval_cache)
1619
1620    num_epoch = 5
1621    eval_iter = eval_dataset.create_dict_iterator(num_epochs=num_epoch)
1622
1623    epoch_count = 0
1624    for _ in range(num_epoch):
1625        assert sum([1 for _ in eval_iter]) == 3
1626        epoch_count += 1
1627    assert epoch_count == num_epoch
1628
1629    logger.info("test_cache_nomap_multiple_cache_eval Ended.\n")
1630
1631
1632@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1633def test_cache_nomap_clue1():
1634    """
1635    A clue dataset (a non mappable dataset) with a cache over it just after the leaf
1636    In this one, the clue dataset will be given sharding configuration, however since a cache is
1637    used, the tree prepare should undo the sharding configuration and instead, a distributed
1638    sampler will be chosen with the same shard config.
1639
1640       Cache
1641         |
1642       CLUE
1643    """
1644
1645    logger.info("Test cache nomap clue 1")
1646    if "SESSION_ID" in os.environ:
1647        session_id = int(os.environ['SESSION_ID'])
1648    else:
1649        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1650
1651    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1652
1653    # With only 3 records shard into 3, we expect only 1 record returned for this shard
1654    # However, the sharding will be done by the sampler, not by the clue leaf node
1655    # In this case, it is a row-based sharding, not the file-based sharding that would happen if
1656    # there was not any cache.
1657    ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train', num_shards=3, shard_id=1, cache=some_cache)
1658
1659    num_epoch = 4
1660    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1661
1662    epoch_count = 0
1663    for _ in range(num_epoch):
1664        assert sum([1 for _ in iter1]) == 1
1665        epoch_count += 1
1666    assert epoch_count == num_epoch
1667
1668    logger.info("test_cache_nomap_clue1 Ended.\n")
1669
1670
1671@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1672def test_cache_nomap_clue2():
1673    """
1674    A clue dataset (a non mappable dataset) with a cache over it after map
1675    In this one, a num_samples argument is given
1676
1677       Cache
1678         |
1679    map(lambda x: x)
1680         |
1681       CLUE
1682    """
1683
1684    logger.info("Test cache nomap clue 2")
1685    if "SESSION_ID" in os.environ:
1686        session_id = int(os.environ['SESSION_ID'])
1687    else:
1688        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1689
1690    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1691
1692    ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train', num_samples=2)
1693    ds1 = ds1.map(py_vision.not_random(lambda x: x), ["label"], cache=some_cache)
1694
1695    num_epoch = 4
1696    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1697
1698    epoch_count = 0
1699    for _ in range(num_epoch):
1700        assert sum([1 for _ in iter1]) == 2
1701        epoch_count += 1
1702    assert epoch_count == num_epoch
1703
1704    logger.info("test_cache_nomap_clue2 Ended.\n")
1705
1706
1707@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1708def test_cache_nomap_csv1():
1709    """
1710    A csv dataset (a non mappable dataset) with a cache over it just after the leaf
1711    In this one, the csv dataset will be given sharding configuration, however since a cache is
1712    used, the tree prepare should undo the sharding configuration and instead, a distributed
1713    sampler will be chosen with the same shard config.
1714
1715       Cache
1716         |
1717       CSV
1718    """
1719
1720    logger.info("Test cache nomap csv 1")
1721    if "SESSION_ID" in os.environ:
1722        session_id = int(os.environ['SESSION_ID'])
1723    else:
1724        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1725
1726    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1727
1728    # With only 3 records shard into 3, we expect only 1 record returned for this shard
1729    # However, the sharding will be done by the sampler, not by the clue leaf node
1730    # In this case, it is a row-based sharding, not the file-based sharding that would happen if
1731    # there was not any cache.
1732    ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"],
1733                        column_names=['col1', 'col2', 'col3', 'col4'], num_shards=3, shard_id=1, cache=some_cache)
1734
1735    num_epoch = 4
1736    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1737
1738    epoch_count = 0
1739    for _ in range(num_epoch):
1740        assert sum([1 for _ in iter1]) == 1
1741        epoch_count += 1
1742    assert epoch_count == num_epoch
1743
1744    logger.info("test_cache_nomap_csv1 Ended.\n")
1745
1746
1747@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1748def test_cache_nomap_csv2():
1749    """
1750    A csv dataset (a non mappable dataset) with a cache over it after map
1751    In this one, a num_samples argument is given
1752
1753       Cache
1754         |
1755    map(lambda x: x)
1756         |
1757       CSV
1758    """
1759
1760    logger.info("Test cache nomap csv 2")
1761    if "SESSION_ID" in os.environ:
1762        session_id = int(os.environ['SESSION_ID'])
1763    else:
1764        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1765
1766    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1767
1768    ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"],
1769                        column_names=['col1', 'col2', 'col3', 'col4'], num_samples=2)
1770    ds1 = ds1.map(py_vision.not_random(lambda x: x), ["col1"], cache=some_cache)
1771
1772    num_epoch = 4
1773    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1774
1775    epoch_count = 0
1776    for _ in range(num_epoch):
1777        assert sum([1 for _ in iter1]) == 2
1778        epoch_count += 1
1779    assert epoch_count == num_epoch
1780
1781    logger.info("test_cache_nomap_csv2 Ended.\n")
1782
1783
1784@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1785def test_cache_nomap_textfile1():
1786    """
1787    A text file dataset (a non mappable dataset) with a cache over it just after the leaf
1788    In this one, the text file dataset will be given sharding configuration, however since a cache is
1789    used, the tree prepare should undo the sharding configuration and instead, a distributed
1790    sampler will be chosen with the same shard config.
1791
1792       Cache
1793         |
1794     TextFile
1795    """
1796
1797    logger.info("Test cache nomap textfile 1")
1798    if "SESSION_ID" in os.environ:
1799        session_id = int(os.environ['SESSION_ID'])
1800    else:
1801        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1802
1803    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1804
1805    # With only 3 records shard into 3, we expect only 1 record returned for this shard
1806    # However, the sharding will be done by the sampler, not by the clue leaf node
1807    # In this case, it is a row-based sharding, not the file-based sharding that would happen if
1808    # there was not any cache.
1809    ds1 = ds.TextFileDataset(TEXT_FILE_DATA_DIR, num_shards=3, shard_id=1, cache=some_cache)
1810
1811    num_epoch = 4
1812    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1813
1814    epoch_count = 0
1815    for _ in range(num_epoch):
1816        assert sum([1 for _ in iter1]) == 1
1817        epoch_count += 1
1818    assert epoch_count == num_epoch
1819
1820    logger.info("test_cache_nomap_textfile1 Ended.\n")
1821
1822
1823@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1824def test_cache_nomap_textfile2():
1825    """
1826    A text file dataset (a non mappable dataset) with a cache over it after map
1827    In this one, a num_samples argument is given
1828
1829       Cache
1830         |
1831    Map(tokenizer)
1832         |
1833     TextFile
1834    """
1835
1836    def my_tokenizer(line):
1837        words = line.split()
1838        if not words:
1839            return [""]
1840        return words
1841
1842    logger.info("Test cache nomap textfile 2")
1843    if "SESSION_ID" in os.environ:
1844        session_id = int(os.environ['SESSION_ID'])
1845    else:
1846        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1847
1848    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1849
1850    ds1 = ds.TextFileDataset(TEXT_FILE_DATA_DIR, num_samples=2)
1851    tokenizer = text.PythonTokenizer(my_tokenizer)
1852    ds1 = ds1.map(operations=tokenizer, cache=some_cache)
1853
1854    num_epoch = 4
1855    iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True)
1856
1857    epoch_count = 0
1858    for _ in range(num_epoch):
1859        assert sum([1 for _ in iter1]) == 2
1860        epoch_count += 1
1861    assert epoch_count == num_epoch
1862
1863    logger.info("test_cache_nomap_textfile2 Ended.\n")
1864
1865
1866@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1867def test_cache_nomap_nested_repeat():
1868    """
1869    Test cache on pipeline with nested repeat ops
1870
1871        Repeat
1872          |
1873        Cache
1874          |
1875      Map(decode)
1876          |
1877        Repeat
1878          |
1879      TFRecord
1880    """
1881
1882    logger.info("Test cache nomap nested repeat")
1883    if "SESSION_ID" in os.environ:
1884        session_id = int(os.environ['SESSION_ID'])
1885    else:
1886        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1887
1888    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1889
1890    # This dataset has 3 records in it only
1891    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR)
1892    decode_op = c_vision.Decode()
1893    ds1 = ds1.repeat(4)
1894    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
1895    ds1 = ds1.repeat(2)
1896
1897    num_iter = 0
1898    for _ in ds1.create_dict_iterator(num_epochs=1):
1899        logger.info("get data from dataset")
1900        num_iter += 1
1901
1902    logger.info("Number of data in ds1: {} ".format(num_iter))
1903    assert num_iter == 24
1904    logger.info('test_cache_nomap_nested_repeat Ended.\n')
1905
1906
1907@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1908def test_cache_nomap_get_repeat_count():
1909    """
1910    Test get_repeat_count() for a pipeline with cache and nested repeat ops
1911
1912        Cache
1913          |
1914      Map(decode)
1915          |
1916        Repeat
1917          |
1918      TFRecord
1919    """
1920
1921    logger.info("Test cache nomap get_repeat_count")
1922    if "SESSION_ID" in os.environ:
1923        session_id = int(os.environ['SESSION_ID'])
1924    else:
1925        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1926
1927    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1928
1929    # This dataset has 3 records in it only
1930    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"], shuffle=False)
1931    ds1 = ds1.repeat(4)
1932    decode_op = c_vision.Decode()
1933    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
1934
1935    repeat_count = ds1.get_repeat_count()
1936    logger.info("repeat_count: {}".format(repeat_count))
1937    assert repeat_count == 4
1938
1939    num_iter = 0
1940    for _ in ds1.create_dict_iterator(num_epochs=1):
1941        logger.info("get data from dataset")
1942        num_iter += 1
1943    assert num_iter == 12
1944
1945
1946@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1947def test_cache_nomap_long_file_list():
1948    """
1949    Test cache after TFRecord with a long list of files as arguments
1950
1951        Cache
1952          |
1953      TFRecord
1954    """
1955
1956    logger.info("Test cache nomap long file list")
1957    if "SESSION_ID" in os.environ:
1958        session_id = int(os.environ['SESSION_ID'])
1959    else:
1960        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1961
1962    some_cache = ds.DatasetCache(session_id=session_id, size=1)
1963
1964    ds1 = ds.TFRecordDataset([DATA_DIR[0] for _ in range(0, 1000)], SCHEMA_DIR, columns_list=["image"],
1965                             cache=some_cache)
1966
1967    with pytest.raises(RuntimeError) as e:
1968        sum([1 for _ in ds1])
1969    assert "Out of memory" in str(e.value)
1970    logger.info("test_cache_nomap_long_file_list Ended.\n")
1971
1972
1973@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
1974def test_cache_nomap_failure1():
1975    """
1976    Test nested cache (failure)
1977
1978        Repeat
1979          |
1980        Cache
1981          |
1982      Map(decode)
1983          |
1984        Cache
1985          |
1986      TFRecord
1987
1988    """
1989    logger.info("Test cache nomap failure 1")
1990    if "SESSION_ID" in os.environ:
1991        session_id = int(os.environ['SESSION_ID'])
1992    else:
1993        raise RuntimeError("Testcase requires SESSION_ID environment variable")
1994
1995    some_cache = ds.DatasetCache(session_id=session_id, size=0)
1996
1997    # This dataset has 3 records in it only
1998    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, cache=some_cache)
1999    decode_op = c_vision.Decode()
2000    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
2001    ds1 = ds1.repeat(4)
2002
2003    with pytest.raises(RuntimeError) as e:
2004        ds1.get_batch_size()
2005    assert "Nested cache operations" in str(e.value)
2006
2007    with pytest.raises(RuntimeError) as e:
2008        num_iter = 0
2009        for _ in ds1.create_dict_iterator(num_epochs=1):
2010            num_iter += 1
2011    assert "Nested cache operations" in str(e.value)
2012
2013    assert num_iter == 0
2014    logger.info('test_cache_nomap_failure1 Ended.\n')
2015
2016
2017@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2018def test_cache_nomap_failure2():
2019    """
2020    Test zip under cache (failure)
2021
2022               repeat
2023                  |
2024                Cache
2025                  |
2026             Map(decode)
2027                  |
2028                 Zip
2029                |    |
2030           Random    Random
2031
2032    """
2033    logger.info("Test cache nomap failure 2")
2034    if "SESSION_ID" in os.environ:
2035        session_id = int(os.environ['SESSION_ID'])
2036    else:
2037        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2038
2039    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2040
2041    schema = ds.Schema()
2042    schema.add_column('image', de_type=mstype.uint8,
2043                      shape=[640, 480, 3])  # 921600 bytes (a bit less than 1 MB per image)
2044    schema.add_column('label', de_type=mstype.uint8, shape=[1])
2045
2046    ds1 = ds.RandomDataset(schema=schema)
2047    ds2 = ds.RandomDataset(schema=schema)
2048    dsz = ds.zip((ds1, ds2))
2049    decode_op = c_vision.Decode()
2050    dsz = dsz.map(input_columns=["image"], operations=decode_op, cache=some_cache)
2051    dsz = dsz.repeat(4)
2052
2053    with pytest.raises(RuntimeError) as e:
2054        num_iter = 0
2055        for _ in dsz.create_dict_iterator():
2056            num_iter += 1
2057    assert "ZipNode is not supported as a descendant operator under a cache" in str(e.value)
2058
2059    assert num_iter == 0
2060    logger.info('test_cache_nomap_failure2 Ended.\n')
2061
2062
2063@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2064def test_cache_nomap_failure3():
2065    """
2066    Test batch under cache (failure)
2067
2068               repeat
2069                  |
2070                Cache
2071                  |
2072             Map(resize)
2073                  |
2074                Batch
2075                  |
2076                Clue
2077    """
2078    logger.info("Test cache nomap failure 3")
2079    if "SESSION_ID" in os.environ:
2080        session_id = int(os.environ['SESSION_ID'])
2081    else:
2082        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2083
2084    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2085
2086    ds1 = ds.CLUEDataset(CLUE_DATA_DIR, task='AFQMC', usage='train')
2087    ds1 = ds1.batch(2)
2088    resize_op = c_vision.Resize((224, 224))
2089    ds1 = ds1.map(input_columns=["image"], operations=resize_op, cache=some_cache)
2090    ds1 = ds1.repeat(4)
2091
2092    with pytest.raises(RuntimeError) as e:
2093        num_iter = 0
2094        for _ in ds1.create_dict_iterator():
2095            num_iter += 1
2096    assert "BatchNode is not supported as a descendant operator under a cache" in str(e.value)
2097
2098    assert num_iter == 0
2099    logger.info('test_cache_nomap_failure3 Ended.\n')
2100
2101
2102@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2103def test_cache_nomap_failure4():
2104    """
2105    Test filter under cache (failure)
2106
2107               repeat
2108                  |
2109                Cache
2110                  |
2111             Map(decode)
2112                  |
2113                Filter
2114                  |
2115                 CSV
2116
2117    """
2118    logger.info("Test cache nomap failure 4")
2119    if "SESSION_ID" in os.environ:
2120        session_id = int(os.environ['SESSION_ID'])
2121    else:
2122        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2123
2124    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2125
2126    ds1 = ds.CSVDataset(CSV_DATA_DIR, column_defaults=["1", "2", "3", "4"],
2127                        column_names=['col1', 'col2', 'col3', 'col4'])
2128    ds1 = ds1.filter(predicate=lambda data: data < 11, input_columns=["label"])
2129
2130    decode_op = c_vision.Decode()
2131    ds1 = ds1.map(input_columns=["image"], operations=decode_op, cache=some_cache)
2132    ds1 = ds1.repeat(4)
2133
2134    with pytest.raises(RuntimeError) as e:
2135        num_iter = 0
2136        for _ in ds1.create_dict_iterator():
2137            num_iter += 1
2138    assert "FilterNode is not supported as a descendant operator under a cache" in str(e.value)
2139
2140    assert num_iter == 0
2141    logger.info('test_cache_nomap_failure4 Ended.\n')
2142
2143
2144@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2145def test_cache_nomap_failure5():
2146    """
2147    Test Map containing random operation under cache (failure)
2148
2149               repeat
2150                  |
2151                Cache
2152                  |
2153             Map(decode, randomCrop)
2154                  |
2155              TextFile
2156
2157    """
2158    logger.info("Test cache nomap failure 5")
2159    if "SESSION_ID" in os.environ:
2160        session_id = int(os.environ['SESSION_ID'])
2161    else:
2162        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2163
2164    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2165
2166    data = ds.TextFileDataset(TEXT_FILE_DATA_DIR)
2167    random_crop_op = c_vision.RandomCrop([512, 512], [200, 200, 200, 200])
2168    decode_op = c_vision.Decode()
2169
2170    data = data.map(input_columns=["image"], operations=decode_op)
2171    data = data.map(input_columns=["image"], operations=random_crop_op, cache=some_cache)
2172    data = data.repeat(4)
2173
2174    with pytest.raises(RuntimeError) as e:
2175        num_iter = 0
2176        for _ in data.create_dict_iterator():
2177            num_iter += 1
2178    assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value)
2179
2180    assert num_iter == 0
2181    logger.info('test_cache_nomap_failure5 Ended.\n')
2182
2183
2184@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2185def test_cache_nomap_pyfunc_lambda():
2186    """
2187    Test cache after map op with a python lambda function.
2188    Only allowed if the lambda function is wrapped by 'pyvision.not_random', otherwise an error will be raised.
2189
2190        Cache
2191          |
2192        Map(lambda function1, lambda function2)
2193          |
2194      TFRecord
2195    """
2196    logger.info("Test cache nomap pyfunc lambda")
2197    if "SESSION_ID" in os.environ:
2198        session_id = int(os.environ['SESSION_ID'])
2199    else:
2200        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2201
2202    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2203
2204    # This dataset has 12 records in it
2205    data1 = ds.TFRecordDataset(PYFUNC_DATA_DIR, PYFUNC_SCHEMA_DIR, shuffle=False)
2206    transforms = [py_vision.not_random(lambda x: x + x), py_vision.not_random(lambda x: x - 1)]
2207    data1 = data1.map(operations=transforms, input_columns="col0", cache=some_cache)
2208
2209    num_iter = 0
2210    for _ in data1.create_dict_iterator(num_epochs=1):
2211        num_iter += 1
2212    assert num_iter == 12
2213
2214    other_cache = ds.DatasetCache(session_id=session_id, size=0)
2215    ds2 = ds.TFRecordDataset(PYFUNC_DATA_DIR, PYFUNC_SCHEMA_DIR, shuffle=False)
2216    ds2 = ds2.map(operations=[(lambda x: x + x)], input_columns=["col0"], cache=other_cache)
2217
2218    with pytest.raises(RuntimeError) as e:
2219        num_iter = 0
2220        for _ in ds2.create_dict_iterator():
2221            num_iter += 1
2222    assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value)
2223    logger.info("test_cache_nomap_pyfunc_lambda Ended.\n")
2224
2225
2226@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2227def test_cache_nomap_pyfunc_builtin():
2228    """
2229    Test cache after map op with a python builtin PyFunc.
2230    An error will be raised if the builtin pyfunc containing random operation.
2231
2232        Cache
2233          |
2234     Map([builtin pyfunc1, builtin pyfunc2])
2235          |
2236      TFRecord
2237    """
2238    logger.info("Test cache nomap pyfunc builtin")
2239    if "SESSION_ID" in os.environ:
2240        session_id = int(os.environ['SESSION_ID'])
2241    else:
2242        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2243
2244    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2245    # This dataset has 3 records in it only
2246    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"])
2247    ds1 = ds1.map(operations=[py_vision.Decode(), py_vision.ToTensor()], input_columns=["image"], cache=some_cache)
2248
2249    num_iter = 0
2250    for _ in ds1.create_dict_iterator(num_epochs=1):
2251        num_iter += 1
2252    assert num_iter == 3
2253
2254    other_cache = ds.DatasetCache(session_id=session_id, size=0)
2255    # This dataset has 3 records in it only
2256    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"])
2257    ds2 = ds2.map(operations=[py_vision.Decode(), py_vision.RandomCrop(224), py_vision.ToTensor()],
2258                  input_columns=["image"], cache=other_cache)
2259
2260    with pytest.raises(RuntimeError) as e:
2261        num_iter = 0
2262        for _ in ds2.create_dict_iterator():
2263            num_iter += 1
2264    assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value)
2265    logger.info("test_cache_nomap_pyfunc_builtin Ended.\n")
2266
2267
2268@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2269def test_cache_nomap_pyfunc_function():
2270    """
2271    Test cache after map op with a python customized function.
2272    Only allowed if the function is decorated with 'py_vision.not_random', otherwise an error will be raised.
2273
2274        Cache
2275          |
2276     Map([function1, function2])
2277          |
2278      TFRecord
2279    """
2280
2281    @py_vision.not_random
2282    def not_random_func(x):
2283        return np.ones(x.shape, dtype=x.dtype)
2284
2285    def normal_func(x):
2286        return np.ones(x.shape, dtype=x.dtype)
2287
2288    logger.info("Test cache nomap pyfunc function")
2289    if "SESSION_ID" in os.environ:
2290        session_id = int(os.environ['SESSION_ID'])
2291    else:
2292        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2293
2294    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2295    # This dataset has 3 records in it only
2296    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"])
2297    ds1 = ds1.map(operations=[not_random_func, not_random_func], input_columns=["image"], cache=some_cache)
2298
2299    num_iter = 0
2300    for _ in ds1.create_dict_iterator(num_epochs=1):
2301        num_iter += 1
2302    assert num_iter == 3
2303
2304    other_cache = ds.DatasetCache(session_id=session_id, size=0)
2305    # This dataset has 3 records in it only
2306    ds2 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, columns_list=["image"])
2307    ds2 = ds2.map(operations=[not_random_func, normal_func], input_columns=["image"], cache=other_cache)
2308
2309    with pytest.raises(RuntimeError) as e:
2310        num_iter = 0
2311        for _ in ds2.create_dict_iterator():
2312            num_iter += 1
2313    assert "MapNode containing random operation is not supported as a descendant of cache" in str(e.value)
2314    logger.info("test_cache_nomap_pyfunc_function Ended.\n")
2315
2316
2317@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2318def test_cache_nomap_all_rows_cached():
2319    """
2320    Make sure all rows are cached before we switch to the fetching phase
2321
2322       Cache
2323         |
2324     RandomDataset
2325    """
2326
2327    logger.info("Test cache nomap all rows cached")
2328    if "SESSION_ID" in os.environ:
2329        session_id = int(os.environ['SESSION_ID'])
2330    else:
2331        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2332
2333    schema = ds.Schema()
2334    schema.add_column('image', de_type=mstype.uint8,
2335                      shape=[450, 450, 3])
2336    schema.add_column('label', de_type=mstype.uint8, shape=[1])
2337
2338    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2339
2340    # easier to reproduce the problem with 271 total rows
2341    num_total_rows = 271
2342    # User-created sampler here
2343    ds1 = ds.RandomDataset(schema=schema, total_rows=num_total_rows, num_parallel_workers=4, cache=some_cache)
2344    iter1 = ds1.create_dict_iterator()
2345
2346    num_iter = 0
2347    for _ in iter1:
2348        num_iter += 1
2349    logger.info("Number of data in ds1: {} ".format(num_iter))
2350    assert num_iter == num_total_rows
2351
2352    cache_stat = some_cache.get_stat()
2353    assert cache_stat.num_mem_cached == num_total_rows
2354
2355    logger.info("test_cache_nomap_all_rows_cached Ended.\n")
2356
2357
2358@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2359def test_cache_nomap_dataset_size1():
2360    """
2361    Test get_dataset_size() when cache is injected directly after a non-mappable leaf
2362
2363       Cache
2364         |
2365      TFRecord
2366    """
2367
2368    logger.info("Test cache nomap dataset size 1")
2369    if "SESSION_ID" in os.environ:
2370        session_id = int(os.environ['SESSION_ID'])
2371    else:
2372        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2373
2374    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2375
2376    # This dataset has 3 records in it only
2377    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0, cache=some_cache)
2378
2379    dataset_size = ds1.get_dataset_size()
2380    assert dataset_size == 2
2381
2382    num_iter = 0
2383    for _ in ds1.create_dict_iterator():
2384        num_iter += 1
2385
2386    logger.info("Number of data in ds1: {} ".format(num_iter))
2387    assert num_iter == dataset_size
2388    logger.info("test_cache_nomap_dataset_size1 Ended.\n")
2389
2390
2391@pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server")
2392def test_cache_nomap_dataset_size2():
2393    """
2394    Test get_dataset_size() when cache is injected after map
2395
2396       Cache
2397         |
2398    Map(decode)
2399         |
2400     TFRecord
2401    """
2402
2403    logger.info("Test cache nomap dataset size 2")
2404    if "SESSION_ID" in os.environ:
2405        session_id = int(os.environ['SESSION_ID'])
2406    else:
2407        raise RuntimeError("Testcase requires SESSION_ID environment variable")
2408
2409    some_cache = ds.DatasetCache(session_id=session_id, size=0)
2410
2411    # This dataset has 3 records in it only
2412    ds1 = ds.TFRecordDataset(DATA_DIR, SCHEMA_DIR, num_shards=2, shard_id=0)
2413    decode_op = c_vision.Decode()
2414    ds1 = ds1.map(operations=decode_op, input_columns=["image"], cache=some_cache)
2415
2416    dataset_size = ds1.get_dataset_size()
2417    assert dataset_size == 2
2418
2419    num_iter = 0
2420    for _ in ds1.create_dict_iterator():
2421        num_iter += 1
2422
2423    logger.info("Number of data in ds1: {} ".format(num_iter))
2424    assert num_iter == dataset_size
2425    logger.info("test_cache_nomap_dataset_size2 Ended.\n")
2426
2427
2428if __name__ == '__main__':
2429    # This is just a list of tests, don't try to run these tests with 'python test_cache_nomap.py'
2430    # since cache server is required to be brought up first
2431    test_cache_nomap_basic1()
2432    test_cache_nomap_basic2()
2433    test_cache_nomap_basic3()
2434    test_cache_nomap_basic4()
2435    test_cache_nomap_basic5()
2436    test_cache_nomap_basic6()
2437    test_cache_nomap_basic7()
2438    test_cache_nomap_basic8()
2439    test_cache_nomap_basic9()
2440    test_cache_nomap_allowed_share1()
2441    test_cache_nomap_allowed_share2()
2442    test_cache_nomap_allowed_share3()
2443    test_cache_nomap_allowed_share4()
2444    test_cache_nomap_disallowed_share1()
2445    test_cache_nomap_running_twice1()
2446    test_cache_nomap_running_twice2()
2447    test_cache_nomap_extra_small_size1()
2448    test_cache_nomap_extra_small_size2()
2449    test_cache_nomap_parallel_pipeline1(shard=0)
2450    test_cache_nomap_parallel_pipeline2(shard=1)
2451    test_cache_nomap_parallel_workers()
2452    test_cache_nomap_server_workers_1()
2453    test_cache_nomap_server_workers_100()
2454    test_cache_nomap_num_connections_1()
2455    test_cache_nomap_num_connections_100()
2456    test_cache_nomap_prefetch_size_1()
2457    test_cache_nomap_prefetch_size_100()
2458    test_cache_nomap_to_device()
2459    test_cache_nomap_session_destroy()
2460    test_cache_nomap_server_stop()
2461    test_cache_nomap_epoch_ctrl1()
2462    test_cache_nomap_epoch_ctrl2()
2463    test_cache_nomap_epoch_ctrl3()
2464    test_cache_nomap_epoch_ctrl4()
2465    test_cache_nomap_multiple_cache1()
2466    test_cache_nomap_multiple_cache2()
2467    test_cache_nomap_multiple_cache3()
2468    test_cache_nomap_multiple_cache_train()
2469    test_cache_nomap_multiple_cache_eval()
2470    test_cache_nomap_clue1()
2471    test_cache_nomap_clue2()
2472    test_cache_nomap_csv1()
2473    test_cache_nomap_csv2()
2474    test_cache_nomap_textfile1()
2475    test_cache_nomap_textfile2()
2476    test_cache_nomap_nested_repeat()
2477    test_cache_nomap_get_repeat_count()
2478    test_cache_nomap_long_file_list()
2479    test_cache_nomap_failure1()
2480    test_cache_nomap_failure2()
2481    test_cache_nomap_failure3()
2482    test_cache_nomap_failure4()
2483    test_cache_nomap_failure5()
2484    test_cache_nomap_pyfunc_lambda()
2485    test_cache_nomap_pyfunc_builtin()
2486    test_cache_nomap_pyfunc_function()
2487    test_cache_nomap_dataset_size1()
2488    test_cache_nomap_dataset_size2()
2489