• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/observable.h"
16 
17 #include <grpc/support/log.h>
18 
19 #include <cstdint>
20 #include <limits>
21 #include <thread>
22 #include <vector>
23 
24 #include "absl/strings/str_join.h"
25 #include "gmock/gmock.h"
26 #include "gtest/gtest.h"
27 #include "src/core/lib/promise/loop.h"
28 #include "src/core/lib/promise/map.h"
29 #include "src/core/util/notification.h"
30 #include "test/core/promise/poll_matcher.h"
31 
32 using testing::Mock;
33 using testing::StrictMock;
34 
35 namespace grpc_core {
36 namespace {
37 
38 class MockActivity : public Activity, public Wakeable {
39  public:
40   MOCK_METHOD(void, WakeupRequested, ());
41 
ForceImmediateRepoll(WakeupMask)42   void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); }
Orphan()43   void Orphan() override {}
MakeOwningWaker()44   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()45   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)46   void Wakeup(WakeupMask) override { WakeupRequested(); }
WakeupAsync(WakeupMask)47   void WakeupAsync(WakeupMask) override { WakeupRequested(); }
Drop(WakeupMask)48   void Drop(WakeupMask) override {}
DebugTag() const49   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const50   std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
51 
Activate()52   void Activate() {
53     if (scoped_activity_ != nullptr) return;
54     scoped_activity_ = std::make_unique<ScopedActivity>(this);
55   }
56 
Deactivate()57   void Deactivate() { scoped_activity_.reset(); }
58 
59  private:
60   std::unique_ptr<ScopedActivity> scoped_activity_;
61 };
62 
TEST(ObservableTest,ImmediateNext)63 TEST(ObservableTest, ImmediateNext) {
64   Observable<int> observable(1);
65   auto next = observable.Next(0);
66   EXPECT_THAT(next(), IsReady(1));
67 }
68 
TEST(ObservableTest,SetBecomesImmediateNext1)69 TEST(ObservableTest, SetBecomesImmediateNext1) {
70   Observable<int> observable(0);
71   auto next = observable.Next(0);
72   observable.Set(1);
73   EXPECT_THAT(next(), IsReady(1));
74 }
75 
TEST(ObservableTest,SetBecomesImmediateNext2)76 TEST(ObservableTest, SetBecomesImmediateNext2) {
77   Observable<int> observable(0);
78   observable.Set(1);
79   auto next = observable.Next(0);
80   EXPECT_THAT(next(), IsReady(1));
81 }
82 
TEST(ObservableTest,SameValueGetsPending)83 TEST(ObservableTest, SameValueGetsPending) {
84   StrictMock<MockActivity> activity;
85   activity.Activate();
86   Observable<int> observable(1);
87   auto next = observable.Next(1);
88   EXPECT_THAT(next(), IsPending());
89   EXPECT_THAT(next(), IsPending());
90   EXPECT_THAT(next(), IsPending());
91   EXPECT_THAT(next(), IsPending());
92 }
93 
TEST(ObservableTest,ChangeValueWakesUp)94 TEST(ObservableTest, ChangeValueWakesUp) {
95   StrictMock<MockActivity> activity;
96   activity.Activate();
97   Observable<int> observable(1);
98   auto next = observable.Next(1);
99   EXPECT_THAT(next(), IsPending());
100   EXPECT_CALL(activity, WakeupRequested());
101   observable.Set(2);
102   Mock::VerifyAndClearExpectations(&activity);
103   EXPECT_THAT(next(), IsReady(2));
104 }
105 
TEST(ObservableTest,NextWhen)106 TEST(ObservableTest, NextWhen) {
107   StrictMock<MockActivity> activity;
108   activity.Activate();
109   Observable<int> observable(1);
110   auto next = observable.NextWhen([](int i) { return i == 3; });
111   EXPECT_THAT(next(), IsPending());
112   EXPECT_CALL(activity, WakeupRequested());
113   observable.Set(2);
114   EXPECT_THAT(next(), IsPending());
115   EXPECT_CALL(activity, WakeupRequested());
116   observable.Set(3);
117   Mock::VerifyAndClearExpectations(&activity);
118   EXPECT_THAT(next(), IsReady(3));
119 }
120 
TEST(ObservableTest,MultipleActivitiesWakeUp)121 TEST(ObservableTest, MultipleActivitiesWakeUp) {
122   StrictMock<MockActivity> activity1;
123   StrictMock<MockActivity> activity2;
124   Observable<int> observable(1);
125   auto next1 = observable.Next(1);
126   auto next2 = observable.Next(1);
127   {
128     activity1.Activate();
129     EXPECT_THAT(next1(), IsPending());
130   }
131   {
132     activity2.Activate();
133     EXPECT_THAT(next2(), IsPending());
134   }
135   EXPECT_CALL(activity1, WakeupRequested());
136   EXPECT_CALL(activity2, WakeupRequested());
137   observable.Set(2);
138   Mock::VerifyAndClearExpectations(&activity1);
139   Mock::VerifyAndClearExpectations(&activity2);
140   EXPECT_THAT(next1(), IsReady(2));
141   EXPECT_THAT(next2(), IsReady(2));
142 }
143 
TEST(ObservableTest,NoDeadlockOnDestruction)144 TEST(ObservableTest, NoDeadlockOnDestruction) {
145   StrictMock<MockActivity> activity;
146   Observable<int> observable(1);
147   activity.Activate();
148   {
149     auto next = observable.Next(1);
150     EXPECT_THAT(next(), IsPending());
151   }
152 }
153 
154 class ThreadWakeupScheduler {
155  public:
156   template <typename ActivityType>
157   class BoundScheduler {
158    public:
BoundScheduler(ThreadWakeupScheduler)159     explicit BoundScheduler(ThreadWakeupScheduler) {}
ScheduleWakeup()160     void ScheduleWakeup() {
161       std::thread t(
162           [this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); });
163       t.detach();
164     }
165   };
166 };
167 
TEST(ObservableTest,Stress)168 TEST(ObservableTest, Stress) {
169   static constexpr uint64_t kEnd = std::numeric_limits<uint64_t>::max();
170   std::vector<uint64_t> values1;
171   std::vector<uint64_t> values2;
172   uint64_t current1 = 0;
173   uint64_t current2 = 0;
174   Notification done1;
175   Notification done2;
176   Observable<uint64_t> observable(0);
177   auto activity1 = MakeActivity(
178       Loop([&observable, &current1, &values1] {
179         return Map(
180             observable.Next(current1),
181             [&values1, &current1](uint64_t value) -> LoopCtl<absl::Status> {
182               values1.push_back(value);
183               current1 = value;
184               if (value == kEnd) return absl::OkStatus();
185               return Continue{};
186             });
187       }),
188       ThreadWakeupScheduler(), [&done1](absl::Status status) {
189         EXPECT_TRUE(status.ok()) << status.ToString();
190         done1.Notify();
191       });
192   auto activity2 = MakeActivity(
193       Loop([&observable, &current2, &values2] {
194         return Map(
195             observable.Next(current2),
196             [&values2, &current2](uint64_t value) -> LoopCtl<absl::Status> {
197               values2.push_back(value);
198               current2 = value;
199               if (value == kEnd) return absl::OkStatus();
200               return Continue{};
201             });
202       }),
203       ThreadWakeupScheduler(), [&done2](absl::Status status) {
204         EXPECT_TRUE(status.ok()) << status.ToString();
205         done2.Notify();
206       });
207   for (uint64_t i = 0; i < 1000000; i++) {
208     observable.Set(i);
209   }
210   observable.Set(kEnd);
211   done1.WaitForNotification();
212   done2.WaitForNotification();
213   ASSERT_GE(values1.size(), 1);
214   ASSERT_GE(values2.size(), 1);
215   EXPECT_EQ(values1.back(), kEnd);
216   EXPECT_EQ(values2.back(), kEnd);
217 }
218 
219 }  // namespace
220 }  // namespace grpc_core
221 
main(int argc,char ** argv)222 int main(int argc, char** argv) {
223   gpr_log_verbosity_init();
224   ::testing::InitGoogleTest(&argc, argv);
225   return RUN_ALL_TESTS();
226 }
227