1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H 18 #define GRPC_CORE_LIB_SURFACE_SERVER_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <list> 23 #include <vector> 24 25 #include "absl/status/statusor.h" 26 #include "absl/types/optional.h" 27 28 #include <grpc/grpc.h> 29 30 #include "src/core/lib/channel/channel_args.h" 31 #include "src/core/lib/channel/channel_stack.h" 32 #include "src/core/lib/channel/channelz.h" 33 #include "src/core/lib/debug/trace.h" 34 #include "src/core/lib/gprpp/atomic.h" 35 #include "src/core/lib/iomgr/resolve_address.h" 36 #include "src/core/lib/surface/completion_queue.h" 37 #include "src/core/lib/transport/transport.h" 38 39 namespace grpc_core { 40 41 extern TraceFlag grpc_server_channel_trace; 42 43 class Server : public InternallyRefCounted<Server> { 44 public: 45 // Filter vtable. 46 static const grpc_channel_filter kServerTopFilter; 47 48 // Opaque type used for registered methods. 49 struct RegisteredMethod; 50 51 // An object to represent the most relevant characteristics of a 52 // newly-allocated call object when using an AllocatingRequestMatcherBatch. 53 struct BatchCallAllocation { 54 void* tag; 55 grpc_call** call; 56 grpc_metadata_array* initial_metadata; 57 grpc_call_details* details; 58 grpc_completion_queue* cq; 59 }; 60 61 // An object to represent the most relevant characteristics of a 62 // newly-allocated call object when using an 63 // AllocatingRequestMatcherRegistered. 64 struct RegisteredCallAllocation { 65 void* tag; 66 grpc_call** call; 67 grpc_metadata_array* initial_metadata; 68 gpr_timespec* deadline; 69 grpc_byte_buffer** optional_payload; 70 grpc_completion_queue* cq; 71 }; 72 73 /// Interface for listeners. 74 /// Implementations must override the Orphan() method, which should stop 75 /// listening and initiate destruction of the listener. 76 class ListenerInterface : public Orphanable { 77 public: 78 ~ListenerInterface() override = default; 79 80 /// Starts listening. This listener may refer to the pollset object beyond 81 /// this call, so it is a pointer rather than a reference. 82 virtual void Start(Server* server, 83 const std::vector<grpc_pollset*>* pollsets) = 0; 84 85 /// Returns the channelz node for the listen socket, or null if not 86 /// supported. 87 virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0; 88 89 /// Sets a closure to be invoked by the listener when its destruction 90 /// is complete. 91 virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0; 92 }; 93 94 explicit Server(const grpc_channel_args* args); 95 ~Server() override; 96 97 void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override; 98 channel_args()99 const grpc_channel_args* channel_args() const { return channel_args_; } default_resource_user()100 grpc_resource_user* default_resource_user() const { 101 return default_resource_user_; 102 } channelz_node()103 channelz::ServerNode* channelz_node() const { return channelz_node_.get(); } 104 105 // Do not call this before Start(). Returns the pollsets. The 106 // vector itself is immutable, but the pollsets inside are mutable. The 107 // result is valid for the lifetime of the server. pollsets()108 const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; } 109 config_fetcher()110 grpc_server_config_fetcher* config_fetcher() const { 111 return config_fetcher_.get(); 112 } 113 set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)114 void set_config_fetcher( 115 std::unique_ptr<grpc_server_config_fetcher> config_fetcher) { 116 config_fetcher_ = std::move(config_fetcher); 117 } 118 119 bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_); 120 121 // Adds a listener to the server. When the server starts, it will call 122 // the listener's Start() method, and when it shuts down, it will orphan 123 // the listener. 124 void AddListener(OrphanablePtr<ListenerInterface> listener); 125 126 // Starts listening for connections. 127 void Start() ABSL_LOCKS_EXCLUDED(mu_global_); 128 129 // Sets up a transport. Creates a channel stack and binds the transport to 130 // the server. Called from the listener when a new connection is accepted. 131 grpc_error_handle SetupTransport( 132 grpc_transport* transport, grpc_pollset* accepting_pollset, 133 const grpc_channel_args* args, 134 const RefCountedPtr<channelz::SocketNode>& socket_node, 135 grpc_resource_user* resource_user = nullptr); 136 137 void RegisterCompletionQueue(grpc_completion_queue* cq); 138 139 // Functions to specify that a specific registered method or the unregistered 140 // collection should use a specific allocator for request matching. 141 void SetRegisteredMethodAllocator( 142 grpc_completion_queue* cq, void* method_tag, 143 std::function<RegisteredCallAllocation()> allocator); 144 void SetBatchMethodAllocator(grpc_completion_queue* cq, 145 std::function<BatchCallAllocation()> allocator); 146 147 RegisteredMethod* RegisterMethod( 148 const char* method, const char* host, 149 grpc_server_register_method_payload_handling payload_handling, 150 uint32_t flags); 151 152 grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details, 153 grpc_metadata_array* request_metadata, 154 grpc_completion_queue* cq_bound_to_call, 155 grpc_completion_queue* cq_for_notification, 156 void* tag); 157 158 grpc_call_error RequestRegisteredCall( 159 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline, 160 grpc_metadata_array* request_metadata, 161 grpc_byte_buffer** optional_payload, 162 grpc_completion_queue* cq_bound_to_call, 163 grpc_completion_queue* cq_for_notification, void* tag_new); 164 165 void ShutdownAndNotify(grpc_completion_queue* cq, void* tag) 166 ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); 167 168 void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_); 169 170 private: 171 struct RequestedCall; 172 173 struct ChannelRegisteredMethod { 174 RegisteredMethod* server_registered_method = nullptr; 175 uint32_t flags; 176 bool has_host; 177 ExternallyManagedSlice method; 178 ExternallyManagedSlice host; 179 }; 180 181 class RequestMatcherInterface; 182 class RealRequestMatcher; 183 class AllocatingRequestMatcherBase; 184 class AllocatingRequestMatcherBatch; 185 class AllocatingRequestMatcherRegistered; 186 187 class ChannelData { 188 public: 189 ChannelData() = default; 190 ~ChannelData(); 191 192 void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel, 193 size_t cq_idx, grpc_transport* transport, 194 intptr_t channelz_socket_uuid); 195 server()196 RefCountedPtr<Server> server() const { return server_; } channel()197 grpc_channel* channel() const { return channel_; } cq_idx()198 size_t cq_idx() const { return cq_idx_; } 199 200 ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host, 201 const grpc_slice& path, 202 bool is_idempotent); 203 204 // Filter vtable functions. 205 static grpc_error_handle InitChannelElement( 206 grpc_channel_element* elem, grpc_channel_element_args* args); 207 static void DestroyChannelElement(grpc_channel_element* elem); 208 209 private: 210 class ConnectivityWatcher; 211 212 static void AcceptStream(void* arg, grpc_transport* /*transport*/, 213 const void* transport_server_data); 214 215 void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_); 216 217 static void FinishDestroy(void* arg, grpc_error_handle error); 218 219 RefCountedPtr<Server> server_; 220 grpc_channel* channel_; 221 // The index into Server::cqs_ of the CQ used as a starting point for 222 // where to publish new incoming calls. 223 size_t cq_idx_; 224 absl::optional<std::list<ChannelData*>::iterator> list_position_; 225 // A hash-table of the methods and hosts of the registered methods. 226 // TODO(vjpai): Convert this to an STL map type as opposed to a direct 227 // bucket implementation. (Consider performance impact, hash function to 228 // use, etc.) 229 std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_; 230 uint32_t registered_method_max_probes_; 231 grpc_closure finish_destroy_channel_closure_; 232 intptr_t channelz_socket_uuid_; 233 }; 234 235 class CallData { 236 public: 237 enum class CallState { 238 NOT_STARTED, // Waiting for metadata. 239 PENDING, // Initial metadata read, not flow controlled in yet. 240 ACTIVATED, // Flow controlled in, on completion queue. 241 ZOMBIED, // Cancelled before being queued. 242 }; 243 244 CallData(grpc_call_element* elem, const grpc_call_element_args& args, 245 RefCountedPtr<Server> server); 246 ~CallData(); 247 248 // Starts the recv_initial_metadata batch on the call. 249 // Invoked from ChannelData::AcceptStream(). 250 void Start(grpc_call_element* elem); 251 252 void SetState(CallState state); 253 254 // Attempts to move from PENDING to ACTIVATED state. Returns true 255 // on success. 256 bool MaybeActivate(); 257 258 // Publishes an incoming call to the application after it has been 259 // matched. 260 void Publish(size_t cq_idx, RequestedCall* rc); 261 262 void KillZombie(); 263 264 void FailCallCreation(); 265 266 // Filter vtable functions. 267 static grpc_error_handle InitCallElement( 268 grpc_call_element* elem, const grpc_call_element_args* args); 269 static void DestroyCallElement(grpc_call_element* elem, 270 const grpc_call_final_info* /*final_info*/, 271 grpc_closure* /*ignored*/); 272 static void StartTransportStreamOpBatch( 273 grpc_call_element* elem, grpc_transport_stream_op_batch* batch); 274 275 private: 276 // Helper functions for handling calls at the top of the call stack. 277 static void RecvInitialMetadataBatchComplete(void* arg, 278 grpc_error_handle error); 279 void StartNewRpc(grpc_call_element* elem); 280 static void PublishNewRpc(void* arg, grpc_error_handle error); 281 282 // Functions used inside the call stack. 283 void StartTransportStreamOpBatchImpl(grpc_call_element* elem, 284 grpc_transport_stream_op_batch* batch); 285 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); 286 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 287 288 RefCountedPtr<Server> server_; 289 290 grpc_call* call_; 291 292 Atomic<CallState> state_{CallState::NOT_STARTED}; 293 294 absl::optional<grpc_slice> path_; 295 absl::optional<grpc_slice> host_; 296 grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE; 297 298 grpc_completion_queue* cq_new_ = nullptr; 299 300 RequestMatcherInterface* matcher_ = nullptr; 301 grpc_byte_buffer* payload_ = nullptr; 302 303 grpc_closure kill_zombie_closure_; 304 305 grpc_metadata_array initial_metadata_ = 306 grpc_metadata_array(); // Zero-initialize the C struct. 307 grpc_closure recv_initial_metadata_batch_complete_; 308 309 grpc_metadata_batch* recv_initial_metadata_ = nullptr; 310 uint32_t recv_initial_metadata_flags_ = 0; 311 grpc_closure recv_initial_metadata_ready_; 312 grpc_closure* original_recv_initial_metadata_ready_; 313 grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE; 314 315 bool seen_recv_trailing_metadata_ready_ = false; 316 grpc_closure recv_trailing_metadata_ready_; 317 grpc_closure* original_recv_trailing_metadata_ready_; 318 grpc_error_handle recv_trailing_metadata_error_ = GRPC_ERROR_NONE; 319 320 grpc_closure publish_; 321 322 CallCombiner* call_combiner_; 323 }; 324 325 struct Listener { ListenerListener326 explicit Listener(OrphanablePtr<ListenerInterface> l) 327 : listener(std::move(l)) {} 328 OrphanablePtr<ListenerInterface> listener; 329 grpc_closure destroy_done; 330 }; 331 332 struct ShutdownTag { ShutdownTagShutdownTag333 ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg) 334 : tag(tag_arg), cq(cq_arg) {} 335 void* const tag; 336 grpc_completion_queue* const cq; 337 grpc_cq_completion completion; 338 }; 339 340 static void ListenerDestroyDone(void* arg, grpc_error_handle error); 341 DoneShutdownEvent(void * server,grpc_cq_completion *)342 static void DoneShutdownEvent(void* server, 343 grpc_cq_completion* /*completion*/) { 344 static_cast<Server*>(server)->Unref(); 345 } 346 347 static void DoneRequestEvent(void* req, grpc_cq_completion* completion); 348 349 void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error); 350 grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc); 351 352 void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) 353 ABSL_LOCKS_EXCLUDED(mu_call_); 354 355 void KillPendingWorkLocked(grpc_error_handle error) 356 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_); 357 358 static grpc_call_error ValidateServerRequest( 359 grpc_completion_queue* cq_for_notification, void* tag, 360 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 361 grpc_call_error ValidateServerRequestAndCq( 362 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag, 363 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 364 365 std::vector<grpc_channel*> GetChannelsLocked() const; 366 367 // Take a shutdown ref for a request (increment by 2) and return if shutdown 368 // has already been called. ShutdownRefOnRequest()369 bool ShutdownRefOnRequest() { 370 int old_value = shutdown_refs_.FetchAdd(2, MemoryOrder::ACQ_REL); 371 return (old_value & 1) != 0; 372 } 373 374 // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2 375 // (for in-flight request) and possibly call MaybeFinishShutdown if 376 // appropriate. ShutdownUnrefOnRequest()377 void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) { 378 if (shutdown_refs_.FetchSub(2, MemoryOrder::ACQ_REL) == 2) { 379 MutexLock lock(&mu_global_); 380 MaybeFinishShutdown(); 381 } 382 } ShutdownUnrefOnShutdownCall()383 void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) { 384 if (shutdown_refs_.FetchSub(1, MemoryOrder::ACQ_REL) == 1) { 385 MaybeFinishShutdown(); 386 } 387 } 388 ShutdownCalled()389 bool ShutdownCalled() const { 390 return (shutdown_refs_.Load(MemoryOrder::ACQUIRE) & 1) == 0; 391 } 392 393 // Returns whether there are no more shutdown refs, which means that shutdown 394 // has been called and all accepted requests have been published if using an 395 // AllocatingRequestMatcher. ShutdownReady()396 bool ShutdownReady() const { 397 return shutdown_refs_.Load(MemoryOrder::ACQUIRE) == 0; 398 } 399 400 grpc_channel_args* const channel_args_; 401 grpc_resource_user* default_resource_user_ = nullptr; 402 RefCountedPtr<channelz::ServerNode> channelz_node_; 403 std::unique_ptr<grpc_server_config_fetcher> config_fetcher_; 404 405 std::vector<grpc_completion_queue*> cqs_; 406 std::vector<grpc_pollset*> pollsets_; 407 bool started_ = false; 408 409 // The two following mutexes control access to server-state. 410 // mu_global_ controls access to non-call-related state (e.g., channel state). 411 // mu_call_ controls access to call-related state (e.g., the call lists). 412 // 413 // If they are ever required to be nested, you must lock mu_global_ 414 // before mu_call_. This is currently used in shutdown processing 415 // (ShutdownAndNotify() and MaybeFinishShutdown()). 416 Mutex mu_global_; // mutex for server and channel state 417 Mutex mu_call_; // mutex for call-specific state 418 419 // startup synchronization: flag is protected by mu_global_, signals whether 420 // we are doing the listener start routine or not. 421 bool starting_ = false; 422 CondVar starting_cv_; 423 424 std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_; 425 426 // Request matcher for unregistered methods. 427 std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_; 428 429 // The shutdown refs counter tracks whether or not shutdown has been called 430 // and whether there are any AllocatingRequestMatcher requests that have been 431 // accepted but not yet started (+2 on each one). If shutdown has been called, 432 // the lowest bit will be 0 (defaults to 1) and the counter will be even. The 433 // server should not notify on shutdown until the counter is 0 (shutdown is 434 // called and there are no requests that are accepted but not started). 435 Atomic<int> shutdown_refs_{1}; 436 bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false; 437 std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_); 438 439 std::list<ChannelData*> channels_; 440 441 std::list<Listener> listeners_; 442 size_t listeners_destroyed_ = 0; 443 444 // The last time we printed a shutdown progress message. 445 gpr_timespec last_shutdown_message_time_; 446 }; 447 448 } // namespace grpc_core 449 450 struct grpc_server { 451 grpc_core::OrphanablePtr<grpc_core::Server> core_server; 452 }; 453 454 // TODO(roth): Eventually, will need a way to modify configuration even after 455 // a connection is established (e.g., to change things like L7 rate 456 // limiting, RBAC, and fault injection configs). One possible option 457 // would be to do something like ServiceConfig and ConfigSelector, but 458 // that might add unnecessary per-call overhead. Need to consider other 459 // approaches here. 460 struct grpc_server_config_fetcher { 461 public: 462 class ConnectionManager : public grpc_core::RefCounted<ConnectionManager> { 463 public: 464 // Ownership of \a args is transfered. 465 virtual absl::StatusOr<grpc_channel_args*> UpdateChannelArgsForConnection( 466 grpc_channel_args* args, grpc_endpoint* tcp) = 0; 467 }; 468 469 class WatcherInterface { 470 public: 471 virtual ~WatcherInterface() = default; 472 // UpdateConnectionManager() is invoked by the config fetcher when a new 473 // config is available. Implementations should update the connection manager 474 // and start serving if not already serving. 475 virtual void UpdateConnectionManager( 476 grpc_core::RefCountedPtr<ConnectionManager> manager) = 0; 477 // Implementations should stop serving when this is called. Serving should 478 // only resume when UpdateConfig() is invoked. 479 virtual void StopServing() = 0; 480 }; 481 482 virtual ~grpc_server_config_fetcher() = default; 483 484 // Ownership of \a args is transferred. 485 virtual void StartWatch(std::string listening_address, 486 grpc_channel_args* args, 487 std::unique_ptr<WatcherInterface> watcher) = 0; 488 virtual void CancelWatch(WatcherInterface* watcher) = 0; 489 virtual grpc_pollset_set* interested_parties() = 0; 490 }; 491 492 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */ 493