1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <string>
17 #include "minddata/dataset/core/client.h"
18 #include "minddata/dataset/engine/cache/cache_client.h"
19 #include "minddata/dataset/engine/execution_tree.h"
20 #include "minddata/dataset/engine/datasetops/cache_op.h"
21 #include "minddata/dataset/engine/datasetops/cache_lookup_op.h"
22 #include "minddata/dataset/engine/datasetops/cache_merge_op.h"
23 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
24 #include "minddata/dataset/engine/datasetops/source/tf_reader_op.h"
25 #include "minddata/dataset/engine/jagged_connector.h"
26 #include "common/common.h"
27 #include "gtest/gtest.h"
28 #include "utils/log_adapter.h"
29 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
30 #include "minddata/dataset/engine/data_schema.h"
31
32 using namespace mindspore::dataset;
33 using mindspore::LogStream;
34 using mindspore::dataset::CacheClient;
35 using mindspore::dataset::TaskGroup;
36 using mindspore::ExceptionType::NoExceptionType;
37 using mindspore::MsLogLevel::INFO;
38
39 // Helper function to get the session id from SESSION_ID env variable
GetSessionFromEnv(session_id_type * session_id)40 Status GetSessionFromEnv(session_id_type *session_id) {
41 RETURN_UNEXPECTED_IF_NULL(session_id);
42 if (const char *session_env = std::getenv("SESSION_ID")) {
43 std::string session_id_str(session_env);
44 try {
45 *session_id = std::stoul(session_id_str);
46 } catch (const std::exception &e) {
47 std::string err_msg = "Invalid numeric value for session id in env var: " + session_id_str;
48 return Status(StatusCode::kMDSyntaxError, err_msg);
49 }
50 } else {
51 RETURN_STATUS_UNEXPECTED("Test case requires a session id to be provided via SESSION_ID environment variable.");
52 }
53 return Status::OK();
54 }
55
56 class MindDataTestCacheOp : public UT::DatasetOpTesting {
57 public:
SetUp()58 void SetUp() override {
59 DatasetOpTesting::SetUp();
60 GlobalInit();
61 }
62 };
63
TEST_F(MindDataTestCacheOp,DISABLED_TestCacheServer)64 TEST_F(MindDataTestCacheOp, DISABLED_TestCacheServer) {
65 Status rc;
66 CacheClient::Builder builder;
67 session_id_type env_session;
68 rc = GetSessionFromEnv(&env_session);
69 ASSERT_TRUE(rc.IsOk());
70
71 // use arbitrary session of 1, size of 0, spilling// is true
72 builder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
73 std::shared_ptr<CacheClient> myClient;
74 rc = builder.Build(&myClient);
75 ASSERT_TRUE(rc.IsOk());
76 // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
77 rc = myClient->CreateCache(1, true);
78 ASSERT_TRUE(rc.IsOk());
79 std::cout << *myClient << std::endl;
80
81 // Create a schema using the C api's
82 int32_t rank = 0; // not used
83 std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
84 // 2 columns. First column is an "image" 640,480,3
85 TensorShape c1Shape({640, 480, 3});
86 ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
87 rank, // not used
88 &c1Shape);
89 // Column 2 will just be a scalar label number
90 TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor
91 ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
92
93 test_schema->AddColumn(c1);
94 test_schema->AddColumn(c2);
95
96 std::unordered_map<std::string, int32_t> map;
97 rc = test_schema->GetColumnNameMap(&map);
98 ASSERT_TRUE(rc.IsOk());
99
100 // Test the CacheSchema api
101 rc = myClient->CacheSchema(map);
102 ASSERT_TRUE(rc.IsOk());
103
104 // Create a tensor, take a snapshot and restore it back, and compare.
105 std::shared_ptr<Tensor> t;
106 Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
107 t->SetItemAt<uint64_t>({0, 0}, 1);
108 t->SetItemAt<uint64_t>({0, 1}, 2);
109 t->SetItemAt<uint64_t>({0, 2}, 3);
110 t->SetItemAt<uint64_t>({1, 0}, 4);
111 t->SetItemAt<uint64_t>({1, 1}, 5);
112 t->SetItemAt<uint64_t>({1, 2}, 6);
113 std::cout << *t << std::endl;
114 TensorTable tbl;
115 TensorRow row;
116 row.push_back(t);
117 int64_t row_id;
118 rc = myClient->WriteRow(row, &row_id);
119 ASSERT_TRUE(rc.IsOk());
120
121 // Switch off build phase.
122 rc = myClient->BuildPhaseDone();
123 ASSERT_TRUE(rc.IsOk());
124
125 // Now restore from cache.
126 row.clear();
127 rc = myClient->GetRows({row_id}, &tbl);
128 row = tbl.front();
129 ASSERT_TRUE(rc.IsOk());
130 auto r = row.front();
131 std::cout << *r << std::endl;
132 // Compare
133 bool cmp = (*t == *r);
134 ASSERT_TRUE(cmp);
135
136 // Get back the schema and verify
137 std::unordered_map<std::string, int32_t> map_out;
138 rc = myClient->FetchSchema(&map_out);
139 ASSERT_TRUE(rc.IsOk());
140 cmp = (map_out == map);
141 ASSERT_TRUE(cmp);
142
143 rc = myClient->DestroyCache();
144 ASSERT_TRUE(rc.IsOk());
145 }
146
TEST_F(MindDataTestCacheOp,DISABLED_TestConcurrencyRequest)147 TEST_F(MindDataTestCacheOp, DISABLED_TestConcurrencyRequest) {
148 // Clear the rc of the master thread if any
149 (void)TaskManager::GetMasterThreadRc();
150 TaskGroup vg;
151 Status rc;
152
153 session_id_type env_session;
154 rc = GetSessionFromEnv(&env_session);
155 ASSERT_TRUE(rc.IsOk());
156
157 // use arbitrary session of 1, size 1, spilling is true
158 CacheClient::Builder builder;
159 // use arbitrary session of 1, size of 0, spilling// is true
160 builder.SetSessionId(env_session).SetCacheMemSz(1).SetSpill(true);
161 std::shared_ptr<CacheClient> myClient;
162 rc = builder.Build(&myClient);
163 ASSERT_TRUE(rc.IsOk());
164 // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
165 rc = myClient->CreateCache(1, true);
166 ASSERT_TRUE(rc.IsOk());
167 std::cout << *myClient << std::endl;
168 std::shared_ptr<Tensor> t;
169 Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
170 t->SetItemAt<uint64_t>({0, 0}, 1);
171 t->SetItemAt<uint64_t>({0, 1}, 2);
172 t->SetItemAt<uint64_t>({0, 2}, 3);
173 t->SetItemAt<uint64_t>({1, 0}, 4);
174 t->SetItemAt<uint64_t>({1, 1}, 5);
175 t->SetItemAt<uint64_t>({1, 2}, 6);
176 TensorTable tbl;
177 TensorRow row;
178 row.push_back(t);
179 // Cache tensor row t 5000 times using 10 threads.
180 for (auto k = 0; k < 10; ++k) {
181 Status vg_rc = vg.CreateAsyncTask("Test agent", [&myClient, &row]() -> Status {
182 TaskManager::FindMe()->Post();
183 for (auto i = 0; i < 500; i++) {
184 RETURN_IF_NOT_OK(myClient->WriteRow(row));
185 }
186 return Status::OK();
187 });
188 ASSERT_TRUE(vg_rc.IsOk());
189 }
190 ASSERT_TRUE(vg.join_all().IsOk());
191 ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOk());
192 rc = myClient->BuildPhaseDone();
193 ASSERT_TRUE(rc.IsOk());
194 // Get statistics from the server.
195 CacheServiceStat stat{};
196 rc = myClient->GetStat(&stat);
197 ASSERT_TRUE(rc.IsOk());
198 std::cout << stat.min_row_id << ":" << stat.max_row_id << ":" << stat.num_mem_cached << ":" << stat.num_disk_cached
199 << "\n";
200 // Expect there are 5000 rows there.
201 EXPECT_EQ(5000, stat.max_row_id - stat.min_row_id + 1);
202 // Get them all back using row id and compare with tensor t.
203 for (auto i = stat.min_row_id; i <= stat.max_row_id; ++i) {
204 tbl.clear();
205 row.clear();
206 rc = myClient->GetRows({i}, &tbl);
207 ASSERT_TRUE(rc.IsOk());
208 row = tbl.front();
209 auto r = row.front();
210 bool cmp = (*t == *r);
211 ASSERT_TRUE(cmp);
212 }
213 rc = myClient->DestroyCache();
214 ASSERT_TRUE(rc.IsOk());
215 }
216
217 // Simple test with a repeated cache op over random data producer
218 //
219 // RepeatOp
220 // |
221 // CacheOp
222 // |
223 // RandomDataOp
224 //
TEST_F(MindDataTestCacheOp,DISABLED_TestRandomDataCache1)225 TEST_F(MindDataTestCacheOp, DISABLED_TestRandomDataCache1) {
226 // Clear the rc of the master thread if any
227 (void)TaskManager::GetMasterThreadRc();
228 Status rc;
229 int32_t rank = 0; // not used
230
231 session_id_type env_session;
232 rc = GetSessionFromEnv(&env_session);
233 ASSERT_TRUE(rc.IsOk());
234
235 MS_LOG(INFO) << "UT test TestRandomDataCache1";
236 // Start with an empty execution tree
237 auto myTree = std::make_shared<ExecutionTree>();
238
239 // Create a schema using the C api's
240 std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
241
242 // 2 columns. First column is an "image" 640,480,3
243 TensorShape c1Shape({640, 480, 3});
244 ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
245 rank, // not used
246 &c1Shape);
247
248 // Column 2 will just be a scalar label number
249 TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor
250 ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
251
252 test_schema->AddColumn(c1);
253 test_schema->AddColumn(c2);
254
255 // RandomDataOp
256 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
257 int32_t op_connector_size = config_manager->op_connector_size();
258 std::shared_ptr<RandomDataOp> myRandomDataOp =
259 std::make_shared<RandomDataOp>(4, op_connector_size, 50, std::move(test_schema));
260
261 rc = myTree->AssociateNode(myRandomDataOp);
262 ASSERT_TRUE(rc.IsOk());
263
264 // CacheOp
265 // size of 0, spilling is true
266 CacheClient::Builder builder;
267 builder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
268 std::shared_ptr<CacheClient> myClient;
269 rc = builder.Build(&myClient);
270 ASSERT_TRUE(rc.IsOk());
271
272 int64_t num_samples = 0;
273 int64_t start_index = 0;
274 auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
275 std::shared_ptr<CacheOp> myCacheOp =
276 std::make_shared<CacheOp>(5, op_connector_size, myClient, std::move(seq_sampler));
277 ASSERT_NE(myCacheOp, nullptr);
278 rc = myTree->AssociateNode(myCacheOp);
279 ASSERT_TRUE(rc.IsOk());
280
281 // RepeatOp
282 uint32_t num_repeats = 4;
283 std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
284 rc = myTree->AssociateNode(myRepeatOp);
285 ASSERT_TRUE(rc.IsOk());
286
287 // Assign tree relations and root
288 myCacheOp->SetTotalRepeats(num_repeats);
289 myCacheOp->SetNumRepeatsPerEpoch(num_repeats);
290 rc = myRepeatOp->AddChild(myCacheOp);
291 ASSERT_TRUE(rc.IsOk());
292 // Always set to 1 under a CacheOp because we read from it only once. The CacheOp is the one that repeats.
293 myRandomDataOp->SetTotalRepeats(1);
294 myRandomDataOp->SetNumRepeatsPerEpoch(1);
295 rc = myCacheOp->AddChild(myRandomDataOp);
296 ASSERT_TRUE(rc.IsOk());
297 rc = myTree->AssignRoot(myRepeatOp);
298 ASSERT_TRUE(rc.IsOk());
299
300 MS_LOG(INFO) << "Launching tree and begin iteration";
301 rc = myTree->Prepare();
302 ASSERT_TRUE(rc.IsOk());
303
304 // quick check to see what tree looks like
305 std::ostringstream ss;
306 ss << *myTree; // some funny const error if I try to write directly to ms log stream
307 MS_LOG(INFO) << "Here's the tree:\n" << ss.str();
308
309 std::cout << *myClient << std::endl;
310
311 rc = myTree->Launch();
312 ASSERT_TRUE(rc.IsOk());
313
314 // Start the loop of reading tensors from our pipeline
315 DatasetIterator dI(myTree);
316 TensorRow tensorList;
317 rc = dI.FetchNextTensorRow(&tensorList);
318 ASSERT_TRUE(rc.IsOk());
319 int rowCount = 0;
320 while (!tensorList.empty()) {
321 // Don't display these rows, just count them
322 MS_LOG(INFO) << "Row fetched #: " << rowCount;
323 rc = dI.FetchNextTensorRow(&tensorList);
324 ASSERT_TRUE(rc.IsOk());
325 rowCount++;
326 }
327 ASSERT_EQ(rowCount, 200);
328 rc = myClient->DestroyCache();
329 ASSERT_TRUE(rc.IsOk());
330 }
331
332 //// Simple test with a repeated cache op over random data producer.
333 //// This one will exceed memory and require a spill.
334 ////
335 //// RepeatOp
336 //// |
337 //// CacheOp
338 //// |
339 //// RandomDataOp
340 ////
TEST_F(MindDataTestCacheOp,DISABLED_TestRandomDataCacheSpill)341 TEST_F(MindDataTestCacheOp, DISABLED_TestRandomDataCacheSpill) {
342 // Clear the rc of the master thread if any
343 (void)TaskManager::GetMasterThreadRc();
344 Status rc;
345 int32_t rank = 0; // not used
346 MS_LOG(INFO) << "UT test TestRandomDataCacheSpill";
347
348 session_id_type env_session;
349 rc = GetSessionFromEnv(&env_session);
350 ASSERT_TRUE(rc.IsOk());
351
352 // Start with an empty execution tree
353 auto myTree = std::make_shared<ExecutionTree>();
354
355 // Create a schema using the C api's
356 std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
357
358 // 2 columns. First column is an "image" 640,480,3
359 TensorShape c1Shape({640, 480, 3});
360 ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
361 rank, // not used
362 &c1Shape);
363
364 // Column 2 will just be a scalar label number
365 TensorShape c2Shape({}); // empty shape is a 1-value scalar Tensor
366 ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
367
368 test_schema->AddColumn(c1);
369 test_schema->AddColumn(c2);
370
371 // RandomDataOp
372 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
373 int32_t op_connector_size = config_manager->op_connector_size();
374 std::shared_ptr<RandomDataOp> myRandomDataOp =
375 std::make_shared<RandomDataOp>(4, op_connector_size, 10, std::move(test_schema));
376 rc = myTree->AssociateNode(myRandomDataOp);
377 ASSERT_TRUE(rc.IsOk());
378
379 // CacheOp
380 int64_t num_samples = 0;
381 int64_t start_index = 0;
382 auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
383 CacheClient::Builder builder;
384 builder.SetSessionId(env_session).SetCacheMemSz(4).SetSpill(true);
385 std::shared_ptr<CacheClient> myClient;
386 rc = builder.Build(&myClient);
387 ASSERT_TRUE(rc.IsOk());
388 std::shared_ptr<CacheOp> myCacheOp =
389 std::make_shared<CacheOp>(4, op_connector_size, myClient, std::move(seq_sampler));
390 ASSERT_NE(myCacheOp, nullptr);
391 rc = myTree->AssociateNode(myCacheOp);
392 ASSERT_TRUE(rc.IsOk());
393
394 // RepeatOp
395 uint32_t num_repeats = 4;
396 std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
397 rc = myTree->AssociateNode(myRepeatOp);
398 ASSERT_TRUE(rc.IsOk());
399
400 // Assign tree relations and root
401 myCacheOp->SetTotalRepeats(num_repeats);
402 myCacheOp->SetNumRepeatsPerEpoch(num_repeats);
403 rc = myRepeatOp->AddChild(myCacheOp);
404 ASSERT_TRUE(rc.IsOk());
405 // Always set to 1 under a CacheOp because we read from it only once. The CacheOp is the one that repeats.
406 myRandomDataOp->SetTotalRepeats(1);
407 myRandomDataOp->SetNumRepeatsPerEpoch(1);
408 rc = myCacheOp->AddChild(myRandomDataOp);
409 ASSERT_TRUE(rc.IsOk());
410 rc = myTree->AssignRoot(myRepeatOp);
411 ASSERT_TRUE(rc.IsOk());
412
413 MS_LOG(INFO) << "Launching tree and begin iteration";
414 rc = myTree->Prepare();
415 ASSERT_TRUE(rc.IsOk());
416
417 std::cout << *myClient << std::endl;
418
419 rc = myTree->Launch();
420 ASSERT_TRUE(rc.IsOk());
421
422 // Start the loop of reading tensors from our pipeline
423 DatasetIterator dI(myTree);
424 TensorRow tensorList;
425 rc = dI.FetchNextTensorRow(&tensorList);
426 ASSERT_TRUE(rc.IsOk());
427 int rowCount = 0;
428 while (!tensorList.empty()) {
429 // Don't display these rows, just count them
430 MS_LOG(INFO) << "Row fetched #: " << rowCount;
431 rc = dI.FetchNextTensorRow(&tensorList);
432 ASSERT_TRUE(rc.IsOk());
433 rowCount++;
434 }
435 ASSERT_EQ(rowCount, 40);
436 rc = myClient->DestroyCache();
437 ASSERT_TRUE(rc.IsOk());
438 }
439
TEST_F(MindDataTestCacheOp,DISABLED_TestImageFolderCacheMerge)440 TEST_F(MindDataTestCacheOp, DISABLED_TestImageFolderCacheMerge) {
441 // Clear the rc of the master thread if any
442 (void)TaskManager::GetMasterThreadRc();
443 Status rc;
444 int64_t num_samples = 0;
445 int64_t start_index = 0;
446
447 session_id_type env_session;
448 rc = GetSessionFromEnv(&env_session);
449 ASSERT_TRUE(rc.IsOk());
450
451 auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
452
453 CacheClient::Builder ccbuilder;
454 ccbuilder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
455 std::shared_ptr<CacheClient> myClient;
456 rc = ccbuilder.Build(&myClient);
457 ASSERT_TRUE(rc.IsOk());
458
459 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
460 int32_t op_connector_size = config_manager->op_connector_size();
461 std::shared_ptr<CacheLookupOp> myLookupOp =
462 std::make_shared<CacheLookupOp>(4, op_connector_size, myClient, std::move(seq_sampler));
463 ASSERT_NE(myLookupOp, nullptr);
464 std::shared_ptr<CacheMergeOp> myMergeOp = std::make_shared<CacheMergeOp>(4, op_connector_size, 4, myClient);
465 ASSERT_NE(myMergeOp, nullptr);
466
467 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
468 TensorShape scalar = TensorShape::CreateScalar();
469 rc = schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1));
470 ASSERT_TRUE(rc.IsOk());
471 rc = schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_INT32), TensorImpl::kFlexible, 0, &scalar));
472 ASSERT_TRUE(rc.IsOk());
473 std::string dataset_path = datasets_root_path_ + "/testPK/data";
474 std::set<std::string> ext = {".jpg", ".JPEG"};
475 bool recursive = true;
476 bool decode = false;
477 std::map<std::string, int32_t> columns_to_load = {};
478 std::shared_ptr<ImageFolderOp> so = std::make_shared<ImageFolderOp>(
479 3, dataset_path, 3, recursive, decode, ext, columns_to_load, std::move(schema), nullptr);
480 so->SetSampler(myLookupOp);
481 ASSERT_TRUE(rc.IsOk());
482
483 // RepeatOp
484 uint32_t num_repeats = 4;
485 std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
486
487 auto myTree = std::make_shared<ExecutionTree>();
488 rc = myTree->AssociateNode(so);
489 ASSERT_TRUE(rc.IsOk());
490
491 rc = myTree->AssociateNode(myLookupOp);
492 ASSERT_TRUE(rc.IsOk());
493 rc = myTree->AssociateNode(myMergeOp);
494 ASSERT_TRUE(rc.IsOk());
495
496 rc = myTree->AssociateNode(myRepeatOp);
497 ASSERT_TRUE(rc.IsOk());
498 rc = myTree->AssignRoot(myRepeatOp);
499 ASSERT_TRUE(rc.IsOk());
500
501 myMergeOp->SetTotalRepeats(num_repeats);
502 myMergeOp->SetNumRepeatsPerEpoch(num_repeats);
503 rc = myRepeatOp->AddChild(myMergeOp);
504 ASSERT_TRUE(rc.IsOk());
505 myLookupOp->SetTotalRepeats(num_repeats);
506 myLookupOp->SetNumRepeatsPerEpoch(num_repeats);
507 rc = myMergeOp->AddChild(myLookupOp);
508 ASSERT_TRUE(rc.IsOk());
509 so->SetTotalRepeats(num_repeats);
510 so->SetNumRepeatsPerEpoch(num_repeats);
511 rc = myMergeOp->AddChild(so);
512 ASSERT_TRUE(rc.IsOk());
513
514 rc = myTree->Prepare();
515 ASSERT_TRUE(rc.IsOk());
516 rc = myTree->Launch();
517 ASSERT_TRUE(rc.IsOk());
518 // Start the loop of reading tensors from our pipeline
519 DatasetIterator dI(myTree);
520 TensorRow tensorList;
521 rc = dI.FetchNextTensorRow(&tensorList);
522 ASSERT_TRUE(rc.IsOk());
523 int rowCount = 0;
524 while (!tensorList.empty()) {
525 rc = dI.FetchNextTensorRow(&tensorList);
526 ASSERT_TRUE(rc.IsOk());
527 if (rc.IsError()) {
528 std::cout << rc << std::endl;
529 break;
530 }
531 rowCount++;
532 }
533 ASSERT_EQ(rowCount, 176);
534 std::cout << "Row count : " << rowCount << std::endl;
535 rc = myClient->DestroyCache();
536 ASSERT_TRUE(rc.IsOk());
537 }
538