// // Copyright 2015-2016 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include "src/core/lib/surface/server.h" #include #include #include #include #include #include #include #include #include #include #include "absl/memory/memory.h" #include "absl/types/optional.h" #include #include #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/init.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" namespace grpc_core { TraceFlag grpc_server_channel_trace(false, "server_channel"); // // Server::RequestedCall // struct Server::RequestedCall { enum class Type { BATCH_CALL, REGISTERED_CALL }; RequestedCall(void* tag_arg, grpc_completion_queue* call_cq, grpc_call** call_arg, grpc_metadata_array* initial_md, grpc_call_details* details) : type(Type::BATCH_CALL), tag(tag_arg), cq_bound_to_call(call_cq), call(call_arg), initial_metadata(initial_md) { details->reserved = nullptr; data.batch.details = details; } RequestedCall(void* tag_arg, grpc_completion_queue* call_cq, grpc_call** call_arg, grpc_metadata_array* initial_md, RegisteredMethod* rm, gpr_timespec* deadline, grpc_byte_buffer** optional_payload) : type(Type::REGISTERED_CALL), tag(tag_arg), cq_bound_to_call(call_cq), call(call_arg), initial_metadata(initial_md) { data.registered.method = rm; data.registered.deadline = deadline; data.registered.optional_payload = optional_payload; } MultiProducerSingleConsumerQueue::Node mpscq_node; const Type type; void* const tag; grpc_completion_queue* const cq_bound_to_call; grpc_call** const call; grpc_cq_completion completion; grpc_metadata_array* const initial_metadata; union { struct { grpc_call_details* details; } batch; struct { RegisteredMethod* method; gpr_timespec* deadline; grpc_byte_buffer** optional_payload; } registered; } data; }; // // Server::RegisteredMethod // struct Server::RegisteredMethod { RegisteredMethod( const char* method_arg, const char* host_arg, grpc_server_register_method_payload_handling payload_handling_arg, uint32_t flags_arg) : method(method_arg == nullptr ? "" : method_arg), host(host_arg == nullptr ? "" : host_arg), payload_handling(payload_handling_arg), flags(flags_arg) {} ~RegisteredMethod() = default; const std::string method; const std::string host; const grpc_server_register_method_payload_handling payload_handling; const uint32_t flags; // One request matcher per method. std::unique_ptr matcher; }; // // Server::RequestMatcherInterface // // RPCs that come in from the transport must be matched against RPC requests // from the application. An incoming request from the application can be matched // to an RPC that has already arrived or can be queued up for later use. // Likewise, an RPC coming in from the transport can either be matched to a // request that already arrived from the application or can be queued up for // later use (marked pending). If there is a match, the request's tag is posted // on the request's notification CQ. // // RequestMatcherInterface is the base class to provide this functionality. class Server::RequestMatcherInterface { public: virtual ~RequestMatcherInterface() {} // Unref the calls associated with any incoming RPCs in the pending queue (not // yet matched to an application-requested RPC). virtual void ZombifyPending() = 0; // Mark all application-requested RPCs failed if they have not been matched to // an incoming RPC. The error parameter indicates why the RPCs are being // failed (always server shutdown in all current implementations). virtual void KillRequests(grpc_error* error) = 0; // How many request queues are supported by this matcher. This is an abstract // concept that essentially maps to gRPC completion queues. virtual size_t request_queue_count() const = 0; // This function is invoked when the application requests a new RPC whose // information is in the call parameter. The request_queue_index marks the // queue onto which to place this RPC, and is typically associated with a gRPC // CQ. If there are pending RPCs waiting to be matched, publish one (match it // and notify the CQ). virtual void RequestCallWithPossiblePublish(size_t request_queue_index, RequestedCall* call) = 0; // This function is invoked on an incoming RPC, represented by the calld // object. The RequestMatcher will try to match it against an // application-requested RPC if possible or will place it in the pending queue // otherwise. To enable some measure of fairness between server CQs, the match // is done starting at the start_request_queue_index parameter in a cyclic // order rather than always starting at 0. virtual void MatchOrQueue(size_t start_request_queue_index, CallData* calld) = 0; // Returns the server associated with this request matcher virtual Server* server() const = 0; }; // The RealRequestMatcher is an implementation of RequestMatcherInterface that // actually uses all the features of RequestMatcherInterface: expecting the // application to explicitly request RPCs and then matching those to incoming // RPCs, along with a slow path by which incoming RPCs are put on a locked // pending list if they aren't able to be matched to an application request. class Server::RealRequestMatcher : public RequestMatcherInterface { public: explicit RealRequestMatcher(Server* server) : server_(server), requests_per_cq_(server->cqs_.size()) {} ~RealRequestMatcher() override { for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { GPR_ASSERT(queue.Pop() == nullptr); } } void ZombifyPending() override { while (!pending_.empty()) { CallData* calld = pending_.front(); calld->SetState(CallData::CallState::ZOMBIED); calld->KillZombie(); pending_.pop(); } } void KillRequests(grpc_error* error) override { for (size_t i = 0; i < requests_per_cq_.size(); i++) { RequestedCall* rc; while ((rc = reinterpret_cast( requests_per_cq_[i].Pop())) != nullptr) { server_->FailCall(i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); } size_t request_queue_count() const override { return requests_per_cq_.size(); } void RequestCallWithPossiblePublish(size_t request_queue_index, RequestedCall* call) override { if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { /* this was the first queued request: we need to lock and start matching calls */ struct PendingCall { RequestedCall* rc = nullptr; CallData* calld; }; auto pop_next_pending = [this, request_queue_index] { PendingCall pending_call; { MutexLock lock(&server_->mu_call_); if (!pending_.empty()) { pending_call.rc = reinterpret_cast( requests_per_cq_[request_queue_index].Pop()); if (pending_call.rc != nullptr) { pending_call.calld = pending_.front(); pending_.pop(); } } } return pending_call; }; while (true) { PendingCall next_pending = pop_next_pending(); if (next_pending.rc == nullptr) break; if (!next_pending.calld->MaybeActivate()) { // Zombied Call next_pending.calld->KillZombie(); } else { next_pending.calld->Publish(request_queue_index, next_pending.rc); } } } } void MatchOrQueue(size_t start_request_queue_index, CallData* calld) override { for (size_t i = 0; i < requests_per_cq_.size(); i++) { size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size(); RequestedCall* rc = reinterpret_cast(requests_per_cq_[cq_idx].TryPop()); if (rc != nullptr) { GRPC_STATS_INC_SERVER_CQS_CHECKED(i); calld->SetState(CallData::CallState::ACTIVATED); calld->Publish(cq_idx, rc); return; } } // No cq to take the request found; queue it on the slow list. GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(); // We need to ensure that all the queues are empty. We do this under // the server mu_call_ lock to ensure that if something is added to // an empty request queue, it will block until the call is actually // added to the pending list. RequestedCall* rc = nullptr; size_t cq_idx = 0; size_t loop_count; { MutexLock lock(&server_->mu_call_); for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { cq_idx = (start_request_queue_index + loop_count) % requests_per_cq_.size(); rc = reinterpret_cast(requests_per_cq_[cq_idx].Pop()); if (rc != nullptr) { break; } } if (rc == nullptr) { calld->SetState(CallData::CallState::PENDING); pending_.push(calld); return; } } GRPC_STATS_INC_SERVER_CQS_CHECKED(loop_count + requests_per_cq_.size()); calld->SetState(CallData::CallState::ACTIVATED); calld->Publish(cq_idx, rc); } Server* server() const override { return server_; } private: Server* const server_; std::queue pending_; std::vector requests_per_cq_; }; // AllocatingRequestMatchers don't allow the application to request an RPC in // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue // will call out to an allocation function passed in at the construction of the // object. These request matchers are designed for the C++ callback API, so they // only support 1 completion queue (passed in at the constructor). class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface { public: AllocatingRequestMatcherBase(Server* server, grpc_completion_queue* cq) : server_(server), cq_(cq) { size_t idx; for (idx = 0; idx < server->cqs_.size(); idx++) { if (server->cqs_[idx] == cq) { break; } } GPR_ASSERT(idx < server->cqs_.size()); cq_idx_ = idx; } void ZombifyPending() override {} void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); } size_t request_queue_count() const override { return 0; } void RequestCallWithPossiblePublish(size_t /*request_queue_index*/, RequestedCall* /*call*/) final { GPR_ASSERT(false); } Server* server() const override { return server_; } // Supply the completion queue related to this request matcher grpc_completion_queue* cq() const { return cq_; } // Supply the completion queue's index relative to the server. size_t cq_idx() const { return cq_idx_; } private: Server* const server_; grpc_completion_queue* const cq_; size_t cq_idx_; }; // An allocating request matcher for non-registered methods (used for generic // API and unimplemented RPCs). class Server::AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase { public: AllocatingRequestMatcherBatch(Server* server, grpc_completion_queue* cq, std::function allocator) : AllocatingRequestMatcherBase(server, cq), allocator_(std::move(allocator)) {} void MatchOrQueue(size_t /*start_request_queue_index*/, CallData* calld) override { BatchCallAllocation call_info = allocator_(); GPR_ASSERT(server()->ValidateServerRequest( cq(), static_cast(call_info.tag), nullptr, nullptr) == GRPC_CALL_OK); RequestedCall* rc = new RequestedCall( static_cast(call_info.tag), cq(), call_info.call, call_info.initial_metadata, call_info.details); calld->SetState(CallData::CallState::ACTIVATED); calld->Publish(cq_idx(), rc); } private: std::function allocator_; }; // An allocating request matcher for registered methods. class Server::AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase { public: AllocatingRequestMatcherRegistered( Server* server, grpc_completion_queue* cq, RegisteredMethod* rm, std::function allocator) : AllocatingRequestMatcherBase(server, cq), registered_method_(rm), allocator_(std::move(allocator)) {} void MatchOrQueue(size_t /*start_request_queue_index*/, CallData* calld) override { RegisteredCallAllocation call_info = allocator_(); GPR_ASSERT( server()->ValidateServerRequest(cq(), static_cast(call_info.tag), call_info.optional_payload, registered_method_) == GRPC_CALL_OK); RequestedCall* rc = new RequestedCall( static_cast(call_info.tag), cq(), call_info.call, call_info.initial_metadata, registered_method_, call_info.deadline, call_info.optional_payload); calld->SetState(CallData::CallState::ACTIVATED); calld->Publish(cq_idx(), rc); } private: RegisteredMethod* const registered_method_; std::function allocator_; }; // // ChannelBroadcaster // namespace { class ChannelBroadcaster { public: // This can have an empty constructor and destructor since we want to control // when the actual setup and shutdown broadcast take place. // Copies over the channels from the locked server. void FillChannelsLocked(std::vector channels) { GPR_DEBUG_ASSERT(channels_.empty()); channels_ = std::move(channels); } // Broadcasts a shutdown on each channel. void BroadcastShutdown(bool send_goaway, grpc_error* force_disconnect) { for (grpc_channel* channel : channels_) { SendShutdown(channel, send_goaway, GRPC_ERROR_REF(force_disconnect)); GRPC_CHANNEL_INTERNAL_UNREF(channel, "broadcast"); } channels_.clear(); // just for safety against double broadcast GRPC_ERROR_UNREF(force_disconnect); } private: struct ShutdownCleanupArgs { grpc_closure closure; grpc_slice slice; }; static void ShutdownCleanup(void* arg, grpc_error* /*error*/) { ShutdownCleanupArgs* a = static_cast(arg); grpc_slice_unref_internal(a->slice); delete a; } static void SendShutdown(grpc_channel* channel, bool send_goaway, grpc_error* send_disconnect) { ShutdownCleanupArgs* sc = new ShutdownCleanupArgs; GRPC_CLOSURE_INIT(&sc->closure, ShutdownCleanup, sc, grpc_schedule_on_exec_ctx); grpc_transport_op* op = grpc_make_transport_op(&sc->closure); grpc_channel_element* elem; op->goaway_error = send_goaway ? grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) : GRPC_ERROR_NONE; op->set_accept_stream = true; sc->slice = grpc_slice_from_copied_string("Server shutdown"); op->disconnect_with_error = send_disconnect; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem->filter->start_transport_op(elem, op); } std::vector channels_; }; } // namespace // // Server // const grpc_channel_filter Server::kServerTopFilter = { Server::CallData::StartTransportStreamOpBatch, grpc_channel_next_op, sizeof(Server::CallData), Server::CallData::InitCallElement, grpc_call_stack_ignore_set_pollset_or_pollset_set, Server::CallData::DestroyCallElement, sizeof(Server::ChannelData), Server::ChannelData::InitChannelElement, Server::ChannelData::DestroyChannelElement, grpc_channel_next_get_info, "server", }; namespace { grpc_resource_user* CreateDefaultResourceUser(const grpc_channel_args* args) { if (args != nullptr) { grpc_resource_quota* resource_quota = grpc_resource_quota_from_channel_args(args, false /* create */); if (resource_quota != nullptr) { return grpc_resource_user_create(resource_quota, "default"); } } return nullptr; } RefCountedPtr CreateChannelzNode( const grpc_channel_args* args) { RefCountedPtr channelz_node; if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT)) { size_t channel_tracer_max_memory = grpc_channel_args_find_integer( args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE, {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}); channelz_node = MakeRefCounted(channel_tracer_max_memory); channelz_node->AddTraceEvent( channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Server created")); } return channelz_node; } } // namespace Server::Server(const grpc_channel_args* args) : channel_args_(grpc_channel_args_copy(args)), default_resource_user_(CreateDefaultResourceUser(args)), channelz_node_(CreateChannelzNode(args)) {} Server::~Server() { grpc_channel_args_destroy(channel_args_); // Remove the cq pollsets from the config_fetcher. if (started_ && config_fetcher_ != nullptr && config_fetcher_->interested_parties() != nullptr) { for (grpc_pollset* pollset : pollsets_) { grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(), pollset); } } for (size_t i = 0; i < cqs_.size(); i++) { GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server"); } } void Server::AddListener(OrphanablePtr listener) { channelz::ListenSocketNode* listen_socket_node = listener->channelz_listen_socket_node(); if (listen_socket_node != nullptr && channelz_node_ != nullptr) { channelz_node_->AddChildListenSocket(listen_socket_node->Ref()); } listeners_.emplace_back(std::move(listener)); } void Server::Start() { started_ = true; for (grpc_completion_queue* cq : cqs_) { if (grpc_cq_can_listen(cq)) { pollsets_.push_back(grpc_cq_pollset(cq)); } } if (unregistered_request_matcher_ == nullptr) { unregistered_request_matcher_ = absl::make_unique(this); } for (std::unique_ptr& rm : registered_methods_) { if (rm->matcher == nullptr) { rm->matcher = absl::make_unique(this); } } { MutexLock lock(&mu_global_); starting_ = true; } // Register the interested parties from the config fetcher to the cq pollsets // before starting listeners so that config fetcher is being polled when the // listeners start watch the fetcher. if (config_fetcher_ != nullptr && config_fetcher_->interested_parties() != nullptr) { for (grpc_pollset* pollset : pollsets_) { grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(), pollset); } } for (auto& listener : listeners_) { listener.listener->Start(this, &pollsets_); } MutexLock lock(&mu_global_); starting_ = false; starting_cv_.Signal(); } grpc_error* Server::SetupTransport( grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, const RefCountedPtr& socket_node, grpc_resource_user* resource_user) { // Create channel. grpc_error* error = GRPC_ERROR_NONE; grpc_channel* channel = grpc_channel_create( nullptr, args, GRPC_SERVER_CHANNEL, transport, resource_user, &error); if (channel == nullptr) { return error; } ChannelData* chand = static_cast( grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0) ->channel_data); // Set up CQs. size_t cq_idx; for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) { if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break; } if (cq_idx == cqs_.size()) { // Completion queue not found. Pick a random one to publish new calls to. cq_idx = static_cast(rand()) % cqs_.size(); } // Set up channelz node. intptr_t channelz_socket_uuid = 0; if (socket_node != nullptr) { channelz_socket_uuid = socket_node->uuid(); channelz_node_->AddChildSocket(socket_node); } // Initialize chand. chand->InitTransport(Ref(), channel, cq_idx, transport, channelz_socket_uuid); return GRPC_ERROR_NONE; } bool Server::HasOpenConnections() { MutexLock lock(&mu_global_); return !channels_.empty(); } void Server::SetRegisteredMethodAllocator( grpc_completion_queue* cq, void* method_tag, std::function allocator) { RegisteredMethod* rm = static_cast(method_tag); rm->matcher = absl::make_unique( this, cq, rm, std::move(allocator)); } void Server::SetBatchMethodAllocator( grpc_completion_queue* cq, std::function allocator) { GPR_DEBUG_ASSERT(unregistered_request_matcher_ == nullptr); unregistered_request_matcher_ = absl::make_unique(this, cq, std::move(allocator)); } void Server::RegisterCompletionQueue(grpc_completion_queue* cq) { for (grpc_completion_queue* queue : cqs_) { if (queue == cq) return; } GRPC_CQ_INTERNAL_REF(cq, "server"); cqs_.push_back(cq); } namespace { bool streq(const std::string& a, const char* b) { return (a.empty() && b == nullptr) || ((b != nullptr) && !strcmp(a.c_str(), b)); } } // namespace Server::RegisteredMethod* Server::RegisterMethod( const char* method, const char* host, grpc_server_register_method_payload_handling payload_handling, uint32_t flags) { if (!method) { gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); return nullptr; } for (std::unique_ptr& m : registered_methods_) { if (streq(m->method, method) && streq(m->host, host)) { gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, host ? host : "*"); return nullptr; } } if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) { gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", flags); return nullptr; } registered_methods_.emplace_back(absl::make_unique( method, host, payload_handling, flags)); return registered_methods_.back().get(); } void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) { delete static_cast(req); } void Server::FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error) { *rc->call = nullptr; rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); grpc_cq_end_op(cqs_[cq_idx], rc->tag, error, DoneRequestEvent, rc, &rc->completion); } // Before calling MaybeFinishShutdown(), we must hold mu_global_ and not // hold mu_call_. void Server::MaybeFinishShutdown() { if (!shutdown_flag_.load(std::memory_order_acquire) || shutdown_published_) { return; } { MutexLock lock(&mu_call_); KillPendingWorkLocked( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); } if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) { if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_shutdown_message_time_), gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME); gpr_log(GPR_DEBUG, "Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR " listeners to be destroyed before shutting down server", channels_.size(), listeners_.size() - listeners_destroyed_, listeners_.size()); } return; } shutdown_published_ = true; for (auto& shutdown_tag : shutdown_tags_) { Ref().release(); grpc_cq_end_op(shutdown_tag.cq, shutdown_tag.tag, GRPC_ERROR_NONE, DoneShutdownEvent, this, &shutdown_tag.completion); } } void Server::KillPendingWorkLocked(grpc_error* error) { if (started_) { unregistered_request_matcher_->KillRequests(GRPC_ERROR_REF(error)); unregistered_request_matcher_->ZombifyPending(); for (std::unique_ptr& rm : registered_methods_) { rm->matcher->KillRequests(GRPC_ERROR_REF(error)); rm->matcher->ZombifyPending(); } } GRPC_ERROR_UNREF(error); } std::vector Server::GetChannelsLocked() const { std::vector channels; channels.reserve(channels_.size()); for (const ChannelData* chand : channels_) { channels.push_back(chand->channel()); GRPC_CHANNEL_INTERNAL_REF(chand->channel(), "broadcast"); } return channels; } void Server::ListenerDestroyDone(void* arg, grpc_error* /*error*/) { Server* server = static_cast(arg); MutexLock lock(&server->mu_global_); server->listeners_destroyed_++; server->MaybeFinishShutdown(); } namespace { void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) { delete storage; } } // namespace // - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made // via grpc_server_request_call() and grpc_server_request_registered_call() // will now be cancelled). See KillPendingWorkLocked(). // // - Shuts down the listeners (i.e., the server will no longer listen on the // port for new incoming channels). // // - Iterates through all channels on the server and sends shutdown msg (see // ChannelBroadcaster::BroadcastShutdown() for details) to the clients via // the transport layer. The transport layer then guarantees the following: // -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY). // -- If the server has outstanding calls that are in the process, the // connection is NOT closed until the server is done with all those calls. // -- Once there are no more calls in progress, the channel is closed. void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { ChannelBroadcaster broadcaster; { // Wait for startup to be finished. Locks mu_global. MutexLock lock(&mu_global_); WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; }); // Stay locked, and gather up some stuff to do. GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (shutdown_published_) { grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, DonePublishedShutdown, nullptr, new grpc_cq_completion); return; } shutdown_tags_.emplace_back(tag, cq); if (shutdown_flag_.load(std::memory_order_acquire)) { return; } last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME); broadcaster.FillChannelsLocked(GetChannelsLocked()); shutdown_flag_.store(true, std::memory_order_release); // Collect all unregistered then registered calls. { MutexLock lock(&mu_call_); KillPendingWorkLocked( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); } MaybeFinishShutdown(); } // Shutdown listeners. for (auto& listener : listeners_) { channelz::ListenSocketNode* channelz_listen_socket_node = listener.listener->channelz_listen_socket_node(); if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) { channelz_node_->RemoveChildListenSocket( channelz_listen_socket_node->uuid()); } GRPC_CLOSURE_INIT(&listener.destroy_done, ListenerDestroyDone, this, grpc_schedule_on_exec_ctx); listener.listener->SetOnDestroyDone(&listener.destroy_done); listener.listener.reset(); } broadcaster.BroadcastShutdown(/*send_goaway=*/true, GRPC_ERROR_NONE); } void Server::CancelAllCalls() { ChannelBroadcaster broadcaster; { MutexLock lock(&mu_global_); broadcaster.FillChannelsLocked(GetChannelsLocked()); } broadcaster.BroadcastShutdown( /*send_goaway=*/false, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); } void Server::Orphan() { { MutexLock lock(&mu_global_); GPR_ASSERT(shutdown_flag_.load(std::memory_order_acquire) || listeners_.empty()); GPR_ASSERT(listeners_destroyed_ == listeners_.size()); } if (default_resource_user_ != nullptr) { grpc_resource_quota_unref(grpc_resource_user_quota(default_resource_user_)); grpc_resource_user_shutdown(default_resource_user_); grpc_resource_user_unref(default_resource_user_); } Unref(); } grpc_call_error Server::ValidateServerRequest( grpc_completion_queue* cq_for_notification, void* tag, grpc_byte_buffer** optional_payload, RegisteredMethod* rm) { if ((rm == nullptr && optional_payload != nullptr) || ((rm != nullptr) && ((optional_payload == nullptr) != (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) { return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; } if (grpc_cq_begin_op(cq_for_notification, tag) == false) { return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; } return GRPC_CALL_OK; } grpc_call_error Server::ValidateServerRequestAndCq( size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag, grpc_byte_buffer** optional_payload, RegisteredMethod* rm) { size_t idx; for (idx = 0; idx < cqs_.size(); idx++) { if (cqs_[idx] == cq_for_notification) { break; } } if (idx == cqs_.size()) { return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } grpc_call_error error = ValidateServerRequest(cq_for_notification, tag, optional_payload, rm); if (error != GRPC_CALL_OK) { return error; } *cq_idx = idx; return GRPC_CALL_OK; } grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) { if (shutdown_flag_.load(std::memory_order_acquire)) { FailCall(cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } RequestMatcherInterface* rm; switch (rc->type) { case RequestedCall::Type::BATCH_CALL: rm = unregistered_request_matcher_.get(); break; case RequestedCall::Type::REGISTERED_CALL: rm = rc->data.registered.method->matcher.get(); break; } rm->RequestCallWithPossiblePublish(cq_idx, rc); return GRPC_CALL_OK; } grpc_call_error Server::RequestCall(grpc_call** call, grpc_call_details* details, grpc_metadata_array* request_metadata, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { size_t cq_idx; grpc_call_error error = ValidateServerRequestAndCq( &cq_idx, cq_for_notification, tag, nullptr, nullptr); if (error != GRPC_CALL_OK) { return error; } RequestedCall* rc = new RequestedCall(tag, cq_bound_to_call, call, request_metadata, details); return QueueRequestedCall(cq_idx, rc); } grpc_call_error Server::RequestRegisteredCall( RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline, grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag_new) { size_t cq_idx; grpc_call_error error = ValidateServerRequestAndCq( &cq_idx, cq_for_notification, tag_new, optional_payload, rm); if (error != GRPC_CALL_OK) { return error; } RequestedCall* rc = new RequestedCall(tag_new, cq_bound_to_call, call, request_metadata, rm, deadline, optional_payload); return QueueRequestedCall(cq_idx, rc); } // // Server::ChannelData::ConnectivityWatcher // class Server::ChannelData::ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { public: explicit ConnectivityWatcher(ChannelData* chand) : chand_(chand) { GRPC_CHANNEL_INTERNAL_REF(chand_->channel_, "connectivity"); } ~ConnectivityWatcher() override { GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel_, "connectivity"); } private: void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& /*status*/) override { // Don't do anything until we are being shut down. if (new_state != GRPC_CHANNEL_SHUTDOWN) return; // Shut down channel. MutexLock lock(&chand_->server_->mu_global_); chand_->Destroy(); } ChannelData* chand_; }; // // Server::ChannelData // Server::ChannelData::~ChannelData() { if (registered_methods_ != nullptr) { for (const ChannelRegisteredMethod& crm : *registered_methods_) { grpc_slice_unref_internal(crm.method); GPR_DEBUG_ASSERT(crm.method.refcount == &kNoopRefcount || crm.method.refcount == nullptr); if (crm.has_host) { grpc_slice_unref_internal(crm.host); GPR_DEBUG_ASSERT(crm.host.refcount == &kNoopRefcount || crm.host.refcount == nullptr); } } registered_methods_.reset(); } if (server_ != nullptr) { if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) { server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_); } { MutexLock lock(&server_->mu_global_); if (list_position_.has_value()) { server_->channels_.erase(*list_position_); list_position_.reset(); } server_->MaybeFinishShutdown(); } } } void Server::ChannelData::InitTransport(RefCountedPtr server, grpc_channel* channel, size_t cq_idx, grpc_transport* transport, intptr_t channelz_socket_uuid) { server_ = std::move(server); channel_ = channel; cq_idx_ = cq_idx; channelz_socket_uuid_ = channelz_socket_uuid; // Build a lookup table phrased in terms of mdstr's in this channels context // to quickly find registered methods. size_t num_registered_methods = server_->registered_methods_.size(); if (num_registered_methods > 0) { uint32_t max_probes = 0; size_t slots = 2 * num_registered_methods; registered_methods_ = absl::make_unique>(slots); for (std::unique_ptr& rm : server_->registered_methods_) { ExternallyManagedSlice host; ExternallyManagedSlice method(rm->method.c_str()); const bool has_host = !rm->host.empty(); if (has_host) { host = ExternallyManagedSlice(rm->host.c_str()); } uint32_t hash = GRPC_MDSTR_KV_HASH(has_host ? host.Hash() : 0, method.Hash()); uint32_t probes = 0; for (probes = 0; (*registered_methods_)[(hash + probes) % slots] .server_registered_method != nullptr; probes++) { } if (probes > max_probes) max_probes = probes; ChannelRegisteredMethod* crm = &(*registered_methods_)[(hash + probes) % slots]; crm->server_registered_method = rm.get(); crm->flags = rm->flags; crm->has_host = has_host; if (has_host) { crm->host = host; } crm->method = method; } GPR_ASSERT(slots <= UINT32_MAX); registered_method_max_probes_ = max_probes; } // Publish channel. { MutexLock lock(&server_->mu_global_); server_->channels_.push_front(this); list_position_ = server_->channels_.begin(); } // Start accept_stream transport op. grpc_transport_op* op = grpc_make_transport_op(nullptr); op->set_accept_stream = true; op->set_accept_stream_fn = AcceptStream; op->set_accept_stream_user_data = this; op->start_connectivity_watch = MakeOrphanable(this); if (server_->shutdown_flag_.load(std::memory_order_acquire)) { op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); } grpc_transport_perform_op(transport, op); } Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( const grpc_slice& host, const grpc_slice& path, bool is_idempotent) { if (registered_methods_ == nullptr) return nullptr; /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ uint32_t hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(host), grpc_slice_hash_internal(path)); for (size_t i = 0; i <= registered_method_max_probes_; i++) { ChannelRegisteredMethod* rm = &(*registered_methods_)[(hash + i) % registered_methods_->size()]; if (rm->server_registered_method == nullptr) break; if (!rm->has_host) continue; if (rm->host != host) continue; if (rm->method != path) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !is_idempotent) { continue; } return rm; } /* check for a wildcard method definition (no host set) */ hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(path)); for (size_t i = 0; i <= registered_method_max_probes_; i++) { ChannelRegisteredMethod* rm = &(*registered_methods_)[(hash + i) % registered_methods_->size()]; if (rm->server_registered_method == nullptr) break; if (rm->has_host) continue; if (rm->method != path) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !is_idempotent) { continue; } return rm; } return nullptr; } void Server::ChannelData::AcceptStream(void* arg, grpc_transport* /*transport*/, const void* transport_server_data) { auto* chand = static_cast(arg); /* create a call */ grpc_call_create_args args; args.channel = chand->channel_; args.server = chand->server_.get(); args.parent = nullptr; args.propagation_mask = 0; args.cq = nullptr; args.pollset_set_alternative = nullptr; args.server_transport_data = transport_server_data; args.add_initial_metadata = nullptr; args.add_initial_metadata_count = 0; args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call* call; grpc_error* error = grpc_call_create(&args, &call); grpc_call_element* elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); auto* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(error); calld->FailCallCreation(); return; } calld->Start(elem); } void Server::ChannelData::FinishDestroy(void* arg, grpc_error* /*error*/) { auto* chand = static_cast(arg); Server* server = chand->server_.get(); GRPC_CHANNEL_INTERNAL_UNREF(chand->channel_, "server"); server->Unref(); } void Server::ChannelData::Destroy() { if (!list_position_.has_value()) return; GPR_ASSERT(server_ != nullptr); server_->channels_.erase(*list_position_); list_position_.reset(); server_->Ref().release(); server_->MaybeFinishShutdown(); GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this, grpc_schedule_on_exec_ctx); if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) { gpr_log(GPR_INFO, "Disconnected client"); } grpc_transport_op* op = grpc_make_transport_op(&finish_destroy_channel_closure_); op->set_accept_stream = true; grpc_channel_next_op( grpc_channel_stack_element(grpc_channel_get_channel_stack(channel_), 0), op); } grpc_error* Server::ChannelData::InitChannelElement( grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(args->is_first); GPR_ASSERT(!args->is_last); new (elem->channel_data) ChannelData(); return GRPC_ERROR_NONE; } void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) { auto* chand = static_cast(elem->channel_data); chand->~ChannelData(); } // // Server::CallData // Server::CallData::CallData(grpc_call_element* elem, const grpc_call_element_args& args, RefCountedPtr server) : server_(std::move(server)), call_(grpc_call_from_top_element(elem)), call_combiner_(args.call_combiner) { GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, elem, grpc_schedule_on_exec_ctx); } Server::CallData::~CallData() { GPR_ASSERT(state_.Load(MemoryOrder::RELAXED) != CallState::PENDING); GRPC_ERROR_UNREF(recv_initial_metadata_error_); if (host_.has_value()) { grpc_slice_unref_internal(*host_); } if (path_.has_value()) { grpc_slice_unref_internal(*path_); } grpc_metadata_array_destroy(&initial_metadata_); grpc_byte_buffer_destroy(payload_); } void Server::CallData::SetState(CallState state) { state_.Store(state, MemoryOrder::RELAXED); } bool Server::CallData::MaybeActivate() { CallState expected = CallState::PENDING; return state_.CompareExchangeStrong(&expected, CallState::ACTIVATED, MemoryOrder::ACQ_REL, MemoryOrder::RELAXED); } void Server::CallData::FailCallCreation() { CallState expected_not_started = CallState::NOT_STARTED; CallState expected_pending = CallState::PENDING; if (state_.CompareExchangeStrong(&expected_not_started, CallState::ZOMBIED, MemoryOrder::ACQ_REL, MemoryOrder::ACQUIRE)) { KillZombie(); } else if (state_.CompareExchangeStrong(&expected_pending, CallState::ZOMBIED, MemoryOrder::ACQ_REL, MemoryOrder::RELAXED)) { // Zombied call will be destroyed when it's removed from the pending // queue... later. } } void Server::CallData::Start(grpc_call_element* elem) { grpc_op op; op.op = GRPC_OP_RECV_INITIAL_METADATA; op.flags = 0; op.reserved = nullptr; op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_; GRPC_CLOSURE_INIT(&recv_initial_metadata_batch_complete_, RecvInitialMetadataBatchComplete, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(call_, &op, 1, &recv_initial_metadata_batch_complete_); } void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) { grpc_call_set_completion_queue(call_, rc->cq_bound_to_call); *rc->call = call_; cq_new_ = server_->cqs_[cq_idx]; GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, initial_metadata_); switch (rc->type) { case RequestedCall::Type::BATCH_CALL: GPR_ASSERT(host_.has_value()); GPR_ASSERT(path_.has_value()); rc->data.batch.details->host = grpc_slice_ref_internal(*host_); rc->data.batch.details->method = grpc_slice_ref_internal(*path_); rc->data.batch.details->deadline = grpc_millis_to_timespec(deadline_, GPR_CLOCK_MONOTONIC); rc->data.batch.details->flags = recv_initial_metadata_flags_; break; case RequestedCall::Type::REGISTERED_CALL: *rc->data.registered.deadline = grpc_millis_to_timespec(deadline_, GPR_CLOCK_MONOTONIC); if (rc->data.registered.optional_payload != nullptr) { *rc->data.registered.optional_payload = payload_; payload_ = nullptr; } break; default: GPR_UNREACHABLE_CODE(return ); } grpc_cq_end_op(cq_new_, rc->tag, GRPC_ERROR_NONE, Server::DoneRequestEvent, rc, &rc->completion, true); } void Server::CallData::PublishNewRpc(void* arg, grpc_error* error) { grpc_call_element* call_elem = static_cast(arg); auto* calld = static_cast(call_elem->call_data); auto* chand = static_cast(call_elem->channel_data); RequestMatcherInterface* rm = calld->matcher_; Server* server = rm->server(); if (error != GRPC_ERROR_NONE || server->shutdown_flag_.load(std::memory_order_acquire)) { calld->state_.Store(CallState::ZOMBIED, MemoryOrder::RELAXED); calld->KillZombie(); return; } rm->MatchOrQueue(chand->cq_idx(), calld); } namespace { void KillZombieClosure(void* call, grpc_error* /*error*/) { grpc_call_unref(static_cast(call)); } } // namespace void Server::CallData::KillZombie() { GRPC_CLOSURE_INIT(&kill_zombie_closure_, KillZombieClosure, call_, grpc_schedule_on_exec_ctx); ExecCtx::Run(DEBUG_LOCATION, &kill_zombie_closure_, GRPC_ERROR_NONE); } void Server::CallData::StartNewRpc(grpc_call_element* elem) { auto* chand = static_cast(elem->channel_data); if (server_->shutdown_flag_.load(std::memory_order_acquire)) { state_.Store(CallState::ZOMBIED, MemoryOrder::RELAXED); KillZombie(); return; } // Find request matcher. matcher_ = server_->unregistered_request_matcher_.get(); grpc_server_register_method_payload_handling payload_handling = GRPC_SRM_PAYLOAD_NONE; if (path_.has_value() && host_.has_value()) { ChannelRegisteredMethod* rm = chand->GetRegisteredMethod(*host_, *path_, (recv_initial_metadata_flags_ & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)); if (rm != nullptr) { matcher_ = rm->server_registered_method->matcher.get(); payload_handling = rm->server_registered_method->payload_handling; } } // Start recv_message op if needed. switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: PublishNewRpc(elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; op.op = GRPC_OP_RECV_MESSAGE; op.flags = 0; op.reserved = nullptr; op.data.recv_message.recv_message = &payload_; GRPC_CLOSURE_INIT(&publish_, PublishNewRpc, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(call_, &op, 1, &publish_); break; } } } void Server::CallData::RecvInitialMetadataBatchComplete(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); auto* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { calld->FailCallCreation(); return; } calld->StartNewRpc(elem); } void Server::CallData::StartTransportStreamOpBatchImpl( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { if (batch->recv_initial_metadata) { GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); recv_initial_metadata_ = batch->payload->recv_initial_metadata.recv_initial_metadata; original_recv_initial_metadata_ready_ = batch->payload->recv_initial_metadata.recv_initial_metadata_ready; batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &recv_initial_metadata_ready_; batch->payload->recv_initial_metadata.recv_flags = &recv_initial_metadata_flags_; } if (batch->recv_trailing_metadata) { original_recv_trailing_metadata_ready_ = batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = &recv_trailing_metadata_ready_; } grpc_call_next_op(elem, batch); } void Server::CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); grpc_millis op_deadline; if (error == GRPC_ERROR_NONE) { GPR_DEBUG_ASSERT(calld->recv_initial_metadata_->idx.named.path != nullptr); GPR_DEBUG_ASSERT(calld->recv_initial_metadata_->idx.named.authority != nullptr); calld->path_.emplace(grpc_slice_ref_internal( GRPC_MDVALUE(calld->recv_initial_metadata_->idx.named.path->md))); calld->host_.emplace(grpc_slice_ref_internal( GRPC_MDVALUE(calld->recv_initial_metadata_->idx.named.authority->md))); grpc_metadata_batch_remove(calld->recv_initial_metadata_, GRPC_BATCH_PATH); grpc_metadata_batch_remove(calld->recv_initial_metadata_, GRPC_BATCH_AUTHORITY); } else { GRPC_ERROR_REF(error); } op_deadline = calld->recv_initial_metadata_->deadline; if (op_deadline != GRPC_MILLIS_INF_FUTURE) { calld->deadline_ = op_deadline; } if (calld->host_.has_value() && calld->path_.has_value()) { /* do nothing */ } else { /* Pass the error reference to calld->recv_initial_metadata_error */ grpc_error* src_error = error; error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Missing :authority or :path", &src_error, 1); GRPC_ERROR_UNREF(src_error); calld->recv_initial_metadata_error_ = GRPC_ERROR_REF(error); } grpc_closure* closure = calld->original_recv_initial_metadata_ready_; calld->original_recv_initial_metadata_ready_ = nullptr; if (calld->seen_recv_trailing_metadata_ready_) { GRPC_CALL_COMBINER_START(calld->call_combiner_, &calld->recv_trailing_metadata_ready_, calld->recv_trailing_metadata_error_, "continue server recv_trailing_metadata_ready"); } Closure::Run(DEBUG_LOCATION, closure, error); } void Server::CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); if (calld->original_recv_initial_metadata_ready_ != nullptr) { calld->recv_trailing_metadata_error_ = GRPC_ERROR_REF(error); calld->seen_recv_trailing_metadata_ready_ = true; GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_, RecvTrailingMetadataReady, elem, grpc_schedule_on_exec_ctx); GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "deferring server recv_trailing_metadata_ready " "until after recv_initial_metadata_ready"); return; } error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->recv_initial_metadata_error_)); Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, error); } grpc_error* Server::CallData::InitCallElement( grpc_call_element* elem, const grpc_call_element_args* args) { auto* chand = static_cast(elem->channel_data); new (elem->call_data) Server::CallData(elem, *args, chand->server()); return GRPC_ERROR_NONE; } void Server::CallData::DestroyCallElement( grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, grpc_closure* /*ignored*/) { auto* calld = static_cast(elem->call_data); calld->~CallData(); } void Server::CallData::StartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { auto* calld = static_cast(elem->call_data); calld->StartTransportStreamOpBatchImpl(elem, batch); } } // namespace grpc_core // // C-core API // grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server* c_server = new grpc_server; c_server->core_server = grpc_core::MakeOrphanable(args); return c_server; } void grpc_server_register_completion_queue(grpc_server* server, grpc_completion_queue* cq, void* reserved) { GRPC_API_TRACE( "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, (server, cq, reserved)); GPR_ASSERT(!reserved); auto cq_type = grpc_get_cq_completion_type(cq); if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) { gpr_log(GPR_INFO, "Completion queue of type %d is being registered as a " "server-completion-queue", static_cast(cq_type)); /* Ideally we should log an error and abort but ruby-wrapped-language API calls grpc_completion_queue_pluck() on server completion queues */ } server->core_server->RegisterCompletionQueue(cq); } void* grpc_server_register_method( grpc_server* server, const char* method, const char* host, grpc_server_register_method_payload_handling payload_handling, uint32_t flags) { GRPC_API_TRACE( "grpc_server_register_method(server=%p, method=%s, host=%s, " "flags=0x%08x)", 4, (server, method, host, flags)); return server->core_server->RegisterMethod(method, host, payload_handling, flags); } void grpc_server_start(grpc_server* server) { grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); server->core_server->Start(); } void grpc_server_shutdown_and_notify(grpc_server* server, grpc_completion_queue* cq, void* tag) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); server->core_server->ShutdownAndNotify(cq, tag); } void grpc_server_cancel_all_calls(grpc_server* server) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server)); server->core_server->CancelAllCalls(); } void grpc_server_destroy(grpc_server* server) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server)); delete server; } grpc_call_error grpc_server_request_call( grpc_server* server, grpc_call** call, grpc_call_details* details, grpc_metadata_array* request_metadata, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, request_metadata, cq_bound_to_call, cq_for_notification, tag)); return server->core_server->RequestCall(call, details, request_metadata, cq_bound_to_call, cq_for_notification, tag); } grpc_call_error grpc_server_request_registered_call( grpc_server* server, void* registered_method, grpc_call** call, gpr_timespec* deadline, grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag_new) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); auto* rm = static_cast(registered_method); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, registered_method=%p, call=%p, deadline=%p, " "request_metadata=%p, " "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, " "tag=%p)", 9, (server, registered_method, call, deadline, request_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag_new)); return server->core_server->RequestRegisteredCall( rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag_new); } void grpc_server_set_config_fetcher( grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_set_config_fetcher(server=%p, config_fetcher=%p)", 2, (server, server_config_fetcher)); server->core_server->set_config_fetcher( std::unique_ptr(server_config_fetcher)); } void grpc_server_config_fetcher_destroy( grpc_server_config_fetcher* server_config_fetcher) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_config_fetcher_destroy(config_fetcher=%p)", 1, (server_config_fetcher)); delete server_config_fetcher; }