1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H 18 #define GRPC_CORE_LIB_SURFACE_SERVER_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <list> 23 #include <vector> 24 25 #include "absl/types/optional.h" 26 27 #include <grpc/grpc.h> 28 29 #include "src/core/lib/channel/channel_args.h" 30 #include "src/core/lib/channel/channel_stack.h" 31 #include "src/core/lib/channel/channelz.h" 32 #include "src/core/lib/debug/trace.h" 33 #include "src/core/lib/gprpp/atomic.h" 34 #include "src/core/lib/surface/completion_queue.h" 35 #include "src/core/lib/transport/transport.h" 36 37 namespace grpc_core { 38 39 extern TraceFlag grpc_server_channel_trace; 40 41 class Server : public InternallyRefCounted<Server> { 42 public: 43 // Filter vtable. 44 static const grpc_channel_filter kServerTopFilter; 45 46 // Opaque type used for registered methods. 47 struct RegisteredMethod; 48 49 // An object to represent the most relevant characteristics of a 50 // newly-allocated call object when using an AllocatingRequestMatcherBatch. 51 struct BatchCallAllocation { 52 grpc_experimental_completion_queue_functor* tag; 53 grpc_call** call; 54 grpc_metadata_array* initial_metadata; 55 grpc_call_details* details; 56 }; 57 58 // An object to represent the most relevant characteristics of a 59 // newly-allocated call object when using an 60 // AllocatingRequestMatcherRegistered. 61 struct RegisteredCallAllocation { 62 grpc_experimental_completion_queue_functor* tag; 63 grpc_call** call; 64 grpc_metadata_array* initial_metadata; 65 gpr_timespec* deadline; 66 grpc_byte_buffer** optional_payload; 67 }; 68 69 /// Interface for listeners. 70 /// Implementations must override the Orphan() method, which should stop 71 /// listening and initiate destruction of the listener. 72 class ListenerInterface : public Orphanable { 73 public: 74 ~ListenerInterface() override = default; 75 76 /// Starts listening. This listener may refer to the pollset object beyond 77 /// this call, so it is a pointer rather than a reference. 78 virtual void Start(Server* server, 79 const std::vector<grpc_pollset*>* pollsets) = 0; 80 81 /// Returns the channelz node for the listen socket, or null if not 82 /// supported. 83 virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0; 84 85 /// Sets a closure to be invoked by the listener when its destruction 86 /// is complete. 87 virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0; 88 }; 89 90 explicit Server(const grpc_channel_args* args); 91 ~Server() override; 92 93 void Orphan() override; 94 channel_args()95 const grpc_channel_args* channel_args() const { return channel_args_; } default_resource_user()96 grpc_resource_user* default_resource_user() const { 97 return default_resource_user_; 98 } channelz_node()99 channelz::ServerNode* channelz_node() const { return channelz_node_.get(); } 100 101 // Do not call this before Start(). Returns the pollsets. The 102 // vector itself is immutable, but the pollsets inside are mutable. The 103 // result is valid for the lifetime of the server. pollsets()104 const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; } 105 config_fetcher()106 grpc_server_config_fetcher* config_fetcher() const { 107 return config_fetcher_.get(); 108 } 109 set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)110 void set_config_fetcher( 111 std::unique_ptr<grpc_server_config_fetcher> config_fetcher) { 112 config_fetcher_ = std::move(config_fetcher); 113 } 114 115 bool HasOpenConnections(); 116 117 // Adds a listener to the server. When the server starts, it will call 118 // the listener's Start() method, and when it shuts down, it will orphan 119 // the listener. 120 void AddListener(OrphanablePtr<ListenerInterface> listener); 121 122 // Starts listening for connections. 123 void Start(); 124 125 // Sets up a transport. Creates a channel stack and binds the transport to 126 // the server. Called from the listener when a new connection is accepted. 127 grpc_error* SetupTransport( 128 grpc_transport* transport, grpc_pollset* accepting_pollset, 129 const grpc_channel_args* args, 130 const RefCountedPtr<channelz::SocketNode>& socket_node, 131 grpc_resource_user* resource_user = nullptr); 132 133 void RegisterCompletionQueue(grpc_completion_queue* cq); 134 135 // Functions to specify that a specific registered method or the unregistered 136 // collection should use a specific allocator for request matching. 137 void SetRegisteredMethodAllocator( 138 grpc_completion_queue* cq, void* method_tag, 139 std::function<RegisteredCallAllocation()> allocator); 140 void SetBatchMethodAllocator(grpc_completion_queue* cq, 141 std::function<BatchCallAllocation()> allocator); 142 143 RegisteredMethod* RegisterMethod( 144 const char* method, const char* host, 145 grpc_server_register_method_payload_handling payload_handling, 146 uint32_t flags); 147 148 grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details, 149 grpc_metadata_array* request_metadata, 150 grpc_completion_queue* cq_bound_to_call, 151 grpc_completion_queue* cq_for_notification, 152 void* tag); 153 154 grpc_call_error RequestRegisteredCall( 155 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline, 156 grpc_metadata_array* request_metadata, 157 grpc_byte_buffer** optional_payload, 158 grpc_completion_queue* cq_bound_to_call, 159 grpc_completion_queue* cq_for_notification, void* tag_new); 160 161 void ShutdownAndNotify(grpc_completion_queue* cq, void* tag); 162 163 void CancelAllCalls(); 164 165 private: 166 struct RequestedCall; 167 168 struct ChannelRegisteredMethod { 169 RegisteredMethod* server_registered_method = nullptr; 170 uint32_t flags; 171 bool has_host; 172 ExternallyManagedSlice method; 173 ExternallyManagedSlice host; 174 }; 175 176 class RequestMatcherInterface; 177 class RealRequestMatcher; 178 class AllocatingRequestMatcherBase; 179 class AllocatingRequestMatcherBatch; 180 class AllocatingRequestMatcherRegistered; 181 182 class ChannelData { 183 public: 184 ChannelData() = default; 185 ~ChannelData(); 186 187 void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel, 188 size_t cq_idx, grpc_transport* transport, 189 intptr_t channelz_socket_uuid); 190 server()191 RefCountedPtr<Server> server() const { return server_; } channel()192 grpc_channel* channel() const { return channel_; } cq_idx()193 size_t cq_idx() const { return cq_idx_; } 194 195 ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host, 196 const grpc_slice& path, 197 bool is_idempotent); 198 199 // Filter vtable functions. 200 static grpc_error* InitChannelElement(grpc_channel_element* elem, 201 grpc_channel_element_args* args); 202 static void DestroyChannelElement(grpc_channel_element* elem); 203 204 private: 205 class ConnectivityWatcher; 206 207 static void AcceptStream(void* arg, grpc_transport* /*transport*/, 208 const void* transport_server_data); 209 210 void Destroy(); 211 212 static void FinishDestroy(void* arg, grpc_error* error); 213 214 RefCountedPtr<Server> server_; 215 grpc_channel* channel_; 216 // The index into Server::cqs_ of the CQ used as a starting point for 217 // where to publish new incoming calls. 218 size_t cq_idx_; 219 absl::optional<std::list<ChannelData*>::iterator> list_position_; 220 // A hash-table of the methods and hosts of the registered methods. 221 // TODO(vjpai): Convert this to an STL map type as opposed to a direct 222 // bucket implementation. (Consider performance impact, hash function to 223 // use, etc.) 224 std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_; 225 uint32_t registered_method_max_probes_; 226 grpc_closure finish_destroy_channel_closure_; 227 intptr_t channelz_socket_uuid_; 228 }; 229 230 class CallData { 231 public: 232 enum class CallState { 233 NOT_STARTED, // Waiting for metadata. 234 PENDING, // Initial metadata read, not flow controlled in yet. 235 ACTIVATED, // Flow controlled in, on completion queue. 236 ZOMBIED, // Cancelled before being queued. 237 }; 238 239 CallData(grpc_call_element* elem, const grpc_call_element_args& args, 240 RefCountedPtr<Server> server); 241 ~CallData(); 242 243 // Starts the recv_initial_metadata batch on the call. 244 // Invoked from ChannelData::AcceptStream(). 245 void Start(grpc_call_element* elem); 246 247 void SetState(CallState state); 248 249 // Attempts to move from PENDING to ACTIVATED state. Returns true 250 // on success. 251 bool MaybeActivate(); 252 253 // Publishes an incoming call to the application after it has been 254 // matched. 255 void Publish(size_t cq_idx, RequestedCall* rc); 256 257 void KillZombie(); 258 259 void FailCallCreation(); 260 261 // Filter vtable functions. 262 static grpc_error* InitCallElement(grpc_call_element* elem, 263 const grpc_call_element_args* args); 264 static void DestroyCallElement(grpc_call_element* elem, 265 const grpc_call_final_info* /*final_info*/, 266 grpc_closure* /*ignored*/); 267 static void StartTransportStreamOpBatch( 268 grpc_call_element* elem, grpc_transport_stream_op_batch* batch); 269 270 private: 271 // Helper functions for handling calls at the top of the call stack. 272 static void RecvInitialMetadataBatchComplete(void* arg, grpc_error* error); 273 void StartNewRpc(grpc_call_element* elem); 274 static void PublishNewRpc(void* arg, grpc_error* error); 275 276 // Functions used inside the call stack. 277 void StartTransportStreamOpBatchImpl(grpc_call_element* elem, 278 grpc_transport_stream_op_batch* batch); 279 static void RecvInitialMetadataReady(void* arg, grpc_error* error); 280 static void RecvTrailingMetadataReady(void* arg, grpc_error* error); 281 282 RefCountedPtr<Server> server_; 283 284 grpc_call* call_; 285 286 Atomic<CallState> state_{CallState::NOT_STARTED}; 287 288 absl::optional<grpc_slice> path_; 289 absl::optional<grpc_slice> host_; 290 grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE; 291 292 grpc_completion_queue* cq_new_ = nullptr; 293 294 RequestMatcherInterface* matcher_ = nullptr; 295 grpc_byte_buffer* payload_ = nullptr; 296 297 grpc_closure kill_zombie_closure_; 298 299 grpc_metadata_array initial_metadata_ = 300 grpc_metadata_array(); // Zero-initialize the C struct. 301 grpc_closure recv_initial_metadata_batch_complete_; 302 303 grpc_metadata_batch* recv_initial_metadata_ = nullptr; 304 uint32_t recv_initial_metadata_flags_ = 0; 305 grpc_closure recv_initial_metadata_ready_; 306 grpc_closure* original_recv_initial_metadata_ready_; 307 grpc_error* recv_initial_metadata_error_ = GRPC_ERROR_NONE; 308 309 bool seen_recv_trailing_metadata_ready_ = false; 310 grpc_closure recv_trailing_metadata_ready_; 311 grpc_closure* original_recv_trailing_metadata_ready_; 312 grpc_error* recv_trailing_metadata_error_ = GRPC_ERROR_NONE; 313 314 grpc_closure publish_; 315 316 CallCombiner* call_combiner_; 317 }; 318 319 struct Listener { ListenerListener320 explicit Listener(OrphanablePtr<ListenerInterface> l) 321 : listener(std::move(l)) {} 322 OrphanablePtr<ListenerInterface> listener; 323 grpc_closure destroy_done; 324 }; 325 326 struct ShutdownTag { ShutdownTagShutdownTag327 ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg) 328 : tag(tag_arg), cq(cq_arg) {} 329 void* const tag; 330 grpc_completion_queue* const cq; 331 grpc_cq_completion completion; 332 }; 333 334 static void ListenerDestroyDone(void* arg, grpc_error* error); 335 DoneShutdownEvent(void * server,grpc_cq_completion *)336 static void DoneShutdownEvent(void* server, 337 grpc_cq_completion* /*completion*/) { 338 static_cast<Server*>(server)->Unref(); 339 } 340 341 static void DoneRequestEvent(void* req, grpc_cq_completion* completion); 342 343 void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error); 344 grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc); 345 346 void MaybeFinishShutdown(); 347 348 void KillPendingWorkLocked(grpc_error* error); 349 350 static grpc_call_error ValidateServerRequest( 351 grpc_completion_queue* cq_for_notification, void* tag, 352 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 353 grpc_call_error ValidateServerRequestAndCq( 354 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag, 355 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 356 357 std::vector<grpc_channel*> GetChannelsLocked() const; 358 359 grpc_channel_args* const channel_args_; 360 grpc_resource_user* default_resource_user_ = nullptr; 361 RefCountedPtr<channelz::ServerNode> channelz_node_; 362 std::unique_ptr<grpc_server_config_fetcher> config_fetcher_; 363 364 std::vector<grpc_completion_queue*> cqs_; 365 std::vector<grpc_pollset*> pollsets_; 366 bool started_ = false; 367 368 // The two following mutexes control access to server-state. 369 // mu_global_ controls access to non-call-related state (e.g., channel state). 370 // mu_call_ controls access to call-related state (e.g., the call lists). 371 // 372 // If they are ever required to be nested, you must lock mu_global_ 373 // before mu_call_. This is currently used in shutdown processing 374 // (ShutdownAndNotify() and MaybeFinishShutdown()). 375 Mutex mu_global_; // mutex for server and channel state 376 Mutex mu_call_; // mutex for call-specific state 377 378 // startup synchronization: flag is protected by mu_global_, signals whether 379 // we are doing the listener start routine or not. 380 bool starting_ = false; 381 CondVar starting_cv_; 382 383 std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_; 384 385 // Request matcher for unregistered methods. 386 std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_; 387 388 std::atomic_bool shutdown_flag_{false}; 389 bool shutdown_published_ = false; 390 std::vector<ShutdownTag> shutdown_tags_; 391 392 std::list<ChannelData*> channels_; 393 394 std::list<Listener> listeners_; 395 size_t listeners_destroyed_ = 0; 396 397 // The last time we printed a shutdown progress message. 398 gpr_timespec last_shutdown_message_time_; 399 }; 400 401 } // namespace grpc_core 402 403 struct grpc_server { 404 grpc_core::OrphanablePtr<grpc_core::Server> core_server; 405 }; 406 407 // TODO(roth): Eventually, will need a way to modify configuration even after 408 // a connection is established (e.g., to change things like L7 rate 409 // limiting, RBAC, and fault injection configs). One possible option 410 // would be to do something like ServiceConfig and ConfigSelector, but 411 // that might add unnecessary per-call overhead. Need to consider other 412 // approaches here. 413 struct grpc_server_config_fetcher { 414 public: 415 class WatcherInterface { 416 public: 417 virtual ~WatcherInterface() = default; 418 // Ownership of \a args is transferred. 419 virtual void UpdateConfig(grpc_channel_args* args) = 0; 420 }; 421 422 virtual ~grpc_server_config_fetcher() = default; 423 424 // Ownership of \a args is transferred. 425 virtual void StartWatch(std::string listening_address, 426 grpc_channel_args* args, 427 std::unique_ptr<WatcherInterface> watcher) = 0; 428 virtual void CancelWatch(WatcherInterface* watcher) = 0; 429 virtual grpc_pollset_set* interested_parties() = 0; 430 }; 431 432 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */ 433