• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/gnn/graph_loader.h"
17 
18 #include <future>
19 #include <tuple>
20 #include <utility>
21 
22 #include "minddata/dataset/engine/gnn/graph_data_impl.h"
23 #include "minddata/dataset/engine/gnn/local_edge.h"
24 #include "minddata/dataset/engine/gnn/local_node.h"
25 #include "minddata/dataset/util/task_manager.h"
26 #include "minddata/mindrecord/include/shard_error.h"
27 
28 using ShardTuple = std::vector<std::tuple<std::vector<uint8_t>, mindspore::mindrecord::json>>;
29 namespace mindspore {
30 namespace dataset {
31 namespace gnn {
32 
33 using mindrecord::MSRStatus;
34 
GraphLoader(GraphDataImpl * graph_impl,std::string mr_filepath,int32_t num_workers,bool server_mode)35 GraphLoader::GraphLoader(GraphDataImpl *graph_impl, std::string mr_filepath, int32_t num_workers, bool server_mode)
36     : graph_impl_(graph_impl),
37       mr_path_(mr_filepath),
38       num_workers_(num_workers),
39       row_id_(0),
40       shard_reader_(nullptr),
41       graph_feature_parser_(nullptr),
42       required_key_(
43         {"first_id", "second_id", "third_id", "attribute", "type", "node_feature_index", "edge_feature_index"}),
44       optional_key_({{"weight", false}}) {}
45 
GetNodesAndEdges()46 Status GraphLoader::GetNodesAndEdges() {
47   NodeIdMap *n_id_map = &graph_impl_->node_id_map_;
48   EdgeIdMap *e_id_map = &graph_impl_->edge_id_map_;
49   for (std::deque<std::shared_ptr<Node>> &dq : n_deques_) {
50     while (dq.empty() == false) {
51       std::shared_ptr<Node> node_ptr = dq.front();
52       n_id_map->insert({node_ptr->id(), node_ptr});
53       graph_impl_->node_type_map_[node_ptr->type()].push_back(node_ptr->id());
54       dq.pop_front();
55     }
56   }
57 
58   for (std::deque<std::shared_ptr<Edge>> &dq : e_deques_) {
59     while (dq.empty() == false) {
60       std::shared_ptr<Edge> edge_ptr = dq.front();
61       std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> p;
62       RETURN_IF_NOT_OK(edge_ptr->GetNode(&p));
63       auto src_itr = n_id_map->find(p.first->id()), dst_itr = n_id_map->find(p.second->id());
64 
65       CHECK_FAIL_RETURN_UNEXPECTED(src_itr != n_id_map->end(), "invalid src_id:" + std::to_string(src_itr->first));
66       CHECK_FAIL_RETURN_UNEXPECTED(dst_itr != n_id_map->end(), "invalid src_id:" + std::to_string(dst_itr->first));
67 
68       RETURN_IF_NOT_OK(edge_ptr->SetNode({src_itr->second, dst_itr->second}));
69       RETURN_IF_NOT_OK(src_itr->second->AddNeighbor(dst_itr->second, edge_ptr->weight()));
70       RETURN_IF_NOT_OK(src_itr->second->AddAdjacent(dst_itr->second, edge_ptr));
71 
72       e_id_map->insert({edge_ptr->id(), edge_ptr});  // add edge to edge_id_map_
73       graph_impl_->edge_type_map_[edge_ptr->type()].push_back(edge_ptr->id());
74       dq.pop_front();
75     }
76   }
77 
78   for (auto &itr : graph_impl_->node_type_map_) itr.second.shrink_to_fit();
79   for (auto &itr : graph_impl_->edge_type_map_) itr.second.shrink_to_fit();
80 
81   MergeFeatureMaps();
82   return Status::OK();
83 }
84 
InitAndLoad()85 Status GraphLoader::InitAndLoad() {
86   CHECK_FAIL_RETURN_UNEXPECTED(num_workers_ > 0, "num_reader can't be < 1\n");
87   CHECK_FAIL_RETURN_UNEXPECTED(row_id_ == 0, "InitAndLoad Can only be called once!\n");
88   n_deques_.resize(num_workers_);
89   e_deques_.resize(num_workers_);
90   n_feature_maps_.resize(num_workers_);
91   e_feature_maps_.resize(num_workers_);
92   default_node_feature_maps_.resize(num_workers_);
93   default_edge_feature_maps_.resize(num_workers_);
94   TaskGroup vg;
95 
96   shard_reader_ = std::make_unique<ShardReader>();
97   RETURN_IF_NOT_OK(shard_reader_->Open({mr_path_}, true, num_workers_));
98   CHECK_FAIL_RETURN_UNEXPECTED(shard_reader_->GetShardHeader()->GetSchemaCount() > 0, "No schema found!");
99   RETURN_IF_NOT_OK(shard_reader_->Launch(true));
100 
101   graph_impl_->data_schema_ = (shard_reader_->GetShardHeader()->GetSchemas()[0]->GetSchema());
102   mindrecord::json schema = graph_impl_->data_schema_["schema"];
103   for (const std::string &key : required_key_) {
104     if (schema.find(key) == schema.end()) {
105       RETURN_STATUS_UNEXPECTED(key + ":doesn't exist in schema:" + schema.dump());
106     }
107   }
108 
109   for (auto op_key : optional_key_) {
110     if (schema.find(op_key.first) != schema.end()) {
111       optional_key_[op_key.first] = true;
112     }
113   }
114 
115   if (graph_impl_->server_mode_) {
116 #if !defined(_WIN32) && !defined(_WIN64)
117     int64_t total_blob_size = 0;
118     RETURN_IF_NOT_OK(shard_reader_->GetTotalBlobSize(&total_blob_size));
119     graph_impl_->graph_shared_memory_ = std::make_unique<GraphSharedMemory>(total_blob_size, mr_path_);
120     RETURN_IF_NOT_OK(graph_impl_->graph_shared_memory_->CreateSharedMemory());
121 #endif
122   }
123 
124   graph_feature_parser_ = std::make_unique<GraphFeatureParser>(*shard_reader_->GetShardColumn());
125 
126   // launching worker threads
127   for (int wkr_id = 0; wkr_id < num_workers_; ++wkr_id) {
128     RETURN_IF_NOT_OK(vg.CreateAsyncTask("GraphLoader", std::bind(&GraphLoader::WorkerEntry, this, wkr_id)));
129   }
130   // wait for threads to finish and check its return code
131   RETURN_IF_NOT_OK(vg.join_all(Task::WaitFlag::kBlocking));
132   RETURN_IF_NOT_OK(vg.GetTaskErrorIfAny());
133   return Status::OK();
134 }
135 
LoadNode(const std::vector<uint8_t> & col_blob,const mindrecord::json & col_jsn,std::shared_ptr<Node> * node,NodeFeatureMap * feature_map,DefaultNodeFeatureMap * default_feature)136 Status GraphLoader::LoadNode(const std::vector<uint8_t> &col_blob, const mindrecord::json &col_jsn,
137                              std::shared_ptr<Node> *node, NodeFeatureMap *feature_map,
138                              DefaultNodeFeatureMap *default_feature) {
139   NodeIdType node_id = col_jsn["first_id"];
140   NodeType node_type = static_cast<NodeType>(col_jsn["type"]);
141   WeightType weight = 1;
142   if (optional_key_["weight"]) {
143     weight = col_jsn["weight"];
144   }
145   (*node) = std::make_shared<LocalNode>(node_id, node_type, weight);
146   std::vector<int32_t> indices;
147   RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureIndex("node_feature_index", col_blob, &indices));
148   if (graph_impl_->server_mode_) {
149 #if !defined(_WIN32) && !defined(_WIN64)
150     for (int32_t ind : indices) {
151       std::shared_ptr<Tensor> tensor_sm;
152       RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureToSharedMemory(
153         "node_feature_" + std::to_string(ind), col_blob, graph_impl_->graph_shared_memory_.get(), &tensor_sm));
154       RETURN_IF_NOT_OK((*node)->UpdateFeature(std::make_shared<Feature>(ind, tensor_sm, true)));
155       (*feature_map)[node_type].insert(ind);
156       if ((*default_feature)[ind] == nullptr) {
157         std::shared_ptr<Tensor> tensor;
158         RETURN_IF_NOT_OK(
159           graph_feature_parser_->LoadFeatureTensor("node_feature_" + std::to_string(ind), col_blob, &tensor));
160         std::shared_ptr<Tensor> zero_tensor;
161         RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
162         RETURN_IF_NOT_OK(zero_tensor->Zero());
163         (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
164       }
165     }
166 #endif
167   } else {
168     for (int32_t ind : indices) {
169       std::shared_ptr<Tensor> tensor;
170       RETURN_IF_NOT_OK(
171         graph_feature_parser_->LoadFeatureTensor("node_feature_" + std::to_string(ind), col_blob, &tensor));
172       RETURN_IF_NOT_OK((*node)->UpdateFeature(std::make_shared<Feature>(ind, tensor)));
173       (*feature_map)[node_type].insert(ind);
174       if ((*default_feature)[ind] == nullptr) {
175         std::shared_ptr<Tensor> zero_tensor;
176         RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
177         RETURN_IF_NOT_OK(zero_tensor->Zero());
178         (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
179       }
180     }
181   }
182   return Status::OK();
183 }
184 
LoadEdge(const std::vector<uint8_t> & col_blob,const mindrecord::json & col_jsn,std::shared_ptr<Edge> * edge,EdgeFeatureMap * feature_map,DefaultEdgeFeatureMap * default_feature)185 Status GraphLoader::LoadEdge(const std::vector<uint8_t> &col_blob, const mindrecord::json &col_jsn,
186                              std::shared_ptr<Edge> *edge, EdgeFeatureMap *feature_map,
187                              DefaultEdgeFeatureMap *default_feature) {
188   EdgeIdType edge_id = col_jsn["first_id"];
189   EdgeType edge_type = static_cast<EdgeType>(col_jsn["type"]);
190   NodeIdType src_id = col_jsn["second_id"], dst_id = col_jsn["third_id"];
191   WeightType edge_weight = 1;
192   if (optional_key_["weight"]) {
193     edge_weight = col_jsn["weight"];
194   }
195   std::shared_ptr<Node> src = std::make_shared<LocalNode>(src_id, -1, 1);
196   std::shared_ptr<Node> dst = std::make_shared<LocalNode>(dst_id, -1, 1);
197   (*edge) = std::make_shared<LocalEdge>(edge_id, edge_type, edge_weight, src, dst);
198   std::vector<int32_t> indices;
199   RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureIndex("edge_feature_index", col_blob, &indices));
200   if (graph_impl_->server_mode_) {
201 #if !defined(_WIN32) && !defined(_WIN64)
202     for (int32_t ind : indices) {
203       std::shared_ptr<Tensor> tensor_sm;
204       RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureToSharedMemory(
205         "edge_feature_" + std::to_string(ind), col_blob, graph_impl_->graph_shared_memory_.get(), &tensor_sm));
206       RETURN_IF_NOT_OK((*edge)->UpdateFeature(std::make_shared<Feature>(ind, tensor_sm, true)));
207       (*feature_map)[edge_type].insert(ind);
208       if ((*default_feature)[ind] == nullptr) {
209         std::shared_ptr<Tensor> tensor;
210         RETURN_IF_NOT_OK(
211           graph_feature_parser_->LoadFeatureTensor("edge_feature_" + std::to_string(ind), col_blob, &tensor));
212         std::shared_ptr<Tensor> zero_tensor;
213         RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
214         RETURN_IF_NOT_OK(zero_tensor->Zero());
215         (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
216       }
217     }
218 #endif
219   } else {
220     for (int32_t ind : indices) {
221       std::shared_ptr<Tensor> tensor;
222       RETURN_IF_NOT_OK(
223         graph_feature_parser_->LoadFeatureTensor("edge_feature_" + std::to_string(ind), col_blob, &tensor));
224       RETURN_IF_NOT_OK((*edge)->UpdateFeature(std::make_shared<Feature>(ind, tensor)));
225       (*feature_map)[edge_type].insert(ind);
226       if ((*default_feature)[ind] == nullptr) {
227         std::shared_ptr<Tensor> zero_tensor;
228         RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
229         RETURN_IF_NOT_OK(zero_tensor->Zero());
230         (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
231       }
232     }
233   }
234 
235   return Status::OK();
236 }
237 
WorkerEntry(int32_t worker_id)238 Status GraphLoader::WorkerEntry(int32_t worker_id) {
239   // Handshake
240   TaskManager::FindMe()->Post();
241   auto ret = shard_reader_->GetNextById(row_id_++, worker_id);
242   ShardTuple rows = ret.second;
243   while (rows.empty() == false) {
244     RETURN_IF_INTERRUPTED();
245     for (const auto &tupled_row : rows) {
246       std::vector<uint8_t> col_blob = std::get<0>(tupled_row);
247       mindrecord::json col_jsn = std::get<1>(tupled_row);
248       std::string attr = col_jsn["attribute"];
249       if (attr == "n") {
250         std::shared_ptr<Node> node_ptr;
251         RETURN_IF_NOT_OK(LoadNode(col_blob, col_jsn, &node_ptr, &(n_feature_maps_[worker_id]),
252                                   &default_node_feature_maps_[worker_id]));
253         n_deques_[worker_id].emplace_back(node_ptr);
254       } else if (attr == "e") {
255         std::shared_ptr<Edge> edge_ptr;
256         RETURN_IF_NOT_OK(LoadEdge(col_blob, col_jsn, &edge_ptr, &(e_feature_maps_[worker_id]),
257                                   &default_edge_feature_maps_[worker_id]));
258         e_deques_[worker_id].emplace_back(edge_ptr);
259       } else {
260         MS_LOG(WARNING) << "attribute:" << attr << " is neither edge nor node.";
261       }
262     }
263     auto rc = shard_reader_->GetNextById(row_id_++, worker_id);
264     rows = rc.second;
265   }
266   return Status::OK();
267 }
268 
MergeFeatureMaps()269 void GraphLoader::MergeFeatureMaps() {
270   for (int wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
271     for (auto &m : n_feature_maps_[wkr_id]) {
272       for (auto &n : m.second) graph_impl_->node_feature_map_[m.first].insert(n);
273     }
274     for (auto &m : e_feature_maps_[wkr_id]) {
275       for (auto &n : m.second) graph_impl_->edge_feature_map_[m.first].insert(n);
276     }
277     for (auto &m : default_node_feature_maps_[wkr_id]) {
278       graph_impl_->default_node_feature_map_[m.first] = m.second;
279     }
280     for (auto &m : default_edge_feature_maps_[wkr_id]) {
281       graph_impl_->default_edge_feature_map_[m.first] = m.second;
282     }
283   }
284   n_feature_maps_.clear();
285   e_feature_maps_.clear();
286 }
287 
288 }  // namespace gnn
289 }  // namespace dataset
290 }  // namespace mindspore
291