1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/observable.h"
16
17 #include <grpc/support/log.h>
18
19 #include <cstdint>
20 #include <limits>
21 #include <thread>
22 #include <vector>
23
24 #include "absl/strings/str_join.h"
25 #include "gmock/gmock.h"
26 #include "gtest/gtest.h"
27 #include "src/core/lib/promise/loop.h"
28 #include "src/core/lib/promise/map.h"
29 #include "src/core/util/notification.h"
30 #include "test/core/promise/poll_matcher.h"
31
32 using testing::Mock;
33 using testing::StrictMock;
34
35 namespace grpc_core {
36 namespace {
37
38 class MockActivity : public Activity, public Wakeable {
39 public:
40 MOCK_METHOD(void, WakeupRequested, ());
41
ForceImmediateRepoll(WakeupMask)42 void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()43 void Orphan() override {}
MakeOwningWaker()44 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()45 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)46 void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)47 void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)48 void Drop(WakeupMask) override {}
DebugTag() const49 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const50 std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
51
Activate()52 void Activate() {
53 if (scoped_activity_ != nullptr) return;
54 scoped_activity_ = std::make_unique<ScopedActivity>(this);
55 }
56
Deactivate()57 void Deactivate() { scoped_activity_.reset(); }
58
59 private:
60 std::unique_ptr<ScopedActivity> scoped_activity_;
61 };
62
TEST(ObservableTest,ImmediateNext)63 TEST(ObservableTest, ImmediateNext) {
64 Observable<int> observable(1);
65 auto next = observable.Next(0);
66 EXPECT_THAT(next(), IsReady(1));
67 }
68
TEST(ObservableTest,SetBecomesImmediateNext1)69 TEST(ObservableTest, SetBecomesImmediateNext1) {
70 Observable<int> observable(0);
71 auto next = observable.Next(0);
72 observable.Set(1);
73 EXPECT_THAT(next(), IsReady(1));
74 }
75
TEST(ObservableTest,SetBecomesImmediateNext2)76 TEST(ObservableTest, SetBecomesImmediateNext2) {
77 Observable<int> observable(0);
78 observable.Set(1);
79 auto next = observable.Next(0);
80 EXPECT_THAT(next(), IsReady(1));
81 }
82
TEST(ObservableTest,SameValueGetsPending)83 TEST(ObservableTest, SameValueGetsPending) {
84 StrictMock<MockActivity> activity;
85 activity.Activate();
86 Observable<int> observable(1);
87 auto next = observable.Next(1);
88 EXPECT_THAT(next(), IsPending());
89 EXPECT_THAT(next(), IsPending());
90 EXPECT_THAT(next(), IsPending());
91 EXPECT_THAT(next(), IsPending());
92 }
93
TEST(ObservableTest,ChangeValueWakesUp)94 TEST(ObservableTest, ChangeValueWakesUp) {
95 StrictMock<MockActivity> activity;
96 activity.Activate();
97 Observable<int> observable(1);
98 auto next = observable.Next(1);
99 EXPECT_THAT(next(), IsPending());
100 EXPECT_CALL(activity, WakeupRequested());
101 observable.Set(2);
102 Mock::VerifyAndClearExpectations(&activity);
103 EXPECT_THAT(next(), IsReady(2));
104 }
105
TEST(ObservableTest,NextWhen)106 TEST(ObservableTest, NextWhen) {
107 StrictMock<MockActivity> activity;
108 activity.Activate();
109 Observable<int> observable(1);
110 auto next = observable.NextWhen([](int i) { return i == 3; });
111 EXPECT_THAT(next(), IsPending());
112 EXPECT_CALL(activity, WakeupRequested());
113 observable.Set(2);
114 EXPECT_THAT(next(), IsPending());
115 EXPECT_CALL(activity, WakeupRequested());
116 observable.Set(3);
117 Mock::VerifyAndClearExpectations(&activity);
118 EXPECT_THAT(next(), IsReady(3));
119 }
120
TEST(ObservableTest,MultipleActivitiesWakeUp)121 TEST(ObservableTest, MultipleActivitiesWakeUp) {
122 StrictMock<MockActivity> activity1;
123 StrictMock<MockActivity> activity2;
124 Observable<int> observable(1);
125 auto next1 = observable.Next(1);
126 auto next2 = observable.Next(1);
127 {
128 activity1.Activate();
129 EXPECT_THAT(next1(), IsPending());
130 }
131 {
132 activity2.Activate();
133 EXPECT_THAT(next2(), IsPending());
134 }
135 EXPECT_CALL(activity1, WakeupRequested());
136 EXPECT_CALL(activity2, WakeupRequested());
137 observable.Set(2);
138 Mock::VerifyAndClearExpectations(&activity1);
139 Mock::VerifyAndClearExpectations(&activity2);
140 EXPECT_THAT(next1(), IsReady(2));
141 EXPECT_THAT(next2(), IsReady(2));
142 }
143
TEST(ObservableTest,NoDeadlockOnDestruction)144 TEST(ObservableTest, NoDeadlockOnDestruction) {
145 StrictMock<MockActivity> activity;
146 Observable<int> observable(1);
147 activity.Activate();
148 {
149 auto next = observable.Next(1);
150 EXPECT_THAT(next(), IsPending());
151 }
152 }
153
154 class ThreadWakeupScheduler {
155 public:
156 template <typename ActivityType>
157 class BoundScheduler {
158 public:
BoundScheduler(ThreadWakeupScheduler)159 explicit BoundScheduler(ThreadWakeupScheduler) {}
ScheduleWakeup()160 void ScheduleWakeup() {
161 std::thread t(
162 [this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); });
163 t.detach();
164 }
165 };
166 };
167
TEST(ObservableTest,Stress)168 TEST(ObservableTest, Stress) {
169 static constexpr uint64_t kEnd = std::numeric_limits<uint64_t>::max();
170 std::vector<uint64_t> values1;
171 std::vector<uint64_t> values2;
172 uint64_t current1 = 0;
173 uint64_t current2 = 0;
174 Notification done1;
175 Notification done2;
176 Observable<uint64_t> observable(0);
177 auto activity1 = MakeActivity(
178 Loop([&observable, ¤t1, &values1] {
179 return Map(
180 observable.Next(current1),
181 [&values1, ¤t1](uint64_t value) -> LoopCtl<absl::Status> {
182 values1.push_back(value);
183 current1 = value;
184 if (value == kEnd) return absl::OkStatus();
185 return Continue{};
186 });
187 }),
188 ThreadWakeupScheduler(), [&done1](absl::Status status) {
189 EXPECT_TRUE(status.ok()) << status.ToString();
190 done1.Notify();
191 });
192 auto activity2 = MakeActivity(
193 Loop([&observable, ¤t2, &values2] {
194 return Map(
195 observable.Next(current2),
196 [&values2, ¤t2](uint64_t value) -> LoopCtl<absl::Status> {
197 values2.push_back(value);
198 current2 = value;
199 if (value == kEnd) return absl::OkStatus();
200 return Continue{};
201 });
202 }),
203 ThreadWakeupScheduler(), [&done2](absl::Status status) {
204 EXPECT_TRUE(status.ok()) << status.ToString();
205 done2.Notify();
206 });
207 for (uint64_t i = 0; i < 1000000; i++) {
208 observable.Set(i);
209 }
210 observable.Set(kEnd);
211 done1.WaitForNotification();
212 done2.WaitForNotification();
213 ASSERT_GE(values1.size(), 1);
214 ASSERT_GE(values2.size(), 1);
215 EXPECT_EQ(values1.back(), kEnd);
216 EXPECT_EQ(values2.back(), kEnd);
217 }
218
219 } // namespace
220 } // namespace grpc_core
221
main(int argc,char ** argv)222 int main(int argc, char** argv) {
223 gpr_log_verbosity_init();
224 ::testing::InitGoogleTest(&argc, argv);
225 return RUN_ALL_TESTS();
226 }
227