• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <string>
17 #include "minddata/dataset/core/client.h"
18 #include "minddata/dataset/engine/cache/cache_client.h"
19 #include "minddata/dataset/engine/execution_tree.h"
20 #include "minddata/dataset/engine/datasetops/cache_op.h"
21 #include "minddata/dataset/engine/datasetops/cache_lookup_op.h"
22 #include "minddata/dataset/engine/datasetops/cache_merge_op.h"
23 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
24 #include "minddata/dataset/engine/datasetops/source/tf_reader_op.h"
25 #include "minddata/dataset/engine/jagged_connector.h"
26 #include "common/common.h"
27 #include "gtest/gtest.h"
28 #include "utils/log_adapter.h"
29 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
30 #include "minddata/dataset/engine/data_schema.h"
31 
32 using namespace mindspore::dataset;
33 using mindspore::dataset::CacheClient;
34 using mindspore::dataset::TaskGroup;
35 
36 class MindDataTestCacheOp : public UT::DatasetOpTesting {
37  public:
SetUp()38   void SetUp() override {
39     DatasetOpTesting::SetUp();
40     GlobalInit();
41   }
42 };
43 
44 /// Feature: Cache
45 /// Description: Test basic usage of Cache server
46 /// Expectation: Runs successfully
TEST_F(MindDataTestCacheOp,DISABLED_TestCacheServer)47 TEST_F(MindDataTestCacheOp, DISABLED_TestCacheServer) {
48   Status rc;
49   CacheClient::Builder builder;
50   session_id_type env_session;
51   rc = GetSessionFromEnv(&env_session);
52   ASSERT_TRUE(rc.IsOk());
53 
54   // use arbitrary session of 1, size of 0, spilling// is true
55   builder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
56   std::shared_ptr<CacheClient> myClient;
57   rc = builder.Build(&myClient);
58   ASSERT_TRUE(rc.IsOk());
59   // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
60   rc = myClient->CreateCache(1, true);
61   ASSERT_TRUE(rc.IsOk());
62   std::cout << *myClient << std::endl;
63 
64   // Create a schema using the C api's
65   int32_t rank = 0;  // not used
66   std::unique_ptr<DataSchema> test_schema = std::make_unique<DataSchema>();
67   // 2 columns. First column is an "image" 640,480,3
68   TensorShape c1Shape({640, 480, 3});
69   ColDescriptor c1("image", DataType(DataType::DE_INT8), TensorImpl::kFlexible,
70                    rank,  // not used
71                    &c1Shape);
72   // Column 2 will just be a scalar label number
73   TensorShape c2Shape({});  // empty shape is a 1-value scalar Tensor
74   ColDescriptor c2("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, rank, &c2Shape);
75 
76   test_schema->AddColumn(c1);
77   test_schema->AddColumn(c2);
78 
79   std::unordered_map<std::string, int32_t> map;
80   rc = test_schema->GetColumnNameMap(&map);
81   ASSERT_TRUE(rc.IsOk());
82 
83   // Test the CacheSchema api
84   rc = myClient->CacheSchema(map);
85   ASSERT_TRUE(rc.IsOk());
86 
87   // Create a tensor, take a snapshot and restore it back, and compare.
88   std::shared_ptr<Tensor> t;
89   Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
90   t->SetItemAt<uint64_t>({0, 0}, 1);
91   t->SetItemAt<uint64_t>({0, 1}, 2);
92   t->SetItemAt<uint64_t>({0, 2}, 3);
93   t->SetItemAt<uint64_t>({1, 0}, 4);
94   t->SetItemAt<uint64_t>({1, 1}, 5);
95   t->SetItemAt<uint64_t>({1, 2}, 6);
96   std::cout << *t << std::endl;
97   TensorTable tbl;
98   TensorRow row;
99   row.push_back(t);
100   int64_t row_id;
101   rc = myClient->WriteRow(row, &row_id);
102   ASSERT_TRUE(rc.IsOk());
103 
104   // Switch off build phase.
105   rc = myClient->BuildPhaseDone();
106   ASSERT_TRUE(rc.IsOk());
107 
108   // Now restore from cache.
109   row.clear();
110   rc = myClient->GetRows({row_id}, &tbl);
111   row = tbl.front();
112   ASSERT_TRUE(rc.IsOk());
113   auto r = row.front();
114   std::cout << *r << std::endl;
115   // Compare
116   bool cmp = (*t == *r);
117   ASSERT_TRUE(cmp);
118 
119   // Get back the schema and verify
120   std::unordered_map<std::string, int32_t> map_out;
121   rc = myClient->FetchSchema(&map_out);
122   ASSERT_TRUE(rc.IsOk());
123   cmp = (map_out == map);
124   ASSERT_TRUE(cmp);
125 
126   rc = myClient->DestroyCache();
127   ASSERT_TRUE(rc.IsOk());
128 }
129 
130 /// Feature: Cache
131 /// Description: Test Cache with concurrency request
132 /// Expectation: Runs successfully
TEST_F(MindDataTestCacheOp,DISABLED_TestConcurrencyRequest)133 TEST_F(MindDataTestCacheOp, DISABLED_TestConcurrencyRequest) {
134   // Clear the rc of the master thread if any
135   (void)TaskManager::GetMasterThreadRc();
136   TaskGroup vg;
137   Status rc;
138 
139   session_id_type env_session;
140   rc = GetSessionFromEnv(&env_session);
141   ASSERT_TRUE(rc.IsOk());
142 
143   // use arbitrary session of 1, size 1, spilling is true
144   CacheClient::Builder builder;
145   // use arbitrary session of 1, size of 0, spilling// is true
146   builder.SetSessionId(env_session).SetCacheMemSz(1).SetSpill(true);
147   std::shared_ptr<CacheClient> myClient;
148   rc = builder.Build(&myClient);
149   ASSERT_TRUE(rc.IsOk());
150   // cksum value of 1 for CreateCache here...normally you do not directly create a cache and the cksum arg is generated.
151   rc = myClient->CreateCache(1, true);
152   ASSERT_TRUE(rc.IsOk());
153   std::cout << *myClient << std::endl;
154   std::shared_ptr<Tensor> t;
155   Tensor::CreateEmpty(TensorShape({2, 3}), DataType(DataType::DE_UINT64), &t);
156   t->SetItemAt<uint64_t>({0, 0}, 1);
157   t->SetItemAt<uint64_t>({0, 1}, 2);
158   t->SetItemAt<uint64_t>({0, 2}, 3);
159   t->SetItemAt<uint64_t>({1, 0}, 4);
160   t->SetItemAt<uint64_t>({1, 1}, 5);
161   t->SetItemAt<uint64_t>({1, 2}, 6);
162   TensorTable tbl;
163   TensorRow row;
164   row.push_back(t);
165   // Cache tensor row t 5000 times using 10 threads.
166   for (auto k = 0; k < 10; ++k) {
167     Status vg_rc = vg.CreateAsyncTask("Test agent", [&myClient, &row]() -> Status {
168       TaskManager::FindMe()->Post();
169       for (auto i = 0; i < 500; i++) {
170         RETURN_IF_NOT_OK(myClient->WriteRow(row));
171       }
172       return Status::OK();
173     });
174     ASSERT_TRUE(vg_rc.IsOk());
175   }
176   ASSERT_TRUE(vg.join_all().IsOk());
177   ASSERT_TRUE(vg.GetTaskErrorIfAny().IsOk());
178   rc = myClient->BuildPhaseDone();
179   ASSERT_TRUE(rc.IsOk());
180   // Get statistics from the server.
181   CacheServiceStat stat{};
182   rc = myClient->GetStat(&stat);
183   ASSERT_TRUE(rc.IsOk());
184   std::cout << stat.min_row_id << ":" << stat.max_row_id << ":" << stat.num_mem_cached << ":" << stat.num_disk_cached
185             << "\n";
186   // Expect there are 5000 rows there.
187   EXPECT_EQ(5000, stat.max_row_id - stat.min_row_id + 1);
188   // Get them all back using row id and compare with tensor t.
189   for (auto i = stat.min_row_id; i <= stat.max_row_id; ++i) {
190     tbl.clear();
191     row.clear();
192     rc = myClient->GetRows({i}, &tbl);
193     ASSERT_TRUE(rc.IsOk());
194     row = tbl.front();
195     auto r = row.front();
196     bool cmp = (*t == *r);
197     ASSERT_TRUE(cmp);
198   }
199   rc = myClient->DestroyCache();
200   ASSERT_TRUE(rc.IsOk());
201 }
202 
203 /// Feature: Cache
204 /// Description: Test Cache with ImageFolderOp and MergeOp
205 /// Expectation: Runs successfully
TEST_F(MindDataTestCacheOp,DISABLED_TestImageFolderCacheMerge)206 TEST_F(MindDataTestCacheOp, DISABLED_TestImageFolderCacheMerge) {
207   // Clear the rc of the master thread if any
208   (void)TaskManager::GetMasterThreadRc();
209   Status rc;
210   int64_t num_samples = 0;
211   int64_t start_index = 0;
212 
213   session_id_type env_session;
214   rc = GetSessionFromEnv(&env_session);
215   ASSERT_TRUE(rc.IsOk());
216 
217   auto seq_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
218 
219   CacheClient::Builder ccbuilder;
220   ccbuilder.SetSessionId(env_session).SetCacheMemSz(0).SetSpill(true);
221   std::shared_ptr<CacheClient> myClient;
222   rc = ccbuilder.Build(&myClient);
223   ASSERT_TRUE(rc.IsOk());
224 
225   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
226   int32_t op_connector_size = config_manager->op_connector_size();
227   std::shared_ptr<CacheLookupOp> myLookupOp =
228     std::make_shared<CacheLookupOp>(4, op_connector_size, myClient, std::move(seq_sampler));
229   ASSERT_NE(myLookupOp, nullptr);
230   std::shared_ptr<CacheMergeOp> myMergeOp = std::make_shared<CacheMergeOp>(4, op_connector_size, 4, myClient);
231   ASSERT_NE(myMergeOp, nullptr);
232 
233   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
234   TensorShape scalar = TensorShape::CreateScalar();
235   rc = schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1));
236   ASSERT_TRUE(rc.IsOk());
237   rc = schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_INT32), TensorImpl::kFlexible, 0, &scalar));
238   ASSERT_TRUE(rc.IsOk());
239   std::string dataset_path = datasets_root_path_ + "/testPK/data";
240   std::set<std::string> ext = {".jpg", ".JPEG"};
241   bool recursive = true;
242   bool decode = false;
243   std::map<std::string, int32_t> columns_to_load = {};
244   std::shared_ptr<ImageFolderOp> so = std::make_shared<ImageFolderOp>(
245     3, dataset_path, 3, recursive, decode, ext, columns_to_load, std::move(schema), nullptr);
246   so->SetSampler(myLookupOp);
247   ASSERT_TRUE(rc.IsOk());
248 
249   // RepeatOp
250   uint32_t num_repeats = 4;
251   std::shared_ptr<RepeatOp> myRepeatOp = std::make_shared<RepeatOp>(num_repeats);
252 
253   auto myTree = std::make_shared<ExecutionTree>();
254   rc = myTree->AssociateNode(so);
255   ASSERT_TRUE(rc.IsOk());
256 
257   rc = myTree->AssociateNode(myLookupOp);
258   ASSERT_TRUE(rc.IsOk());
259   rc = myTree->AssociateNode(myMergeOp);
260   ASSERT_TRUE(rc.IsOk());
261 
262   rc = myTree->AssociateNode(myRepeatOp);
263   ASSERT_TRUE(rc.IsOk());
264   rc = myTree->AssignRoot(myRepeatOp);
265   ASSERT_TRUE(rc.IsOk());
266 
267   myMergeOp->SetTotalRepeats(num_repeats);
268   myMergeOp->SetNumRepeatsPerEpoch(num_repeats);
269   rc = myRepeatOp->AddChild(myMergeOp);
270   ASSERT_TRUE(rc.IsOk());
271   myLookupOp->SetTotalRepeats(num_repeats);
272   myLookupOp->SetNumRepeatsPerEpoch(num_repeats);
273   rc = myMergeOp->AddChild(myLookupOp);
274   ASSERT_TRUE(rc.IsOk());
275   so->SetTotalRepeats(num_repeats);
276   so->SetNumRepeatsPerEpoch(num_repeats);
277   rc = myMergeOp->AddChild(so);
278   ASSERT_TRUE(rc.IsOk());
279 
280   rc = myTree->Prepare();
281   ASSERT_TRUE(rc.IsOk());
282   rc = myTree->Launch();
283   ASSERT_TRUE(rc.IsOk());
284   // Start the loop of reading tensors from our pipeline
285   DatasetIterator dI(myTree);
286   TensorRow tensorList;
287   rc = dI.FetchNextTensorRow(&tensorList);
288   ASSERT_TRUE(rc.IsOk());
289   int rowCount = 0;
290   while (!tensorList.empty()) {
291     rc = dI.FetchNextTensorRow(&tensorList);
292     ASSERT_TRUE(rc.IsOk());
293     if (rc.IsError()) {
294       std::cout << rc << std::endl;
295       break;
296     }
297     rowCount++;
298   }
299   ASSERT_EQ(rowCount, 176);
300   std::cout << "Row count : " << rowCount << std::endl;
301   rc = myClient->DestroyCache();
302   ASSERT_TRUE(rc.IsOk());
303 }
304