• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <string>
17 #include "minddata/dataset/core/client.h"
18 #include "minddata/dataset/engine/cache/cache_client.h"
19 #include "minddata/dataset/engine/execution_tree.h"
20 #include "minddata/dataset/engine/datasetops/cache_op.h"
21 #include "minddata/dataset/engine/datasetops/cache_lookup_op.h"
22 #include "minddata/dataset/engine/datasetops/cache_merge_op.h"
23 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
24 #include "minddata/dataset/engine/datasetops/source/tf_reader_op.h"
25 #include "minddata/dataset/engine/jagged_connector.h"
26 #include "common/common.h"
27 #include "gtest/gtest.h"
28 #include "utils/log_adapter.h"
29 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
30 #include "minddata/dataset/engine/data_schema.h"
31 
32 using namespace mindspore::dataset;
33 using mindspore::LogStream;
34 using mindspore::dataset::CacheClient;
35 using mindspore::dataset::TaskGroup;
36 using mindspore::ExceptionType::NoExceptionType;
37 using mindspore::MsLogLevel::INFO;
38 
39 // Helper function to get the session id from SESSION_ID env variable
40 Status GetSessionFromEnv(session_id_type *session_id) {
41   RETURN_UNEXPECTED_IF_NULL(session_id);
42   if (const char *session_env = std::getenv("SESSION_ID")) {
43     std::string session_id_str(session_env);
44     try {
45       *session_id = std::stoul(session_id_str);
46     } catch (const std::exception &e) {
47       std::string err_msg = "Invalid numeric value for session id in env var: " + session_id_str;
48       return Status(StatusCode::kMDSyntaxError, err_msg);
49     }
50   } else {
51     RETURN_STATUS_UNEXPECTED("Test case requires a session id to be provided via SESSION_ID environment variable.");
52   }
53   return Status::OK();
54 }
55 
56 class MindDataTestCacheOp : public UT::DatasetOpTesting {
57  public:
58   void SetUp() override {
59     DatasetOpTesting::SetUp();
60     GlobalInit();
61   }
62 };
63 
64 TEST_F(MindDataTestCacheOp, DISABLED_TestCacheServer) {
65   Status rc;
66   CacheClient::Builder builder;
67   session_id_type env_session;
68   rc = GetSessionFromEnv(&env_session);
69   ASSERT_TRUE(rc.IsOk());
70 
71   // use arbitrary session of 1, size of 0, spilling// is true
72   builder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
73   std::shared_ptr<CacheClient> myClient;
74   rc = builder.Build(&myClient);
75   ASSERT_TRUE(rc.IsOk());
76   // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
77   rc = myClient->CreateCache(1, true);
78   ASSERT_TRUE(rc.IsOk());
79   std::cout << *myClient << std::endl;
80 
81   // Create a schema using the C api's
82   int32_t rank = 0;  // not used
83   std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
84   // 2 columns. First column is an "image" 640,480,3
85   TensorShape c1Shape({640, 480, 3});
86   ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
87                    rank,  // not used
88                    &c1Shape);
89   // Column 2 will just be a scalar label number
90   TensorShape c2Shape({});  // empty shape is a 1-value scalar Tensor
91   ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
92 
93   test_schema->AddColumn(c1);
94   test_schema->AddColumn(c2);
95 
96   std::unordered_map<std::string, int32_t> map;
97   rc = test_schema->GetColumnNameMap(&map);
98   ASSERT_TRUE(rc.IsOk());
99 
100   // Test the CacheSchema api
101   rc = myClient->CacheSchema(map);
102   ASSERT_TRUE(rc.IsOk());
103 
104   // Create a tensor, take a snapshot and restore it back, and compare.
105   std::shared_ptr<Tensor> t;
106   Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
107   t->SetItemAt<uint64_t>({0, 0}, 1);
108   t->SetItemAt<uint64_t>({0, 1}, 2);
109   t->SetItemAt<uint64_t>({0, 2}, 3);
110   t->SetItemAt<uint64_t>({1, 0}, 4);
111   t->SetItemAt<uint64_t>({1, 1}, 5);
112   t->SetItemAt<uint64_t>({1, 2}, 6);
113   std::cout << *t << std::endl;
114   TensorTable tbl;
115   TensorRow row;
116   row.push_back(t);
117   int64_t row_id;
118   rc = myClient->WriteRow(row, &row_id);
119   ASSERT_TRUE(rc.IsOk());
120 
121   // Switch off build phase.
122   rc = myClient->BuildPhaseDone();
123   ASSERT_TRUE(rc.IsOk());
124 
125   // Now restore from cache.
126   row.clear();
127   rc = myClient->GetRows({row_id}, &tbl);
128   row = tbl.front();
129   ASSERT_TRUE(rc.IsOk());
130   auto r = row.front();
131   std::cout << *r << std::endl;
132   // Compare
133   bool cmp = (*t == *r);
134   ASSERT_TRUE(cmp);
135 
136   // Get back the schema and verify
137   std::unordered_map<std::string, int32_t> map_out;
138   rc = myClient->FetchSchema(&map_out);
139   ASSERT_TRUE(rc.IsOk());
140   cmp = (map_out == map);
141   ASSERT_TRUE(cmp);
142 
143   rc = myClient->DestroyCache();
144   ASSERT_TRUE(rc.IsOk());
145 }
146 
147 TEST_F(MindDataTestCacheOp, DISABLED_TestConcurrencyRequest) {
148   // Clear the rc of the master thread if any
149   (void)TaskManager::GetMasterThreadRc();
150   TaskGroup vg;
151   Status rc;
152 
153   session_id_type env_session;
154   rc = GetSessionFromEnv(&env_session);
155   ASSERT_TRUE(rc.IsOk());
156 
157   // use arbitrary session of 1, size 1, spilling is true
158   CacheClient::Builder builder;
159   // use arbitrary session of 1, size of 0, spilling// is true
160   builder.SetSessionId(env_session).SetCacheMemSz(1).SetSpill(true);
161   std::shared_ptr<CacheClient> myClient;
162   rc = builder.Build(&myClient);
163   ASSERT_TRUE(rc.IsOk());
164   // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
165   rc = myClient->CreateCache(1, true);
166   ASSERT_TRUE(rc.IsOk());
167   std::cout << *myClient << std::endl;
168   std::shared_ptr<Tensor> t;
169   Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
170   t->SetItemAt<uint64_t>({0, 0}, 1);
171   t->SetItemAt<uint64_t>({0, 1}, 2);
172   t->SetItemAt<uint64_t>({0, 2}, 3);
173   t->SetItemAt<uint64_t>({1, 0}, 4);
174   t->SetItemAt<uint64_t>({1, 1}, 5);
175   t->SetItemAt<uint64_t>({1, 2}, 6);
176   TensorTable tbl;
177   TensorRow row;
178   row.push_back(t);
179   // Cache tensor row t 5000 times using 10 threads.
180   for (auto k = 0; k < 10; ++k) {
181     Status vg_rc = vg.CreateAsyncTask("Test agent", [&myClient, &row]() -> Status {
182       TaskManager::FindMe()->Post();
183       for (auto i = 0; i < 500; i++) {
184         RETURN_IF_NOT_OK(myClient->WriteRow(row));
185       }
186       return Status::OK();
187     });
188     ASSERT_TRUE(vg_rc.IsOk());
189   }
190   ASSERT_TRUE(vg.join_all().IsOk());
191   ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOk());
192   rc = myClient->BuildPhaseDone();
193   ASSERT_TRUE(rc.IsOk());
194   // Get statistics from the server.
195   CacheServiceStat stat{};
196   rc = myClient->GetStat(&stat);
197   ASSERT_TRUE(rc.IsOk());
198   std::cout << stat.min_row_id << ":" << stat.max_row_id << ":" << stat.num_mem_cached << ":" << stat.num_disk_cached
199             << "\n";
200   // Expect there are 5000 rows there.
201   EXPECT_EQ(5000, stat.max_row_id - stat.min_row_id + 1);
202   // Get them all back using row id and compare with tensor t.
203   for (auto i = stat.min_row_id; i <= stat.max_row_id; ++i) {
204     tbl.clear();
205     row.clear();
206     rc = myClient->GetRows({i}, &tbl);
207     ASSERT_TRUE(rc.IsOk());
208     row = tbl.front();
209     auto r = row.front();
210     bool cmp = (*t == *r);
211     ASSERT_TRUE(cmp);
212   }
213   rc = myClient->DestroyCache();
214   ASSERT_TRUE(rc.IsOk());
215 }
216 
217 // Simple test with a repeated cache op over random data producer
218 //
219 //     RepeatOp
220 //        |
221 //     CacheOp
222 //        |
223 //   RandomDataOp
224 //
225 TEST_F(MindDataTestCacheOp, DISABLED_TestRandomDataCache1) {
226   // Clear the rc of the master thread if any
227   (void)TaskManager::GetMasterThreadRc();
228   Status rc;
229   int32_t rank = 0;  // not used
230 
231   session_id_type env_session;
232   rc = GetSessionFromEnv(&env_session);
233   ASSERT_TRUE(rc.IsOk());
234 
235   MS_LOG(INFO) << "UT test TestRandomDataCache1";
236   // Start with an empty execution tree
237   auto myTree = std::make_shared<ExecutionTree>();
238 
239   // Create a schema using the C api's
240   std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
241 
242   // 2 columns. First column is an "image" 640,480,3
243   TensorShape c1Shape({640, 480, 3});
244   ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
245                    rank,  // not used
246                    &c1Shape);
247 
248   // Column 2 will just be a scalar label number
249   TensorShape c2Shape({});  // empty shape is a 1-value scalar Tensor
250   ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
251 
252   test_schema->AddColumn(c1);
253   test_schema->AddColumn(c2);
254 
255   // RandomDataOp
256   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
257   int32_t op_connector_size = config_manager->op_connector_size();
258   std::shared_ptr<RandomDataOp> myRandomDataOp =
259     std::make_shared<RandomDataOp>(4, op_connector_size, 50, std::move(test_schema));
260 
261   rc = myTree->AssociateNode(myRandomDataOp);
262   ASSERT_TRUE(rc.IsOk());
263 
264   // CacheOp
265   // size of 0, spilling is true
266   CacheClient::Builder builder;
267   builder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
268   std::shared_ptr<CacheClient> myClient;
269   rc = builder.Build(&myClient);
270   ASSERT_TRUE(rc.IsOk());
271 
272   int64_t num_samples = 0;
273   int64_t start_index = 0;
274   auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
275   std::shared_ptr<CacheOp> myCacheOp =
276     std::make_shared<CacheOp>(5, op_connector_size, myClient, std::move(seq_sampler));
277   ASSERT_NE(myCacheOp, nullptr);
278   rc = myTree->AssociateNode(myCacheOp);
279   ASSERT_TRUE(rc.IsOk());
280 
281   // RepeatOp
282   uint32_t num_repeats = 4;
283   std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
284   rc = myTree->AssociateNode(myRepeatOp);
285   ASSERT_TRUE(rc.IsOk());
286 
287   // Assign tree relations and root
288   myCacheOp->SetTotalRepeats(num_repeats);
289   myCacheOp->SetNumRepeatsPerEpoch(num_repeats);
290   rc = myRepeatOp->AddChild(myCacheOp);
291   ASSERT_TRUE(rc.IsOk());
292   // Always set to 1 under a CacheOp because we read from it only once. The CacheOp is the one that repeats.
293   myRandomDataOp->SetTotalRepeats(1);
294   myRandomDataOp->SetNumRepeatsPerEpoch(1);
295   rc = myCacheOp->AddChild(myRandomDataOp);
296   ASSERT_TRUE(rc.IsOk());
297   rc = myTree->AssignRoot(myRepeatOp);
298   ASSERT_TRUE(rc.IsOk());
299 
300   MS_LOG(INFO) << "Launching tree and begin iteration";
301   rc = myTree->Prepare();
302   ASSERT_TRUE(rc.IsOk());
303 
304   // quick check to see what tree looks like
305   std::ostringstream ss;
306   ss << *myTree;  // some funny const error if I try to write directly to ms log stream
307   MS_LOG(INFO) << "Here's the tree:\n" << ss.str();
308 
309   std::cout << *myClient << std::endl;
310 
311   rc = myTree->Launch();
312   ASSERT_TRUE(rc.IsOk());
313 
314   // Start the loop of reading tensors from our pipeline
315   DatasetIterator dI(myTree);
316   TensorRow tensorList;
317   rc = dI.FetchNextTensorRow(&tensorList);
318   ASSERT_TRUE(rc.IsOk());
319   int rowCount = 0;
320   while (!tensorList.empty()) {
321     // Don't display these rows, just count them
322     MS_LOG(INFO) << "Row fetched #: " << rowCount;
323     rc = dI.FetchNextTensorRow(&tensorList);
324     ASSERT_TRUE(rc.IsOk());
325     rowCount++;
326   }
327   ASSERT_EQ(rowCount, 200);
328   rc = myClient->DestroyCache();
329   ASSERT_TRUE(rc.IsOk());
330 }
331 
332 //// Simple test with a repeated cache op over random data producer.
333 //// This one will exceed memory and require a spill.
334 ////
335 ////     RepeatOp
336 ////        |
337 ////     CacheOp
338 ////        |
339 ////   RandomDataOp
340 ////
341 TEST_F(MindDataTestCacheOp, DISABLED_TestRandomDataCacheSpill) {
342   // Clear the rc of the master thread if any
343   (void)TaskManager::GetMasterThreadRc();
344   Status rc;
345   int32_t rank = 0;  // not used
346   MS_LOG(INFO) << "UT test TestRandomDataCacheSpill";
347 
348   session_id_type env_session;
349   rc = GetSessionFromEnv(&env_session);
350   ASSERT_TRUE(rc.IsOk());
351 
352   // Start with an empty execution tree
353   auto myTree = std::make_shared<ExecutionTree>();
354 
355   // Create a schema using the C api's
356   std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
357 
358   // 2 columns. First column is an "image" 640,480,3
359   TensorShape c1Shape({640, 480, 3});
360   ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
361                    rank,  // not used
362                    &c1Shape);
363 
364   // Column 2 will just be a scalar label number
365   TensorShape c2Shape({});  // empty shape is a 1-value scalar Tensor
366   ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
367 
368   test_schema->AddColumn(c1);
369   test_schema->AddColumn(c2);
370 
371   // RandomDataOp
372   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
373   int32_t op_connector_size = config_manager->op_connector_size();
374   std::shared_ptr<RandomDataOp> myRandomDataOp =
375     std::make_shared<RandomDataOp>(4, op_connector_size, 10, std::move(test_schema));
376   rc = myTree->AssociateNode(myRandomDataOp);
377   ASSERT_TRUE(rc.IsOk());
378 
379   // CacheOp
380   int64_t num_samples = 0;
381   int64_t start_index = 0;
382   auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
383   CacheClient::Builder builder;
384   builder.SetSessionId(env_session).SetCacheMemSz(4).SetSpill(true);
385   std::shared_ptr<CacheClient> myClient;
386   rc = builder.Build(&myClient);
387   ASSERT_TRUE(rc.IsOk());
388   std::shared_ptr<CacheOp> myCacheOp =
389     std::make_shared<CacheOp>(4, op_connector_size, myClient, std::move(seq_sampler));
390   ASSERT_NE(myCacheOp, nullptr);
391   rc = myTree->AssociateNode(myCacheOp);
392   ASSERT_TRUE(rc.IsOk());
393 
394   // RepeatOp
395   uint32_t num_repeats = 4;
396   std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
397   rc = myTree->AssociateNode(myRepeatOp);
398   ASSERT_TRUE(rc.IsOk());
399 
400   // Assign tree relations and root
401   myCacheOp->SetTotalRepeats(num_repeats);
402   myCacheOp->SetNumRepeatsPerEpoch(num_repeats);
403   rc = myRepeatOp->AddChild(myCacheOp);
404   ASSERT_TRUE(rc.IsOk());
405   // Always set to 1 under a CacheOp because we read from it only once. The CacheOp is the one that repeats.
406   myRandomDataOp->SetTotalRepeats(1);
407   myRandomDataOp->SetNumRepeatsPerEpoch(1);
408   rc = myCacheOp->AddChild(myRandomDataOp);
409   ASSERT_TRUE(rc.IsOk());
410   rc = myTree->AssignRoot(myRepeatOp);
411   ASSERT_TRUE(rc.IsOk());
412 
413   MS_LOG(INFO) << "Launching tree and begin iteration";
414   rc = myTree->Prepare();
415   ASSERT_TRUE(rc.IsOk());
416 
417   std::cout << *myClient << std::endl;
418 
419   rc = myTree->Launch();
420   ASSERT_TRUE(rc.IsOk());
421 
422   // Start the loop of reading tensors from our pipeline
423   DatasetIterator dI(myTree);
424   TensorRow tensorList;
425   rc = dI.FetchNextTensorRow(&tensorList);
426   ASSERT_TRUE(rc.IsOk());
427   int rowCount = 0;
428   while (!tensorList.empty()) {
429     // Don't display these rows, just count them
430     MS_LOG(INFO) << "Row fetched #: " << rowCount;
431     rc = dI.FetchNextTensorRow(&tensorList);
432     ASSERT_TRUE(rc.IsOk());
433     rowCount++;
434   }
435   ASSERT_EQ(rowCount, 40);
436   rc = myClient->DestroyCache();
437   ASSERT_TRUE(rc.IsOk());
438 }
439 
440 TEST_F(MindDataTestCacheOp, DISABLED_TestImageFolderCacheMerge) {
441   // Clear the rc of the master thread if any
442   (void)TaskManager::GetMasterThreadRc();
443   Status rc;
444   int64_t num_samples = 0;
445   int64_t start_index = 0;
446 
447   session_id_type env_session;
448   rc = GetSessionFromEnv(&env_session);
449   ASSERT_TRUE(rc.IsOk());
450 
451   auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
452 
453   CacheClient::Builder ccbuilder;
454   ccbuilder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
455   std::shared_ptr<CacheClient> myClient;
456   rc = ccbuilder.Build(&myClient);
457   ASSERT_TRUE(rc.IsOk());
458 
459   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
460   int32_t op_connector_size = config_manager->op_connector_size();
461   std::shared_ptr<CacheLookupOp> myLookupOp =
462     std::make_shared<CacheLookupOp>(4, op_connector_size, myClient, std::move(seq_sampler));
463   ASSERT_NE(myLookupOp, nullptr);
464   std::shared_ptr<CacheMergeOp> myMergeOp = std::make_shared<CacheMergeOp>(4, op_connector_size, 4, myClient);
465   ASSERT_NE(myMergeOp, nullptr);
466 
467   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
468   TensorShape scalar = TensorShape::CreateScalar();
469   rc = schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1));
470   ASSERT_TRUE(rc.IsOk());
471   rc = schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_INT32), TensorImpl::kFlexible, 0, &scalar));
472   ASSERT_TRUE(rc.IsOk());
473   std::string dataset_path = datasets_root_path_ + "/testPK/data";
474   std::set<std::string> ext = {".jpg", ".JPEG"};
475   bool recursive = true;
476   bool decode = false;
477   std::map<std::string, int32_t> columns_to_load = {};
478   std::shared_ptr<ImageFolderOp> so = std::make_shared<ImageFolderOp>(
479     3, dataset_path, 3, recursive, decode, ext, columns_to_load, std::move(schema), nullptr);
480   so->SetSampler(myLookupOp);
481   ASSERT_TRUE(rc.IsOk());
482 
483   // RepeatOp
484   uint32_t num_repeats = 4;
485   std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
486 
487   auto myTree = std::make_shared<ExecutionTree>();
488   rc = myTree->AssociateNode(so);
489   ASSERT_TRUE(rc.IsOk());
490 
491   rc = myTree->AssociateNode(myLookupOp);
492   ASSERT_TRUE(rc.IsOk());
493   rc = myTree->AssociateNode(myMergeOp);
494   ASSERT_TRUE(rc.IsOk());
495 
496   rc = myTree->AssociateNode(myRepeatOp);
497   ASSERT_TRUE(rc.IsOk());
498   rc = myTree->AssignRoot(myRepeatOp);
499   ASSERT_TRUE(rc.IsOk());
500 
501   myMergeOp->SetTotalRepeats(num_repeats);
502   myMergeOp->SetNumRepeatsPerEpoch(num_repeats);
503   rc = myRepeatOp->AddChild(myMergeOp);
504   ASSERT_TRUE(rc.IsOk());
505   myLookupOp->SetTotalRepeats(num_repeats);
506   myLookupOp->SetNumRepeatsPerEpoch(num_repeats);
507   rc = myMergeOp->AddChild(myLookupOp);
508   ASSERT_TRUE(rc.IsOk());
509   so->SetTotalRepeats(num_repeats);
510   so->SetNumRepeatsPerEpoch(num_repeats);
511   rc = myMergeOp->AddChild(so);
512   ASSERT_TRUE(rc.IsOk());
513 
514   rc = myTree->Prepare();
515   ASSERT_TRUE(rc.IsOk());
516   rc = myTree->Launch();
517   ASSERT_TRUE(rc.IsOk());
518   // Start the loop of reading tensors from our pipeline
519   DatasetIterator dI(myTree);
520   TensorRow tensorList;
521   rc = dI.FetchNextTensorRow(&tensorList);
522   ASSERT_TRUE(rc.IsOk());
523   int rowCount = 0;
524   while (!tensorList.empty()) {
525     rc = dI.FetchNextTensorRow(&tensorList);
526     ASSERT_TRUE(rc.IsOk());
527     if (rc.IsError()) {
528       std::cout << rc << std::endl;
529       break;
530     }
531     rowCount++;
532   }
533   ASSERT_EQ(rowCount, 176);
534   std::cout << "Row count : " << rowCount << std::endl;
535   rc = myClient->DestroyCache();
536   ASSERT_TRUE(rc.IsOk());
537 }
538