• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/default_event_engine.h"
15 
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/support/port_platform.h>
18 
19 #include <atomic>
20 #include <memory>
21 #include <utility>
22 
23 #include "absl/functional/any_invocable.h"
24 #include "src/core/config/core_configuration.h"
25 #include "src/core/lib/channel/channel_args.h"
26 #include "src/core/lib/debug/trace.h"
27 #include "src/core/lib/event_engine/default_event_engine_factory.h"
28 #include "src/core/util/debug_location.h"
29 #include "src/core/util/no_destruct.h"
30 #include "src/core/util/sync.h"
31 
32 #ifdef GRPC_MAXIMIZE_THREADYNESS
33 #include "src/core/lib/event_engine/thready_event_engine/thready_event_engine.h"  // IWYU pragma: keep
34 #endif
35 
36 namespace grpc_event_engine {
37 namespace experimental {
38 
39 namespace {
40 std::atomic<absl::AnyInvocable<std::unique_ptr<EventEngine>()>*>
41     g_event_engine_factory{nullptr};
42 grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
43 grpc_core::NoDestruct<std::weak_ptr<EventEngine>> g_event_engine;
44 }  // namespace
45 
SetEventEngineFactory(absl::AnyInvocable<std::unique_ptr<EventEngine> ()> factory)46 void SetEventEngineFactory(
47     absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory) {
48   delete g_event_engine_factory.exchange(
49       new absl::AnyInvocable<std::unique_ptr<EventEngine>()>(
50           std::move(factory)));
51   // Forget any previous EventEngines
52   grpc_core::MutexLock lock(&*g_mu);
53   g_event_engine->reset();
54 }
55 
EventEngineFactoryReset()56 void EventEngineFactoryReset() {
57   delete g_event_engine_factory.exchange(nullptr);
58   g_event_engine->reset();
59 }
60 
CreateEventEngineInner()61 std::unique_ptr<EventEngine> CreateEventEngineInner() {
62   if (auto* factory = g_event_engine_factory.load()) {
63     return (*factory)();
64   }
65   return DefaultEventEngineFactory();
66 }
67 
CreateEventEngine()68 std::unique_ptr<EventEngine> CreateEventEngine() {
69 #ifdef GRPC_MAXIMIZE_THREADYNESS
70   return std::make_unique<ThreadyEventEngine>(CreateEventEngineInner());
71 #else
72   return CreateEventEngineInner();
73 #endif
74 }
75 
GetDefaultEventEngine(grpc_core::SourceLocation location)76 std::shared_ptr<EventEngine> GetDefaultEventEngine(
77     grpc_core::SourceLocation location) {
78   grpc_core::MutexLock lock(&*g_mu);
79   if (std::shared_ptr<EventEngine> engine = g_event_engine->lock()) {
80     GRPC_TRACE_LOG(event_engine, INFO)
81         << "Returning existing EventEngine::" << engine.get()
82         << ". use_count:" << engine.use_count() << ". Called from " << location;
83     return engine;
84   }
85   std::shared_ptr<EventEngine> engine{CreateEventEngine()};
86   GRPC_TRACE_LOG(event_engine, INFO)
87       << "Created DefaultEventEngine::" << engine.get() << ". Called from "
88       << location;
89   *g_event_engine = engine;
90   return engine;
91 }
92 
93 namespace {
EnsureEventEngineInChannelArgs(grpc_core::ChannelArgs args)94 grpc_core::ChannelArgs EnsureEventEngineInChannelArgs(
95     grpc_core::ChannelArgs args) {
96   if (args.ContainsObject<EventEngine>()) return args;
97   return args.SetObject<EventEngine>(GetDefaultEventEngine());
98 }
99 }  // namespace
100 
RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder * builder)101 void RegisterEventEngineChannelArgPreconditioning(
102     grpc_core::CoreConfiguration::Builder* builder) {
103   builder->channel_args_preconditioning()->RegisterStage(
104       grpc_event_engine::experimental::EnsureEventEngineInChannelArgs);
105 }
106 
107 }  // namespace experimental
108 }  // namespace grpc_event_engine
109