1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/util/intrp_service.h"
17 #include <sstream>
18 #include "minddata/dataset/util/services.h"
19 #include "minddata/dataset/util/task_manager.h"
20
21 namespace mindspore {
22 namespace dataset {
IntrpService()23 IntrpService::IntrpService() try : high_water_mark_(0) { (void)ServiceStart(); } catch (const std::exception &e) {
24 MS_LOG(ERROR) << "Interrupt service failed: " << e.what() << ".";
25 std::terminate();
26 }
27
~IntrpService()28 IntrpService::~IntrpService() noexcept {
29 MS_LOG(INFO) << "Number of registered resources is " << high_water_mark_ << ".";
30 if (!all_intrp_resources_.empty()) {
31 try {
32 InterruptAll();
33 } catch (const std::exception &e) {
34 // Ignore all error as we can't throw in the destructor.
35 }
36 }
37 (void)ServiceStop();
38 }
39
Register(const std::string & name,IntrpResource * res)40 Status IntrpService::Register(const std::string &name, IntrpResource *res) {
41 SharedLock stateLck(&state_lock_);
42 // Now double check the state
43 if (ServiceState() != STATE::kRunning) {
44 return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "Interrupt service is shutting down");
45 } else {
46 std::lock_guard<std::mutex> lck(mutex_);
47 try {
48 std::ostringstream ss;
49 ss << this_thread::get_id();
50 MS_LOG(DEBUG) << "Register resource with name " << name << ". Thread ID " << ss.str() << ".";
51 auto it = all_intrp_resources_.emplace(name, res);
52 if (it.second == false) {
53 return Status(StatusCode::kMDDuplicateKey, __LINE__, __FILE__, name);
54 }
55 high_water_mark_++;
56 } catch (std::exception &e) {
57 RETURN_STATUS_UNEXPECTED(e.what());
58 }
59 }
60 return Status::OK();
61 }
62
Deregister(const std::string & name)63 Status IntrpService::Deregister(const std::string &name) noexcept {
64 std::lock_guard<std::mutex> lck(mutex_);
65 try {
66 std::ostringstream ss;
67 ss << this_thread::get_id();
68 MS_LOG(DEBUG) << "De-register resource with name " << name << ". Thread ID is " << ss.str() << ".";
69 auto n = all_intrp_resources_.erase(name);
70 if (n == 0) {
71 MS_LOG(INFO) << "Key " << name << " not found.";
72 }
73 } catch (std::exception &e) {
74 RETURN_STATUS_UNEXPECTED(e.what());
75 }
76 return Status::OK();
77 }
78
InterruptAll()79 void IntrpService::InterruptAll() noexcept {
80 std::lock_guard<std::mutex> lck(mutex_);
81 for (auto const &it : all_intrp_resources_) {
82 std::string kName = it.first;
83 try {
84 it.second->Interrupt();
85 } catch (const std::exception &e) {
86 // continue the clean up.
87 }
88 }
89 }
90 } // namespace dataset
91 } // namespace mindspore
92