1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <utility> 21 22 #include "absl/container/flat_hash_set.h" 23 #include "absl/hash/hash.h" 24 #include "src/core/lib/promise/activity.h" 25 #include "src/core/lib/promise/poll.h" 26 27 namespace grpc_core { 28 29 // Helper type that can be used to enqueue many Activities waiting for some 30 // external state. 31 // Typically the external state should be guarded by mu_, and a call to 32 // WakeAllAndUnlock should be made when the state changes. 33 // Promises should bottom out polling inside pending(), which will register for 34 // wakeup and return Pending(). 35 // Queues handles to Activities, and not Activities themselves, meaning that if 36 // an Activity is destroyed prior to wakeup we end up holding only a small 37 // amount of memory (around 16 bytes + malloc overhead) until the next wakeup 38 // occurs. 39 class WaitSet final { 40 using WakerSet = absl::flat_hash_set<Waker>; 41 42 public: 43 // Register for wakeup, return Pending(). If state is not ready to proceed, 44 // Promises should bottom out here. AddPending(Waker waker)45 Pending AddPending(Waker waker) { 46 pending_.emplace(std::move(waker)); 47 return Pending(); 48 } 49 50 class WakeupSet { 51 public: Wakeup()52 void Wakeup() { 53 while (!wakeup_.empty()) { 54 wakeup_.extract(wakeup_.begin()).value().Wakeup(); 55 } 56 } 57 58 private: 59 friend class WaitSet; WakeupSet(WakerSet && wakeup)60 explicit WakeupSet(WakerSet&& wakeup) 61 : wakeup_(std::forward<WakerSet>(wakeup)) {} 62 WakerSet wakeup_; 63 }; 64 TakeWakeupSet()65 GRPC_MUST_USE_RESULT WakeupSet TakeWakeupSet() { 66 auto ret = WakeupSet(std::move(pending_)); 67 pending_.clear(); // reinitialize after move. 68 return ret; 69 } 70 WakeupAsync()71 void WakeupAsync() { 72 while (!pending_.empty()) { 73 pending_.extract(pending_.begin()).value().WakeupAsync(); 74 } 75 } 76 77 private: 78 // Handles to activities that need to be awoken. 79 WakerSet pending_; 80 }; 81 82 } // namespace grpc_core 83 84 #endif // GRPC_SRC_CORE_LIB_PROMISE_WAIT_SET_H 85