1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/util/intrp_service.h"
17 #include <sstream>
18 #include "minddata/dataset/util/services.h"
19 #include "minddata/dataset/util/task_manager.h"
20
21 namespace mindspore {
22 namespace dataset {
23 const int64_t kServiceRetryGetUniqueIdInterVal = 10;
IntrpService()24 IntrpService::IntrpService() try : high_water_mark_(0) { (void)ServiceStart(); } catch (const std::exception &e) {
25 MS_LOG(ERROR) << "Interrupt service failed: " << e.what() << ".";
26 std::terminate();
27 }
28
~IntrpService()29 IntrpService::~IntrpService() noexcept {
30 MS_LOG(DEBUG) << "Number of registered resources is " << high_water_mark_ << ".";
31 if (!all_intrp_resources_.empty()) {
32 try {
33 InterruptAll();
34 } catch (const std::exception &e) {
35 // Ignore all error as we can't throw in the destructor.
36 }
37 }
38 (void)ServiceStop();
39 }
40
Register(std::string * name,IntrpResource * res)41 Status IntrpService::Register(std::string *name, IntrpResource *res) {
42 SharedLock stateLck(&state_lock_);
43 // Now double check the state
44 if (ServiceState() != STATE::kRunning) {
45 RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "Interrupt service is shutting down");
46 } else {
47 std::lock_guard<std::mutex> lck(mutex_);
48 try {
49 std::ostringstream ss;
50 std::string uuid = std::string("");
51 ss << this_thread::get_id();
52 MS_LOG(DEBUG) << "Register resource with name " << *name << ". Thread ID " << ss.str() << ".";
53 auto it = all_intrp_resources_.emplace(*name, res);
54 while (it.second == false) {
55 uuid = Services::GetUniqueID();
56 it = all_intrp_resources_.emplace(uuid, res);
57 MS_LOG(INFO) << "The name(" << *name << ") of register resource is duplicate, get new uuid: " << uuid;
58 std::this_thread::sleep_for(std::chrono::milliseconds(kServiceRetryGetUniqueIdInterVal));
59 }
60 if (!uuid.empty()) {
61 *name = uuid;
62 }
63 high_water_mark_++;
64 } catch (std::exception &e) {
65 RETURN_STATUS_UNEXPECTED(e.what());
66 }
67 }
68 return Status::OK();
69 }
70
Deregister(const std::string & name)71 Status IntrpService::Deregister(const std::string &name) noexcept {
72 std::lock_guard<std::mutex> lck(mutex_);
73 try {
74 std::ostringstream ss;
75 ss << this_thread::get_id();
76 MS_LOG(DEBUG) << "De-register resource with name " << name << ". Thread ID is " << ss.str() << ".";
77 auto n = all_intrp_resources_.erase(name);
78 if (n == 0) {
79 MS_LOG(INFO) << "Key " << name << " not found.";
80 }
81 } catch (std::exception &e) {
82 RETURN_STATUS_UNEXPECTED(e.what());
83 }
84 return Status::OK();
85 }
86
InterruptAll()87 void IntrpService::InterruptAll() noexcept {
88 std::lock_guard<std::mutex> lck(mutex_);
89 for (auto const &it : all_intrp_resources_) {
90 std::string kName = it.first;
91 try {
92 it.second->Interrupt();
93 } catch (const std::exception &e) {
94 // continue the clean up.
95 }
96 }
97 }
98 } // namespace dataset
99 } // namespace mindspore
100