1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "server_executor/include/future_factory.h" 17 18 #include "platform/time/include/time.h" 19 #include "protocol/data_channel/include/i_request.h" 20 #include "protocol/data_channel/include/i_response.h" 21 #include "protocol/retcode_inner/aie_retcode_inner.h" 22 #include "server_executor/include/future.h" 23 #include "server_executor/include/i_future_listener.h" 24 #include "utils/log/aie_log.h" 25 26 namespace OHOS { 27 namespace AI { 28 namespace { 29 const int MAX_NUM_FUTURES = 1024; 30 const int INVALID_SEQUENCE_ID = -1; 31 const int MIN_SEQUENCE_ID = 1; 32 } 33 34 std::mutex FutureFactory::mutex_; 35 FutureFactory *FutureFactory::instance_ = nullptr; 36 GetInstance()37 FutureFactory *FutureFactory::GetInstance() 38 { 39 CHK_RET(instance_ != nullptr, instance_); 40 41 std::lock_guard<std::mutex> lock(mutex_); 42 CHK_RET(instance_ != nullptr, instance_); 43 44 AIE_NEW(instance_, FutureFactory); 45 46 return instance_; 47 } 48 ReleaseInstance()49 void FutureFactory::ReleaseInstance() 50 { 51 std::lock_guard<std::mutex> lock(mutex_); 52 AIE_DELETE(instance_); 53 } 54 FutureFactory()55 FutureFactory::FutureFactory() : sequenceId_(0) 56 { 57 } 58 ~FutureFactory()59 FutureFactory::~FutureFactory() 60 { 61 HILOGI("[FutureFactory]Begin to release FutureFactory."); 62 for (auto &iter : listeners_) { 63 AIE_DELETE(iter.second); 64 } 65 listeners_.clear(); 66 for (auto &iter : futures_) { 67 AIE_DELETE(iter.second); 68 } 69 futures_.clear(); 70 } 71 FindSequenceId()72 long long FutureFactory::FindSequenceId() 73 { 74 std::lock_guard<std::mutex> lock(innerMutex_); 75 76 if (futures_.size() > MAX_NUM_FUTURES) { 77 HILOGE("[FutureFactory]Num of valid futures reaches max."); 78 return INVALID_SEQUENCE_ID; 79 } 80 81 do { 82 ++sequenceId_; 83 if (sequenceId_ < MIN_SEQUENCE_ID) { 84 HILOGI("[FutureFactory]The sequenceId_ is smaller than MIN_SEQUENCE_ID."); 85 sequenceId_ = MIN_SEQUENCE_ID; 86 } 87 } while (futures_.find(sequenceId_) != futures_.end()); 88 89 return sequenceId_; 90 } 91 CreateFuture(IRequest * request)92 int FutureFactory::CreateFuture(IRequest *request) 93 { 94 long long sequenceId = FindSequenceId(); 95 if (sequenceId == INVALID_SEQUENCE_ID) { 96 HILOGE("[FutureFactory]Invalid sequence id generated."); 97 return RETCODE_NULL_PARAM; 98 } 99 100 if (request == nullptr) { 101 HILOGE("[FutureFactory]Param request is nullptr."); 102 return RETCODE_NULL_PARAM; 103 } 104 Request *req = reinterpret_cast<Request*>(request); 105 req->SetInnerSequenceId(sequenceId); 106 107 Future *future = nullptr; 108 AIE_NEW(future, Future(request, sequenceId, request->GetTransactionId())); 109 CHK_RET(future == nullptr, RETCODE_OUT_OF_MEMORY); 110 111 AddFuture(sequenceId, future); 112 return RETCODE_SUCCESS; 113 } 114 AddFuture(long long sequenceId,Future * future)115 void FutureFactory::AddFuture(long long sequenceId, Future* future) 116 { 117 std::lock_guard<std::mutex> lock(innerMutex_); 118 119 futures_[sequenceId] = future; 120 } 121 Release(long long sequenceId)122 void FutureFactory::Release(long long sequenceId) 123 { 124 DeleteFuture(sequenceId); 125 } 126 DeleteFuture(long long sequenceId)127 void FutureFactory::DeleteFuture(long long sequenceId) 128 { 129 std::lock_guard<std::mutex> lock(innerMutex_); 130 auto iter = futures_.find(sequenceId); 131 if (iter != futures_.end()) { 132 delete iter->second; 133 iter->second = nullptr; 134 futures_.erase(sequenceId); 135 } 136 } 137 RegisterListener(IFutureListener * listener,long long transactionId)138 void FutureFactory::RegisterListener(IFutureListener *listener, long long transactionId) 139 { 140 std::lock_guard<std::mutex> lock(innerMutex_); 141 listeners_[transactionId] = listener; 142 } 143 UnregisterListener(long long transactionId)144 void FutureFactory::UnregisterListener(long long transactionId) 145 { 146 std::lock_guard<std::mutex> lock(innerMutex_); 147 auto iter = listeners_.find(transactionId); 148 if (iter != listeners_.end()) { 149 delete iter->second; 150 iter->second = nullptr; 151 listeners_.erase(transactionId); 152 } 153 } 154 ProcessResponse(PluginEvent event,IResponse * response)155 int FutureFactory::ProcessResponse(PluginEvent event, IResponse *response) 156 { 157 CHK_RET(response == nullptr, RETCODE_NULL_PARAM); 158 HILOGI("[FutureFactory]Begin to Process Response."); 159 Response *res = reinterpret_cast<Response *>(response); 160 Future *future = FetchFuture(res); 161 if (!future) { 162 HILOGE("[FutureFactory][transactionId:%lld]No matched future found, seqId=%lld.", 163 res->GetTransactionId(), res->GetInnerSequenceId()); 164 return RETCODE_NULL_PARAM; 165 } 166 167 FutureStatus status = Future::ConvertPluginStatus(event); 168 future->SetResponse(status, response); 169 170 IFutureListener *listener = FindListener(response->GetTransactionId()); 171 if (listener == nullptr) { 172 HILOGE("[FutureFactory][transactionId:%lld]No matched listener found.", response->GetTransactionId()); 173 return RETCODE_NO_LISTENER_FOUND; 174 } 175 176 listener->OnReply(future); 177 future->DetachResponse(); 178 DeleteFuture(future->GetSequenceId()); 179 180 return RETCODE_SUCCESS; 181 } 182 FetchFuture(Response * response)183 Future *FutureFactory::FetchFuture(Response *response) 184 { 185 long long sequenceId = response->GetInnerSequenceId(); 186 std::lock_guard<std::mutex> lock(innerMutex_); 187 188 auto findProc = futures_.find(sequenceId); 189 CHK_RET(findProc == futures_.end(), nullptr); 190 191 return findProc->second; 192 } 193 FindListener(long long transactionId)194 IFutureListener *FutureFactory::FindListener(long long transactionId) 195 { 196 std::lock_guard<std::mutex> lock(innerMutex_); 197 198 auto findProc = listeners_.find(transactionId); 199 CHK_RET(findProc == listeners_.end(), nullptr); 200 201 return findProc->second; 202 } 203 } // namespace AI 204 } // namespace OHOS