• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/gnn/graph_data_impl.h"
17 
18 #include <algorithm>
19 #include <functional>
20 #include <iterator>
21 #include <numeric>
22 #include <utility>
23 
24 #include "minddata/dataset/core/tensor_shape.h"
25 #include "minddata/dataset/engine/gnn/graph_loader.h"
26 #include "minddata/dataset/util/random.h"
27 namespace mindspore {
28 namespace dataset {
29 namespace gnn {
30 
GraphDataImpl(std::string dataset_file,int32_t num_workers,bool server_mode)31 GraphDataImpl::GraphDataImpl(std::string dataset_file, int32_t num_workers, bool server_mode)
32     : dataset_file_(dataset_file),
33       num_workers_(num_workers),
34       rnd_(GetRandomDevice()),
35       random_walk_(this),
36       server_mode_(server_mode) {
37   rnd_.seed(GetSeed());
38   MS_LOG(INFO) << "num_workers:" << num_workers;
39 }
40 
~GraphDataImpl()41 GraphDataImpl::~GraphDataImpl() {}
42 
GetAllNodes(NodeType node_type,std::shared_ptr<Tensor> * out)43 Status GraphDataImpl::GetAllNodes(NodeType node_type, std::shared_ptr<Tensor> *out) {
44   RETURN_UNEXPECTED_IF_NULL(out);
45   auto itr = node_type_map_.find(node_type);
46   if (itr == node_type_map_.end()) {
47     std::string err_msg = "Invalid node type:" + std::to_string(node_type);
48     RETURN_STATUS_UNEXPECTED(err_msg);
49   } else {
50     RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>({itr->second}, DataType(DataType::DE_INT32), out));
51   }
52   return Status::OK();
53 }
54 
55 template <typename T>
CreateTensorByVector(const std::vector<std::vector<T>> & data,DataType type,std::shared_ptr<Tensor> * out)56 Status GraphDataImpl::CreateTensorByVector(const std::vector<std::vector<T>> &data, DataType type,
57                                            std::shared_ptr<Tensor> *out) {
58   RETURN_UNEXPECTED_IF_NULL(out);
59   if (!type.IsCompatible<T>()) {
60     RETURN_STATUS_UNEXPECTED("Data type not compatible");
61   }
62   if (data.empty()) {
63     RETURN_STATUS_UNEXPECTED("Input data is empty");
64   }
65   std::shared_ptr<Tensor> tensor;
66   size_t m = data.size();
67   size_t n = data[0].size();
68   RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({static_cast<dsize_t>(m), static_cast<dsize_t>(n)}), type, &tensor));
69   auto ptr = tensor->begin<T>();
70   for (const auto &id_m : data) {
71     CHECK_FAIL_RETURN_UNEXPECTED(id_m.size() == n, "Each member of the vector has a different size");
72     for (const auto &id_n : id_m) {
73       *ptr = id_n;
74       ptr++;
75     }
76   }
77   tensor->Squeeze();
78   *out = std::move(tensor);
79   return Status::OK();
80 }
81 
82 template <typename T>
ComplementVector(std::vector<std::vector<T>> * data,size_t max_size,T default_value)83 Status GraphDataImpl::ComplementVector(std::vector<std::vector<T>> *data, size_t max_size, T default_value) {
84   if (!data || data->empty()) {
85     RETURN_STATUS_UNEXPECTED("Input data is empty");
86   }
87   for (std::vector<T> &vec : *data) {
88     size_t size = vec.size();
89     if (size > max_size) {
90       RETURN_STATUS_UNEXPECTED("The max_size parameter is abnormal");
91     } else {
92       for (size_t i = 0; i < (max_size - size); ++i) {
93         vec.push_back(default_value);
94       }
95     }
96   }
97   return Status::OK();
98 }
99 
GetAllEdges(EdgeType edge_type,std::shared_ptr<Tensor> * out)100 Status GraphDataImpl::GetAllEdges(EdgeType edge_type, std::shared_ptr<Tensor> *out) {
101   RETURN_UNEXPECTED_IF_NULL(out);
102   auto itr = edge_type_map_.find(edge_type);
103   if (itr == edge_type_map_.end()) {
104     std::string err_msg = "Invalid edge type:" + std::to_string(edge_type);
105     RETURN_STATUS_UNEXPECTED(err_msg);
106   } else {
107     RETURN_IF_NOT_OK(CreateTensorByVector<EdgeIdType>({itr->second}, DataType(DataType::DE_INT32), out));
108   }
109   return Status::OK();
110 }
111 
GetNodesFromEdges(const std::vector<EdgeIdType> & edge_list,std::shared_ptr<Tensor> * out)112 Status GraphDataImpl::GetNodesFromEdges(const std::vector<EdgeIdType> &edge_list, std::shared_ptr<Tensor> *out) {
113   if (edge_list.empty()) {
114     RETURN_STATUS_UNEXPECTED("Input edge_list is empty");
115   }
116   RETURN_UNEXPECTED_IF_NULL(out);
117 
118   std::vector<std::vector<NodeIdType>> node_list;
119   node_list.reserve(edge_list.size());
120   for (const auto &edge_id : edge_list) {
121     auto itr = edge_id_map_.find(edge_id);
122     if (itr == edge_id_map_.end()) {
123       std::string err_msg = "Invalid edge id:" + std::to_string(edge_id);
124       RETURN_STATUS_UNEXPECTED(err_msg);
125     } else {
126       std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> nodes;
127       RETURN_IF_NOT_OK(itr->second->GetNode(&nodes));
128       node_list.push_back({nodes.first->id(), nodes.second->id()});
129     }
130   }
131   RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(node_list, DataType(DataType::DE_INT32), out));
132   return Status::OK();
133 }
134 
GetEdgesFromNodes(const std::vector<std::pair<NodeIdType,NodeIdType>> & node_list,std::shared_ptr<Tensor> * out)135 Status GraphDataImpl::GetEdgesFromNodes(const std::vector<std::pair<NodeIdType, NodeIdType>> &node_list,
136                                         std::shared_ptr<Tensor> *out) {
137   if (node_list.empty()) {
138     RETURN_STATUS_UNEXPECTED("Input node list is empty.");
139   }
140 
141   std::vector<std::vector<EdgeIdType>> edge_list;
142   edge_list.reserve(node_list.size());
143 
144   for (const auto &node_id : node_list) {
145     std::shared_ptr<Node> src_node;
146     RETURN_IF_NOT_OK(GetNodeByNodeId(node_id.first, &src_node));
147 
148     EdgeIdType edge_id;
149     src_node->GetEdgeByAdjNodeId(node_id.second, &edge_id);
150 
151     std::vector<EdgeIdType> connection_edge = {edge_id};
152     edge_list.emplace_back(std::move(connection_edge));
153   }
154 
155   RETURN_IF_NOT_OK(CreateTensorByVector<EdgeIdType>(edge_list, DataType(DataType::DE_INT32), out));
156   return Status::OK();
157 }
158 
GetAllNeighbors(const std::vector<NodeIdType> & node_list,NodeType neighbor_type,const OutputFormat & format,std::shared_ptr<Tensor> * out)159 Status GraphDataImpl::GetAllNeighbors(const std::vector<NodeIdType> &node_list, NodeType neighbor_type,
160                                       const OutputFormat &format, std::shared_ptr<Tensor> *out) {
161   CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
162   RETURN_IF_NOT_OK(CheckNeighborType(neighbor_type));
163   RETURN_UNEXPECTED_IF_NULL(out);
164 
165   std::vector<std::vector<NodeIdType>> neighbors;
166 
167   size_t max_neighbor_num = 0;                                // Special parameter for normal format
168   size_t total_edge_num = 0;                                  // Special parameter for coo and csr format
169   std::vector<NodeIdType> offset_table(node_list.size(), 0);  // Special parameter for csr format
170 
171   // Collect information of adjacent table
172   neighbors.resize(node_list.size());
173   for (size_t i = 0; i < node_list.size(); ++i) {
174     std::shared_ptr<Node> node;
175     RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[i], &node));
176     if (format == OutputFormat::kNormal) {
177       RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i]));
178       max_neighbor_num = max_neighbor_num > neighbors[i].size() ? max_neighbor_num : neighbors[i].size();
179     } else if (format == OutputFormat::kCoo) {
180       RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i], true));
181       total_edge_num += neighbors[i].size();
182     } else {
183       RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i], true));
184       total_edge_num += neighbors[i].size();
185       if (i < node_list.size() - 1) {
186         offset_table[i + 1] = total_edge_num;
187       }
188     }
189   }
190 
191   // By applying those information we obtained above, deal with the output with corresponding to
192   // output format
193   if (format == OutputFormat::kNormal) {
194     RETURN_IF_NOT_OK(ComplementVector<NodeIdType>(&neighbors, max_neighbor_num, kDefaultNodeId));
195     RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neighbors, DataType(DataType::DE_INT32), out));
196   } else if (format == OutputFormat::kCoo) {
197     std::vector<std::vector<NodeIdType>> coo_result;
198     coo_result.resize(total_edge_num);
199     size_t k = 0;
200     for (size_t i = 0; i < neighbors.size(); ++i) {
201       NodeIdType src = node_list[i];
202       for (auto &dst : neighbors[i]) {
203         coo_result[k] = {src, dst};
204         k++;
205       }
206     }
207     RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(coo_result, DataType(DataType::DE_INT32), out));
208   } else {
209     std::vector<std::vector<NodeIdType>> csr_result;
210     csr_result.resize(node_list.size() + total_edge_num);
211     for (size_t i = 0; i < offset_table.size(); ++i) {
212       csr_result[i] = {offset_table[i]};
213     }
214     size_t edge_index = 0;
215     for (auto &neighbor : neighbors) {
216       for (auto &dst : neighbor) {
217         csr_result[node_list.size() + edge_index] = {dst};
218         edge_index++;
219       }
220     }
221     RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(csr_result, DataType(DataType::DE_INT32), out));
222   }
223   return Status::OK();
224 }
225 
CheckSamplesNum(NodeIdType samples_num)226 Status GraphDataImpl::CheckSamplesNum(NodeIdType samples_num) {
227   NodeIdType all_nodes_number =
228     std::accumulate(node_type_map_.begin(), node_type_map_.end(), 0,
229                     [](NodeIdType t1, const auto &t2) -> NodeIdType { return t1 + t2.second.size(); });
230   if ((samples_num < 1) || (samples_num > all_nodes_number)) {
231     std::string err_msg = "Wrong samples number, should be between 1 and " + std::to_string(all_nodes_number) +
232                           ", got " + std::to_string(samples_num);
233     RETURN_STATUS_UNEXPECTED(err_msg);
234   }
235   return Status::OK();
236 }
237 
CheckNeighborType(NodeType neighbor_type)238 Status GraphDataImpl::CheckNeighborType(NodeType neighbor_type) {
239   if (node_type_map_.find(neighbor_type) == node_type_map_.end()) {
240     std::string err_msg = "Invalid neighbor type:" + std::to_string(neighbor_type);
241     RETURN_STATUS_UNEXPECTED(err_msg);
242   }
243   return Status::OK();
244 }
245 
GetSampledNeighbors(const std::vector<NodeIdType> & node_list,const std::vector<NodeIdType> & neighbor_nums,const std::vector<NodeType> & neighbor_types,SamplingStrategy strategy,std::shared_ptr<Tensor> * out)246 Status GraphDataImpl::GetSampledNeighbors(const std::vector<NodeIdType> &node_list,
247                                           const std::vector<NodeIdType> &neighbor_nums,
248                                           const std::vector<NodeType> &neighbor_types, SamplingStrategy strategy,
249                                           std::shared_ptr<Tensor> *out) {
250   CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
251   CHECK_FAIL_RETURN_UNEXPECTED(neighbor_nums.size() == neighbor_types.size(),
252                                "The sizes of neighbor_nums and neighbor_types are inconsistent.");
253   for (const auto &num : neighbor_nums) {
254     RETURN_IF_NOT_OK(CheckSamplesNum(num));
255   }
256   for (const auto &type : neighbor_types) {
257     RETURN_IF_NOT_OK(CheckNeighborType(type));
258   }
259   RETURN_UNEXPECTED_IF_NULL(out);
260   std::vector<std::vector<NodeIdType>> neighbors_vec(node_list.size());
261   for (size_t node_idx = 0; node_idx < node_list.size(); ++node_idx) {
262     std::shared_ptr<Node> input_node;
263     RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[node_idx], &input_node));
264     neighbors_vec[node_idx].emplace_back(node_list[node_idx]);
265     std::vector<NodeIdType> input_list = {node_list[node_idx]};
266     for (size_t i = 0; i < neighbor_nums.size(); ++i) {
267       std::vector<NodeIdType> neighbors;
268       neighbors.reserve(input_list.size() * neighbor_nums[i]);
269       for (const auto &node_id : input_list) {
270         if (node_id == kDefaultNodeId) {
271           for (int32_t j = 0; j < neighbor_nums[i]; ++j) {
272             neighbors.emplace_back(kDefaultNodeId);
273           }
274         } else {
275           std::shared_ptr<Node> node;
276           RETURN_IF_NOT_OK(GetNodeByNodeId(node_id, &node));
277           std::vector<NodeIdType> out;
278           RETURN_IF_NOT_OK(node->GetSampledNeighbors(neighbor_types[i], neighbor_nums[i], strategy, &out));
279           neighbors.insert(neighbors.end(), out.begin(), out.end());
280         }
281       }
282       neighbors_vec[node_idx].insert(neighbors_vec[node_idx].end(), neighbors.begin(), neighbors.end());
283       input_list = std::move(neighbors);
284     }
285   }
286   RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neighbors_vec, DataType(DataType::DE_INT32), out));
287   return Status::OK();
288 }
289 
NegativeSample(const std::vector<NodeIdType> & data,const std::vector<NodeIdType> shuffled_ids,size_t * start_index,const std::unordered_set<NodeIdType> & exclude_data,int32_t samples_num,std::vector<NodeIdType> * out_samples)290 Status GraphDataImpl::NegativeSample(const std::vector<NodeIdType> &data, const std::vector<NodeIdType> shuffled_ids,
291                                      size_t *start_index, const std::unordered_set<NodeIdType> &exclude_data,
292                                      int32_t samples_num, std::vector<NodeIdType> *out_samples) {
293   CHECK_FAIL_RETURN_UNEXPECTED(!data.empty(), "Input data is empty.");
294   RETURN_UNEXPECTED_IF_NULL(start_index);
295   size_t index = *start_index;
296   for (size_t i = index; i < shuffled_ids.size(); ++i) {
297     ++index;
298     if (exclude_data.find(data[shuffled_ids[i]]) != exclude_data.end()) {
299       continue;
300     }
301     out_samples->emplace_back(data[shuffled_ids[i]]);
302     if (out_samples->size() >= samples_num) {
303       break;
304     }
305   }
306   *start_index = index;
307   return Status::OK();
308 }
309 
GetNegSampledNeighbors(const std::vector<NodeIdType> & node_list,NodeIdType samples_num,NodeType neg_neighbor_type,std::shared_ptr<Tensor> * out)310 Status GraphDataImpl::GetNegSampledNeighbors(const std::vector<NodeIdType> &node_list, NodeIdType samples_num,
311                                              NodeType neg_neighbor_type, std::shared_ptr<Tensor> *out) {
312   CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
313   RETURN_IF_NOT_OK(CheckSamplesNum(samples_num));
314   RETURN_IF_NOT_OK(CheckNeighborType(neg_neighbor_type));
315   RETURN_UNEXPECTED_IF_NULL(out);
316 
317   const std::vector<NodeIdType> &all_nodes = node_type_map_[neg_neighbor_type];
318   std::vector<NodeIdType> shuffled_id(all_nodes.size());
319   std::iota(shuffled_id.begin(), shuffled_id.end(), 0);
320   std::shuffle(shuffled_id.begin(), shuffled_id.end(), rnd_);
321   size_t start_index = 0;
322   bool need_shuffle = false;
323 
324   std::vector<std::vector<NodeIdType>> neg_neighbors_vec;
325   neg_neighbors_vec.resize(node_list.size());
326   for (size_t node_idx = 0; node_idx < node_list.size(); ++node_idx) {
327     std::shared_ptr<Node> node;
328     RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[node_idx], &node));
329     std::vector<NodeIdType> neighbors;
330     RETURN_IF_NOT_OK(node->GetAllNeighbors(neg_neighbor_type, &neighbors));
331     std::unordered_set<NodeIdType> exclude_nodes;
332     (void)std::transform(neighbors.begin(), neighbors.end(),
333                          std::insert_iterator<std::unordered_set<NodeIdType>>(exclude_nodes, exclude_nodes.begin()),
334                          [](const NodeIdType node) { return node; });
335     neg_neighbors_vec[node_idx].emplace_back(node->id());
336     if (all_nodes.size() > exclude_nodes.size()) {
337       while (neg_neighbors_vec[node_idx].size() < samples_num + 1) {
338         RETURN_IF_NOT_OK(NegativeSample(all_nodes, shuffled_id, &start_index, exclude_nodes, samples_num + 1,
339                                         &neg_neighbors_vec[node_idx]));
340         if (start_index >= shuffled_id.size()) {
341           start_index = start_index % shuffled_id.size();
342           need_shuffle = true;
343         }
344       }
345     } else {
346       MS_LOG(DEBUG) << "There are no negative neighbors. node_id:" << node->id()
347                     << " neg_neighbor_type:" << neg_neighbor_type;
348       // If there are no negative neighbors, they are filled with kDefaultNodeId
349       for (int32_t i = 0; i < samples_num; ++i) {
350         neg_neighbors_vec[node_idx].emplace_back(kDefaultNodeId);
351       }
352     }
353     if (need_shuffle) {
354       std::shuffle(shuffled_id.begin(), shuffled_id.end(), rnd_);
355       start_index = 0;
356       need_shuffle = false;
357     }
358   }
359   RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neg_neighbors_vec, DataType(DataType::DE_INT32), out));
360   return Status::OK();
361 }
362 
RandomWalk(const std::vector<NodeIdType> & node_list,const std::vector<NodeType> & meta_path,float step_home_param,float step_away_param,NodeIdType default_node,std::shared_ptr<Tensor> * out)363 Status GraphDataImpl::RandomWalk(const std::vector<NodeIdType> &node_list, const std::vector<NodeType> &meta_path,
364                                  float step_home_param, float step_away_param, NodeIdType default_node,
365                                  std::shared_ptr<Tensor> *out) {
366   RETURN_UNEXPECTED_IF_NULL(out);
367   RETURN_IF_NOT_OK(random_walk_.Build(node_list, meta_path, step_home_param, step_away_param, default_node));
368   std::vector<std::vector<NodeIdType>> walks;
369   RETURN_IF_NOT_OK(random_walk_.SimulateWalk(&walks));
370   RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>({walks}, DataType(DataType::DE_INT32), out));
371   return Status::OK();
372 }
373 
GetNodeDefaultFeature(FeatureType feature_type,std::shared_ptr<Feature> * out_feature)374 Status GraphDataImpl::GetNodeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature) {
375   RETURN_UNEXPECTED_IF_NULL(out_feature);
376   auto itr = default_node_feature_map_.find(feature_type);
377   if (itr == default_node_feature_map_.end()) {
378     std::string err_msg = "Invalid feature type:" + std::to_string(feature_type);
379     RETURN_STATUS_UNEXPECTED(err_msg);
380   } else {
381     *out_feature = itr->second;
382   }
383   return Status::OK();
384 }
385 
GetEdgeDefaultFeature(FeatureType feature_type,std::shared_ptr<Feature> * out_feature)386 Status GraphDataImpl::GetEdgeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature) {
387   RETURN_UNEXPECTED_IF_NULL(out_feature);
388   auto itr = default_edge_feature_map_.find(feature_type);
389   if (itr == default_edge_feature_map_.end()) {
390     std::string err_msg = "Invalid feature type:" + std::to_string(feature_type);
391     RETURN_STATUS_UNEXPECTED(err_msg);
392   } else {
393     *out_feature = itr->second;
394   }
395   return Status::OK();
396 }
397 
GetNodeFeature(const std::shared_ptr<Tensor> & nodes,const std::vector<FeatureType> & feature_types,TensorRow * out)398 Status GraphDataImpl::GetNodeFeature(const std::shared_ptr<Tensor> &nodes,
399                                      const std::vector<FeatureType> &feature_types, TensorRow *out) {
400   if (!nodes || nodes->Size() == 0) {
401     RETURN_STATUS_UNEXPECTED("Input nodes is empty");
402   }
403   CHECK_FAIL_RETURN_UNEXPECTED(!feature_types.empty(), "Input feature_types is empty");
404   RETURN_UNEXPECTED_IF_NULL(out);
405   TensorRow tensors;
406   for (const auto &f_type : feature_types) {
407     std::shared_ptr<Feature> default_feature;
408     // If no feature can be obtained, fill in the default value
409     RETURN_IF_NOT_OK(GetNodeDefaultFeature(f_type, &default_feature));
410 
411     TensorShape shape(default_feature->Value()->shape());
412     auto shape_vec = nodes->shape().AsVector();
413     dsize_t size = std::accumulate(shape_vec.begin(), shape_vec.end(), 1, std::multiplies<dsize_t>());
414     shape = shape.PrependDim(size);
415     std::shared_ptr<Tensor> fea_tensor;
416     RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, default_feature->Value()->type(), &fea_tensor));
417 
418     dsize_t index = 0;
419     for (auto node_itr = nodes->begin<NodeIdType>(); node_itr != nodes->end<NodeIdType>(); ++node_itr) {
420       std::shared_ptr<Feature> feature;
421       if (*node_itr == kDefaultNodeId) {
422         feature = default_feature;
423       } else {
424         std::shared_ptr<Node> node;
425 
426         if (!GetNodeByNodeId(*node_itr, &node).IsOk() || !node->GetFeatures(f_type, &feature).IsOk()) {
427           feature = default_feature;
428         }
429       }
430       RETURN_IF_NOT_OK(fea_tensor->InsertTensor({index}, feature->Value()));
431       index++;
432     }
433 
434     TensorShape reshape(nodes->shape());
435     for (auto s : default_feature->Value()->shape().AsVector()) {
436       reshape = reshape.AppendDim(s);
437     }
438     RETURN_IF_NOT_OK(fea_tensor->Reshape(reshape));
439     fea_tensor->Squeeze();
440     tensors.push_back(fea_tensor);
441   }
442   *out = std::move(tensors);
443   return Status::OK();
444 }
445 
GetNodeFeatureSharedMemory(const std::shared_ptr<Tensor> & nodes,FeatureType type,std::shared_ptr<Tensor> * out)446 Status GraphDataImpl::GetNodeFeatureSharedMemory(const std::shared_ptr<Tensor> &nodes, FeatureType type,
447                                                  std::shared_ptr<Tensor> *out) {
448   if (!nodes || nodes->Size() == 0) {
449     RETURN_STATUS_UNEXPECTED("Input nodes is empty");
450   }
451   RETURN_UNEXPECTED_IF_NULL(out);
452   TensorShape shape = nodes->shape().AppendDim(2);
453   std::shared_ptr<Tensor> fea_tensor;
454   RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, DataType(DataType::DE_INT64), &fea_tensor));
455 
456   auto out_fea_itr = fea_tensor->begin<int64_t>();
457   for (auto node_itr = nodes->begin<NodeIdType>(); node_itr != nodes->end<NodeIdType>(); ++node_itr) {
458     if (*node_itr == kDefaultNodeId) {
459       *out_fea_itr = -1;
460       ++out_fea_itr;
461       *out_fea_itr = -1;
462       ++out_fea_itr;
463     } else {
464       std::shared_ptr<Node> node;
465       RETURN_IF_NOT_OK(GetNodeByNodeId(*node_itr, &node));
466       std::shared_ptr<Feature> feature;
467       if (!node->GetFeatures(type, &feature).IsOk()) {
468         *out_fea_itr = -1;
469         ++out_fea_itr;
470         *out_fea_itr = -1;
471         ++out_fea_itr;
472       } else {
473         for (auto fea_itr = feature->Value()->begin<int64_t>(); fea_itr != feature->Value()->end<int64_t>();
474              ++fea_itr) {
475           *out_fea_itr = *fea_itr;
476           ++out_fea_itr;
477         }
478       }
479     }
480   }
481 
482   fea_tensor->Squeeze();
483 
484   *out = std::move(fea_tensor);
485   return Status::OK();
486 }
487 
GetEdgeFeature(const std::shared_ptr<Tensor> & edges,const std::vector<FeatureType> & feature_types,TensorRow * out)488 Status GraphDataImpl::GetEdgeFeature(const std::shared_ptr<Tensor> &edges,
489                                      const std::vector<FeatureType> &feature_types, TensorRow *out) {
490   if (!edges || edges->Size() == 0) {
491     RETURN_STATUS_UNEXPECTED("Input edges is empty");
492   }
493   CHECK_FAIL_RETURN_UNEXPECTED(!feature_types.empty(), "Input feature_types is empty");
494   RETURN_UNEXPECTED_IF_NULL(out);
495   TensorRow tensors;
496   for (const auto &f_type : feature_types) {
497     std::shared_ptr<Feature> default_feature;
498     // If no feature can be obtained, fill in the default value
499     RETURN_IF_NOT_OK(GetEdgeDefaultFeature(f_type, &default_feature));
500 
501     TensorShape shape(default_feature->Value()->shape());
502     auto shape_vec = edges->shape().AsVector();
503     dsize_t size = std::accumulate(shape_vec.begin(), shape_vec.end(), 1, std::multiplies<dsize_t>());
504     shape = shape.PrependDim(size);
505     std::shared_ptr<Tensor> fea_tensor;
506     RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, default_feature->Value()->type(), &fea_tensor));
507 
508     dsize_t index = 0;
509     for (auto edge_itr = edges->begin<EdgeIdType>(); edge_itr != edges->end<EdgeIdType>(); ++edge_itr) {
510       std::shared_ptr<Edge> edge;
511       std::shared_ptr<Feature> feature;
512 
513       if (!GetEdgeByEdgeId(*edge_itr, &edge).IsOk() || !edge->GetFeatures(f_type, &feature).IsOk()) {
514         feature = default_feature;
515       }
516       RETURN_IF_NOT_OK(fea_tensor->InsertTensor({index}, feature->Value()));
517       index++;
518     }
519 
520     TensorShape reshape(edges->shape());
521     for (auto s : default_feature->Value()->shape().AsVector()) {
522       reshape = reshape.AppendDim(s);
523     }
524     RETURN_IF_NOT_OK(fea_tensor->Reshape(reshape));
525     fea_tensor->Squeeze();
526     tensors.push_back(fea_tensor);
527   }
528   *out = std::move(tensors);
529   return Status::OK();
530 }
531 
GetEdgeFeatureSharedMemory(const std::shared_ptr<Tensor> & edges,FeatureType type,std::shared_ptr<Tensor> * out)532 Status GraphDataImpl::GetEdgeFeatureSharedMemory(const std::shared_ptr<Tensor> &edges, FeatureType type,
533                                                  std::shared_ptr<Tensor> *out) {
534   if (!edges || edges->Size() == 0) {
535     RETURN_STATUS_UNEXPECTED("Input edges is empty");
536   }
537   RETURN_UNEXPECTED_IF_NULL(out);
538   TensorShape shape = edges->shape().AppendDim(2);
539   std::shared_ptr<Tensor> fea_tensor;
540   RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, DataType(DataType::DE_INT64), &fea_tensor));
541 
542   auto out_fea_itr = fea_tensor->begin<int64_t>();
543   for (auto edge_itr = edges->begin<EdgeIdType>(); edge_itr != edges->end<EdgeIdType>(); ++edge_itr) {
544     std::shared_ptr<Edge> edge;
545     RETURN_IF_NOT_OK(GetEdgeByEdgeId(*edge_itr, &edge));
546     std::shared_ptr<Feature> feature;
547     if (!edge->GetFeatures(type, &feature).IsOk()) {
548       *out_fea_itr = -1;
549       ++out_fea_itr;
550       *out_fea_itr = -1;
551       ++out_fea_itr;
552     } else {
553       for (auto fea_itr = feature->Value()->begin<int64_t>(); fea_itr != feature->Value()->end<int64_t>(); ++fea_itr) {
554         *out_fea_itr = *fea_itr;
555         ++out_fea_itr;
556       }
557     }
558   }
559 
560   fea_tensor->Squeeze();
561 
562   *out = std::move(fea_tensor);
563   return Status::OK();
564 }
565 
Init()566 Status GraphDataImpl::Init() {
567   RETURN_IF_NOT_OK(LoadNodeAndEdge());
568   return Status::OK();
569 }
570 
GetMetaInfo(MetaInfo * meta_info)571 Status GraphDataImpl::GetMetaInfo(MetaInfo *meta_info) {
572   RETURN_UNEXPECTED_IF_NULL(meta_info);
573   meta_info->node_type.resize(node_type_map_.size());
574   (void)std::transform(node_type_map_.begin(), node_type_map_.end(), meta_info->node_type.begin(),
575                        [](auto itr) { return itr.first; });
576   std::sort(meta_info->node_type.begin(), meta_info->node_type.end());
577 
578   meta_info->edge_type.resize(edge_type_map_.size());
579   (void)std::transform(edge_type_map_.begin(), edge_type_map_.end(), meta_info->edge_type.begin(),
580                        [](auto itr) { return itr.first; });
581   std::sort(meta_info->edge_type.begin(), meta_info->edge_type.end());
582 
583   for (const auto &node : node_type_map_) {
584     meta_info->node_num[node.first] = node.second.size();
585   }
586 
587   for (const auto &edge : edge_type_map_) {
588     meta_info->edge_num[edge.first] = edge.second.size();
589   }
590 
591   for (const auto &node_feature : node_feature_map_) {
592     for (auto type : node_feature.second) {
593       meta_info->node_feature_type.emplace_back(type);
594     }
595   }
596   std::sort(meta_info->node_feature_type.begin(), meta_info->node_feature_type.end());
597   auto unique_node = std::unique(meta_info->node_feature_type.begin(), meta_info->node_feature_type.end());
598   meta_info->node_feature_type.erase(unique_node, meta_info->node_feature_type.end());
599 
600   for (const auto &edge_feature : edge_feature_map_) {
601     for (const auto &type : edge_feature.second) {
602       meta_info->edge_feature_type.emplace_back(type);
603     }
604   }
605   std::sort(meta_info->edge_feature_type.begin(), meta_info->edge_feature_type.end());
606   auto unique_edge = std::unique(meta_info->edge_feature_type.begin(), meta_info->edge_feature_type.end());
607   meta_info->edge_feature_type.erase(unique_edge, meta_info->edge_feature_type.end());
608   return Status::OK();
609 }
610 
611 #ifdef ENABLE_PYTHON
GraphInfo(py::dict * out)612 Status GraphDataImpl::GraphInfo(py::dict *out) {
613   RETURN_UNEXPECTED_IF_NULL(out);
614   MetaInfo meta_info;
615   RETURN_IF_NOT_OK(GetMetaInfo(&meta_info));
616   (*out)["node_type"] = py::cast(meta_info.node_type);
617   (*out)["edge_type"] = py::cast(meta_info.edge_type);
618   (*out)["node_num"] = py::cast(meta_info.node_num);
619   (*out)["edge_num"] = py::cast(meta_info.edge_num);
620   (*out)["node_feature_type"] = py::cast(meta_info.node_feature_type);
621   (*out)["edge_feature_type"] = py::cast(meta_info.edge_feature_type);
622   return Status::OK();
623 }
624 #endif
625 
LoadNodeAndEdge()626 Status GraphDataImpl::LoadNodeAndEdge() {
627   GraphLoader gl(this, dataset_file_, num_workers_, server_mode_);
628   // ask graph_loader to load everything into memory
629   RETURN_IF_NOT_OK(gl.InitAndLoad());
630   // get all maps
631   RETURN_IF_NOT_OK(gl.GetNodesAndEdges());
632   return Status::OK();
633 }
634 
GetNodeByNodeId(NodeIdType id,std::shared_ptr<Node> * node)635 Status GraphDataImpl::GetNodeByNodeId(NodeIdType id, std::shared_ptr<Node> *node) {
636   RETURN_UNEXPECTED_IF_NULL(node);
637   auto itr = node_id_map_.find(id);
638   if (itr == node_id_map_.end()) {
639     std::string err_msg = "Invalid node id:" + std::to_string(id);
640     RETURN_STATUS_UNEXPECTED(err_msg);
641   } else {
642     *node = itr->second;
643   }
644   return Status::OK();
645 }
646 
GetEdgeByEdgeId(EdgeIdType id,std::shared_ptr<Edge> * edge)647 Status GraphDataImpl::GetEdgeByEdgeId(EdgeIdType id, std::shared_ptr<Edge> *edge) {
648   RETURN_UNEXPECTED_IF_NULL(edge);
649   auto itr = edge_id_map_.find(id);
650   if (itr == edge_id_map_.end()) {
651     std::string err_msg = "Invalid edge id:" + std::to_string(id);
652     RETURN_STATUS_UNEXPECTED(err_msg);
653   } else {
654     *edge = itr->second;
655   }
656   return Status::OK();
657 }
658 
RandomWalkBase(GraphDataImpl * graph)659 GraphDataImpl::RandomWalkBase::RandomWalkBase(GraphDataImpl *graph)
660     : graph_(graph), step_home_param_(1.0), step_away_param_(1.0), default_node_(-1), num_walks_(1), num_workers_(1) {}
661 
Build(const std::vector<NodeIdType> & node_list,const std::vector<NodeType> & meta_path,float step_home_param,float step_away_param,const NodeIdType default_node,int32_t num_walks,int32_t num_workers)662 Status GraphDataImpl::RandomWalkBase::Build(const std::vector<NodeIdType> &node_list,
663                                             const std::vector<NodeType> &meta_path, float step_home_param,
664                                             float step_away_param, const NodeIdType default_node, int32_t num_walks,
665                                             int32_t num_workers) {
666   CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
667   node_list_ = node_list;
668   if (meta_path.empty() || meta_path.size() > kMaxNumWalks) {
669     std::string err_msg = "Failed, meta path required between 1 and " + std::to_string(kMaxNumWalks) +
670                           ". The size of input path is " + std::to_string(meta_path.size());
671     RETURN_STATUS_UNEXPECTED(err_msg);
672   }
673   for (const auto &type : meta_path) {
674     RETURN_IF_NOT_OK(graph_->CheckNeighborType(type));
675   }
676   meta_path_ = meta_path;
677   if (step_home_param < kGnnEpsilon || step_away_param < kGnnEpsilon) {
678     std::string err_msg = "Failed, step_home_param and step_away_param required greater than " +
679                           std::to_string(kGnnEpsilon) + ". step_home_param: " + std::to_string(step_home_param) +
680                           ", step_away_param: " + std::to_string(step_away_param);
681     RETURN_STATUS_UNEXPECTED(err_msg);
682   }
683   if (default_node < -1) {
684     std::string err_msg = "Failed, default_node required to be greater or equal to -1.";
685     RETURN_STATUS_UNEXPECTED(err_msg);
686   }
687   if (num_walks <= 0) {
688     std::string err_msg = "Failed, num_walks parameter required to be greater than 0";
689     RETURN_STATUS_UNEXPECTED(err_msg);
690   }
691   if (num_workers <= 0) {
692     std::string err_msg = "Failed, num_workers parameter required to be greater than 0";
693     RETURN_STATUS_UNEXPECTED(err_msg);
694   }
695   step_home_param_ = step_home_param;
696   step_away_param_ = step_away_param;
697   default_node_ = default_node;
698   num_walks_ = num_walks;
699   num_workers_ = num_workers;
700   return Status::OK();
701 }
702 
Node2vecWalk(const NodeIdType & start_node,std::vector<NodeIdType> * walk_path)703 Status GraphDataImpl::RandomWalkBase::Node2vecWalk(const NodeIdType &start_node, std::vector<NodeIdType> *walk_path) {
704   RETURN_UNEXPECTED_IF_NULL(walk_path);
705   // Simulate a random walk starting from start node.
706   auto walk = std::vector<NodeIdType>(1, start_node);  // walk is an vector
707   // walk simulate
708   while (walk.size() - 1 < meta_path_.size()) {
709     // current nodE
710     auto cur_node_id = walk.back();
711     std::shared_ptr<Node> cur_node;
712     RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(cur_node_id, &cur_node));
713 
714     // current neighbors
715     std::vector<NodeIdType> cur_neighbors;
716     RETURN_IF_NOT_OK(cur_node->GetAllNeighbors(meta_path_[walk.size() - 1], &cur_neighbors, true));
717     std::sort(cur_neighbors.begin(), cur_neighbors.end());
718 
719     // break if no neighbors
720     if (cur_neighbors.empty()) {
721       break;
722     }
723 
724     // walk by the fist node, then by the previous 2 nodes
725     std::shared_ptr<StochasticIndex> stochastic_index;
726     if (walk.size() == 1) {
727       RETURN_IF_NOT_OK(GetNodeProbability(cur_node_id, meta_path_[0], &stochastic_index));
728     } else {
729       NodeIdType prev_node_id = walk[walk.size() - 2];
730       RETURN_IF_NOT_OK(GetEdgeProbability(prev_node_id, cur_node_id, walk.size() - 2, &stochastic_index));
731     }
732     NodeIdType next_node_id = cur_neighbors[WalkToNextNode(*stochastic_index)];
733     walk.push_back(next_node_id);
734   }
735 
736   while (walk.size() - 1 < meta_path_.size()) {
737     walk.push_back(default_node_);
738   }
739 
740   *walk_path = std::move(walk);
741   return Status::OK();
742 }
743 
SimulateWalk(std::vector<std::vector<NodeIdType>> * walks)744 Status GraphDataImpl::RandomWalkBase::SimulateWalk(std::vector<std::vector<NodeIdType>> *walks) {
745   RETURN_UNEXPECTED_IF_NULL(walks);
746   for (int32_t i = 0; i < num_walks_; ++i) {
747     for (const auto &node : node_list_) {
748       std::vector<NodeIdType> walk;
749       RETURN_IF_NOT_OK(Node2vecWalk(node, &walk));
750       walks->push_back(walk);
751     }
752   }
753   return Status::OK();
754 }
755 
GetNodeProbability(const NodeIdType & node_id,const NodeType & node_type,std::shared_ptr<StochasticIndex> * node_probability)756 Status GraphDataImpl::RandomWalkBase::GetNodeProbability(const NodeIdType &node_id, const NodeType &node_type,
757                                                          std::shared_ptr<StochasticIndex> *node_probability) {
758   RETURN_UNEXPECTED_IF_NULL(node_probability);
759   // Generate alias nodes
760   std::shared_ptr<Node> node;
761   RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(node_id, &node));
762   std::vector<NodeIdType> neighbors;
763   RETURN_IF_NOT_OK(node->GetAllNeighbors(node_type, &neighbors, true));
764   std::sort(neighbors.begin(), neighbors.end());
765   auto non_normalized_probability = std::vector<float>(neighbors.size(), 1.0);
766   *node_probability =
767     std::make_shared<StochasticIndex>(GenerateProbability(Normalize<float>(non_normalized_probability)));
768   return Status::OK();
769 }
770 
GetEdgeProbability(const NodeIdType & src,const NodeIdType & dst,uint32_t meta_path_index,std::shared_ptr<StochasticIndex> * edge_probability)771 Status GraphDataImpl::RandomWalkBase::GetEdgeProbability(const NodeIdType &src, const NodeIdType &dst,
772                                                          uint32_t meta_path_index,
773                                                          std::shared_ptr<StochasticIndex> *edge_probability) {
774   RETURN_UNEXPECTED_IF_NULL(edge_probability);
775   // Get the alias edge setup lists for a given edge.
776   std::shared_ptr<Node> src_node;
777   RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(src, &src_node));
778   std::vector<NodeIdType> src_neighbors;
779   RETURN_IF_NOT_OK(src_node->GetAllNeighbors(meta_path_[meta_path_index], &src_neighbors, true));
780 
781   std::shared_ptr<Node> dst_node;
782   RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(dst, &dst_node));
783   std::vector<NodeIdType> dst_neighbors;
784   RETURN_IF_NOT_OK(dst_node->GetAllNeighbors(meta_path_[meta_path_index + 1], &dst_neighbors, true));
785 
786   CHECK_FAIL_RETURN_UNEXPECTED(step_home_param_ != 0, "Invalid data, step home parameter can't be zero.");
787   CHECK_FAIL_RETURN_UNEXPECTED(step_away_param_ != 0, "Invalid data, step away parameter can't be zero.");
788   std::sort(dst_neighbors.begin(), dst_neighbors.end());
789   std::vector<float> non_normalized_probability;
790   for (const auto &dst_nbr : dst_neighbors) {
791     if (dst_nbr == src) {
792       non_normalized_probability.push_back(1.0 / step_home_param_);  // replace 1.0 with G[dst][dst_nbr]['weight']
793       continue;
794     }
795     auto it = std::find(src_neighbors.begin(), src_neighbors.end(), dst_nbr);
796     if (it != src_neighbors.end()) {
797       // stay close, this node connect both src and dst
798       non_normalized_probability.push_back(1.0);  // replace 1.0 with G[dst][dst_nbr]['weight']
799     } else {
800       // step far away
801       non_normalized_probability.push_back(1.0 / step_away_param_);  // replace 1.0 with G[dst][dst_nbr]['weight']
802     }
803   }
804 
805   *edge_probability =
806     std::make_shared<StochasticIndex>(GenerateProbability(Normalize<float>(non_normalized_probability)));
807   return Status::OK();
808 }
809 
GenerateProbability(const std::vector<float> & probability)810 StochasticIndex GraphDataImpl::RandomWalkBase::GenerateProbability(const std::vector<float> &probability) {
811   uint32_t K = probability.size();
812   std::vector<int32_t> switch_to_large_index(K, 0);
813   std::vector<float> weight(K, .0);
814   std::vector<int32_t> smaller;
815   std::vector<int32_t> larger;
816   auto random_device = GetRandomDevice();
817   std::uniform_real_distribution<> distribution(-kGnnEpsilon, kGnnEpsilon);
818   float accumulate_threshold = 0.0;
819   for (uint32_t i = 0; i < K; i++) {
820     float threshold_one = distribution(random_device);
821     accumulate_threshold += threshold_one;
822     weight[i] = i < K - 1 ? probability[i] * K + threshold_one : probability[i] * K - accumulate_threshold;
823     weight[i] < 1.0 ? smaller.push_back(i) : larger.push_back(i);
824   }
825 
826   while ((!smaller.empty()) && (!larger.empty())) {
827     uint32_t small = smaller.back();
828     smaller.pop_back();
829     uint32_t large = larger.back();
830     larger.pop_back();
831     switch_to_large_index[small] = large;
832     weight[large] = weight[large] + weight[small] - 1.0;
833     weight[large] < 1.0 ? smaller.push_back(large) : larger.push_back(large);
834   }
835   return StochasticIndex(switch_to_large_index, weight);
836 }
837 
WalkToNextNode(const StochasticIndex & stochastic_index)838 uint32_t GraphDataImpl::RandomWalkBase::WalkToNextNode(const StochasticIndex &stochastic_index) {
839   auto switch_to_large_index = stochastic_index.first;
840   auto weight = stochastic_index.second;
841   const uint32_t size_of_index = switch_to_large_index.size();
842 
843   auto random_device = GetRandomDevice();
844   std::uniform_real_distribution<> distribution(0.0, 1.0);
845 
846   // Generate random integer between [0, K)
847   uint32_t random_idx = std::floor(distribution(random_device) * size_of_index);
848 
849   if (distribution(random_device) < weight[random_idx]) {
850     return random_idx;
851   }
852   return switch_to_large_index[random_idx];
853 }
854 
855 template <typename T>
Normalize(const std::vector<T> & non_normalized_probability)856 std::vector<float> GraphDataImpl::RandomWalkBase::Normalize(const std::vector<T> &non_normalized_probability) {
857   float sum_probability =
858     1.0 * std::accumulate(non_normalized_probability.begin(), non_normalized_probability.end(), 0);
859   if (sum_probability < kGnnEpsilon) {
860     sum_probability = 1.0;
861   }
862   std::vector<float> normalized_probability;
863   std::transform(non_normalized_probability.begin(), non_normalized_probability.end(),
864                  std::back_inserter(normalized_probability), [&](T value) -> float { return value / sum_probability; });
865   return normalized_probability;
866 }
867 }  // namespace gnn
868 }  // namespace dataset
869 }  // namespace mindspore
870