1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/sleep.h"
16
17 #include <grpc/grpc.h>
18
19 #include <chrono>
20 #include <cstddef>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24
25 #include "absl/log/log.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "src/core/lib/event_engine/default_event_engine.h"
29 #include "src/core/lib/event_engine/event_engine_context.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
32 #include "src/core/lib/promise/race.h"
33 #include "src/core/lib/resource_quota/arena.h"
34 #include "src/core/util/notification.h"
35 #include "src/core/util/orphanable.h"
36 #include "test/core/event_engine/mock_event_engine.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38
39 using grpc_event_engine::experimental::EventEngine;
40 using grpc_event_engine::experimental::GetDefaultEventEngine;
41 using grpc_event_engine::experimental::MockEventEngine;
42 using testing::_;
43 using testing::DoAll;
44 using testing::Matcher;
45 using testing::Mock;
46 using testing::Return;
47 using testing::SaveArg;
48 using testing::StrictMock;
49
50 namespace grpc_core {
51 namespace {
52
ArenaWithEventEngine(EventEngine * ee)53 RefCountedPtr<Arena> ArenaWithEventEngine(EventEngine* ee) {
54 auto arena = SimpleArenaAllocator()->MakeArena();
55 arena->SetContext<grpc_event_engine::experimental::EventEngine>(ee);
56 return arena;
57 }
58
TEST(Sleep,Zzzz)59 TEST(Sleep, Zzzz) {
60 ExecCtx exec_ctx;
61 Notification done;
62 Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
63 auto engine = GetDefaultEventEngine();
64 // Sleep for one second then set done to true.
65 auto activity = MakeActivity(
66 Sleep(done_time), InlineWakeupScheduler(),
67 [&done](absl::Status r) {
68 EXPECT_EQ(r, absl::OkStatus());
69 done.Notify();
70 },
71 ArenaWithEventEngine(engine.get()));
72 done.WaitForNotification();
73 exec_ctx.InvalidateNow();
74 EXPECT_GE(Timestamp::Now(), done_time);
75 }
76
TEST(Sleep,OverlyEagerEventEngine)77 TEST(Sleep, OverlyEagerEventEngine) {
78 StrictMock<MockEventEngine> mock_event_engine;
79
80 ExecCtx exec_ctx;
81 bool done = false;
82 // Schedule a sleep for a very long time.
83 Timestamp done_time = Timestamp::Now() + Duration::Seconds(1e6);
84 EventEngine::Closure* wakeup = nullptr;
85 EXPECT_CALL(mock_event_engine, RunAfter(_, Matcher<EventEngine::Closure*>(_)))
86 .WillOnce(
87 DoAll(SaveArg<1>(&wakeup), Return(EventEngine::TaskHandle{42, 123})));
88 auto activity = MakeActivity(
89 Sleep(done_time), InlineWakeupScheduler(),
90 [&done](absl::Status r) {
91 EXPECT_EQ(r, absl::OkStatus());
92 done = true;
93 },
94 ArenaWithEventEngine(static_cast<EventEngine*>(&mock_event_engine)));
95 Mock::VerifyAndClearExpectations(&mock_event_engine);
96 EXPECT_NE(wakeup, nullptr);
97 EXPECT_FALSE(done);
98 // Schedule the wakeup instantaneously - It won't have passed the scheduled
99 // time yet, but sleep should believe the EventEngine.
100 wakeup->Run();
101 EXPECT_TRUE(done);
102 }
103
TEST(Sleep,AlreadyDone)104 TEST(Sleep, AlreadyDone) {
105 ExecCtx exec_ctx;
106 Notification done;
107 Timestamp done_time = Timestamp::Now() - Duration::Seconds(1);
108 auto engine = GetDefaultEventEngine();
109 // Sleep for no time at all then set done to true.
110 auto activity = MakeActivity(
111 Sleep(done_time), InlineWakeupScheduler(),
112 [&done](absl::Status r) {
113 EXPECT_EQ(r, absl::OkStatus());
114 done.Notify();
115 },
116 ArenaWithEventEngine(engine.get()));
117 done.WaitForNotification();
118 }
119
TEST(Sleep,Cancel)120 TEST(Sleep, Cancel) {
121 ExecCtx exec_ctx;
122 Notification done;
123 Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
124 auto engine = GetDefaultEventEngine();
125 // Sleep for one second but race it to complete immediately
126 auto activity = MakeActivity(
127 Race(Sleep(done_time), [] { return absl::CancelledError(); }),
128 InlineWakeupScheduler(),
129 [&done](absl::Status r) {
130 EXPECT_EQ(r, absl::CancelledError());
131 done.Notify();
132 },
133 ArenaWithEventEngine(engine.get()));
134 done.WaitForNotification();
135 exec_ctx.InvalidateNow();
136 EXPECT_LT(Timestamp::Now(), done_time);
137 }
138
TEST(Sleep,MoveSemantics)139 TEST(Sleep, MoveSemantics) {
140 // ASAN should help determine if there are any memory leaks here
141 ExecCtx exec_ctx;
142 Notification done;
143 Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111);
144 Sleep donor(done_time);
145 Sleep sleeper = std::move(donor);
146 auto engine = GetDefaultEventEngine();
147 auto activity = MakeActivity(
148 std::move(sleeper), InlineWakeupScheduler(),
149 [&done](absl::Status r) {
150 EXPECT_EQ(r, absl::OkStatus());
151 done.Notify();
152 },
153 ArenaWithEventEngine(engine.get()));
154 done.WaitForNotification();
155 exec_ctx.InvalidateNow();
156 EXPECT_GE(Timestamp::Now(), done_time);
157 }
158
TEST(Sleep,StressTest)159 TEST(Sleep, StressTest) {
160 // Kick off a bunch sleeps for one second.
161 static const int kNumActivities = 100000;
162 ExecCtx exec_ctx;
163 std::vector<std::shared_ptr<Notification>> notifications;
164 std::vector<ActivityPtr> activities;
165 auto engine = GetDefaultEventEngine();
166 LOG(INFO) << "Starting " << kNumActivities << " sleeps for 1sec";
167 for (int i = 0; i < kNumActivities; i++) {
168 auto notification = std::make_shared<Notification>();
169 auto activity = MakeActivity(
170 Sleep(Timestamp::Now() + Duration::Seconds(1)),
171 ExecCtxWakeupScheduler(),
172 [notification](absl::Status /*r*/) { notification->Notify(); },
173 ArenaWithEventEngine(engine.get()));
174 notifications.push_back(std::move(notification));
175 activities.push_back(std::move(activity));
176 }
177 LOG(INFO) << "Waiting for the first " << (kNumActivities / 2)
178 << " sleeps, whilst cancelling the other half";
179 for (size_t i = 0; i < kNumActivities / 2; i++) {
180 notifications[i]->WaitForNotification();
181 activities[i].reset();
182 activities[i + (kNumActivities / 2)].reset();
183 exec_ctx.Flush();
184 }
185 }
186
187 } // namespace
188 } // namespace grpc_core
189
main(int argc,char ** argv)190 int main(int argc, char** argv) {
191 ::testing::InitGoogleTest(&argc, argv);
192 grpc_init();
193 int r = RUN_ALL_TESTS();
194 grpc_shutdown();
195 return r;
196 }
197