1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/ext/transport/inproc/inproc_transport.h"
16
17 #include <grpc/grpc.h>
18 #include <grpc/support/port_platform.h>
19
20 #include <atomic>
21 #include <memory>
22
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "src/core/config/core_configuration.h"
27 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
28 #include "src/core/lib/event_engine/event_engine_context.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/promise/promise.h"
31 #include "src/core/lib/promise/try_seq.h"
32 #include "src/core/lib/resource_quota/resource_quota.h"
33 #include "src/core/lib/surface/channel_create.h"
34 #include "src/core/lib/transport/metadata.h"
35 #include "src/core/lib/transport/transport.h"
36 #include "src/core/server/server.h"
37 #include "src/core/util/crash.h"
38 #include "src/core/util/debug_location.h"
39
40 namespace grpc_core {
41
42 namespace {
43 class InprocClientTransport;
44
45 class InprocServerTransport final : public ServerTransport {
46 public:
InprocServerTransport(const ChannelArgs & args)47 explicit InprocServerTransport(const ChannelArgs& args)
48 : event_engine_(
49 args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
50 call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
51 args.GetObject<ResourceQuota>()
52 ->memory_quota()
53 ->CreateMemoryAllocator("inproc_server"),
54 1024)) {}
55
SetCallDestination(RefCountedPtr<UnstartedCallDestination> unstarted_call_handler)56 void SetCallDestination(
57 RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) override {
58 unstarted_call_handler_ = unstarted_call_handler;
59 ConnectionState expect = ConnectionState::kInitial;
60 state_.compare_exchange_strong(expect, ConnectionState::kReady,
61 std::memory_order_acq_rel,
62 std::memory_order_acquire);
63 connected_state()->SetReady();
64 }
65
Orphan()66 void Orphan() override {
67 GRPC_TRACE_LOG(inproc, INFO) << "InprocServerTransport::Orphan(): " << this;
68 Disconnect(absl::UnavailableError("Server transport closed"));
69 Unref();
70 }
71
filter_stack_transport()72 FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()73 ClientTransport* client_transport() override { return nullptr; }
server_transport()74 ServerTransport* server_transport() override { return this; }
GetTransportName() const75 absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)76 void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)77 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op * op)78 void PerformOp(grpc_transport_op* op) override {
79 GRPC_TRACE_LOG(inproc, INFO)
80 << "inproc server op: " << grpc_transport_op_string(op);
81 if (op->start_connectivity_watch != nullptr) {
82 connected_state()->AddWatcher(op->start_connectivity_watch_state,
83 std::move(op->start_connectivity_watch));
84 }
85 if (op->stop_connectivity_watch != nullptr) {
86 connected_state()->RemoveWatcher(op->stop_connectivity_watch);
87 }
88 if (op->set_accept_stream) {
89 Crash("set_accept_stream not supported on inproc transport");
90 }
91 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
92 }
93
Disconnect(absl::Status error)94 void Disconnect(absl::Status error) {
95 RefCountedPtr<ConnectedState> connected_state;
96 {
97 MutexLock lock(&connected_state_mu_);
98 connected_state = std::move(connected_state_);
99 }
100 if (connected_state == nullptr) return;
101 connected_state->Disconnect(std::move(error));
102 state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
103 }
104
AcceptCall(ClientMetadataHandle md)105 absl::StatusOr<CallInitiator> AcceptCall(ClientMetadataHandle md) {
106 switch (state_.load(std::memory_order_acquire)) {
107 case ConnectionState::kInitial:
108 return absl::InternalError(
109 "inproc transport hasn't started accepting calls");
110 case ConnectionState::kDisconnected:
111 return absl::UnavailableError("inproc transport is disconnected");
112 case ConnectionState::kReady:
113 break;
114 }
115 auto arena = call_arena_allocator_->MakeArena();
116 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
117 event_engine_.get());
118 auto server_call = MakeCallPair(std::move(md), std::move(arena));
119 unstarted_call_handler_->StartCall(std::move(server_call.handler));
120 return std::move(server_call.initiator);
121 }
122
123 OrphanablePtr<InprocClientTransport> MakeClientTransport();
124
125 class ConnectedState : public RefCounted<ConnectedState> {
126 public:
~ConnectedState()127 ~ConnectedState() override {
128 state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
129 "inproc transport disconnected");
130 }
131
SetReady()132 void SetReady() {
133 MutexLock lock(&state_tracker_mu_);
134 state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
135 "accept function set");
136 }
137
Disconnect(absl::Status error)138 void Disconnect(absl::Status error) {
139 disconnect_error_ = std::move(error);
140 }
141
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)142 void AddWatcher(grpc_connectivity_state initial_state,
143 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
144 MutexLock lock(&state_tracker_mu_);
145 state_tracker_.AddWatcher(initial_state, std::move(watcher));
146 }
147
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)148 void RemoveWatcher(ConnectivityStateWatcherInterface* watcher) {
149 MutexLock lock(&state_tracker_mu_);
150 state_tracker_.RemoveWatcher(watcher);
151 }
152
153 private:
154 absl::Status disconnect_error_;
155 Mutex state_tracker_mu_;
ABSL_GUARDED_BY(state_tracker_mu_)156 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
157 "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
158 };
159
connected_state()160 RefCountedPtr<ConnectedState> connected_state() {
161 MutexLock lock(&connected_state_mu_);
162 return connected_state_;
163 }
164
165 private:
166 enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
167
168 std::atomic<ConnectionState> state_{ConnectionState::kInitial};
169 RefCountedPtr<UnstartedCallDestination> unstarted_call_handler_;
170 Mutex connected_state_mu_;
171 RefCountedPtr<ConnectedState> connected_state_
172 ABSL_GUARDED_BY(connected_state_mu_) = MakeRefCounted<ConnectedState>();
173 const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
174 event_engine_;
175 const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
176 };
177
178 class InprocClientTransport final : public ClientTransport {
179 public:
InprocClientTransport(RefCountedPtr<InprocServerTransport> server_transport)180 explicit InprocClientTransport(
181 RefCountedPtr<InprocServerTransport> server_transport)
182 : server_transport_(std::move(server_transport)) {}
183
StartCall(CallHandler child_call_handler)184 void StartCall(CallHandler child_call_handler) override {
185 child_call_handler.SpawnGuarded(
186 "pull_initial_metadata",
187 TrySeq(child_call_handler.PullClientInitialMetadata(),
188 [server_transport = server_transport_,
189 connected_state = server_transport_->connected_state(),
190 child_call_handler](ClientMetadataHandle md) mutable {
191 auto server_call_initiator =
192 server_transport->AcceptCall(std::move(md));
193 if (!server_call_initiator.ok()) {
194 return server_call_initiator.status();
195 }
196 ForwardCall(
197 child_call_handler, std::move(*server_call_initiator),
198 [connected_state =
199 std::move(connected_state)](ServerMetadata& md) {
200 md.Set(GrpcStatusFromWire(), true);
201 });
202 return absl::OkStatus();
203 }));
204 }
205
Orphan()206 void Orphan() override {
207 GRPC_TRACE_LOG(inproc, INFO) << "InprocClientTransport::Orphan(): " << this;
208 Unref();
209 }
210
filter_stack_transport()211 FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()212 ClientTransport* client_transport() override { return this; }
server_transport()213 ServerTransport* server_transport() override { return nullptr; }
GetTransportName() const214 absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)215 void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)216 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op *)217 void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
218
219 private:
~InprocClientTransport()220 ~InprocClientTransport() override {
221 server_transport_->Disconnect(
222 absl::UnavailableError("Client transport closed"));
223 }
224
225 const RefCountedPtr<InprocServerTransport> server_transport_;
226 };
227
UsePromiseBasedTransport(const ChannelArgs & channel_args)228 bool UsePromiseBasedTransport(const ChannelArgs& channel_args) {
229 return channel_args
230 .GetBool("grpc.experimental.promise_based_inproc_transport")
231 .value_or(IsPromiseBasedInprocTransportEnabled());
232 }
233
234 OrphanablePtr<InprocClientTransport>
MakeClientTransport()235 InprocServerTransport::MakeClientTransport() {
236 return MakeOrphanable<InprocClientTransport>(
237 RefAsSubclass<InprocServerTransport>());
238 }
239
MakeLameChannel(absl::string_view why,absl::Status error)240 RefCountedPtr<Channel> MakeLameChannel(absl::string_view why,
241 absl::Status error) {
242 LOG(ERROR) << why << ": " << error.message();
243 intptr_t integer;
244 grpc_status_code status = GRPC_STATUS_INTERNAL;
245 if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
246 status = static_cast<grpc_status_code>(integer);
247 }
248 return RefCountedPtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
249 nullptr, status, std::string(why).c_str())));
250 }
251
MakeInprocChannel(Server * server,ChannelArgs client_channel_args)252 RefCountedPtr<Channel> MakeInprocChannel(Server* server,
253 ChannelArgs client_channel_args) {
254 auto transports = MakeInProcessTransportPair(server->channel_args());
255 auto client_transport = std::move(transports.first);
256 auto server_transport = std::move(transports.second);
257 auto error =
258 server->SetupTransport(server_transport.get(), nullptr,
259 server->channel_args()
260 .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
261 .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS),
262 nullptr);
263 if (!error.ok()) {
264 return MakeLameChannel("Failed to create server channel", std::move(error));
265 }
266 std::ignore = server_transport.release(); // consumed by SetupTransport
267 auto channel = ChannelCreate(
268 "inproc",
269 client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority")
270 .Set(GRPC_ARG_USE_V3_STACK, true),
271 GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
272 if (!channel.ok()) {
273 return MakeLameChannel("Failed to create client channel", channel.status());
274 }
275 return std::move(*channel);
276 }
277 } // namespace
278
279 std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair(const ChannelArgs & server_channel_args)280 MakeInProcessTransportPair(const ChannelArgs& server_channel_args) {
281 auto server_transport =
282 MakeOrphanable<InprocServerTransport>(server_channel_args);
283 auto client_transport = server_transport->MakeClientTransport();
284 return std::make_pair(std::move(client_transport),
285 std::move(server_transport));
286 }
287
288 } // namespace grpc_core
289
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void * reserved)290 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
291 const grpc_channel_args* args,
292 void* reserved) {
293 grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
294 grpc_core::ExecCtx exec_ctx;
295 const auto channel_args = grpc_core::CoreConfiguration::Get()
296 .channel_args_preconditioning()
297 .PreconditionChannelArgs(args);
298 if (!grpc_core::UsePromiseBasedTransport(channel_args)) {
299 return grpc_legacy_inproc_channel_create(server, args, reserved);
300 }
301 return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
302 channel_args)
303 .release()
304 ->c_ptr();
305 }
306