• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/sleep.h"
16 
17 #include <grpc/grpc.h>
18 
19 #include <chrono>
20 #include <cstddef>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/log/log.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "src/core/lib/event_engine/default_event_engine.h"
29 #include "src/core/lib/event_engine/event_engine_context.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
32 #include "src/core/lib/promise/race.h"
33 #include "src/core/lib/resource_quota/arena.h"
34 #include "src/core/util/notification.h"
35 #include "src/core/util/orphanable.h"
36 #include "test/core/event_engine/mock_event_engine.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38 
39 using grpc_event_engine::experimental::EventEngine;
40 using grpc_event_engine::experimental::GetDefaultEventEngine;
41 using grpc_event_engine::experimental::MockEventEngine;
42 using testing::_;
43 using testing::DoAll;
44 using testing::Matcher;
45 using testing::Mock;
46 using testing::Return;
47 using testing::SaveArg;
48 using testing::StrictMock;
49 
50 namespace grpc_core {
51 namespace {
52 
ArenaWithEventEngine(EventEngine * ee)53 RefCountedPtr<Arena> ArenaWithEventEngine(EventEngine* ee) {
54   auto arena = SimpleArenaAllocator()->MakeArena();
55   arena->SetContext<grpc_event_engine::experimental::EventEngine>(ee);
56   return arena;
57 }
58 
TEST(Sleep,Zzzz)59 TEST(Sleep, Zzzz) {
60   ExecCtx exec_ctx;
61   Notification done;
62   Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
63   auto engine = GetDefaultEventEngine();
64   // Sleep for one second then set done to true.
65   auto activity = MakeActivity(
66       Sleep(done_time), InlineWakeupScheduler(),
67       [&done](absl::Status r) {
68         EXPECT_EQ(r, absl::OkStatus());
69         done.Notify();
70       },
71       ArenaWithEventEngine(engine.get()));
72   done.WaitForNotification();
73   exec_ctx.InvalidateNow();
74   EXPECT_GE(Timestamp::Now(), done_time);
75 }
76 
TEST(Sleep,OverlyEagerEventEngine)77 TEST(Sleep, OverlyEagerEventEngine) {
78   StrictMock<MockEventEngine> mock_event_engine;
79 
80   ExecCtx exec_ctx;
81   bool done = false;
82   // Schedule a sleep for a very long time.
83   Timestamp done_time = Timestamp::Now() + Duration::Seconds(1e6);
84   EventEngine::Closure* wakeup = nullptr;
85   EXPECT_CALL(mock_event_engine, RunAfter(_, Matcher<EventEngine::Closure*>(_)))
86       .WillOnce(
87           DoAll(SaveArg<1>(&wakeup), Return(EventEngine::TaskHandle{42, 123})));
88   auto activity = MakeActivity(
89       Sleep(done_time), InlineWakeupScheduler(),
90       [&done](absl::Status r) {
91         EXPECT_EQ(r, absl::OkStatus());
92         done = true;
93       },
94       ArenaWithEventEngine(static_cast<EventEngine*>(&mock_event_engine)));
95   Mock::VerifyAndClearExpectations(&mock_event_engine);
96   EXPECT_NE(wakeup, nullptr);
97   EXPECT_FALSE(done);
98   // Schedule the wakeup instantaneously - It won't have passed the scheduled
99   // time yet, but sleep should believe the EventEngine.
100   wakeup->Run();
101   EXPECT_TRUE(done);
102 }
103 
TEST(Sleep,AlreadyDone)104 TEST(Sleep, AlreadyDone) {
105   ExecCtx exec_ctx;
106   Notification done;
107   Timestamp done_time = Timestamp::Now() - Duration::Seconds(1);
108   auto engine = GetDefaultEventEngine();
109   // Sleep for no time at all then set done to true.
110   auto activity = MakeActivity(
111       Sleep(done_time), InlineWakeupScheduler(),
112       [&done](absl::Status r) {
113         EXPECT_EQ(r, absl::OkStatus());
114         done.Notify();
115       },
116       ArenaWithEventEngine(engine.get()));
117   done.WaitForNotification();
118 }
119 
TEST(Sleep,Cancel)120 TEST(Sleep, Cancel) {
121   ExecCtx exec_ctx;
122   Notification done;
123   Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
124   auto engine = GetDefaultEventEngine();
125   // Sleep for one second but race it to complete immediately
126   auto activity = MakeActivity(
127       Race(Sleep(done_time), [] { return absl::CancelledError(); }),
128       InlineWakeupScheduler(),
129       [&done](absl::Status r) {
130         EXPECT_EQ(r, absl::CancelledError());
131         done.Notify();
132       },
133       ArenaWithEventEngine(engine.get()));
134   done.WaitForNotification();
135   exec_ctx.InvalidateNow();
136   EXPECT_LT(Timestamp::Now(), done_time);
137 }
138 
TEST(Sleep,MoveSemantics)139 TEST(Sleep, MoveSemantics) {
140   // ASAN should help determine if there are any memory leaks here
141   ExecCtx exec_ctx;
142   Notification done;
143   Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111);
144   Sleep donor(done_time);
145   Sleep sleeper = std::move(donor);
146   auto engine = GetDefaultEventEngine();
147   auto activity = MakeActivity(
148       std::move(sleeper), InlineWakeupScheduler(),
149       [&done](absl::Status r) {
150         EXPECT_EQ(r, absl::OkStatus());
151         done.Notify();
152       },
153       ArenaWithEventEngine(engine.get()));
154   done.WaitForNotification();
155   exec_ctx.InvalidateNow();
156   EXPECT_GE(Timestamp::Now(), done_time);
157 }
158 
TEST(Sleep,StressTest)159 TEST(Sleep, StressTest) {
160   // Kick off a bunch sleeps for one second.
161   static const int kNumActivities = 100000;
162   ExecCtx exec_ctx;
163   std::vector<std::shared_ptr<Notification>> notifications;
164   std::vector<ActivityPtr> activities;
165   auto engine = GetDefaultEventEngine();
166   LOG(INFO) << "Starting " << kNumActivities << " sleeps for 1sec";
167   for (int i = 0; i < kNumActivities; i++) {
168     auto notification = std::make_shared<Notification>();
169     auto activity = MakeActivity(
170         Sleep(Timestamp::Now() + Duration::Seconds(1)),
171         ExecCtxWakeupScheduler(),
172         [notification](absl::Status /*r*/) { notification->Notify(); },
173         ArenaWithEventEngine(engine.get()));
174     notifications.push_back(std::move(notification));
175     activities.push_back(std::move(activity));
176   }
177   LOG(INFO) << "Waiting for the first " << (kNumActivities / 2)
178             << " sleeps, whilst cancelling the other half";
179   for (size_t i = 0; i < kNumActivities / 2; i++) {
180     notifications[i]->WaitForNotification();
181     activities[i].reset();
182     activities[i + (kNumActivities / 2)].reset();
183     exec_ctx.Flush();
184   }
185 }
186 
187 }  // namespace
188 }  // namespace grpc_core
189 
main(int argc,char ** argv)190 int main(int argc, char** argv) {
191   ::testing::InitGoogleTest(&argc, argv);
192   grpc_init();
193   int r = RUN_ALL_TESTS();
194   grpc_shutdown();
195   return r;
196 }
197