• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/event_engine/endpoint_config.h>
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/event_engine/memory_allocator.h>
17 #include <grpc/grpc.h>
18 #include <grpc/support/port_platform.h>
19 
20 #include <algorithm>
21 #include <memory>
22 #include <thread>
23 #include <vector>
24 
25 #include "absl/functional/any_invocable.h"
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/time/clock.h"
29 #include "absl/time/time.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "test/core/test_util/test_config.h"
33 
34 namespace {
35 
36 using ::grpc_event_engine::experimental::EventEngine;
37 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
38 
39 class DefaultEngineTest : public testing::Test {
40  protected:
41   // Does nothing, fills space that a nullptr could not
42   class FakeEventEngine : public EventEngine {
43    public:
44     FakeEventEngine() = default;
45     ~FakeEventEngine() override = default;
CreateListener(Listener::AcceptCallback,absl::AnyInvocable<void (absl::Status)>,const grpc_event_engine::experimental::EndpointConfig &,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>)46     absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
47         Listener::AcceptCallback /* on_accept */,
48         absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
49         const grpc_event_engine::experimental::EndpointConfig& /* config */,
50         std::unique_ptr<
51             grpc_event_engine::experimental::
52                 MemoryAllocatorFactory> /* memory_allocator_factory */)
53         override {
54       return absl::UnimplementedError("test");
55     };
Connect(OnConnectCallback,const ResolvedAddress &,const grpc_event_engine::experimental::EndpointConfig &,grpc_event_engine::experimental::MemoryAllocator,Duration)56     ConnectionHandle Connect(
57         OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
58         const grpc_event_engine::experimental::EndpointConfig& /* args */,
59         grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */,
60         Duration /* timeout */) override {
61       return {-1, -1};
62     };
CancelConnect(ConnectionHandle)63     bool CancelConnect(ConnectionHandle /* handle */) override {
64       return false;
65     };
IsWorkerThread()66     bool IsWorkerThread() override { return false; };
GetDNSResolver(const DNSResolver::ResolverOptions &)67     absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
68         const DNSResolver::ResolverOptions& /* options */) override {
69       return nullptr;
70     };
Run(Closure *)71     void Run(Closure* /* closure */) override {};
Run(absl::AnyInvocable<void ()>)72     void Run(absl::AnyInvocable<void()> /* closure */) override{};
RunAfter(Duration,Closure *)73     TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override {
74       return {-1, -1};
75     }
RunAfter(Duration,absl::AnyInvocable<void ()>)76     TaskHandle RunAfter(Duration /* when */,
77                         absl::AnyInvocable<void()> /* closure */) override {
78       return {-1, -1};
79     }
Cancel(TaskHandle)80     bool Cancel(TaskHandle /* handle */) override { return false; };
81   };
82 };
83 
TEST_F(DefaultEngineTest,SharedPtrGlobalEventEngineLifetimesAreValid)84 TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) {
85   int create_count = 0;
86   grpc_event_engine::experimental::SetEventEngineFactory([&create_count] {
87     ++create_count;
88     return std::make_unique<FakeEventEngine>();
89   });
90   std::shared_ptr<EventEngine> ee2;
91   {
92     std::shared_ptr<EventEngine> ee1 = GetDefaultEventEngine();
93     ASSERT_EQ(1, create_count);
94     ee2 = GetDefaultEventEngine();
95     ASSERT_EQ(1, create_count);
96     ASSERT_EQ(ee2.use_count(), 2);
97   }
98   // Ensure the first shared_ptr did not delete the global
99   ASSERT_TRUE(ee2.unique());
100   ASSERT_FALSE(ee2->IsWorkerThread());  // useful for ASAN
101   // destroy the global engine via the last shared_ptr, and create a new one.
102   ee2.reset();
103   ee2 = GetDefaultEventEngine();
104   ASSERT_EQ(2, create_count);
105   ASSERT_TRUE(ee2.unique());
106   grpc_event_engine::experimental::EventEngineFactoryReset();
107 }
108 
TEST_F(DefaultEngineTest,StressTestSharedPtr)109 TEST_F(DefaultEngineTest, StressTestSharedPtr) {
110   constexpr int thread_count = 13;
111   constexpr absl::Duration spin_time = absl::Seconds(3);
112   std::vector<std::thread> threads;
113   threads.reserve(thread_count);
114   for (int i = 0; i < thread_count; i++) {
115     threads.emplace_back([&spin_time] {
116       auto timeout = absl::Now() + spin_time;
117       do {
118         GetDefaultEventEngine().reset();
119       } while (timeout > absl::Now());
120     });
121   }
122   for (auto& thd : threads) {
123     thd.join();
124   }
125 }
126 }  // namespace
127 
main(int argc,char ** argv)128 int main(int argc, char** argv) {
129   testing::InitGoogleTest(&argc, argv);
130   grpc::testing::TestEnvironment env(&argc, argv);
131   grpc_init();
132   auto result = RUN_ALL_TESTS();
133   grpc_shutdown();
134   return result;
135 }
136