1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/gnn/graph_data_impl.h"
17
18 #include <algorithm>
19 #include <functional>
20 #include <iterator>
21 #include <numeric>
22 #include <utility>
23
24 #include "minddata/dataset/core/tensor_shape.h"
25 #include "minddata/dataset/engine/gnn/graph_loader.h"
26 #include "minddata/dataset/util/random.h"
27 namespace mindspore {
28 namespace dataset {
29 namespace gnn {
30
GraphDataImpl(std::string dataset_file,int32_t num_workers,bool server_mode)31 GraphDataImpl::GraphDataImpl(std::string dataset_file, int32_t num_workers, bool server_mode)
32 : dataset_file_(dataset_file),
33 num_workers_(num_workers),
34 rnd_(GetRandomDevice()),
35 random_walk_(this),
36 server_mode_(server_mode) {
37 rnd_.seed(GetSeed());
38 MS_LOG(INFO) << "num_workers:" << num_workers;
39 }
40
~GraphDataImpl()41 GraphDataImpl::~GraphDataImpl() {}
42
GetAllNodes(NodeType node_type,std::shared_ptr<Tensor> * out)43 Status GraphDataImpl::GetAllNodes(NodeType node_type, std::shared_ptr<Tensor> *out) {
44 RETURN_UNEXPECTED_IF_NULL(out);
45 auto itr = node_type_map_.find(node_type);
46 if (itr == node_type_map_.end()) {
47 std::string err_msg = "Invalid node type:" + std::to_string(node_type);
48 RETURN_STATUS_UNEXPECTED(err_msg);
49 } else {
50 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>({itr->second}, DataType(DataType::DE_INT32), out));
51 }
52 return Status::OK();
53 }
54
55 template <typename T>
CreateTensorByVector(const std::vector<std::vector<T>> & data,DataType type,std::shared_ptr<Tensor> * out)56 Status GraphDataImpl::CreateTensorByVector(const std::vector<std::vector<T>> &data, DataType type,
57 std::shared_ptr<Tensor> *out) {
58 RETURN_UNEXPECTED_IF_NULL(out);
59 if (!type.IsCompatible<T>()) {
60 RETURN_STATUS_UNEXPECTED("Data type not compatible");
61 }
62 if (data.empty()) {
63 RETURN_STATUS_UNEXPECTED("Input data is empty");
64 }
65 std::shared_ptr<Tensor> tensor;
66 size_t m = data.size();
67 size_t n = data[0].size();
68 RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({static_cast<dsize_t>(m), static_cast<dsize_t>(n)}), type, &tensor));
69 auto ptr = tensor->begin<T>();
70 for (const auto &id_m : data) {
71 CHECK_FAIL_RETURN_UNEXPECTED(id_m.size() == n, "Each member of the vector has a different size");
72 for (const auto &id_n : id_m) {
73 *ptr = id_n;
74 ptr++;
75 }
76 }
77 tensor->Squeeze();
78 *out = std::move(tensor);
79 return Status::OK();
80 }
81
82 template <typename T>
ComplementVector(std::vector<std::vector<T>> * data,size_t max_size,T default_value)83 Status GraphDataImpl::ComplementVector(std::vector<std::vector<T>> *data, size_t max_size, T default_value) {
84 if (!data || data->empty()) {
85 RETURN_STATUS_UNEXPECTED("Input data is empty");
86 }
87 for (std::vector<T> &vec : *data) {
88 size_t size = vec.size();
89 if (size > max_size) {
90 RETURN_STATUS_UNEXPECTED("The max_size parameter is abnormal");
91 } else {
92 for (size_t i = 0; i < (max_size - size); ++i) {
93 vec.push_back(default_value);
94 }
95 }
96 }
97 return Status::OK();
98 }
99
GetAllEdges(EdgeType edge_type,std::shared_ptr<Tensor> * out)100 Status GraphDataImpl::GetAllEdges(EdgeType edge_type, std::shared_ptr<Tensor> *out) {
101 RETURN_UNEXPECTED_IF_NULL(out);
102 auto itr = edge_type_map_.find(edge_type);
103 if (itr == edge_type_map_.end()) {
104 std::string err_msg = "Invalid edge type:" + std::to_string(edge_type);
105 RETURN_STATUS_UNEXPECTED(err_msg);
106 } else {
107 RETURN_IF_NOT_OK(CreateTensorByVector<EdgeIdType>({itr->second}, DataType(DataType::DE_INT32), out));
108 }
109 return Status::OK();
110 }
111
GetNodesFromEdges(const std::vector<EdgeIdType> & edge_list,std::shared_ptr<Tensor> * out)112 Status GraphDataImpl::GetNodesFromEdges(const std::vector<EdgeIdType> &edge_list, std::shared_ptr<Tensor> *out) {
113 if (edge_list.empty()) {
114 RETURN_STATUS_UNEXPECTED("Input edge_list is empty");
115 }
116 RETURN_UNEXPECTED_IF_NULL(out);
117
118 std::vector<std::vector<NodeIdType>> node_list;
119 node_list.reserve(edge_list.size());
120 for (const auto &edge_id : edge_list) {
121 auto itr = edge_id_map_.find(edge_id);
122 if (itr == edge_id_map_.end()) {
123 std::string err_msg = "Invalid edge id:" + std::to_string(edge_id);
124 RETURN_STATUS_UNEXPECTED(err_msg);
125 } else {
126 std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> nodes;
127 RETURN_IF_NOT_OK(itr->second->GetNode(&nodes));
128 node_list.push_back({nodes.first->id(), nodes.second->id()});
129 }
130 }
131 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(node_list, DataType(DataType::DE_INT32), out));
132 return Status::OK();
133 }
134
GetEdgesFromNodes(const std::vector<std::pair<NodeIdType,NodeIdType>> & node_list,std::shared_ptr<Tensor> * out)135 Status GraphDataImpl::GetEdgesFromNodes(const std::vector<std::pair<NodeIdType, NodeIdType>> &node_list,
136 std::shared_ptr<Tensor> *out) {
137 if (node_list.empty()) {
138 RETURN_STATUS_UNEXPECTED("Input node list is empty.");
139 }
140
141 std::vector<std::vector<EdgeIdType>> edge_list;
142 edge_list.reserve(node_list.size());
143
144 for (const auto &node_id : node_list) {
145 std::shared_ptr<Node> src_node;
146 RETURN_IF_NOT_OK(GetNodeByNodeId(node_id.first, &src_node));
147
148 EdgeIdType edge_id;
149 src_node->GetEdgeByAdjNodeId(node_id.second, &edge_id);
150
151 std::vector<EdgeIdType> connection_edge = {edge_id};
152 edge_list.emplace_back(std::move(connection_edge));
153 }
154
155 RETURN_IF_NOT_OK(CreateTensorByVector<EdgeIdType>(edge_list, DataType(DataType::DE_INT32), out));
156 return Status::OK();
157 }
158
GetAllNeighbors(const std::vector<NodeIdType> & node_list,NodeType neighbor_type,const OutputFormat & format,std::shared_ptr<Tensor> * out)159 Status GraphDataImpl::GetAllNeighbors(const std::vector<NodeIdType> &node_list, NodeType neighbor_type,
160 const OutputFormat &format, std::shared_ptr<Tensor> *out) {
161 CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
162 RETURN_IF_NOT_OK(CheckNeighborType(neighbor_type));
163 RETURN_UNEXPECTED_IF_NULL(out);
164
165 std::vector<std::vector<NodeIdType>> neighbors;
166
167 size_t max_neighbor_num = 0; // Special parameter for normal format
168 size_t total_edge_num = 0; // Special parameter for coo and csr format
169 std::vector<NodeIdType> offset_table(node_list.size(), 0); // Special parameter for csr format
170
171 // Collect information of adjacent table
172 neighbors.resize(node_list.size());
173 for (size_t i = 0; i < node_list.size(); ++i) {
174 std::shared_ptr<Node> node;
175 RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[i], &node));
176 if (format == OutputFormat::kNormal) {
177 RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i]));
178 max_neighbor_num = max_neighbor_num > neighbors[i].size() ? max_neighbor_num : neighbors[i].size();
179 } else if (format == OutputFormat::kCoo) {
180 RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i], true));
181 total_edge_num += neighbors[i].size();
182 } else {
183 RETURN_IF_NOT_OK(node->GetAllNeighbors(neighbor_type, &neighbors[i], true));
184 total_edge_num += neighbors[i].size();
185 if (i < node_list.size() - 1) {
186 offset_table[i + 1] = total_edge_num;
187 }
188 }
189 }
190
191 // By applying those information we obtained above, deal with the output with corresponding to
192 // output format
193 if (format == OutputFormat::kNormal) {
194 RETURN_IF_NOT_OK(ComplementVector<NodeIdType>(&neighbors, max_neighbor_num, kDefaultNodeId));
195 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neighbors, DataType(DataType::DE_INT32), out));
196 } else if (format == OutputFormat::kCoo) {
197 std::vector<std::vector<NodeIdType>> coo_result;
198 coo_result.resize(total_edge_num);
199 size_t k = 0;
200 for (size_t i = 0; i < neighbors.size(); ++i) {
201 NodeIdType src = node_list[i];
202 for (auto &dst : neighbors[i]) {
203 coo_result[k] = {src, dst};
204 k++;
205 }
206 }
207 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(coo_result, DataType(DataType::DE_INT32), out));
208 } else {
209 std::vector<std::vector<NodeIdType>> csr_result;
210 csr_result.resize(node_list.size() + total_edge_num);
211 for (size_t i = 0; i < offset_table.size(); ++i) {
212 csr_result[i] = {offset_table[i]};
213 }
214 size_t edge_index = 0;
215 for (auto &neighbor : neighbors) {
216 for (auto &dst : neighbor) {
217 csr_result[node_list.size() + edge_index] = {dst};
218 edge_index++;
219 }
220 }
221 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(csr_result, DataType(DataType::DE_INT32), out));
222 }
223 return Status::OK();
224 }
225
CheckSamplesNum(NodeIdType samples_num)226 Status GraphDataImpl::CheckSamplesNum(NodeIdType samples_num) {
227 NodeIdType all_nodes_number =
228 std::accumulate(node_type_map_.begin(), node_type_map_.end(), 0,
229 [](NodeIdType t1, const auto &t2) -> NodeIdType { return t1 + t2.second.size(); });
230 if ((samples_num < 1) || (samples_num > all_nodes_number)) {
231 std::string err_msg = "Wrong samples number, should be between 1 and " + std::to_string(all_nodes_number) +
232 ", got " + std::to_string(samples_num);
233 RETURN_STATUS_UNEXPECTED(err_msg);
234 }
235 return Status::OK();
236 }
237
CheckNeighborType(NodeType neighbor_type)238 Status GraphDataImpl::CheckNeighborType(NodeType neighbor_type) {
239 if (node_type_map_.find(neighbor_type) == node_type_map_.end()) {
240 std::string err_msg = "Invalid neighbor type:" + std::to_string(neighbor_type);
241 RETURN_STATUS_UNEXPECTED(err_msg);
242 }
243 return Status::OK();
244 }
245
GetSampledNeighbors(const std::vector<NodeIdType> & node_list,const std::vector<NodeIdType> & neighbor_nums,const std::vector<NodeType> & neighbor_types,SamplingStrategy strategy,std::shared_ptr<Tensor> * out)246 Status GraphDataImpl::GetSampledNeighbors(const std::vector<NodeIdType> &node_list,
247 const std::vector<NodeIdType> &neighbor_nums,
248 const std::vector<NodeType> &neighbor_types, SamplingStrategy strategy,
249 std::shared_ptr<Tensor> *out) {
250 CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
251 CHECK_FAIL_RETURN_UNEXPECTED(neighbor_nums.size() == neighbor_types.size(),
252 "The sizes of neighbor_nums and neighbor_types are inconsistent.");
253 for (const auto &num : neighbor_nums) {
254 RETURN_IF_NOT_OK(CheckSamplesNum(num));
255 }
256 for (const auto &type : neighbor_types) {
257 RETURN_IF_NOT_OK(CheckNeighborType(type));
258 }
259 RETURN_UNEXPECTED_IF_NULL(out);
260 std::vector<std::vector<NodeIdType>> neighbors_vec(node_list.size());
261 for (size_t node_idx = 0; node_idx < node_list.size(); ++node_idx) {
262 std::shared_ptr<Node> input_node;
263 RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[node_idx], &input_node));
264 neighbors_vec[node_idx].emplace_back(node_list[node_idx]);
265 std::vector<NodeIdType> input_list = {node_list[node_idx]};
266 for (size_t i = 0; i < neighbor_nums.size(); ++i) {
267 std::vector<NodeIdType> neighbors;
268 neighbors.reserve(input_list.size() * neighbor_nums[i]);
269 for (const auto &node_id : input_list) {
270 if (node_id == kDefaultNodeId) {
271 for (int32_t j = 0; j < neighbor_nums[i]; ++j) {
272 neighbors.emplace_back(kDefaultNodeId);
273 }
274 } else {
275 std::shared_ptr<Node> node;
276 RETURN_IF_NOT_OK(GetNodeByNodeId(node_id, &node));
277 std::vector<NodeIdType> out;
278 RETURN_IF_NOT_OK(node->GetSampledNeighbors(neighbor_types[i], neighbor_nums[i], strategy, &out));
279 neighbors.insert(neighbors.end(), out.begin(), out.end());
280 }
281 }
282 neighbors_vec[node_idx].insert(neighbors_vec[node_idx].end(), neighbors.begin(), neighbors.end());
283 input_list = std::move(neighbors);
284 }
285 }
286 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neighbors_vec, DataType(DataType::DE_INT32), out));
287 return Status::OK();
288 }
289
NegativeSample(const std::vector<NodeIdType> & data,const std::vector<NodeIdType> shuffled_ids,size_t * start_index,const std::unordered_set<NodeIdType> & exclude_data,int32_t samples_num,std::vector<NodeIdType> * out_samples)290 Status GraphDataImpl::NegativeSample(const std::vector<NodeIdType> &data, const std::vector<NodeIdType> shuffled_ids,
291 size_t *start_index, const std::unordered_set<NodeIdType> &exclude_data,
292 int32_t samples_num, std::vector<NodeIdType> *out_samples) {
293 CHECK_FAIL_RETURN_UNEXPECTED(!data.empty(), "Input data is empty.");
294 RETURN_UNEXPECTED_IF_NULL(start_index);
295 size_t index = *start_index;
296 for (size_t i = index; i < shuffled_ids.size(); ++i) {
297 ++index;
298 if (exclude_data.find(data[shuffled_ids[i]]) != exclude_data.end()) {
299 continue;
300 }
301 out_samples->emplace_back(data[shuffled_ids[i]]);
302 if (out_samples->size() >= samples_num) {
303 break;
304 }
305 }
306 *start_index = index;
307 return Status::OK();
308 }
309
GetNegSampledNeighbors(const std::vector<NodeIdType> & node_list,NodeIdType samples_num,NodeType neg_neighbor_type,std::shared_ptr<Tensor> * out)310 Status GraphDataImpl::GetNegSampledNeighbors(const std::vector<NodeIdType> &node_list, NodeIdType samples_num,
311 NodeType neg_neighbor_type, std::shared_ptr<Tensor> *out) {
312 CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
313 RETURN_IF_NOT_OK(CheckSamplesNum(samples_num));
314 RETURN_IF_NOT_OK(CheckNeighborType(neg_neighbor_type));
315 RETURN_UNEXPECTED_IF_NULL(out);
316
317 const std::vector<NodeIdType> &all_nodes = node_type_map_[neg_neighbor_type];
318 std::vector<NodeIdType> shuffled_id(all_nodes.size());
319 std::iota(shuffled_id.begin(), shuffled_id.end(), 0);
320 std::shuffle(shuffled_id.begin(), shuffled_id.end(), rnd_);
321 size_t start_index = 0;
322 bool need_shuffle = false;
323
324 std::vector<std::vector<NodeIdType>> neg_neighbors_vec;
325 neg_neighbors_vec.resize(node_list.size());
326 for (size_t node_idx = 0; node_idx < node_list.size(); ++node_idx) {
327 std::shared_ptr<Node> node;
328 RETURN_IF_NOT_OK(GetNodeByNodeId(node_list[node_idx], &node));
329 std::vector<NodeIdType> neighbors;
330 RETURN_IF_NOT_OK(node->GetAllNeighbors(neg_neighbor_type, &neighbors));
331 std::unordered_set<NodeIdType> exclude_nodes;
332 (void)std::transform(neighbors.begin(), neighbors.end(),
333 std::insert_iterator<std::unordered_set<NodeIdType>>(exclude_nodes, exclude_nodes.begin()),
334 [](const NodeIdType node) { return node; });
335 neg_neighbors_vec[node_idx].emplace_back(node->id());
336 if (all_nodes.size() > exclude_nodes.size()) {
337 while (neg_neighbors_vec[node_idx].size() < samples_num + 1) {
338 RETURN_IF_NOT_OK(NegativeSample(all_nodes, shuffled_id, &start_index, exclude_nodes, samples_num + 1,
339 &neg_neighbors_vec[node_idx]));
340 if (start_index >= shuffled_id.size()) {
341 start_index = start_index % shuffled_id.size();
342 need_shuffle = true;
343 }
344 }
345 } else {
346 MS_LOG(DEBUG) << "There are no negative neighbors. node_id:" << node->id()
347 << " neg_neighbor_type:" << neg_neighbor_type;
348 // If there are no negative neighbors, they are filled with kDefaultNodeId
349 for (int32_t i = 0; i < samples_num; ++i) {
350 neg_neighbors_vec[node_idx].emplace_back(kDefaultNodeId);
351 }
352 }
353 if (need_shuffle) {
354 std::shuffle(shuffled_id.begin(), shuffled_id.end(), rnd_);
355 start_index = 0;
356 need_shuffle = false;
357 }
358 }
359 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>(neg_neighbors_vec, DataType(DataType::DE_INT32), out));
360 return Status::OK();
361 }
362
RandomWalk(const std::vector<NodeIdType> & node_list,const std::vector<NodeType> & meta_path,float step_home_param,float step_away_param,NodeIdType default_node,std::shared_ptr<Tensor> * out)363 Status GraphDataImpl::RandomWalk(const std::vector<NodeIdType> &node_list, const std::vector<NodeType> &meta_path,
364 float step_home_param, float step_away_param, NodeIdType default_node,
365 std::shared_ptr<Tensor> *out) {
366 RETURN_UNEXPECTED_IF_NULL(out);
367 RETURN_IF_NOT_OK(random_walk_.Build(node_list, meta_path, step_home_param, step_away_param, default_node));
368 std::vector<std::vector<NodeIdType>> walks;
369 RETURN_IF_NOT_OK(random_walk_.SimulateWalk(&walks));
370 RETURN_IF_NOT_OK(CreateTensorByVector<NodeIdType>({walks}, DataType(DataType::DE_INT32), out));
371 return Status::OK();
372 }
373
GetNodeDefaultFeature(FeatureType feature_type,std::shared_ptr<Feature> * out_feature)374 Status GraphDataImpl::GetNodeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature) {
375 RETURN_UNEXPECTED_IF_NULL(out_feature);
376 auto itr = default_node_feature_map_.find(feature_type);
377 if (itr == default_node_feature_map_.end()) {
378 std::string err_msg = "Invalid feature type:" + std::to_string(feature_type);
379 RETURN_STATUS_UNEXPECTED(err_msg);
380 } else {
381 *out_feature = itr->second;
382 }
383 return Status::OK();
384 }
385
GetEdgeDefaultFeature(FeatureType feature_type,std::shared_ptr<Feature> * out_feature)386 Status GraphDataImpl::GetEdgeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature) {
387 RETURN_UNEXPECTED_IF_NULL(out_feature);
388 auto itr = default_edge_feature_map_.find(feature_type);
389 if (itr == default_edge_feature_map_.end()) {
390 std::string err_msg = "Invalid feature type:" + std::to_string(feature_type);
391 RETURN_STATUS_UNEXPECTED(err_msg);
392 } else {
393 *out_feature = itr->second;
394 }
395 return Status::OK();
396 }
397
GetNodeFeature(const std::shared_ptr<Tensor> & nodes,const std::vector<FeatureType> & feature_types,TensorRow * out)398 Status GraphDataImpl::GetNodeFeature(const std::shared_ptr<Tensor> &nodes,
399 const std::vector<FeatureType> &feature_types, TensorRow *out) {
400 if (!nodes || nodes->Size() == 0) {
401 RETURN_STATUS_UNEXPECTED("Input nodes is empty");
402 }
403 CHECK_FAIL_RETURN_UNEXPECTED(!feature_types.empty(), "Input feature_types is empty");
404 RETURN_UNEXPECTED_IF_NULL(out);
405 TensorRow tensors;
406 for (const auto &f_type : feature_types) {
407 std::shared_ptr<Feature> default_feature;
408 // If no feature can be obtained, fill in the default value
409 RETURN_IF_NOT_OK(GetNodeDefaultFeature(f_type, &default_feature));
410
411 TensorShape shape(default_feature->Value()->shape());
412 auto shape_vec = nodes->shape().AsVector();
413 dsize_t size = std::accumulate(shape_vec.begin(), shape_vec.end(), 1, std::multiplies<dsize_t>());
414 shape = shape.PrependDim(size);
415 std::shared_ptr<Tensor> fea_tensor;
416 RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, default_feature->Value()->type(), &fea_tensor));
417
418 dsize_t index = 0;
419 for (auto node_itr = nodes->begin<NodeIdType>(); node_itr != nodes->end<NodeIdType>(); ++node_itr) {
420 std::shared_ptr<Feature> feature;
421 if (*node_itr == kDefaultNodeId) {
422 feature = default_feature;
423 } else {
424 std::shared_ptr<Node> node;
425
426 if (!GetNodeByNodeId(*node_itr, &node).IsOk() || !node->GetFeatures(f_type, &feature).IsOk()) {
427 feature = default_feature;
428 }
429 }
430 RETURN_IF_NOT_OK(fea_tensor->InsertTensor({index}, feature->Value()));
431 index++;
432 }
433
434 TensorShape reshape(nodes->shape());
435 for (auto s : default_feature->Value()->shape().AsVector()) {
436 reshape = reshape.AppendDim(s);
437 }
438 RETURN_IF_NOT_OK(fea_tensor->Reshape(reshape));
439 fea_tensor->Squeeze();
440 tensors.push_back(fea_tensor);
441 }
442 *out = std::move(tensors);
443 return Status::OK();
444 }
445
GetNodeFeatureSharedMemory(const std::shared_ptr<Tensor> & nodes,FeatureType type,std::shared_ptr<Tensor> * out)446 Status GraphDataImpl::GetNodeFeatureSharedMemory(const std::shared_ptr<Tensor> &nodes, FeatureType type,
447 std::shared_ptr<Tensor> *out) {
448 if (!nodes || nodes->Size() == 0) {
449 RETURN_STATUS_UNEXPECTED("Input nodes is empty");
450 }
451 RETURN_UNEXPECTED_IF_NULL(out);
452 TensorShape shape = nodes->shape().AppendDim(2);
453 std::shared_ptr<Tensor> fea_tensor;
454 RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, DataType(DataType::DE_INT64), &fea_tensor));
455
456 auto out_fea_itr = fea_tensor->begin<int64_t>();
457 for (auto node_itr = nodes->begin<NodeIdType>(); node_itr != nodes->end<NodeIdType>(); ++node_itr) {
458 if (*node_itr == kDefaultNodeId) {
459 *out_fea_itr = -1;
460 ++out_fea_itr;
461 *out_fea_itr = -1;
462 ++out_fea_itr;
463 } else {
464 std::shared_ptr<Node> node;
465 RETURN_IF_NOT_OK(GetNodeByNodeId(*node_itr, &node));
466 std::shared_ptr<Feature> feature;
467 if (!node->GetFeatures(type, &feature).IsOk()) {
468 *out_fea_itr = -1;
469 ++out_fea_itr;
470 *out_fea_itr = -1;
471 ++out_fea_itr;
472 } else {
473 for (auto fea_itr = feature->Value()->begin<int64_t>(); fea_itr != feature->Value()->end<int64_t>();
474 ++fea_itr) {
475 *out_fea_itr = *fea_itr;
476 ++out_fea_itr;
477 }
478 }
479 }
480 }
481
482 fea_tensor->Squeeze();
483
484 *out = std::move(fea_tensor);
485 return Status::OK();
486 }
487
GetEdgeFeature(const std::shared_ptr<Tensor> & edges,const std::vector<FeatureType> & feature_types,TensorRow * out)488 Status GraphDataImpl::GetEdgeFeature(const std::shared_ptr<Tensor> &edges,
489 const std::vector<FeatureType> &feature_types, TensorRow *out) {
490 if (!edges || edges->Size() == 0) {
491 RETURN_STATUS_UNEXPECTED("Input edges is empty");
492 }
493 CHECK_FAIL_RETURN_UNEXPECTED(!feature_types.empty(), "Input feature_types is empty");
494 RETURN_UNEXPECTED_IF_NULL(out);
495 TensorRow tensors;
496 for (const auto &f_type : feature_types) {
497 std::shared_ptr<Feature> default_feature;
498 // If no feature can be obtained, fill in the default value
499 RETURN_IF_NOT_OK(GetEdgeDefaultFeature(f_type, &default_feature));
500
501 TensorShape shape(default_feature->Value()->shape());
502 auto shape_vec = edges->shape().AsVector();
503 dsize_t size = std::accumulate(shape_vec.begin(), shape_vec.end(), 1, std::multiplies<dsize_t>());
504 shape = shape.PrependDim(size);
505 std::shared_ptr<Tensor> fea_tensor;
506 RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, default_feature->Value()->type(), &fea_tensor));
507
508 dsize_t index = 0;
509 for (auto edge_itr = edges->begin<EdgeIdType>(); edge_itr != edges->end<EdgeIdType>(); ++edge_itr) {
510 std::shared_ptr<Edge> edge;
511 std::shared_ptr<Feature> feature;
512
513 if (!GetEdgeByEdgeId(*edge_itr, &edge).IsOk() || !edge->GetFeatures(f_type, &feature).IsOk()) {
514 feature = default_feature;
515 }
516 RETURN_IF_NOT_OK(fea_tensor->InsertTensor({index}, feature->Value()));
517 index++;
518 }
519
520 TensorShape reshape(edges->shape());
521 for (auto s : default_feature->Value()->shape().AsVector()) {
522 reshape = reshape.AppendDim(s);
523 }
524 RETURN_IF_NOT_OK(fea_tensor->Reshape(reshape));
525 fea_tensor->Squeeze();
526 tensors.push_back(fea_tensor);
527 }
528 *out = std::move(tensors);
529 return Status::OK();
530 }
531
GetEdgeFeatureSharedMemory(const std::shared_ptr<Tensor> & edges,FeatureType type,std::shared_ptr<Tensor> * out)532 Status GraphDataImpl::GetEdgeFeatureSharedMemory(const std::shared_ptr<Tensor> &edges, FeatureType type,
533 std::shared_ptr<Tensor> *out) {
534 if (!edges || edges->Size() == 0) {
535 RETURN_STATUS_UNEXPECTED("Input edges is empty");
536 }
537 RETURN_UNEXPECTED_IF_NULL(out);
538 TensorShape shape = edges->shape().AppendDim(2);
539 std::shared_ptr<Tensor> fea_tensor;
540 RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, DataType(DataType::DE_INT64), &fea_tensor));
541
542 auto out_fea_itr = fea_tensor->begin<int64_t>();
543 for (auto edge_itr = edges->begin<EdgeIdType>(); edge_itr != edges->end<EdgeIdType>(); ++edge_itr) {
544 std::shared_ptr<Edge> edge;
545 RETURN_IF_NOT_OK(GetEdgeByEdgeId(*edge_itr, &edge));
546 std::shared_ptr<Feature> feature;
547 if (!edge->GetFeatures(type, &feature).IsOk()) {
548 *out_fea_itr = -1;
549 ++out_fea_itr;
550 *out_fea_itr = -1;
551 ++out_fea_itr;
552 } else {
553 for (auto fea_itr = feature->Value()->begin<int64_t>(); fea_itr != feature->Value()->end<int64_t>(); ++fea_itr) {
554 *out_fea_itr = *fea_itr;
555 ++out_fea_itr;
556 }
557 }
558 }
559
560 fea_tensor->Squeeze();
561
562 *out = std::move(fea_tensor);
563 return Status::OK();
564 }
565
Init()566 Status GraphDataImpl::Init() {
567 RETURN_IF_NOT_OK(LoadNodeAndEdge());
568 return Status::OK();
569 }
570
GetMetaInfo(MetaInfo * meta_info)571 Status GraphDataImpl::GetMetaInfo(MetaInfo *meta_info) {
572 RETURN_UNEXPECTED_IF_NULL(meta_info);
573 meta_info->node_type.resize(node_type_map_.size());
574 (void)std::transform(node_type_map_.begin(), node_type_map_.end(), meta_info->node_type.begin(),
575 [](auto itr) { return itr.first; });
576 std::sort(meta_info->node_type.begin(), meta_info->node_type.end());
577
578 meta_info->edge_type.resize(edge_type_map_.size());
579 (void)std::transform(edge_type_map_.begin(), edge_type_map_.end(), meta_info->edge_type.begin(),
580 [](auto itr) { return itr.first; });
581 std::sort(meta_info->edge_type.begin(), meta_info->edge_type.end());
582
583 for (const auto &node : node_type_map_) {
584 meta_info->node_num[node.first] = node.second.size();
585 }
586
587 for (const auto &edge : edge_type_map_) {
588 meta_info->edge_num[edge.first] = edge.second.size();
589 }
590
591 for (const auto &node_feature : node_feature_map_) {
592 for (auto type : node_feature.second) {
593 meta_info->node_feature_type.emplace_back(type);
594 }
595 }
596 std::sort(meta_info->node_feature_type.begin(), meta_info->node_feature_type.end());
597 auto unique_node = std::unique(meta_info->node_feature_type.begin(), meta_info->node_feature_type.end());
598 meta_info->node_feature_type.erase(unique_node, meta_info->node_feature_type.end());
599
600 for (const auto &edge_feature : edge_feature_map_) {
601 for (const auto &type : edge_feature.second) {
602 meta_info->edge_feature_type.emplace_back(type);
603 }
604 }
605 std::sort(meta_info->edge_feature_type.begin(), meta_info->edge_feature_type.end());
606 auto unique_edge = std::unique(meta_info->edge_feature_type.begin(), meta_info->edge_feature_type.end());
607 meta_info->edge_feature_type.erase(unique_edge, meta_info->edge_feature_type.end());
608 return Status::OK();
609 }
610
611 #ifdef ENABLE_PYTHON
GraphInfo(py::dict * out)612 Status GraphDataImpl::GraphInfo(py::dict *out) {
613 RETURN_UNEXPECTED_IF_NULL(out);
614 MetaInfo meta_info;
615 RETURN_IF_NOT_OK(GetMetaInfo(&meta_info));
616 (*out)["node_type"] = py::cast(meta_info.node_type);
617 (*out)["edge_type"] = py::cast(meta_info.edge_type);
618 (*out)["node_num"] = py::cast(meta_info.node_num);
619 (*out)["edge_num"] = py::cast(meta_info.edge_num);
620 (*out)["node_feature_type"] = py::cast(meta_info.node_feature_type);
621 (*out)["edge_feature_type"] = py::cast(meta_info.edge_feature_type);
622 return Status::OK();
623 }
624 #endif
625
LoadNodeAndEdge()626 Status GraphDataImpl::LoadNodeAndEdge() {
627 GraphLoader gl(this, dataset_file_, num_workers_, server_mode_);
628 // ask graph_loader to load everything into memory
629 RETURN_IF_NOT_OK(gl.InitAndLoad());
630 // get all maps
631 RETURN_IF_NOT_OK(gl.GetNodesAndEdges());
632 return Status::OK();
633 }
634
GetNodeByNodeId(NodeIdType id,std::shared_ptr<Node> * node)635 Status GraphDataImpl::GetNodeByNodeId(NodeIdType id, std::shared_ptr<Node> *node) {
636 RETURN_UNEXPECTED_IF_NULL(node);
637 auto itr = node_id_map_.find(id);
638 if (itr == node_id_map_.end()) {
639 std::string err_msg = "Invalid node id:" + std::to_string(id);
640 RETURN_STATUS_UNEXPECTED(err_msg);
641 } else {
642 *node = itr->second;
643 }
644 return Status::OK();
645 }
646
GetEdgeByEdgeId(EdgeIdType id,std::shared_ptr<Edge> * edge)647 Status GraphDataImpl::GetEdgeByEdgeId(EdgeIdType id, std::shared_ptr<Edge> *edge) {
648 RETURN_UNEXPECTED_IF_NULL(edge);
649 auto itr = edge_id_map_.find(id);
650 if (itr == edge_id_map_.end()) {
651 std::string err_msg = "Invalid edge id:" + std::to_string(id);
652 RETURN_STATUS_UNEXPECTED(err_msg);
653 } else {
654 *edge = itr->second;
655 }
656 return Status::OK();
657 }
658
RandomWalkBase(GraphDataImpl * graph)659 GraphDataImpl::RandomWalkBase::RandomWalkBase(GraphDataImpl *graph)
660 : graph_(graph), step_home_param_(1.0), step_away_param_(1.0), default_node_(-1), num_walks_(1), num_workers_(1) {}
661
Build(const std::vector<NodeIdType> & node_list,const std::vector<NodeType> & meta_path,float step_home_param,float step_away_param,const NodeIdType default_node,int32_t num_walks,int32_t num_workers)662 Status GraphDataImpl::RandomWalkBase::Build(const std::vector<NodeIdType> &node_list,
663 const std::vector<NodeType> &meta_path, float step_home_param,
664 float step_away_param, const NodeIdType default_node, int32_t num_walks,
665 int32_t num_workers) {
666 CHECK_FAIL_RETURN_UNEXPECTED(!node_list.empty(), "Input node_list is empty.");
667 node_list_ = node_list;
668 if (meta_path.empty() || meta_path.size() > kMaxNumWalks) {
669 std::string err_msg = "Failed, meta path required between 1 and " + std::to_string(kMaxNumWalks) +
670 ". The size of input path is " + std::to_string(meta_path.size());
671 RETURN_STATUS_UNEXPECTED(err_msg);
672 }
673 for (const auto &type : meta_path) {
674 RETURN_IF_NOT_OK(graph_->CheckNeighborType(type));
675 }
676 meta_path_ = meta_path;
677 if (step_home_param < kGnnEpsilon || step_away_param < kGnnEpsilon) {
678 std::string err_msg = "Failed, step_home_param and step_away_param required greater than " +
679 std::to_string(kGnnEpsilon) + ". step_home_param: " + std::to_string(step_home_param) +
680 ", step_away_param: " + std::to_string(step_away_param);
681 RETURN_STATUS_UNEXPECTED(err_msg);
682 }
683 if (default_node < -1) {
684 std::string err_msg = "Failed, default_node required to be greater or equal to -1.";
685 RETURN_STATUS_UNEXPECTED(err_msg);
686 }
687 if (num_walks <= 0) {
688 std::string err_msg = "Failed, num_walks parameter required to be greater than 0";
689 RETURN_STATUS_UNEXPECTED(err_msg);
690 }
691 if (num_workers <= 0) {
692 std::string err_msg = "Failed, num_workers parameter required to be greater than 0";
693 RETURN_STATUS_UNEXPECTED(err_msg);
694 }
695 step_home_param_ = step_home_param;
696 step_away_param_ = step_away_param;
697 default_node_ = default_node;
698 num_walks_ = num_walks;
699 num_workers_ = num_workers;
700 return Status::OK();
701 }
702
Node2vecWalk(const NodeIdType & start_node,std::vector<NodeIdType> * walk_path)703 Status GraphDataImpl::RandomWalkBase::Node2vecWalk(const NodeIdType &start_node, std::vector<NodeIdType> *walk_path) {
704 RETURN_UNEXPECTED_IF_NULL(walk_path);
705 // Simulate a random walk starting from start node.
706 auto walk = std::vector<NodeIdType>(1, start_node); // walk is an vector
707 // walk simulate
708 while (walk.size() - 1 < meta_path_.size()) {
709 // current nodE
710 auto cur_node_id = walk.back();
711 std::shared_ptr<Node> cur_node;
712 RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(cur_node_id, &cur_node));
713
714 // current neighbors
715 std::vector<NodeIdType> cur_neighbors;
716 RETURN_IF_NOT_OK(cur_node->GetAllNeighbors(meta_path_[walk.size() - 1], &cur_neighbors, true));
717 std::sort(cur_neighbors.begin(), cur_neighbors.end());
718
719 // break if no neighbors
720 if (cur_neighbors.empty()) {
721 break;
722 }
723
724 // walk by the fist node, then by the previous 2 nodes
725 std::shared_ptr<StochasticIndex> stochastic_index;
726 if (walk.size() == 1) {
727 RETURN_IF_NOT_OK(GetNodeProbability(cur_node_id, meta_path_[0], &stochastic_index));
728 } else {
729 NodeIdType prev_node_id = walk[walk.size() - 2];
730 RETURN_IF_NOT_OK(GetEdgeProbability(prev_node_id, cur_node_id, walk.size() - 2, &stochastic_index));
731 }
732 NodeIdType next_node_id = cur_neighbors[WalkToNextNode(*stochastic_index)];
733 walk.push_back(next_node_id);
734 }
735
736 while (walk.size() - 1 < meta_path_.size()) {
737 walk.push_back(default_node_);
738 }
739
740 *walk_path = std::move(walk);
741 return Status::OK();
742 }
743
SimulateWalk(std::vector<std::vector<NodeIdType>> * walks)744 Status GraphDataImpl::RandomWalkBase::SimulateWalk(std::vector<std::vector<NodeIdType>> *walks) {
745 RETURN_UNEXPECTED_IF_NULL(walks);
746 for (int32_t i = 0; i < num_walks_; ++i) {
747 for (const auto &node : node_list_) {
748 std::vector<NodeIdType> walk;
749 RETURN_IF_NOT_OK(Node2vecWalk(node, &walk));
750 walks->push_back(walk);
751 }
752 }
753 return Status::OK();
754 }
755
GetNodeProbability(const NodeIdType & node_id,const NodeType & node_type,std::shared_ptr<StochasticIndex> * node_probability)756 Status GraphDataImpl::RandomWalkBase::GetNodeProbability(const NodeIdType &node_id, const NodeType &node_type,
757 std::shared_ptr<StochasticIndex> *node_probability) {
758 RETURN_UNEXPECTED_IF_NULL(node_probability);
759 // Generate alias nodes
760 std::shared_ptr<Node> node;
761 RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(node_id, &node));
762 std::vector<NodeIdType> neighbors;
763 RETURN_IF_NOT_OK(node->GetAllNeighbors(node_type, &neighbors, true));
764 std::sort(neighbors.begin(), neighbors.end());
765 auto non_normalized_probability = std::vector<float>(neighbors.size(), 1.0);
766 *node_probability =
767 std::make_shared<StochasticIndex>(GenerateProbability(Normalize<float>(non_normalized_probability)));
768 return Status::OK();
769 }
770
GetEdgeProbability(const NodeIdType & src,const NodeIdType & dst,uint32_t meta_path_index,std::shared_ptr<StochasticIndex> * edge_probability)771 Status GraphDataImpl::RandomWalkBase::GetEdgeProbability(const NodeIdType &src, const NodeIdType &dst,
772 uint32_t meta_path_index,
773 std::shared_ptr<StochasticIndex> *edge_probability) {
774 RETURN_UNEXPECTED_IF_NULL(edge_probability);
775 // Get the alias edge setup lists for a given edge.
776 std::shared_ptr<Node> src_node;
777 RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(src, &src_node));
778 std::vector<NodeIdType> src_neighbors;
779 RETURN_IF_NOT_OK(src_node->GetAllNeighbors(meta_path_[meta_path_index], &src_neighbors, true));
780
781 std::shared_ptr<Node> dst_node;
782 RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(dst, &dst_node));
783 std::vector<NodeIdType> dst_neighbors;
784 RETURN_IF_NOT_OK(dst_node->GetAllNeighbors(meta_path_[meta_path_index + 1], &dst_neighbors, true));
785
786 CHECK_FAIL_RETURN_UNEXPECTED(step_home_param_ != 0, "Invalid data, step home parameter can't be zero.");
787 CHECK_FAIL_RETURN_UNEXPECTED(step_away_param_ != 0, "Invalid data, step away parameter can't be zero.");
788 std::sort(dst_neighbors.begin(), dst_neighbors.end());
789 std::vector<float> non_normalized_probability;
790 for (const auto &dst_nbr : dst_neighbors) {
791 if (dst_nbr == src) {
792 non_normalized_probability.push_back(1.0 / step_home_param_); // replace 1.0 with G[dst][dst_nbr]['weight']
793 continue;
794 }
795 auto it = std::find(src_neighbors.begin(), src_neighbors.end(), dst_nbr);
796 if (it != src_neighbors.end()) {
797 // stay close, this node connect both src and dst
798 non_normalized_probability.push_back(1.0); // replace 1.0 with G[dst][dst_nbr]['weight']
799 } else {
800 // step far away
801 non_normalized_probability.push_back(1.0 / step_away_param_); // replace 1.0 with G[dst][dst_nbr]['weight']
802 }
803 }
804
805 *edge_probability =
806 std::make_shared<StochasticIndex>(GenerateProbability(Normalize<float>(non_normalized_probability)));
807 return Status::OK();
808 }
809
GenerateProbability(const std::vector<float> & probability)810 StochasticIndex GraphDataImpl::RandomWalkBase::GenerateProbability(const std::vector<float> &probability) {
811 uint32_t K = probability.size();
812 std::vector<int32_t> switch_to_large_index(K, 0);
813 std::vector<float> weight(K, .0);
814 std::vector<int32_t> smaller;
815 std::vector<int32_t> larger;
816 auto random_device = GetRandomDevice();
817 std::uniform_real_distribution<> distribution(-kGnnEpsilon, kGnnEpsilon);
818 float accumulate_threshold = 0.0;
819 for (uint32_t i = 0; i < K; i++) {
820 float threshold_one = distribution(random_device);
821 accumulate_threshold += threshold_one;
822 weight[i] = i < K - 1 ? probability[i] * K + threshold_one : probability[i] * K - accumulate_threshold;
823 weight[i] < 1.0 ? smaller.push_back(i) : larger.push_back(i);
824 }
825
826 while ((!smaller.empty()) && (!larger.empty())) {
827 uint32_t small = smaller.back();
828 smaller.pop_back();
829 uint32_t large = larger.back();
830 larger.pop_back();
831 switch_to_large_index[small] = large;
832 weight[large] = weight[large] + weight[small] - 1.0;
833 weight[large] < 1.0 ? smaller.push_back(large) : larger.push_back(large);
834 }
835 return StochasticIndex(switch_to_large_index, weight);
836 }
837
WalkToNextNode(const StochasticIndex & stochastic_index)838 uint32_t GraphDataImpl::RandomWalkBase::WalkToNextNode(const StochasticIndex &stochastic_index) {
839 auto switch_to_large_index = stochastic_index.first;
840 auto weight = stochastic_index.second;
841 const uint32_t size_of_index = switch_to_large_index.size();
842
843 auto random_device = GetRandomDevice();
844 std::uniform_real_distribution<> distribution(0.0, 1.0);
845
846 // Generate random integer between [0, K)
847 uint32_t random_idx = std::floor(distribution(random_device) * size_of_index);
848
849 if (distribution(random_device) < weight[random_idx]) {
850 return random_idx;
851 }
852 return switch_to_large_index[random_idx];
853 }
854
855 template <typename T>
Normalize(const std::vector<T> & non_normalized_probability)856 std::vector<float> GraphDataImpl::RandomWalkBase::Normalize(const std::vector<T> &non_normalized_probability) {
857 float sum_probability =
858 1.0 * std::accumulate(non_normalized_probability.begin(), non_normalized_probability.end(), 0);
859 if (sum_probability < kGnnEpsilon) {
860 sum_probability = 1.0;
861 }
862 std::vector<float> normalized_probability;
863 std::transform(non_normalized_probability.begin(), non_normalized_probability.end(),
864 std::back_inserter(normalized_probability), [&](T value) -> float { return value / sum_probability; });
865 return normalized_probability;
866 }
867 } // namespace gnn
868 } // namespace dataset
869 } // namespace mindspore
870