1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/gnn/graph_loader.h"
17
18 #include <future>
19 #include <tuple>
20 #include <utility>
21
22 #include "minddata/dataset/engine/gnn/graph_data_impl.h"
23 #include "minddata/dataset/engine/gnn/local_edge.h"
24 #include "minddata/dataset/engine/gnn/local_node.h"
25 #include "minddata/dataset/util/task_manager.h"
26 #include "minddata/mindrecord/include/shard_error.h"
27
28 using ShardTuple = std::vector<std::tuple<std::vector<uint8_t>, mindspore::mindrecord::json>>;
29 namespace mindspore {
30 namespace dataset {
31 namespace gnn {
32
33 using mindrecord::MSRStatus;
34
GraphLoader(GraphDataImpl * graph_impl,std::string mr_filepath,int32_t num_workers,bool server_mode)35 GraphLoader::GraphLoader(GraphDataImpl *graph_impl, std::string mr_filepath, int32_t num_workers, bool server_mode)
36 : graph_impl_(graph_impl),
37 mr_path_(mr_filepath),
38 num_workers_(num_workers),
39 row_id_(0),
40 shard_reader_(nullptr),
41 graph_feature_parser_(nullptr),
42 required_key_(
43 {"first_id", "second_id", "third_id", "attribute", "type", "node_feature_index", "edge_feature_index"}),
44 optional_key_({{"weight", false}}) {}
45
GetNodesAndEdges()46 Status GraphLoader::GetNodesAndEdges() {
47 NodeIdMap *n_id_map = &graph_impl_->node_id_map_;
48 EdgeIdMap *e_id_map = &graph_impl_->edge_id_map_;
49 for (std::deque<std::shared_ptr<Node>> &dq : n_deques_) {
50 while (dq.empty() == false) {
51 std::shared_ptr<Node> node_ptr = dq.front();
52 n_id_map->insert({node_ptr->id(), node_ptr});
53 graph_impl_->node_type_map_[node_ptr->type()].push_back(node_ptr->id());
54 dq.pop_front();
55 }
56 }
57
58 for (std::deque<std::shared_ptr<Edge>> &dq : e_deques_) {
59 while (dq.empty() == false) {
60 std::shared_ptr<Edge> edge_ptr = dq.front();
61 std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> p;
62 RETURN_IF_NOT_OK(edge_ptr->GetNode(&p));
63 auto src_itr = n_id_map->find(p.first->id()), dst_itr = n_id_map->find(p.second->id());
64
65 CHECK_FAIL_RETURN_UNEXPECTED(src_itr != n_id_map->end(), "invalid src_id:" + std::to_string(src_itr->first));
66 CHECK_FAIL_RETURN_UNEXPECTED(dst_itr != n_id_map->end(), "invalid src_id:" + std::to_string(dst_itr->first));
67
68 RETURN_IF_NOT_OK(edge_ptr->SetNode({src_itr->second, dst_itr->second}));
69 RETURN_IF_NOT_OK(src_itr->second->AddNeighbor(dst_itr->second, edge_ptr->weight()));
70 RETURN_IF_NOT_OK(src_itr->second->AddAdjacent(dst_itr->second, edge_ptr));
71
72 e_id_map->insert({edge_ptr->id(), edge_ptr}); // add edge to edge_id_map_
73 graph_impl_->edge_type_map_[edge_ptr->type()].push_back(edge_ptr->id());
74 dq.pop_front();
75 }
76 }
77
78 for (auto &itr : graph_impl_->node_type_map_) itr.second.shrink_to_fit();
79 for (auto &itr : graph_impl_->edge_type_map_) itr.second.shrink_to_fit();
80
81 MergeFeatureMaps();
82 return Status::OK();
83 }
84
InitAndLoad()85 Status GraphLoader::InitAndLoad() {
86 CHECK_FAIL_RETURN_UNEXPECTED(num_workers_ > 0, "num_reader can't be < 1\n");
87 CHECK_FAIL_RETURN_UNEXPECTED(row_id_ == 0, "InitAndLoad Can only be called once!\n");
88 n_deques_.resize(num_workers_);
89 e_deques_.resize(num_workers_);
90 n_feature_maps_.resize(num_workers_);
91 e_feature_maps_.resize(num_workers_);
92 default_node_feature_maps_.resize(num_workers_);
93 default_edge_feature_maps_.resize(num_workers_);
94 TaskGroup vg;
95
96 shard_reader_ = std::make_unique<ShardReader>();
97 RETURN_IF_NOT_OK(shard_reader_->Open({mr_path_}, true, num_workers_));
98 CHECK_FAIL_RETURN_UNEXPECTED(shard_reader_->GetShardHeader()->GetSchemaCount() > 0, "No schema found!");
99 RETURN_IF_NOT_OK(shard_reader_->Launch(true));
100
101 graph_impl_->data_schema_ = (shard_reader_->GetShardHeader()->GetSchemas()[0]->GetSchema());
102 mindrecord::json schema = graph_impl_->data_schema_["schema"];
103 for (const std::string &key : required_key_) {
104 if (schema.find(key) == schema.end()) {
105 RETURN_STATUS_UNEXPECTED(key + ":doesn't exist in schema:" + schema.dump());
106 }
107 }
108
109 for (auto op_key : optional_key_) {
110 if (schema.find(op_key.first) != schema.end()) {
111 optional_key_[op_key.first] = true;
112 }
113 }
114
115 if (graph_impl_->server_mode_) {
116 #if !defined(_WIN32) && !defined(_WIN64)
117 int64_t total_blob_size = 0;
118 RETURN_IF_NOT_OK(shard_reader_->GetTotalBlobSize(&total_blob_size));
119 graph_impl_->graph_shared_memory_ = std::make_unique<GraphSharedMemory>(total_blob_size, mr_path_);
120 RETURN_IF_NOT_OK(graph_impl_->graph_shared_memory_->CreateSharedMemory());
121 #endif
122 }
123
124 graph_feature_parser_ = std::make_unique<GraphFeatureParser>(*shard_reader_->GetShardColumn());
125
126 // launching worker threads
127 for (int wkr_id = 0; wkr_id < num_workers_; ++wkr_id) {
128 RETURN_IF_NOT_OK(vg.CreateAsyncTask("GraphLoader", std::bind(&GraphLoader::WorkerEntry, this, wkr_id)));
129 }
130 // wait for threads to finish and check its return code
131 RETURN_IF_NOT_OK(vg.join_all(Task::WaitFlag::kBlocking));
132 RETURN_IF_NOT_OK(vg.GetTaskErrorIfAny());
133 return Status::OK();
134 }
135
LoadNode(const std::vector<uint8_t> & col_blob,const mindrecord::json & col_jsn,std::shared_ptr<Node> * node,NodeFeatureMap * feature_map,DefaultNodeFeatureMap * default_feature)136 Status GraphLoader::LoadNode(const std::vector<uint8_t> &col_blob, const mindrecord::json &col_jsn,
137 std::shared_ptr<Node> *node, NodeFeatureMap *feature_map,
138 DefaultNodeFeatureMap *default_feature) {
139 NodeIdType node_id = col_jsn["first_id"];
140 NodeType node_type = static_cast<NodeType>(col_jsn["type"]);
141 WeightType weight = 1;
142 if (optional_key_["weight"]) {
143 weight = col_jsn["weight"];
144 }
145 (*node) = std::make_shared<LocalNode>(node_id, node_type, weight);
146 std::vector<int32_t> indices;
147 RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureIndex("node_feature_index", col_blob, &indices));
148 if (graph_impl_->server_mode_) {
149 #if !defined(_WIN32) && !defined(_WIN64)
150 for (int32_t ind : indices) {
151 std::shared_ptr<Tensor> tensor_sm;
152 RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureToSharedMemory(
153 "node_feature_" + std::to_string(ind), col_blob, graph_impl_->graph_shared_memory_.get(), &tensor_sm));
154 RETURN_IF_NOT_OK((*node)->UpdateFeature(std::make_shared<Feature>(ind, tensor_sm, true)));
155 (*feature_map)[node_type].insert(ind);
156 if ((*default_feature)[ind] == nullptr) {
157 std::shared_ptr<Tensor> tensor;
158 RETURN_IF_NOT_OK(
159 graph_feature_parser_->LoadFeatureTensor("node_feature_" + std::to_string(ind), col_blob, &tensor));
160 std::shared_ptr<Tensor> zero_tensor;
161 RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
162 RETURN_IF_NOT_OK(zero_tensor->Zero());
163 (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
164 }
165 }
166 #endif
167 } else {
168 for (int32_t ind : indices) {
169 std::shared_ptr<Tensor> tensor;
170 RETURN_IF_NOT_OK(
171 graph_feature_parser_->LoadFeatureTensor("node_feature_" + std::to_string(ind), col_blob, &tensor));
172 RETURN_IF_NOT_OK((*node)->UpdateFeature(std::make_shared<Feature>(ind, tensor)));
173 (*feature_map)[node_type].insert(ind);
174 if ((*default_feature)[ind] == nullptr) {
175 std::shared_ptr<Tensor> zero_tensor;
176 RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
177 RETURN_IF_NOT_OK(zero_tensor->Zero());
178 (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
179 }
180 }
181 }
182 return Status::OK();
183 }
184
LoadEdge(const std::vector<uint8_t> & col_blob,const mindrecord::json & col_jsn,std::shared_ptr<Edge> * edge,EdgeFeatureMap * feature_map,DefaultEdgeFeatureMap * default_feature)185 Status GraphLoader::LoadEdge(const std::vector<uint8_t> &col_blob, const mindrecord::json &col_jsn,
186 std::shared_ptr<Edge> *edge, EdgeFeatureMap *feature_map,
187 DefaultEdgeFeatureMap *default_feature) {
188 EdgeIdType edge_id = col_jsn["first_id"];
189 EdgeType edge_type = static_cast<EdgeType>(col_jsn["type"]);
190 NodeIdType src_id = col_jsn["second_id"], dst_id = col_jsn["third_id"];
191 WeightType edge_weight = 1;
192 if (optional_key_["weight"]) {
193 edge_weight = col_jsn["weight"];
194 }
195 std::shared_ptr<Node> src = std::make_shared<LocalNode>(src_id, -1, 1);
196 std::shared_ptr<Node> dst = std::make_shared<LocalNode>(dst_id, -1, 1);
197 (*edge) = std::make_shared<LocalEdge>(edge_id, edge_type, edge_weight, src, dst);
198 std::vector<int32_t> indices;
199 RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureIndex("edge_feature_index", col_blob, &indices));
200 if (graph_impl_->server_mode_) {
201 #if !defined(_WIN32) && !defined(_WIN64)
202 for (int32_t ind : indices) {
203 std::shared_ptr<Tensor> tensor_sm;
204 RETURN_IF_NOT_OK(graph_feature_parser_->LoadFeatureToSharedMemory(
205 "edge_feature_" + std::to_string(ind), col_blob, graph_impl_->graph_shared_memory_.get(), &tensor_sm));
206 RETURN_IF_NOT_OK((*edge)->UpdateFeature(std::make_shared<Feature>(ind, tensor_sm, true)));
207 (*feature_map)[edge_type].insert(ind);
208 if ((*default_feature)[ind] == nullptr) {
209 std::shared_ptr<Tensor> tensor;
210 RETURN_IF_NOT_OK(
211 graph_feature_parser_->LoadFeatureTensor("edge_feature_" + std::to_string(ind), col_blob, &tensor));
212 std::shared_ptr<Tensor> zero_tensor;
213 RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
214 RETURN_IF_NOT_OK(zero_tensor->Zero());
215 (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
216 }
217 }
218 #endif
219 } else {
220 for (int32_t ind : indices) {
221 std::shared_ptr<Tensor> tensor;
222 RETURN_IF_NOT_OK(
223 graph_feature_parser_->LoadFeatureTensor("edge_feature_" + std::to_string(ind), col_blob, &tensor));
224 RETURN_IF_NOT_OK((*edge)->UpdateFeature(std::make_shared<Feature>(ind, tensor)));
225 (*feature_map)[edge_type].insert(ind);
226 if ((*default_feature)[ind] == nullptr) {
227 std::shared_ptr<Tensor> zero_tensor;
228 RETURN_IF_NOT_OK(Tensor::CreateEmpty(tensor->shape(), tensor->type(), &zero_tensor));
229 RETURN_IF_NOT_OK(zero_tensor->Zero());
230 (*default_feature)[ind] = std::make_shared<Feature>(ind, zero_tensor);
231 }
232 }
233 }
234
235 return Status::OK();
236 }
237
WorkerEntry(int32_t worker_id)238 Status GraphLoader::WorkerEntry(int32_t worker_id) {
239 // Handshake
240 TaskManager::FindMe()->Post();
241 auto ret = shard_reader_->GetNextById(row_id_++, worker_id);
242 ShardTuple rows = ret.second;
243 while (rows.empty() == false) {
244 RETURN_IF_INTERRUPTED();
245 for (const auto &tupled_row : rows) {
246 std::vector<uint8_t> col_blob = std::get<0>(tupled_row);
247 mindrecord::json col_jsn = std::get<1>(tupled_row);
248 std::string attr = col_jsn["attribute"];
249 if (attr == "n") {
250 std::shared_ptr<Node> node_ptr;
251 RETURN_IF_NOT_OK(LoadNode(col_blob, col_jsn, &node_ptr, &(n_feature_maps_[worker_id]),
252 &default_node_feature_maps_[worker_id]));
253 n_deques_[worker_id].emplace_back(node_ptr);
254 } else if (attr == "e") {
255 std::shared_ptr<Edge> edge_ptr;
256 RETURN_IF_NOT_OK(LoadEdge(col_blob, col_jsn, &edge_ptr, &(e_feature_maps_[worker_id]),
257 &default_edge_feature_maps_[worker_id]));
258 e_deques_[worker_id].emplace_back(edge_ptr);
259 } else {
260 MS_LOG(WARNING) << "attribute:" << attr << " is neither edge nor node.";
261 }
262 }
263 auto rc = shard_reader_->GetNextById(row_id_++, worker_id);
264 rows = rc.second;
265 }
266 return Status::OK();
267 }
268
MergeFeatureMaps()269 void GraphLoader::MergeFeatureMaps() {
270 for (int wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
271 for (auto &m : n_feature_maps_[wkr_id]) {
272 for (auto &n : m.second) graph_impl_->node_feature_map_[m.first].insert(n);
273 }
274 for (auto &m : e_feature_maps_[wkr_id]) {
275 for (auto &n : m.second) graph_impl_->edge_feature_map_[m.first].insert(n);
276 }
277 for (auto &m : default_node_feature_maps_[wkr_id]) {
278 graph_impl_->default_node_feature_map_[m.first] = m.second;
279 }
280 for (auto &m : default_edge_feature_maps_[wkr_id]) {
281 graph_impl_->default_edge_feature_map_[m.first] = m.second;
282 }
283 }
284 n_feature_maps_.clear();
285 e_feature_maps_.clear();
286 }
287
288 } // namespace gnn
289 } // namespace dataset
290 } // namespace mindspore
291