• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <atomic>
18 #include "src/actor/actormgr.h"
19 #include "src/actor/iomgr.h"
20 #include "include/mindrt.hpp"
21 #include "include/mindrt.h"
22 
23 extern "C" {
MindrtInitializeC(const struct MindrtConfig * config)24 int MindrtInitializeC(const struct MindrtConfig *config) {
25   if (config == nullptr) {
26     return -1;
27   }
28 
29   if (config->threadCount == 0) {
30     return -1;
31   }
32 
33   if (config->httpKmsgFlag != 0 && config->httpKmsgFlag != 1) {
34     return -1;
35   }
36   mindspore::SetHttpKmsgFlag(config->httpKmsgFlag);
37 
38   return mindspore::Initialize(std::string(config->tcpUrl), std::string(config->tcpUrlAdv), std::string(config->udpUrl),
39                                std::string(config->udpUrlAdv), config->threadCount);
40 }
41 
MindrtFinalizeC()42 void MindrtFinalizeC() { mindspore::Finalize(); }
43 }
44 
45 namespace mindspore {
46 
47 namespace local {
48 
49 static MindrtAddress g_mindrtAddress = MindrtAddress();
50 static std::atomic_bool g_finalizeMindrtStatus(false);
51 
52 }  // namespace local
53 
GetMindrtAddress()54 const MindrtAddress &GetMindrtAddress() { return local::g_mindrtAddress; }
55 
56 class MindrtExit {
57  public:
MindrtExit()58   MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()."; }
~MindrtExit()59   ~MindrtExit() {
60     MS_LOG(DEBUG) << "trace: enter ~MindrtExit().";
61     mindspore::Finalize();
62   }
63 };
64 
InitializeImp(const std::string & tcpUrl,const std::string & tcpUrlAdv,const std::string & udpUrl,const std::string & udpUrlAdv,int threadCount)65 int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
66                   const std::string &udpUrlAdv, int threadCount) {
67   MS_LOG(DEBUG) << "mindrt starts.";
68   auto ret = ActorMgr::GetActorMgrRef()->Initialize();
69   MS_LOG(DEBUG) << "mindrt has started.";
70   return ret;
71 }
72 
Initialize(const std::string & tcpUrl,const std::string & tcpUrlAdv,const std::string & udpUrl,const std::string & udpUrlAdv,int threadCount)73 int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl,
74                const std::string &udpUrlAdv, int threadCount) {
75   /* support repeat initialize  */
76   int result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount);
77   static MindrtExit mindrtExit;
78 
79   return result;
80 }
81 
Spawn(const ActorReference actor,bool sharedThread)82 AID Spawn(const ActorReference actor, bool sharedThread) {
83   if (actor == nullptr) {
84     MS_LOG(ERROR) << "Actor is nullptr.";
85     MINDRT_EXIT("Actor is nullptr.");
86   }
87 
88   if (local::g_finalizeMindrtStatus.load() == true) {
89     return actor->GetAID();
90   } else {
91     return ActorMgr::GetActorMgrRef()->Spawn(actor, sharedThread);
92   }
93 }
94 
Await(const ActorReference & actor)95 void Await(const ActorReference &actor) { ActorMgr::GetActorMgrRef()->Wait(actor->GetAID()); }
96 
Await(const AID & actor)97 void Await(const AID &actor) { ActorMgr::GetActorMgrRef()->Wait(actor); }
98 
99 // brief get actor with aid
GetActor(const AID & actor)100 ActorReference GetActor(const AID &actor) { return ActorMgr::GetActorMgrRef()->GetActor(actor); }
101 
Terminate(const AID & actor)102 void Terminate(const AID &actor) { ActorMgr::GetActorMgrRef()->Terminate(actor); }
103 
TerminateAll()104 void TerminateAll() { mindspore::ActorMgr::GetActorMgrRef()->TerminateAll(); }
105 
Finalize()106 void Finalize() {
107   bool inite = false;
108   if (local::g_finalizeMindrtStatus.compare_exchange_strong(inite, true) == false) {
109     MS_LOG(DEBUG) << "mindrt has been Finalized.";
110     return;
111   }
112 
113   MS_LOG(DEBUG) << "mindrt starts to finalize.";
114   mindspore::ActorMgr::GetActorMgrRef()->Finalize();
115 
116   MS_LOG(DEBUG) << "mindrt has been finalized.";
117   // flush the log in cache to disk before exiting.
118   FlushHLogCache();
119 }
120 
SetDelegate(const std::string & delegate)121 void SetDelegate(const std::string &delegate) { mindspore::ActorMgr::GetActorMgrRef()->SetDelegate(delegate); }
122 
123 static int g_mindrtLogPid = 1;
SetLogPID(int pid)124 void SetLogPID(int pid) {
125   MS_LOG(DEBUG) << "Set Mindrt log PID:" << pid;
126   g_mindrtLogPid = pid;
127 }
GetLogPID()128 int GetLogPID() { return g_mindrtLogPid; }
129 
130 static int g_httpKmsgEnable = -1;
SetHttpKmsgFlag(int flag)131 void SetHttpKmsgFlag(int flag) {
132   MS_LOG(DEBUG) << "Set Mindrt http message format:" << flag;
133   g_httpKmsgEnable = flag;
134 }
135 
GetHttpKmsgFlag()136 int GetHttpKmsgFlag() { return g_httpKmsgEnable; }
137 
138 }  // namespace mindspore
139