1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <grpc/support/log.h> 25 #include <grpcpp/alarm.h> 26 #include <grpcpp/grpcpp.h> 27 28 #include "src/core/lib/gprpp/sync.h" 29 #include "src/core/lib/gprpp/thd.h" 30 #include "src/cpp/server/load_reporter/load_reporter.h" 31 32 namespace grpc { 33 namespace load_reporter { 34 35 // Async load reporting service. It's mainly responsible for controlling the 36 // procedure of incoming requests. The real business logic is handed off to the 37 // LoadReporter. There should be at most one instance of this service on a 38 // server to avoid spreading the load data into multiple places. 39 class LoadReporterAsyncServiceImpl 40 : public grpc::lb::v1::LoadReporter::AsyncService { 41 public: 42 explicit LoadReporterAsyncServiceImpl( 43 std::unique_ptr<ServerCompletionQueue> cq); 44 ~LoadReporterAsyncServiceImpl() override; 45 46 // Starts the working thread. 47 void StartThread(); 48 49 // Not copyable nor movable. 50 LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete; 51 LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) = 52 delete; 53 54 private: 55 class ReportLoadHandler; 56 57 // A tag that can be called with a bool argument. It's tailored for 58 // ReportLoadHandler's use. Before being used, it should be constructed with a 59 // method of ReportLoadHandler and a shared pointer to the handler. The 60 // shared pointer will be moved to the invoked function and the function can 61 // only be invoked once. That makes ref counting of the handler easier, 62 // because the shared pointer is not bound to the function and can be gone 63 // once the invoked function returns (if not used any more). 64 class CallableTag { 65 public: 66 using HandlerFunction = 67 std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>; 68 CallableTag()69 CallableTag() {} 70 CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)71 CallableTag(HandlerFunction func, 72 std::shared_ptr<ReportLoadHandler> handler) 73 : handler_function_(std::move(func)), handler_(std::move(handler)) { 74 GPR_ASSERT(handler_function_ != nullptr); 75 GPR_ASSERT(handler_ != nullptr); 76 } 77 78 // Runs the tag. This should be called only once. The handler is no longer 79 // owned by this tag after this method is invoked. 80 void Run(bool ok); 81 82 // Releases and returns the shared pointer to the handler. ReleaseHandler()83 std::shared_ptr<ReportLoadHandler> ReleaseHandler() { 84 return std::move(handler_); 85 } 86 87 private: 88 HandlerFunction handler_function_ = nullptr; 89 std::shared_ptr<ReportLoadHandler> handler_; 90 }; 91 92 // Each handler takes care of one load reporting stream. It contains 93 // per-stream data and it will access the members of the parent class (i.e., 94 // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data). 95 class ReportLoadHandler { 96 public: 97 // Instantiates a ReportLoadHandler and requests the next load reporting 98 // call. The handler object will manage its own lifetime, so no action is 99 // needed from the caller any more regarding that object. 100 static void CreateAndStart(ServerCompletionQueue* cq, 101 LoadReporterAsyncServiceImpl* service, 102 LoadReporter* load_reporter); 103 104 // This ctor is public because we want to use std::make_shared<> in 105 // CreateAndStart(). This ctor shouldn't be used elsewhere. 106 ReportLoadHandler(ServerCompletionQueue* cq, 107 LoadReporterAsyncServiceImpl* service, 108 LoadReporter* load_reporter); 109 110 private: 111 // After the handler has a call request delivered, it starts reading the 112 // initial request. Also, a new handler is spawned so that we can keep 113 // servicing future calls. 114 void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok); 115 116 // The first Read() is expected to succeed, after which the handler starts 117 // sending load reports back to the balancer. The second Read() is 118 // expected to fail, which happens when the balancer half-closes the 119 // stream to signal that it's no longer interested in the load reports. For 120 // the latter case, the handler will then close the stream. 121 void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 122 123 // The report sending operations are sequential as: send report -> send 124 // done, schedule the next send -> waiting for the alarm to fire -> alarm 125 // fires, send report -> ... 126 void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 127 void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 128 129 // Called when Finish() is done. 130 void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 131 132 // Called when AsyncNotifyWhenDone() notifies us. 133 void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok); 134 135 void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason); 136 137 // The key fields of the stream. 138 std::string lb_id_; 139 std::string load_balanced_hostname_; 140 std::string load_key_; 141 uint64_t load_report_interval_ms_; 142 143 // The data for RPC communication with the load reportee. 144 ServerContext ctx_; 145 ::grpc::lb::v1::LoadReportRequest request_; 146 147 // The members passed down from LoadReporterAsyncServiceImpl. 148 ServerCompletionQueue* cq_; 149 LoadReporterAsyncServiceImpl* service_; 150 LoadReporter* load_reporter_; 151 ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse, 152 ::grpc::lb::v1::LoadReportRequest> 153 stream_; 154 155 // The status of the RPC progress. 156 enum CallStatus { 157 WAITING_FOR_DELIVERY, 158 DELIVERED, 159 INITIAL_REQUEST_RECEIVED, 160 INITIAL_RESPONSE_SENT, 161 FINISH_CALLED 162 } call_status_; 163 bool shutdown_{false}; 164 bool done_notified_{false}; 165 bool is_cancelled_{false}; 166 CallableTag on_done_notified_; 167 CallableTag on_finish_done_; 168 CallableTag next_inbound_; 169 CallableTag next_outbound_; 170 std::unique_ptr<Alarm> next_report_alarm_; 171 }; 172 173 // Handles the incoming requests and drives the completion queue in a loop. 174 static void Work(void* arg); 175 176 // Schedules the next data fetching from Census and LB feedback sampling. 177 void ScheduleNextFetchAndSample(); 178 179 // Fetches data from Census and samples LB feedback. 180 void FetchAndSample(bool ok); 181 182 std::unique_ptr<ServerCompletionQueue> cq_; 183 // To synchronize the operations related to shutdown state of cq_, so that we 184 // don't enqueue new tags into cq_ after it is already shut down. 185 grpc_core::Mutex cq_shutdown_mu_; 186 std::atomic_bool shutdown_{false}; 187 std::unique_ptr<::grpc_core::Thread> thread_; 188 std::unique_ptr<LoadReporter> load_reporter_; 189 std::unique_ptr<Alarm> next_fetch_and_sample_alarm_; 190 }; 191 192 } // namespace load_reporter 193 } // namespace grpc 194 195 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 196