• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/ext/transport/inproc/inproc_transport.h"
16 
17 #include <grpc/grpc.h>
18 #include <grpc/support/port_platform.h>
19 
20 #include <atomic>
21 #include <memory>
22 
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/status/status.h"
26 #include "src/core/config/core_configuration.h"
27 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
28 #include "src/core/lib/event_engine/event_engine_context.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/promise/promise.h"
31 #include "src/core/lib/promise/try_seq.h"
32 #include "src/core/lib/resource_quota/resource_quota.h"
33 #include "src/core/lib/surface/channel_create.h"
34 #include "src/core/lib/transport/metadata.h"
35 #include "src/core/lib/transport/transport.h"
36 #include "src/core/server/server.h"
37 #include "src/core/util/crash.h"
38 #include "src/core/util/debug_location.h"
39 
40 namespace grpc_core {
41 
42 namespace {
43 class InprocClientTransport;
44 
45 class InprocServerTransport final : public ServerTransport {
46  public:
InprocServerTransport(const ChannelArgs & args)47   explicit InprocServerTransport(const ChannelArgs& args)
48       : event_engine_(
49             args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
50         call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
51             args.GetObject<ResourceQuota>()
52                 ->memory_quota()
53                 ->CreateMemoryAllocator("inproc_server"),
54             1024)) {}
55 
SetCallDestination(RefCountedPtr<UnstartedCallDestination> unstarted_call_handler)56   void SetCallDestination(
57       RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) override {
58     unstarted_call_handler_ = unstarted_call_handler;
59     ConnectionState expect = ConnectionState::kInitial;
60     state_.compare_exchange_strong(expect, ConnectionState::kReady,
61                                    std::memory_order_acq_rel,
62                                    std::memory_order_acquire);
63     connected_state()->SetReady();
64   }
65 
Orphan()66   void Orphan() override {
67     GRPC_TRACE_LOG(inproc, INFO) << "InprocServerTransport::Orphan(): " << this;
68     Disconnect(absl::UnavailableError("Server transport closed"));
69     Unref();
70   }
71 
filter_stack_transport()72   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()73   ClientTransport* client_transport() override { return nullptr; }
server_transport()74   ServerTransport* server_transport() override { return this; }
GetTransportName() const75   absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)76   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)77   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op * op)78   void PerformOp(grpc_transport_op* op) override {
79     GRPC_TRACE_LOG(inproc, INFO)
80         << "inproc server op: " << grpc_transport_op_string(op);
81     if (op->start_connectivity_watch != nullptr) {
82       connected_state()->AddWatcher(op->start_connectivity_watch_state,
83                                     std::move(op->start_connectivity_watch));
84     }
85     if (op->stop_connectivity_watch != nullptr) {
86       connected_state()->RemoveWatcher(op->stop_connectivity_watch);
87     }
88     if (op->set_accept_stream) {
89       Crash("set_accept_stream not supported on inproc transport");
90     }
91     ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
92   }
93 
Disconnect(absl::Status error)94   void Disconnect(absl::Status error) {
95     RefCountedPtr<ConnectedState> connected_state;
96     {
97       MutexLock lock(&connected_state_mu_);
98       connected_state = std::move(connected_state_);
99     }
100     if (connected_state == nullptr) return;
101     connected_state->Disconnect(std::move(error));
102     state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
103   }
104 
AcceptCall(ClientMetadataHandle md)105   absl::StatusOr<CallInitiator> AcceptCall(ClientMetadataHandle md) {
106     switch (state_.load(std::memory_order_acquire)) {
107       case ConnectionState::kInitial:
108         return absl::InternalError(
109             "inproc transport hasn't started accepting calls");
110       case ConnectionState::kDisconnected:
111         return absl::UnavailableError("inproc transport is disconnected");
112       case ConnectionState::kReady:
113         break;
114     }
115     auto arena = call_arena_allocator_->MakeArena();
116     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
117         event_engine_.get());
118     auto server_call = MakeCallPair(std::move(md), std::move(arena));
119     unstarted_call_handler_->StartCall(std::move(server_call.handler));
120     return std::move(server_call.initiator);
121   }
122 
123   OrphanablePtr<InprocClientTransport> MakeClientTransport();
124 
125   class ConnectedState : public RefCounted<ConnectedState> {
126    public:
~ConnectedState()127     ~ConnectedState() override {
128       state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
129                               "inproc transport disconnected");
130     }
131 
SetReady()132     void SetReady() {
133       MutexLock lock(&state_tracker_mu_);
134       state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
135                               "accept function set");
136     }
137 
Disconnect(absl::Status error)138     void Disconnect(absl::Status error) {
139       disconnect_error_ = std::move(error);
140     }
141 
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)142     void AddWatcher(grpc_connectivity_state initial_state,
143                     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
144       MutexLock lock(&state_tracker_mu_);
145       state_tracker_.AddWatcher(initial_state, std::move(watcher));
146     }
147 
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)148     void RemoveWatcher(ConnectivityStateWatcherInterface* watcher) {
149       MutexLock lock(&state_tracker_mu_);
150       state_tracker_.RemoveWatcher(watcher);
151     }
152 
153    private:
154     absl::Status disconnect_error_;
155     Mutex state_tracker_mu_;
ABSL_GUARDED_BY(state_tracker_mu_)156     ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
157         "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
158   };
159 
connected_state()160   RefCountedPtr<ConnectedState> connected_state() {
161     MutexLock lock(&connected_state_mu_);
162     return connected_state_;
163   }
164 
165  private:
166   enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
167 
168   std::atomic<ConnectionState> state_{ConnectionState::kInitial};
169   RefCountedPtr<UnstartedCallDestination> unstarted_call_handler_;
170   Mutex connected_state_mu_;
171   RefCountedPtr<ConnectedState> connected_state_
172       ABSL_GUARDED_BY(connected_state_mu_) = MakeRefCounted<ConnectedState>();
173   const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
174       event_engine_;
175   const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
176 };
177 
178 class InprocClientTransport final : public ClientTransport {
179  public:
InprocClientTransport(RefCountedPtr<InprocServerTransport> server_transport)180   explicit InprocClientTransport(
181       RefCountedPtr<InprocServerTransport> server_transport)
182       : server_transport_(std::move(server_transport)) {}
183 
StartCall(CallHandler child_call_handler)184   void StartCall(CallHandler child_call_handler) override {
185     child_call_handler.SpawnGuarded(
186         "pull_initial_metadata",
187         TrySeq(child_call_handler.PullClientInitialMetadata(),
188                [server_transport = server_transport_,
189                 connected_state = server_transport_->connected_state(),
190                 child_call_handler](ClientMetadataHandle md) mutable {
191                  auto server_call_initiator =
192                      server_transport->AcceptCall(std::move(md));
193                  if (!server_call_initiator.ok()) {
194                    return server_call_initiator.status();
195                  }
196                  ForwardCall(
197                      child_call_handler, std::move(*server_call_initiator),
198                      [connected_state =
199                           std::move(connected_state)](ServerMetadata& md) {
200                        md.Set(GrpcStatusFromWire(), true);
201                      });
202                  return absl::OkStatus();
203                }));
204   }
205 
Orphan()206   void Orphan() override {
207     GRPC_TRACE_LOG(inproc, INFO) << "InprocClientTransport::Orphan(): " << this;
208     Unref();
209   }
210 
filter_stack_transport()211   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()212   ClientTransport* client_transport() override { return this; }
server_transport()213   ServerTransport* server_transport() override { return nullptr; }
GetTransportName() const214   absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)215   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)216   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op *)217   void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
218 
219  private:
~InprocClientTransport()220   ~InprocClientTransport() override {
221     server_transport_->Disconnect(
222         absl::UnavailableError("Client transport closed"));
223   }
224 
225   const RefCountedPtr<InprocServerTransport> server_transport_;
226 };
227 
UsePromiseBasedTransport(const ChannelArgs & channel_args)228 bool UsePromiseBasedTransport(const ChannelArgs& channel_args) {
229   return channel_args
230       .GetBool("grpc.experimental.promise_based_inproc_transport")
231       .value_or(IsPromiseBasedInprocTransportEnabled());
232 }
233 
234 OrphanablePtr<InprocClientTransport>
MakeClientTransport()235 InprocServerTransport::MakeClientTransport() {
236   return MakeOrphanable<InprocClientTransport>(
237       RefAsSubclass<InprocServerTransport>());
238 }
239 
MakeLameChannel(absl::string_view why,absl::Status error)240 RefCountedPtr<Channel> MakeLameChannel(absl::string_view why,
241                                        absl::Status error) {
242   LOG(ERROR) << why << ": " << error.message();
243   intptr_t integer;
244   grpc_status_code status = GRPC_STATUS_INTERNAL;
245   if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
246     status = static_cast<grpc_status_code>(integer);
247   }
248   return RefCountedPtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
249       nullptr, status, std::string(why).c_str())));
250 }
251 
MakeInprocChannel(Server * server,ChannelArgs client_channel_args)252 RefCountedPtr<Channel> MakeInprocChannel(Server* server,
253                                          ChannelArgs client_channel_args) {
254   auto transports = MakeInProcessTransportPair(server->channel_args());
255   auto client_transport = std::move(transports.first);
256   auto server_transport = std::move(transports.second);
257   auto error =
258       server->SetupTransport(server_transport.get(), nullptr,
259                              server->channel_args()
260                                  .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
261                                  .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS),
262                              nullptr);
263   if (!error.ok()) {
264     return MakeLameChannel("Failed to create server channel", std::move(error));
265   }
266   std::ignore = server_transport.release();  // consumed by SetupTransport
267   auto channel = ChannelCreate(
268       "inproc",
269       client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority")
270           .Set(GRPC_ARG_USE_V3_STACK, true),
271       GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
272   if (!channel.ok()) {
273     return MakeLameChannel("Failed to create client channel", channel.status());
274   }
275   return std::move(*channel);
276 }
277 }  // namespace
278 
279 std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair(const ChannelArgs & server_channel_args)280 MakeInProcessTransportPair(const ChannelArgs& server_channel_args) {
281   auto server_transport =
282       MakeOrphanable<InprocServerTransport>(server_channel_args);
283   auto client_transport = server_transport->MakeClientTransport();
284   return std::make_pair(std::move(client_transport),
285                         std::move(server_transport));
286 }
287 
288 }  // namespace grpc_core
289 
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void * reserved)290 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
291                                          const grpc_channel_args* args,
292                                          void* reserved) {
293   grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
294   grpc_core::ExecCtx exec_ctx;
295   const auto channel_args = grpc_core::CoreConfiguration::Get()
296                                 .channel_args_preconditioning()
297                                 .PreconditionChannelArgs(args);
298   if (!grpc_core::UsePromiseBasedTransport(channel_args)) {
299     return grpc_legacy_inproc_channel_create(server, args, reserved);
300   }
301   return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
302                                       channel_args)
303       .release()
304       ->c_ptr();
305 }
306