1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GNN_GRAPH_DATA_IMPL_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GNN_GRAPH_DATA_IMPL_H_ 18 19 #include <algorithm> 20 #include <memory> 21 #include <string> 22 #include <map> 23 #include <unordered_map> 24 #include <unordered_set> 25 #include <vector> 26 #include <utility> 27 28 #include "minddata/dataset/engine/gnn/graph_data.h" 29 #if !defined(_WIN32) && !defined(_WIN64) 30 #include "minddata/dataset/engine/gnn/graph_shared_memory.h" 31 #endif 32 #include "minddata/mindrecord/include/common/shard_utils.h" 33 34 namespace mindspore { 35 namespace dataset { 36 namespace gnn { 37 38 const float kGnnEpsilon = 0.0001; 39 const uint32_t kMaxNumWalks = 80; 40 using StochasticIndex = std::pair<std::vector<int32_t>, std::vector<float>>; 41 42 class GraphDataImpl : public GraphData { 43 public: 44 // Constructor 45 // @param std::string dataset_file - 46 // @param int32_t num_workers - number of parallel threads 47 GraphDataImpl(std::string dataset_file, int32_t num_workers, bool server_mode = false); 48 49 ~GraphDataImpl(); 50 51 // Get all nodes from the graph. 52 // @param NodeType node_type - type of node 53 // @param std::shared_ptr<Tensor> *out - Returned nodes id 54 // @return Status The status code returned 55 Status GetAllNodes(NodeType node_type, std::shared_ptr<Tensor> *out) override; 56 57 // Get all edges from the graph. 58 // @param NodeType edge_type - type of edge 59 // @param std::shared_ptr<Tensor> *out - Returned edge ids 60 // @return Status The status code returned 61 Status GetAllEdges(EdgeType edge_type, std::shared_ptr<Tensor> *out) override; 62 63 // Get the node id from the edge. 64 // @param std::vector<EdgeIdType> edge_list - List of edges 65 // @param std::shared_ptr<Tensor> *out - Returned node ids 66 // @return Status The status code returned 67 Status GetNodesFromEdges(const std::vector<EdgeIdType> &edge_list, std::shared_ptr<Tensor> *out) override; 68 69 // Get the edge id from connected node pair 70 // @param std::vector<std::pair<NodeIdType, NodeIdType>> node_list - List of pair nodes 71 // @param std::shared_ptr<Tensor> *out - Returned edge ids 72 // @return Status - The status code that indicate the result of function execution 73 Status GetEdgesFromNodes(const std::vector<std::pair<NodeIdType, NodeIdType>> &node_list, 74 std::shared_ptr<Tensor> *out) override; 75 76 // All neighbors of the acquisition node. 77 // @param std::vector<NodeType> node_list - List of nodes 78 // @param NodeType neighbor_type - The type of neighbor. If the type does not exist, an error will be reported 79 // @param OutputFormat format - The storage format for output, normal, COO or CSR are valid 80 // @param std::shared_ptr<Tensor> *out - Returned neighbor's id. Because the number of neighbors at different nodes is 81 // different, the returned tensor is output according to the maximum number of neighbors. If the number of neighbors 82 // is not enough, fill in tensor as -1. 83 // @return Status The status code returned 84 Status GetAllNeighbors(const std::vector<NodeIdType> &node_list, NodeType neighbor_type, const OutputFormat &format, 85 std::shared_ptr<Tensor> *out) override; 86 87 // Get sampled neighbors. 88 // @param std::vector<NodeType> node_list - List of nodes 89 // @param std::vector<NodeIdType> neighbor_nums - Number of neighbors sampled per hop 90 // @param std::vector<NodeType> neighbor_types - Neighbor type sampled per hop 91 // @param std::SamplingStrategy strategy - Sampling strategy 92 // @param std::shared_ptr<Tensor> *out - Returned neighbor's id. 93 // @return Status The status code returned 94 Status GetSampledNeighbors(const std::vector<NodeIdType> &node_list, const std::vector<NodeIdType> &neighbor_nums, 95 const std::vector<NodeType> &neighbor_types, SamplingStrategy strategy, 96 std::shared_ptr<Tensor> *out) override; 97 98 // Get negative sampled neighbors. 99 // @param std::vector<NodeType> node_list - List of nodes 100 // @param NodeIdType samples_num - Number of neighbors sampled 101 // @param NodeType neg_neighbor_type - The type of negative neighbor. 102 // @param std::shared_ptr<Tensor> *out - Returned negative neighbor's id. 103 // @return Status The status code returned 104 Status GetNegSampledNeighbors(const std::vector<NodeIdType> &node_list, NodeIdType samples_num, 105 NodeType neg_neighbor_type, std::shared_ptr<Tensor> *out) override; 106 107 // Node2vec random walk. 108 // @param std::vector<NodeIdType> node_list - List of nodes 109 // @param std::vector<NodeType> meta_path - node type of each step 110 // @param float step_home_param - return hyper parameter in node2vec algorithm 111 // @param float step_away_param - in out hyper parameter in node2vec algorithm 112 // @param NodeIdType default_node - default node id 113 // @param std::shared_ptr<Tensor> *out - Returned nodes id in walk path 114 // @return Status The status code returned 115 Status RandomWalk(const std::vector<NodeIdType> &node_list, const std::vector<NodeType> &meta_path, 116 float step_home_param, float step_away_param, NodeIdType default_node, 117 std::shared_ptr<Tensor> *out) override; 118 119 // Get the feature of a node 120 // @param std::shared_ptr<Tensor> nodes - List of nodes 121 // @param std::vector<FeatureType> feature_types - Types of features, An error will be reported if the feature type 122 // does not exist. 123 // @param TensorRow *out - Returned features 124 // @return Status The status code returned 125 Status GetNodeFeature(const std::shared_ptr<Tensor> &nodes, const std::vector<FeatureType> &feature_types, 126 TensorRow *out) override; 127 128 Status GetNodeFeatureSharedMemory(const std::shared_ptr<Tensor> &nodes, FeatureType type, 129 std::shared_ptr<Tensor> *out); 130 131 // Get the feature of a edge 132 // @param std::shared_ptr<Tensor> edges - List of edges 133 // @param std::vector<FeatureType> feature_types - Types of features, An error will be reported if the feature type 134 // does not exist. 135 // @param Tensor *out - Returned features 136 // @return Status The status code returned 137 Status GetEdgeFeature(const std::shared_ptr<Tensor> &edges, const std::vector<FeatureType> &feature_types, 138 TensorRow *out) override; 139 140 Status GetEdgeFeatureSharedMemory(const std::shared_ptr<Tensor> &edges, FeatureType type, 141 std::shared_ptr<Tensor> *out); 142 143 // Get meta information of graph 144 // @param MetaInfo *meta_info - Returned meta information 145 // @return Status The status code returned 146 Status GetMetaInfo(MetaInfo *meta_info); 147 148 #ifdef ENABLE_PYTHON 149 // Return meta information to python layer 150 Status GraphInfo(py::dict *out) override; 151 #endif 152 GetAllDefaultNodeFeatures()153 const std::unordered_map<FeatureType, std::shared_ptr<Feature>> *GetAllDefaultNodeFeatures() { 154 return &default_node_feature_map_; 155 } 156 GetAllDefaultEdgeFeatures()157 const std::unordered_map<FeatureType, std::shared_ptr<Feature>> *GetAllDefaultEdgeFeatures() { 158 return &default_edge_feature_map_; 159 } 160 161 Status Init() override; 162 Stop()163 Status Stop() override { return Status::OK(); } 164 GetDataSchema()165 std::string GetDataSchema() { return data_schema_.dump(); } 166 167 #if !defined(_WIN32) && !defined(_WIN64) GetSharedMemoryKey()168 key_t GetSharedMemoryKey() { return graph_shared_memory_->memory_key(); } 169 GetSharedMemorySize()170 int64_t GetSharedMemorySize() { return graph_shared_memory_->memory_size(); } 171 #endif 172 173 private: 174 friend class GraphLoader; 175 class RandomWalkBase { 176 public: 177 explicit RandomWalkBase(GraphDataImpl *graph); 178 179 Status Build(const std::vector<NodeIdType> &node_list, const std::vector<NodeType> &meta_path, 180 float step_home_param = 1.0, float step_away_param = 1.0, NodeIdType default_node = -1, 181 int32_t num_walks = 1, int32_t num_workers = 1); 182 183 ~RandomWalkBase() = default; 184 185 Status SimulateWalk(std::vector<std::vector<NodeIdType>> *walks); 186 187 private: 188 Status Node2vecWalk(const NodeIdType &start_node, std::vector<NodeIdType> *walk_path); 189 190 Status GetNodeProbability(const NodeIdType &node_id, const NodeType &node_type, 191 std::shared_ptr<StochasticIndex> *node_probability); 192 193 Status GetEdgeProbability(const NodeIdType &src, const NodeIdType &dst, uint32_t meta_path_index, 194 std::shared_ptr<StochasticIndex> *edge_probability); 195 196 static StochasticIndex GenerateProbability(const std::vector<float> &probability); 197 198 static uint32_t WalkToNextNode(const StochasticIndex &stochastic_index); 199 200 template <typename T> 201 std::vector<float> Normalize(const std::vector<T> &non_normalized_probability); 202 203 GraphDataImpl *graph_; 204 std::vector<NodeIdType> node_list_; 205 std::vector<NodeType> meta_path_; 206 float step_home_param_; // Return hyper parameter. Default is 1.0 207 float step_away_param_; // In out hyper parameter. Default is 1.0 208 NodeIdType default_node_; 209 210 int32_t num_walks_; // Number of walks per source. Default is 1 211 int32_t num_workers_; // The number of worker threads. Default is 1 212 }; 213 214 // Load graph data from mindrecord file 215 // @return Status The status code returned 216 Status LoadNodeAndEdge(); 217 218 // Create Tensor By Vector 219 // @param std::vector<std::vector<T>> &data - 220 // @param DataType type - 221 // @param std::shared_ptr<Tensor> *out - 222 // @return Status The status code returned 223 template <typename T> 224 Status CreateTensorByVector(const std::vector<std::vector<T>> &data, DataType type, std::shared_ptr<Tensor> *out); 225 226 // Complete vector 227 // @param std::vector<std::vector<T>> *data - To be completed vector 228 // @param size_t max_size - The size of the completed vector 229 // @param T default_value - Filled default 230 // @return Status The status code returned 231 template <typename T> 232 Status ComplementVector(std::vector<std::vector<T>> *data, size_t max_size, T default_value); 233 234 // Get the default feature of a node 235 // @param FeatureType feature_type - 236 // @param std::shared_ptr<Feature> *out_feature - Returned feature 237 // @return Status The status code returned 238 Status GetNodeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature); 239 240 // Get the default feature of a edge 241 // @param FeatureType feature_type - 242 // @param std::shared_ptr<Feature> *out_feature - Returned feature 243 // @return Status The status code returned 244 Status GetEdgeDefaultFeature(FeatureType feature_type, std::shared_ptr<Feature> *out_feature); 245 246 // Find node object using node id 247 // @param NodeIdType id - 248 // @param std::shared_ptr<Node> *node - Returned node object 249 // @return Status The status code returned 250 Status GetNodeByNodeId(NodeIdType id, std::shared_ptr<Node> *node); 251 252 // Find edge object using edge id 253 // @param EdgeIdType id - 254 // @param std::shared_ptr<Node> *edge - Returned edge object 255 // @return Status The status code returned 256 Status GetEdgeByEdgeId(EdgeIdType id, std::shared_ptr<Edge> *edge); 257 258 // Negative sampling 259 // @param std::vector<NodeIdType> &input_data - The data set to be sampled 260 // @param std::unordered_set<NodeIdType> &exclude_data - Data to be excluded 261 // @param int32_t samples_num - 262 // @param std::vector<NodeIdType> *out_samples - Sampling results returned 263 // @return Status The status code returned 264 Status NegativeSample(const std::vector<NodeIdType> &data, const std::vector<NodeIdType> shuffled_ids, 265 size_t *start_index, const std::unordered_set<NodeIdType> &exclude_data, int32_t samples_num, 266 std::vector<NodeIdType> *out_samples); 267 268 Status CheckSamplesNum(NodeIdType samples_num); 269 270 Status CheckNeighborType(NodeType neighbor_type); 271 272 std::string dataset_file_; 273 int32_t num_workers_; // The number of worker threads 274 std::mt19937 rnd_; 275 RandomWalkBase random_walk_; 276 mindrecord::json data_schema_; 277 bool server_mode_; 278 #if !defined(_WIN32) && !defined(_WIN64) 279 std::unique_ptr<GraphSharedMemory> graph_shared_memory_; 280 #endif 281 std::unordered_map<NodeType, std::vector<NodeIdType>> node_type_map_; 282 std::unordered_map<NodeIdType, std::shared_ptr<Node>> node_id_map_; 283 284 std::unordered_map<EdgeType, std::vector<EdgeIdType>> edge_type_map_; 285 std::unordered_map<EdgeIdType, std::shared_ptr<Edge>> edge_id_map_; 286 287 std::unordered_map<NodeType, std::unordered_set<FeatureType>> node_feature_map_; 288 std::unordered_map<EdgeType, std::unordered_set<FeatureType>> edge_feature_map_; 289 290 std::unordered_map<FeatureType, std::shared_ptr<Feature>> default_node_feature_map_; 291 std::unordered_map<FeatureType, std::shared_ptr<Feature>> default_edge_feature_map_; 292 }; 293 } // namespace gnn 294 } // namespace dataset 295 } // namespace mindspore 296 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GNN_GRAPH_DATA_IMPL_H_ 297