1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <utility> 21 22 #include "absl/container/flat_hash_set.h" 23 #include "absl/hash/hash.h" 24 25 #include "src/core/lib/promise/activity.h" 26 #include "src/core/lib/promise/poll.h" 27 28 namespace grpc_core { 29 30 // Helper type that can be used to enqueue many Activities waiting for some 31 // external state. 32 // Typically the external state should be guarded by mu_, and a call to 33 // WakeAllAndUnlock should be made when the state changes. 34 // Promises should bottom out polling inside pending(), which will register for 35 // wakeup and return Pending(). 36 // Queues handles to Activities, and not Activities themselves, meaning that if 37 // an Activity is destroyed prior to wakeup we end up holding only a small 38 // amount of memory (around 16 bytes + malloc overhead) until the next wakeup 39 // occurs. 40 class WaitSet final { 41 using WakerSet = absl::flat_hash_set<Waker>; 42 43 public: 44 // Register for wakeup, return Pending(). If state is not ready to proceed, 45 // Promises should bottom out here. AddPending(Waker waker)46 Pending AddPending(Waker waker) { 47 pending_.emplace(std::move(waker)); 48 return Pending(); 49 } 50 51 class WakeupSet { 52 public: Wakeup()53 void Wakeup() { 54 while (!wakeup_.empty()) { 55 wakeup_.extract(wakeup_.begin()).value().Wakeup(); 56 } 57 } 58 59 private: 60 friend class WaitSet; WakeupSet(WakerSet && wakeup)61 explicit WakeupSet(WakerSet&& wakeup) 62 : wakeup_(std::forward<WakerSet>(wakeup)) {} 63 WakerSet wakeup_; 64 }; 65 TakeWakeupSet()66 GRPC_MUST_USE_RESULT WakeupSet TakeWakeupSet() { 67 auto ret = WakeupSet(std::move(pending_)); 68 pending_.clear(); // reinitialize after move. 69 return ret; 70 } 71 WakeupAsync()72 void WakeupAsync() { 73 while (!pending_.empty()) { 74 pending_.extract(pending_.begin()).value().WakeupAsync(); 75 } 76 } 77 78 private: 79 // Handles to activities that need to be awoken. 80 WakerSet pending_; 81 }; 82 83 } // namespace grpc_core 84 85 #endif // GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 86