• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ps/core/instance_manager.h"
18 
19 namespace mindspore {
20 namespace ps {
21 namespace core {
NewInstanceAsync(const std::shared_ptr<TcpClient> & client,const NodeManager &,const std::string & body,const uint64_t & request_id,const NodeInfo & node_info)22 void InstanceManager::NewInstanceAsync(const std::shared_ptr<TcpClient> &client, const NodeManager &,
23                                        const std::string &body, const uint64_t &request_id, const NodeInfo &node_info) {
24   MS_EXCEPTION_IF_NULL(client);
25   MS_EXCEPTION_IF_NULL(node_);
26   auto message_meta = std::make_shared<MessageMeta>();
27   MS_EXCEPTION_IF_NULL(message_meta);
28   message_meta->set_cmd(NodeCommand::SEND_DATA);
29   message_meta->set_request_id(request_id);
30   message_meta->set_rank_id(node_info.rank_id_);
31   message_meta->set_role(node_info.node_role_);
32   message_meta->set_user_cmd(static_cast<int32_t>(TcpUserCommand::kNewInstance));
33 
34   if (!client->SendMessage(message_meta, Protos::RAW, body.data(), body.length())) {
35     MS_LOG(WARNING) << "Send new instance timeout!";
36   }
37 
38   MS_LOG(INFO) << "The scheduler is sending new instance to workers and servers!";
39 }
40 
QueryInstanceAsync(const std::shared_ptr<TcpClient> & client,const NodeManager &,const uint64_t & request_id,const NodeInfo & node_info)41 void InstanceManager::QueryInstanceAsync(const std::shared_ptr<TcpClient> &client, const NodeManager &,
42                                          const uint64_t &request_id, const NodeInfo &node_info) {
43   MS_EXCEPTION_IF_NULL(client);
44   MS_EXCEPTION_IF_NULL(node_);
45   auto message_meta = std::make_shared<MessageMeta>();
46   MS_EXCEPTION_IF_NULL(message_meta);
47   message_meta->set_cmd(NodeCommand::SEND_DATA);
48   message_meta->set_request_id(request_id);
49   message_meta->set_rank_id(node_info.rank_id_);
50   message_meta->set_role(node_info.node_role_);
51   message_meta->set_user_cmd(static_cast<int32_t>(TcpUserCommand::kQueryInstance));
52 
53   std::string res;
54   if (!client->SendMessage(message_meta, Protos::RAW, res.data(), res.length())) {
55     MS_LOG(WARNING) << "Send query instance timeout!";
56   }
57 
58   MS_LOG(INFO) << "The scheduler is sending query instance to workers and servers!";
59 }
60 
EnableFLSAsync(const std::shared_ptr<TcpClient> & client,const NodeManager &,const uint64_t & request_id,const NodeInfo & node_info)61 void InstanceManager::EnableFLSAsync(const std::shared_ptr<TcpClient> &client, const NodeManager &,
62                                      const uint64_t &request_id, const NodeInfo &node_info) {
63   MS_EXCEPTION_IF_NULL(client);
64   MS_EXCEPTION_IF_NULL(node_);
65   auto message_meta = std::make_shared<MessageMeta>();
66   MS_EXCEPTION_IF_NULL(message_meta);
67   message_meta->set_cmd(NodeCommand::SEND_DATA);
68   message_meta->set_request_id(request_id);
69   message_meta->set_rank_id(node_info.rank_id_);
70   message_meta->set_role(node_info.node_role_);
71   message_meta->set_user_cmd(static_cast<int32_t>(TcpUserCommand::kEnableFLS));
72 
73   std::string res;
74   if (!client->SendMessage(message_meta, Protos::RAW, res.data(), res.length())) {
75     MS_LOG(WARNING) << "Send query instance timeout!";
76   }
77 
78   MS_LOG(INFO) << "The scheduler is sending query instance to workers and servers!";
79 }
80 
DisableFLSAsync(const std::shared_ptr<TcpClient> & client,const NodeManager &,const uint64_t & request_id,const NodeInfo & node_info)81 void InstanceManager::DisableFLSAsync(const std::shared_ptr<TcpClient> &client, const NodeManager &,
82                                       const uint64_t &request_id, const NodeInfo &node_info) {
83   MS_EXCEPTION_IF_NULL(client);
84   MS_EXCEPTION_IF_NULL(node_);
85   auto message_meta = std::make_shared<MessageMeta>();
86   MS_EXCEPTION_IF_NULL(message_meta);
87   message_meta->set_cmd(NodeCommand::SEND_DATA);
88   message_meta->set_request_id(request_id);
89   message_meta->set_rank_id(node_info.rank_id_);
90   message_meta->set_role(node_info.node_role_);
91   message_meta->set_user_cmd(static_cast<int32_t>(TcpUserCommand::kDisableFLS));
92 
93   std::string res;
94   if (!client->SendMessage(message_meta, Protos::RAW, res.data(), res.length())) {
95     MS_LOG(WARNING) << "Send query instance timeout!";
96   }
97 
98   MS_LOG(INFO) << "The scheduler is sending query instance to workers and servers!";
99 }
100 }  // namespace core
101 }  // namespace ps
102 }  // namespace mindspore
103