1 /* 2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef HISTREAMER_FOUNDATION_OSAL_BASE_SYNCHRONIZER_H 17 #define HISTREAMER_FOUNDATION_OSAL_BASE_SYNCHRONIZER_H 18 19 #include <functional> 20 #include <map> 21 #include <set> 22 #include <string> 23 24 #include "foundation/log.h" 25 #include "foundation/osal/thread/condition_variable.h" 26 #include "foundation/osal/thread/mutex.h" 27 28 namespace OHOS { 29 namespace Media { 30 namespace OSAL { 31 template <typename SyncIdType, typename ResultType = void> 32 class Synchronizer { 33 public: Synchronizer(std::string name)34 explicit Synchronizer(std::string name) : name_(std::move(name)) 35 { 36 } 37 38 Synchronizer(const Synchronizer<SyncIdType, ResultType>&) = delete; 39 40 Synchronizer<SyncIdType, ResultType>& operator=(const Synchronizer<SyncIdType, ResultType>&) = delete; 41 42 virtual ~Synchronizer() = default; 43 Wait(SyncIdType syncId,const std::function<void ()> & asyncOps)44 void Wait(SyncIdType syncId, const std::function<void()>& asyncOps) 45 { 46 MEDIA_LOG_I("Synchronizer " PUBLIC_LOG_S " Wait for " PUBLIC_LOG_D32, 47 name_.c_str(), static_cast<int>(syncId)); 48 if (asyncOps) { 49 OSAL::ScopedLock lock(mutex_); 50 waitSet_.insert(syncId); 51 asyncOps(); 52 cv_.Wait(lock, [this, syncId] { return syncMap_.find(syncId) != syncMap_.end(); }); 53 syncMap_.erase(syncId); 54 } 55 } 56 WaitFor(SyncIdType syncId,const std::function<void ()> & asyncOps,int timeoutMs)57 bool WaitFor(SyncIdType syncId, const std::function<void()>& asyncOps, int timeoutMs) 58 { 59 MEDIA_LOG_I("Synchronizer " PUBLIC_LOG_S " Wait for " PUBLIC_LOG_D32 ", timeout: " PUBLIC_LOG_D32, 60 name_.c_str(), static_cast<int>(syncId), timeoutMs); 61 if (!asyncOps) { 62 return false; 63 } 64 OSAL::ScopedLock lock(mutex_); 65 waitSet_.insert(syncId); 66 asyncOps(); 67 auto rtv = cv_.WaitFor(lock, timeoutMs, [this, syncId] { return syncMap_.find(syncId) != syncMap_.end(); }); 68 if (rtv) { 69 syncMap_.erase(syncId); 70 } else { 71 waitSet_.erase(syncId); 72 } 73 return rtv; 74 } 75 Wait(SyncIdType syncId,const std::function<void ()> & asyncOps,ResultType & result)76 void Wait(SyncIdType syncId, const std::function<void()>& asyncOps, ResultType& result) 77 { 78 MEDIA_LOG_I("Synchronizer " PUBLIC_LOG_S " Wait for " PUBLIC_LOG_D32, 79 name_.c_str(), static_cast<int>(syncId)); 80 if (asyncOps) { 81 OSAL::ScopedLock lock(mutex_); 82 waitSet_.insert(syncId); 83 asyncOps(); 84 cv_.Wait(lock, [this, syncId] { return syncMap_.find(syncId) != syncMap_.end(); }); 85 result = syncMap_[syncId]; 86 syncMap_.erase(syncId); 87 } 88 } 89 WaitFor(SyncIdType syncId,const std::function<bool ()> & asyncOps,int timeoutMs,ResultType & result)90 bool WaitFor(SyncIdType syncId, const std::function<bool()>& asyncOps, int timeoutMs, ResultType& result) 91 { 92 MEDIA_LOG_I("Synchronizer " PUBLIC_LOG_S " Wait for " PUBLIC_LOG_D32 ", timeout: " PUBLIC_LOG_D32, 93 name_.c_str(), static_cast<int>(syncId), timeoutMs); 94 if (!asyncOps) { 95 return false; 96 } 97 OSAL::ScopedLock lock(mutex_); 98 waitSet_.insert(syncId); 99 if (!asyncOps()) { 100 waitSet_.erase(syncId); 101 return false; 102 } 103 auto rtv = cv_.WaitFor(lock, timeoutMs, [this, syncId] { return syncMap_.find(syncId) != syncMap_.end(); }); 104 if (rtv) { 105 result = syncMap_[syncId]; 106 syncMap_.erase(syncId); 107 MEDIA_LOG_D("Synchronizer " PUBLIC_LOG_S " Wait for " PUBLIC_LOG_D32 " return.", name_.c_str(), 108 static_cast<int>(syncId)); 109 } else { 110 waitSet_.erase(syncId); 111 } 112 return rtv; 113 } 114 115 void Notify(SyncIdType syncId, ResultType result = ResultType()) 116 { 117 MEDIA_LOG_I("Synchronizer " PUBLIC_LOG_S " Notify: " PUBLIC_LOG_D32, 118 name_.c_str(), static_cast<int>(syncId)); 119 OSAL::ScopedLock lock(mutex_); 120 if (waitSet_.find(syncId) != waitSet_.end()) { 121 waitSet_.erase(syncId); 122 syncMap_.insert({syncId, result}); 123 cv_.NotifyAll(); 124 } 125 } 126 127 private: 128 Mutex mutex_; 129 ConditionVariable cv_; 130 std::string name_; 131 std::map<SyncIdType, ResultType> syncMap_; 132 std::set<SyncIdType> waitSet_; 133 }; 134 } // namespace OSAL 135 } // namespace Media 136 } // namespace OHOS 137 #endif // HISTREAMER_FOUNDATION_OSAL_BASE_SYNCHRONIZER_H 138