• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/util/intrp_service.h"
17 #include <sstream>
18 #include "minddata/dataset/util/services.h"
19 #include "minddata/dataset/util/task_manager.h"
20 
21 namespace mindspore {
22 namespace dataset {
IntrpService()23 IntrpService::IntrpService() try : high_water_mark_(0) { (void)ServiceStart(); } catch (const std::exception &e) {
24   MS_LOG(ERROR) << "Interrupt service failed: " << e.what() << ".";
25   std::terminate();
26 }
27 
~IntrpService()28 IntrpService::~IntrpService() noexcept {
29   MS_LOG(INFO) << "Number of registered resources is " << high_water_mark_ << ".";
30   if (!all_intrp_resources_.empty()) {
31     try {
32       InterruptAll();
33     } catch (const std::exception &e) {
34       // Ignore all error as we can't throw in the destructor.
35     }
36   }
37   (void)ServiceStop();
38 }
39 
Register(const std::string & name,IntrpResource * res)40 Status IntrpService::Register(const std::string &name, IntrpResource *res) {
41   SharedLock stateLck(&state_lock_);
42   // Now double check the state
43   if (ServiceState() != STATE::kRunning) {
44     return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "Interrupt service is shutting down");
45   } else {
46     std::lock_guard<std::mutex> lck(mutex_);
47     try {
48       std::ostringstream ss;
49       ss << this_thread::get_id();
50       MS_LOG(DEBUG) << "Register resource with name " << name << ". Thread ID " << ss.str() << ".";
51       auto it = all_intrp_resources_.emplace(name, res);
52       if (it.second == false) {
53         return Status(StatusCode::kMDDuplicateKey, __LINE__, __FILE__, name);
54       }
55       high_water_mark_++;
56     } catch (std::exception &e) {
57       RETURN_STATUS_UNEXPECTED(e.what());
58     }
59   }
60   return Status::OK();
61 }
62 
Deregister(const std::string & name)63 Status IntrpService::Deregister(const std::string &name) noexcept {
64   std::lock_guard<std::mutex> lck(mutex_);
65   try {
66     std::ostringstream ss;
67     ss << this_thread::get_id();
68     MS_LOG(DEBUG) << "De-register resource with name " << name << ". Thread ID is " << ss.str() << ".";
69     auto n = all_intrp_resources_.erase(name);
70     if (n == 0) {
71       MS_LOG(INFO) << "Key " << name << " not found.";
72     }
73   } catch (std::exception &e) {
74     RETURN_STATUS_UNEXPECTED(e.what());
75   }
76   return Status::OK();
77 }
78 
InterruptAll()79 void IntrpService::InterruptAll() noexcept {
80   std::lock_guard<std::mutex> lck(mutex_);
81   for (auto const &it : all_intrp_resources_) {
82     std::string kName = it.first;
83     try {
84       it.second->Interrupt();
85     } catch (const std::exception &e) {
86       // continue the clean up.
87     }
88   }
89 }
90 }  // namespace dataset
91 }  // namespace mindspore
92