• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/util/intrp_service.h"
17 #include <sstream>
18 #include "minddata/dataset/util/services.h"
19 #include "minddata/dataset/util/task_manager.h"
20 
21 namespace mindspore {
22 namespace dataset {
23 const int64_t kServiceRetryGetUniqueIdInterVal = 10;
IntrpService()24 IntrpService::IntrpService() try : high_water_mark_(0) { (void)ServiceStart(); } catch (const std::exception &e) {
25   MS_LOG(ERROR) << "Interrupt service failed: " << e.what() << ".";
26   std::terminate();
27 }
28 
~IntrpService()29 IntrpService::~IntrpService() noexcept {
30   MS_LOG(DEBUG) << "Number of registered resources is " << high_water_mark_ << ".";
31   if (!all_intrp_resources_.empty()) {
32     try {
33       InterruptAll();
34     } catch (const std::exception &e) {
35       // Ignore all error as we can't throw in the destructor.
36     }
37   }
38   (void)ServiceStop();
39 }
40 
Register(std::string * name,IntrpResource * res)41 Status IntrpService::Register(std::string *name, IntrpResource *res) {
42   SharedLock stateLck(&state_lock_);
43   // Now double check the state
44   if (ServiceState() != STATE::kRunning) {
45     RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "Interrupt service is shutting down");
46   } else {
47     std::lock_guard<std::mutex> lck(mutex_);
48     try {
49       std::ostringstream ss;
50       std::string uuid = std::string("");
51       ss << this_thread::get_id();
52       MS_LOG(DEBUG) << "Register resource with name " << *name << ". Thread ID " << ss.str() << ".";
53       auto it = all_intrp_resources_.emplace(*name, res);
54       while (it.second == false) {
55         uuid = Services::GetUniqueID();
56         it = all_intrp_resources_.emplace(uuid, res);
57         MS_LOG(INFO) << "The name(" << *name << ") of register resource is duplicate, get new uuid: " << uuid;
58         std::this_thread::sleep_for(std::chrono::milliseconds(kServiceRetryGetUniqueIdInterVal));
59       }
60       if (!uuid.empty()) {
61         *name = uuid;
62       }
63       high_water_mark_++;
64     } catch (std::exception &e) {
65       RETURN_STATUS_UNEXPECTED(e.what());
66     }
67   }
68   return Status::OK();
69 }
70 
Deregister(const std::string & name)71 Status IntrpService::Deregister(const std::string &name) noexcept {
72   std::lock_guard<std::mutex> lck(mutex_);
73   try {
74     std::ostringstream ss;
75     ss << this_thread::get_id();
76     MS_LOG(DEBUG) << "De-register resource with name " << name << ". Thread ID is " << ss.str() << ".";
77     auto n = all_intrp_resources_.erase(name);
78     if (n == 0) {
79       MS_LOG(INFO) << "Key " << name << " not found.";
80     }
81   } catch (std::exception &e) {
82     RETURN_STATUS_UNEXPECTED(e.what());
83   }
84   return Status::OK();
85 }
86 
InterruptAll()87 void IntrpService::InterruptAll() noexcept {
88   std::lock_guard<std::mutex> lck(mutex_);
89   for (auto const &it : all_intrp_resources_) {
90     std::string kName = it.first;
91     try {
92       it.second->Interrupt();
93     } catch (const std::exception &e) {
94       // continue the clean up.
95     }
96   }
97 }
98 }  // namespace dataset
99 }  // namespace mindspore
100