1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/event_engine/endpoint_config.h>
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/event_engine/memory_allocator.h>
17 #include <grpc/grpc.h>
18 #include <grpc/support/port_platform.h>
19
20 #include <algorithm>
21 #include <memory>
22 #include <thread>
23 #include <vector>
24
25 #include "absl/functional/any_invocable.h"
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "test/core/test_util/test_config.h"
33
34 namespace {
35
36 using ::grpc_event_engine::experimental::EventEngine;
37 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
38
39 class DefaultEngineTest : public testing::Test {
40 protected:
41 // Does nothing, fills space that a nullptr could not
42 class FakeEventEngine : public EventEngine {
43 public:
44 FakeEventEngine() = default;
45 ~FakeEventEngine() override = default;
CreateListener(Listener::AcceptCallback,absl::AnyInvocable<void (absl::Status)>,const grpc_event_engine::experimental::EndpointConfig &,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>)46 absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
47 Listener::AcceptCallback /* on_accept */,
48 absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
49 const grpc_event_engine::experimental::EndpointConfig& /* config */,
50 std::unique_ptr<
51 grpc_event_engine::experimental::
52 MemoryAllocatorFactory> /* memory_allocator_factory */)
53 override {
54 return absl::UnimplementedError("test");
55 };
Connect(OnConnectCallback,const ResolvedAddress &,const grpc_event_engine::experimental::EndpointConfig &,grpc_event_engine::experimental::MemoryAllocator,Duration)56 ConnectionHandle Connect(
57 OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
58 const grpc_event_engine::experimental::EndpointConfig& /* args */,
59 grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */,
60 Duration /* timeout */) override {
61 return {-1, -1};
62 };
CancelConnect(ConnectionHandle)63 bool CancelConnect(ConnectionHandle /* handle */) override {
64 return false;
65 };
IsWorkerThread()66 bool IsWorkerThread() override { return false; };
GetDNSResolver(const DNSResolver::ResolverOptions &)67 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
68 const DNSResolver::ResolverOptions& /* options */) override {
69 return nullptr;
70 };
Run(Closure *)71 void Run(Closure* /* closure */) override {};
Run(absl::AnyInvocable<void ()>)72 void Run(absl::AnyInvocable<void()> /* closure */) override{};
RunAfter(Duration,Closure *)73 TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override {
74 return {-1, -1};
75 }
RunAfter(Duration,absl::AnyInvocable<void ()>)76 TaskHandle RunAfter(Duration /* when */,
77 absl::AnyInvocable<void()> /* closure */) override {
78 return {-1, -1};
79 }
Cancel(TaskHandle)80 bool Cancel(TaskHandle /* handle */) override { return false; };
81 };
82 };
83
TEST_F(DefaultEngineTest,SharedPtrGlobalEventEngineLifetimesAreValid)84 TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) {
85 int create_count = 0;
86 grpc_event_engine::experimental::SetEventEngineFactory([&create_count] {
87 ++create_count;
88 return std::make_unique<FakeEventEngine>();
89 });
90 std::shared_ptr<EventEngine> ee2;
91 {
92 std::shared_ptr<EventEngine> ee1 = GetDefaultEventEngine();
93 ASSERT_EQ(1, create_count);
94 ee2 = GetDefaultEventEngine();
95 ASSERT_EQ(1, create_count);
96 ASSERT_EQ(ee2.use_count(), 2);
97 }
98 // Ensure the first shared_ptr did not delete the global
99 ASSERT_TRUE(ee2.unique());
100 ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN
101 // destroy the global engine via the last shared_ptr, and create a new one.
102 ee2.reset();
103 ee2 = GetDefaultEventEngine();
104 ASSERT_EQ(2, create_count);
105 ASSERT_TRUE(ee2.unique());
106 grpc_event_engine::experimental::EventEngineFactoryReset();
107 }
108
TEST_F(DefaultEngineTest,StressTestSharedPtr)109 TEST_F(DefaultEngineTest, StressTestSharedPtr) {
110 constexpr int thread_count = 13;
111 constexpr absl::Duration spin_time = absl::Seconds(3);
112 std::vector<std::thread> threads;
113 threads.reserve(thread_count);
114 for (int i = 0; i < thread_count; i++) {
115 threads.emplace_back([&spin_time] {
116 auto timeout = absl::Now() + spin_time;
117 do {
118 GetDefaultEventEngine().reset();
119 } while (timeout > absl::Now());
120 });
121 }
122 for (auto& thd : threads) {
123 thd.join();
124 }
125 }
126 } // namespace
127
main(int argc,char ** argv)128 int main(int argc, char** argv) {
129 testing::InitGoogleTest(&argc, argv);
130 grpc::testing::TestEnvironment env(&argc, argv);
131 grpc_init();
132 auto result = RUN_ALL_TESTS();
133 grpc_shutdown();
134 return result;
135 }
136