• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "server_executor/include/future_factory.h"
17  
18  #include "platform/time/include/time.h"
19  #include "protocol/data_channel/include/i_request.h"
20  #include "protocol/data_channel/include/i_response.h"
21  #include "protocol/retcode_inner/aie_retcode_inner.h"
22  #include "server_executor/include/future.h"
23  #include "server_executor/include/i_future_listener.h"
24  #include "utils/log/aie_log.h"
25  
26  namespace OHOS {
27  namespace AI {
28  namespace {
29  const int MAX_NUM_FUTURES = 1024;
30  const int INVALID_SEQUENCE_ID = -1;
31  const int MIN_SEQUENCE_ID = 1;
32  }
33  
34  std::mutex FutureFactory::mutex_;
35  FutureFactory *FutureFactory::instance_ = nullptr;
36  
GetInstance()37  FutureFactory *FutureFactory::GetInstance()
38  {
39      CHK_RET(instance_ != nullptr, instance_);
40  
41      std::lock_guard<std::mutex> lock(mutex_);
42      CHK_RET(instance_ != nullptr, instance_);
43  
44      AIE_NEW(instance_, FutureFactory);
45  
46      return instance_;
47  }
48  
ReleaseInstance()49  void FutureFactory::ReleaseInstance()
50  {
51      std::lock_guard<std::mutex> lock(mutex_);
52      AIE_DELETE(instance_);
53  }
54  
FutureFactory()55  FutureFactory::FutureFactory() : sequenceId_(0)
56  {
57  }
58  
~FutureFactory()59  FutureFactory::~FutureFactory()
60  {
61      HILOGI("[FutureFactory]Begin to release FutureFactory.");
62      for (auto &iter : listeners_) {
63          AIE_DELETE(iter.second);
64      }
65      listeners_.clear();
66      for (auto &iter : futures_) {
67          AIE_DELETE(iter.second);
68      }
69      futures_.clear();
70  }
71  
FindSequenceId()72  long long FutureFactory::FindSequenceId()
73  {
74      std::lock_guard<std::mutex> lock(innerMutex_);
75  
76      if (futures_.size() > MAX_NUM_FUTURES) {
77          HILOGE("[FutureFactory]Num of valid futures reaches max.");
78          return INVALID_SEQUENCE_ID;
79      }
80  
81      do {
82          ++sequenceId_;
83          if (sequenceId_ < MIN_SEQUENCE_ID) {
84              HILOGI("[FutureFactory]The sequenceId_ is smaller than MIN_SEQUENCE_ID.");
85              sequenceId_ = MIN_SEQUENCE_ID;
86          }
87      } while (futures_.find(sequenceId_) != futures_.end());
88  
89      return sequenceId_;
90  }
91  
CreateFuture(IRequest * request)92  int FutureFactory::CreateFuture(IRequest *request)
93  {
94      long long sequenceId = FindSequenceId();
95      if (sequenceId == INVALID_SEQUENCE_ID) {
96          HILOGE("[FutureFactory]Invalid sequence id generated.");
97          return RETCODE_NULL_PARAM;
98      }
99  
100      if (request == nullptr) {
101          HILOGE("[FutureFactory]Param request is nullptr.");
102          return RETCODE_NULL_PARAM;
103      }
104      Request *req = reinterpret_cast<Request*>(request);
105      req->SetInnerSequenceId(sequenceId);
106  
107      Future *future = nullptr;
108      AIE_NEW(future, Future(request, sequenceId, request->GetTransactionId()));
109      CHK_RET(future == nullptr, RETCODE_OUT_OF_MEMORY);
110  
111      AddFuture(sequenceId, future);
112      return RETCODE_SUCCESS;
113  }
114  
AddFuture(long long sequenceId,Future * future)115  void FutureFactory::AddFuture(long long sequenceId, Future* future)
116  {
117      std::lock_guard<std::mutex> lock(innerMutex_);
118  
119      futures_[sequenceId] = future;
120  }
121  
Release(long long sequenceId)122  void FutureFactory::Release(long long sequenceId)
123  {
124      DeleteFuture(sequenceId);
125  }
126  
DeleteFuture(long long sequenceId)127  void FutureFactory::DeleteFuture(long long sequenceId)
128  {
129      std::lock_guard<std::mutex> lock(innerMutex_);
130      auto iter = futures_.find(sequenceId);
131      if (iter != futures_.end()) {
132          delete iter->second;
133          iter->second = nullptr;
134          futures_.erase(sequenceId);
135      }
136  }
137  
RegisterListener(IFutureListener * listener,long long transactionId)138  void FutureFactory::RegisterListener(IFutureListener *listener, long long transactionId)
139  {
140      std::lock_guard<std::mutex> lock(innerMutex_);
141      listeners_[transactionId] = listener;
142  }
143  
UnregisterListener(long long transactionId)144  void FutureFactory::UnregisterListener(long long transactionId)
145  {
146      std::lock_guard<std::mutex> lock(innerMutex_);
147      auto iter = listeners_.find(transactionId);
148      if (iter != listeners_.end()) {
149          delete iter->second;
150          iter->second = nullptr;
151          listeners_.erase(transactionId);
152      }
153  }
154  
ProcessResponse(PluginEvent event,IResponse * response)155  int FutureFactory::ProcessResponse(PluginEvent event, IResponse *response)
156  {
157      CHK_RET(response == nullptr, RETCODE_NULL_PARAM);
158      HILOGI("[FutureFactory]Begin to Process Response.");
159      Response *res = reinterpret_cast<Response *>(response);
160      Future *future = FetchFuture(res);
161      if (!future) {
162          HILOGE("[FutureFactory][transactionId:%lld]No matched future found, seqId=%lld.",
163              res->GetTransactionId(), res->GetInnerSequenceId());
164          return RETCODE_NULL_PARAM;
165      }
166  
167      FutureStatus status = Future::ConvertPluginStatus(event);
168      future->SetResponse(status, response);
169  
170      IFutureListener *listener = FindListener(response->GetTransactionId());
171      if (listener == nullptr) {
172          HILOGE("[FutureFactory][transactionId:%lld]No matched listener found.", response->GetTransactionId());
173          return RETCODE_NO_LISTENER_FOUND;
174      }
175  
176      listener->OnReply(future);
177      future->DetachResponse();
178      DeleteFuture(future->GetSequenceId());
179  
180      return RETCODE_SUCCESS;
181  }
182  
FetchFuture(Response * response)183  Future *FutureFactory::FetchFuture(Response *response)
184  {
185      long long sequenceId = response->GetInnerSequenceId();
186      std::lock_guard<std::mutex> lock(innerMutex_);
187  
188      auto findProc = futures_.find(sequenceId);
189      CHK_RET(findProc == futures_.end(), nullptr);
190  
191      return findProc->second;
192  }
193  
FindListener(long long transactionId)194  IFutureListener *FutureFactory::FindListener(long long transactionId)
195  {
196      std::lock_guard<std::mutex> lock(innerMutex_);
197  
198      auto findProc = listeners_.find(transactionId);
199      CHK_RET(findProc == listeners_.end(), nullptr);
200  
201      return findProc->second;
202  }
203  } // namespace AI
204  } // namespace OHOS