1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
19
20 #include <grpc/support/port_platform.h>
21
22 #include <stddef.h>
23 #include <stdint.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <functional>
28 #include <list>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/base/thread_annotations.h"
35 #include "absl/container/flat_hash_map.h"
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/hash/hash.h"
38 #include "absl/random/random.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42
43 #include <grpc/grpc.h>
44 #include <grpc/slice.h>
45 #include <grpc/support/time.h>
46
47 #include "src/core/lib/backoff/random_early_detection.h"
48 #include "src/core/lib/channel/call_tracer.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_fwd.h"
51 #include "src/core/lib/channel/channel_stack.h"
52 #include "src/core/lib/channel/channelz.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/gprpp/cpp_impl_of.h"
55 #include "src/core/lib/gprpp/dual_ref_counted.h"
56 #include "src/core/lib/gprpp/orphanable.h"
57 #include "src/core/lib/gprpp/ref_counted_ptr.h"
58 #include "src/core/lib/gprpp/sync.h"
59 #include "src/core/lib/gprpp/time.h"
60 #include "src/core/lib/iomgr/call_combiner.h"
61 #include "src/core/lib/iomgr/closure.h"
62 #include "src/core/lib/iomgr/endpoint.h"
63 #include "src/core/lib/iomgr/error.h"
64 #include "src/core/lib/iomgr/iomgr_fwd.h"
65 #include "src/core/lib/promise/arena_promise.h"
66 #include "src/core/lib/slice/slice.h"
67 #include "src/core/lib/surface/channel.h"
68 #include "src/core/lib/surface/completion_queue.h"
69 #include "src/core/lib/surface/server_interface.h"
70 #include "src/core/lib/transport/metadata_batch.h"
71 #include "src/core/lib/transport/transport.h"
72
73 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests"
74 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \
75 "grpc.server.max_pending_requests_hard_limit"
76
77 namespace grpc_core {
78
79 extern TraceFlag grpc_server_channel_trace;
80
81 class Server : public ServerInterface,
82 public InternallyRefCounted<Server>,
83 public CppImplOf<Server, grpc_server> {
84 public:
85 // Filter vtable.
86 static const grpc_channel_filter kServerTopFilter;
87
88 // Opaque type used for registered methods.
89 struct RegisteredMethod;
90
91 // An object to represent the most relevant characteristics of a
92 // newly-allocated call object when using an AllocatingRequestMatcherBatch.
93 struct BatchCallAllocation {
94 void* tag;
95 grpc_call** call;
96 grpc_metadata_array* initial_metadata;
97 grpc_call_details* details;
98 grpc_completion_queue* cq;
99 };
100
101 // An object to represent the most relevant characteristics of a
102 // newly-allocated call object when using an
103 // AllocatingRequestMatcherRegistered.
104 struct RegisteredCallAllocation {
105 void* tag;
106 grpc_call** call;
107 grpc_metadata_array* initial_metadata;
108 gpr_timespec* deadline;
109 grpc_byte_buffer** optional_payload;
110 grpc_completion_queue* cq;
111 };
112
113 /// Interface for listeners.
114 /// Implementations must override the Orphan() method, which should stop
115 /// listening and initiate destruction of the listener.
116 class ListenerInterface : public Orphanable {
117 public:
118 ~ListenerInterface() override = default;
119
120 /// Starts listening. This listener may refer to the pollset object beyond
121 /// this call, so it is a pointer rather than a reference.
122 virtual void Start(Server* server,
123 const std::vector<grpc_pollset*>* pollsets) = 0;
124
125 /// Returns the channelz node for the listen socket, or null if not
126 /// supported.
127 virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
128
129 /// Sets a closure to be invoked by the listener when its destruction
130 /// is complete.
131 virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
132 };
133
134 explicit Server(const ChannelArgs& args);
135 ~Server() override;
136
137 void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
138
channel_args()139 const ChannelArgs& channel_args() const override { return channel_args_; }
channelz_node()140 channelz::ServerNode* channelz_node() const override {
141 return channelz_node_.get();
142 }
143
144 // Do not call this before Start(). Returns the pollsets. The
145 // vector itself is immutable, but the pollsets inside are mutable. The
146 // result is valid for the lifetime of the server.
pollsets()147 const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
148
config_fetcher()149 grpc_server_config_fetcher* config_fetcher() const {
150 return config_fetcher_.get();
151 }
152
server_call_tracer_factory()153 ServerCallTracerFactory* server_call_tracer_factory() const override {
154 return server_call_tracer_factory_;
155 }
156
157 void set_config_fetcher(
158 std::unique_ptr<grpc_server_config_fetcher> config_fetcher);
159
160 bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_);
161
162 // Adds a listener to the server. When the server starts, it will call
163 // the listener's Start() method, and when it shuts down, it will orphan
164 // the listener.
165 void AddListener(OrphanablePtr<ListenerInterface> listener);
166
167 // Starts listening for connections.
168 void Start() ABSL_LOCKS_EXCLUDED(mu_global_);
169
170 // Sets up a transport. Creates a channel stack and binds the transport to
171 // the server. Called from the listener when a new connection is accepted.
172 // Takes ownership of a ref on resource_user from the caller.
173 grpc_error_handle SetupTransport(
174 Transport* transport, grpc_pollset* accepting_pollset,
175 const ChannelArgs& args,
176 const RefCountedPtr<channelz::SocketNode>& socket_node);
177
178 void RegisterCompletionQueue(grpc_completion_queue* cq);
179
180 // Functions to specify that a specific registered method or the unregistered
181 // collection should use a specific allocator for request matching.
182 void SetRegisteredMethodAllocator(
183 grpc_completion_queue* cq, void* method_tag,
184 std::function<RegisteredCallAllocation()> allocator);
185 void SetBatchMethodAllocator(grpc_completion_queue* cq,
186 std::function<BatchCallAllocation()> allocator);
187
188 RegisteredMethod* RegisterMethod(
189 const char* method, const char* host,
190 grpc_server_register_method_payload_handling payload_handling,
191 uint32_t flags);
192
193 grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
194 grpc_metadata_array* request_metadata,
195 grpc_completion_queue* cq_bound_to_call,
196 grpc_completion_queue* cq_for_notification,
197 void* tag);
198
199 grpc_call_error RequestRegisteredCall(
200 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
201 grpc_metadata_array* request_metadata,
202 grpc_byte_buffer** optional_payload,
203 grpc_completion_queue* cq_bound_to_call,
204 grpc_completion_queue* cq_for_notification, void* tag_new);
205
206 void ShutdownAndNotify(grpc_completion_queue* cq, void* tag)
207 ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
208
209 void StopListening();
210
211 void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
212
213 void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
214
215 private:
216 struct RequestedCall;
217
218 class RequestMatcherInterface;
219 class RealRequestMatcherFilterStack;
220 class RealRequestMatcherPromises;
221 class AllocatingRequestMatcherBase;
222 class AllocatingRequestMatcherBatch;
223 class AllocatingRequestMatcherRegistered;
224
225 class ChannelData final : public ServerTransport::Acceptor {
226 public:
227 ChannelData() = default;
228 ~ChannelData();
229
230 void InitTransport(RefCountedPtr<Server> server,
231 OrphanablePtr<Channel> channel, size_t cq_idx,
232 Transport* transport, intptr_t channelz_socket_uuid);
233
server()234 RefCountedPtr<Server> server() const { return server_; }
channel()235 Channel* channel() const { return channel_.get(); }
cq_idx()236 size_t cq_idx() const { return cq_idx_; }
237
238 RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
239 const absl::string_view& path);
240 // Filter vtable functions.
241 static grpc_error_handle InitChannelElement(
242 grpc_channel_element* elem, grpc_channel_element_args* args);
243 static void DestroyChannelElement(grpc_channel_element* elem);
244 static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
245 grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory);
246 void InitCall(RefCountedPtr<CallSpineInterface> call);
247
248 Arena* CreateArena() override;
249 absl::StatusOr<CallInitiator> CreateCall(
250 ClientMetadata& client_initial_metadata, Arena* arena) override;
251
252 private:
253 class ConnectivityWatcher;
254
255 static void AcceptStream(void* arg, Transport* /*transport*/,
256 const void* transport_server_data);
257 void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
258
259 void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
260
261 static void FinishDestroy(void* arg, grpc_error_handle error);
262
263 RefCountedPtr<Server> server_;
264 OrphanablePtr<Channel> channel_;
265 // The index into Server::cqs_ of the CQ used as a starting point for
266 // where to publish new incoming calls.
267 size_t cq_idx_;
268 absl::optional<std::list<ChannelData*>::iterator> list_position_;
269 grpc_closure finish_destroy_channel_closure_;
270 intptr_t channelz_socket_uuid_;
271 };
272
273 class CallData {
274 public:
275 enum class CallState {
276 NOT_STARTED, // Waiting for metadata.
277 PENDING, // Initial metadata read, not flow controlled in yet.
278 ACTIVATED, // Flow controlled in, on completion queue.
279 ZOMBIED, // Cancelled before being queued.
280 };
281
282 CallData(grpc_call_element* elem, const grpc_call_element_args& args,
283 RefCountedPtr<Server> server);
284 ~CallData();
285
286 // Starts the recv_initial_metadata batch on the call.
287 // Invoked from ChannelData::AcceptStream().
288 void Start(grpc_call_element* elem);
289
290 void SetState(CallState state);
291
292 // Attempts to move from PENDING to ACTIVATED state. Returns true
293 // on success.
294 bool MaybeActivate();
295
296 // Publishes an incoming call to the application after it has been
297 // matched.
298 void Publish(size_t cq_idx, RequestedCall* rc);
299
300 void KillZombie();
301
302 void FailCallCreation();
303
304 // Filter vtable functions.
305 static grpc_error_handle InitCallElement(
306 grpc_call_element* elem, const grpc_call_element_args* args);
307 static void DestroyCallElement(grpc_call_element* elem,
308 const grpc_call_final_info* /*final_info*/,
309 grpc_closure* /*ignored*/);
310 static void StartTransportStreamOpBatch(
311 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
312
313 private:
314 // Helper functions for handling calls at the top of the call stack.
315 static void RecvInitialMetadataBatchComplete(void* arg,
316 grpc_error_handle error);
317 void StartNewRpc(grpc_call_element* elem);
318 static void PublishNewRpc(void* arg, grpc_error_handle error);
319
320 // Functions used inside the call stack.
321 void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
322 grpc_transport_stream_op_batch* batch);
323 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
324 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
325
326 RefCountedPtr<Server> server_;
327
328 grpc_call* call_;
329
330 std::atomic<CallState> state_{CallState::NOT_STARTED};
331
332 absl::optional<Slice> path_;
333 absl::optional<Slice> host_;
334 Timestamp deadline_ = Timestamp::InfFuture();
335
336 grpc_completion_queue* cq_new_ = nullptr;
337
338 RequestMatcherInterface* matcher_ = nullptr;
339 grpc_byte_buffer* payload_ = nullptr;
340
341 grpc_closure kill_zombie_closure_;
342
343 grpc_metadata_array initial_metadata_ =
344 grpc_metadata_array(); // Zero-initialize the C struct.
345 grpc_closure recv_initial_metadata_batch_complete_;
346
347 grpc_metadata_batch* recv_initial_metadata_ = nullptr;
348 grpc_closure recv_initial_metadata_ready_;
349 grpc_closure* original_recv_initial_metadata_ready_;
350 grpc_error_handle recv_initial_metadata_error_;
351
352 bool seen_recv_trailing_metadata_ready_ = false;
353 grpc_closure recv_trailing_metadata_ready_;
354 grpc_closure* original_recv_trailing_metadata_ready_;
355 grpc_error_handle recv_trailing_metadata_error_;
356
357 grpc_closure publish_;
358
359 CallCombiner* call_combiner_;
360 };
361
362 struct Listener {
ListenerListener363 explicit Listener(OrphanablePtr<ListenerInterface> l)
364 : listener(std::move(l)) {}
365 OrphanablePtr<ListenerInterface> listener;
366 grpc_closure destroy_done;
367 };
368
369 struct ShutdownTag {
ShutdownTagShutdownTag370 ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
371 : tag(tag_arg), cq(cq_arg) {}
372 void* const tag;
373 grpc_completion_queue* const cq;
374 grpc_cq_completion completion;
375 };
376
377 struct StringViewStringViewPairHash
378 : absl::flat_hash_set<
379 std::pair<absl::string_view, absl::string_view>>::hasher {
380 using is_transparent = void;
381 };
382
383 struct StringViewStringViewPairEq
384 : std::equal_to<std::pair<absl::string_view, absl::string_view>> {
385 using is_transparent = void;
386 };
387
388 static void ListenerDestroyDone(void* arg, grpc_error_handle error);
389
DoneShutdownEvent(void * server,grpc_cq_completion *)390 static void DoneShutdownEvent(void* server,
391 grpc_cq_completion* /*completion*/) {
392 static_cast<Server*>(server)->Unref();
393 }
394
395 static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
396
397 void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error);
398 grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
399
400 void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_)
401 ABSL_LOCKS_EXCLUDED(mu_call_);
402
403 void KillPendingWorkLocked(grpc_error_handle error)
404 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_);
405
406 static grpc_call_error ValidateServerRequest(
407 grpc_completion_queue* cq_for_notification, void* tag,
408 grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
409 grpc_call_error ValidateServerRequestAndCq(
410 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
411 grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
412
413 std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const;
414
415 // Take a shutdown ref for a request (increment by 2) and return if shutdown
416 // has not been called.
ShutdownRefOnRequest()417 bool ShutdownRefOnRequest() {
418 int old_value = shutdown_refs_.fetch_add(2, std::memory_order_acq_rel);
419 return (old_value & 1) != 0;
420 }
421
422 // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2
423 // (for in-flight request) and possibly call MaybeFinishShutdown if
424 // appropriate.
ShutdownUnrefOnRequest()425 void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
426 if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) {
427 MutexLock lock(&mu_global_);
428 MaybeFinishShutdown();
429 }
430 }
ShutdownUnrefOnShutdownCall()431 void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
432 if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
433 // There is no request in-flight.
434 MaybeFinishShutdown();
435 }
436 }
437
ShutdownCalled()438 bool ShutdownCalled() const {
439 return (shutdown_refs_.load(std::memory_order_acquire) & 1) == 0;
440 }
441
442 // Returns whether there are no more shutdown refs, which means that shutdown
443 // has been called and all accepted requests have been published if using an
444 // AllocatingRequestMatcher.
ShutdownReady()445 bool ShutdownReady() const {
446 return shutdown_refs_.load(std::memory_order_acquire) == 0;
447 }
448
449 ChannelArgs const channel_args_;
450 RefCountedPtr<channelz::ServerNode> channelz_node_;
451 std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
452 ServerCallTracerFactory* const server_call_tracer_factory_;
453
454 std::vector<grpc_completion_queue*> cqs_;
455 std::vector<grpc_pollset*> pollsets_;
456 bool started_ = false;
457
458 // The two following mutexes control access to server-state.
459 // mu_global_ controls access to non-call-related state (e.g., channel state).
460 // mu_call_ controls access to call-related state (e.g., the call lists).
461 //
462 // If they are ever required to be nested, you must lock mu_global_
463 // before mu_call_. This is currently used in shutdown processing
464 // (ShutdownAndNotify() and MaybeFinishShutdown()).
465 Mutex mu_global_; // mutex for server and channel state
466 Mutex mu_call_; // mutex for call-specific state
467
468 // startup synchronization: flag, signals whether we are doing the listener
469 // start routine or not.
470 bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
471 CondVar starting_cv_;
472
473 // Map of registered methods.
474 absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
475 std::unique_ptr<RegisteredMethod>,
476 StringViewStringViewPairHash, StringViewStringViewPairEq>
477 registered_methods_;
478
479 // Request matcher for unregistered methods.
480 std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
481
482 // The shutdown refs counter tracks whether or not shutdown has been called
483 // and whether there are any AllocatingRequestMatcher requests that have been
484 // accepted but not yet started (+2 on each one). If shutdown has been called,
485 // the lowest bit will be 0 (defaults to 1) and the counter will be even. The
486 // server should not notify on shutdown until the counter is 0 (shutdown is
487 // called and there are no requests that are accepted but not started).
488 std::atomic<int> shutdown_refs_{1};
489 bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
490 std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
491
ABSL_GUARDED_BY(mu_call_)492 RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){
493 static_cast<uint64_t>(
494 std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS)
495 .value_or(1000))),
496 static_cast<uint64_t>(std::max(
497 0,
498 channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT)
499 .value_or(3000)))};
500 const Duration max_time_in_pending_queue_;
501 absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
502
503 std::list<ChannelData*> channels_;
504
505 std::list<Listener> listeners_;
506 size_t listeners_destroyed_ = 0;
507
508 // The last time we printed a shutdown progress message.
509 gpr_timespec last_shutdown_message_time_;
510 };
511
512 } // namespace grpc_core
513
514 struct grpc_server_config_fetcher {
515 public:
516 class ConnectionManager
517 : public grpc_core::DualRefCounted<ConnectionManager> {
518 public:
519 // Ownership of \a args is transfered.
520 virtual absl::StatusOr<grpc_core::ChannelArgs>
521 UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args,
522 grpc_endpoint* tcp) = 0;
523 };
524
525 class WatcherInterface {
526 public:
527 virtual ~WatcherInterface() = default;
528 // UpdateConnectionManager() is invoked by the config fetcher when a new
529 // config is available. Implementations should update the connection manager
530 // and start serving if not already serving.
531 virtual void UpdateConnectionManager(
532 grpc_core::RefCountedPtr<ConnectionManager> manager) = 0;
533 // Implementations should stop serving when this is called. Serving should
534 // only resume when UpdateConfig() is invoked.
535 virtual void StopServing() = 0;
536 };
537
538 virtual ~grpc_server_config_fetcher() = default;
539
540 virtual void StartWatch(std::string listening_address,
541 std::unique_ptr<WatcherInterface> watcher) = 0;
542 virtual void CancelWatch(WatcherInterface* watcher) = 0;
543 virtual grpc_pollset_set* interested_parties() = 0;
544 };
545
546 namespace grpc_core {
547
set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)548 inline void Server::set_config_fetcher(
549 std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
550 config_fetcher_ = std::move(config_fetcher);
551 }
552
553 } // namespace grpc_core
554
555 #endif // GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
556