1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 21 22 #include <grpc/support/port_platform.h> 23 #include <grpcpp/alarm.h> 24 #include <grpcpp/grpcpp.h> 25 #include <grpcpp/support/async_stream.h> 26 #include <grpcpp/support/interceptor.h> 27 #include <stdint.h> 28 29 #include <atomic> 30 #include <functional> 31 #include <memory> 32 #include <string> 33 #include <utility> 34 35 #include "absl/log/check.h" 36 #include "src/core/util/sync.h" 37 #include "src/core/util/thd.h" 38 #include "src/cpp/server/load_reporter/load_reporter.h" 39 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" 40 41 namespace grpc { 42 namespace load_reporter { 43 44 // Async load reporting service. It's mainly responsible for controlling the 45 // procedure of incoming requests. The real business logic is handed off to the 46 // LoadReporter. There should be at most one instance of this service on a 47 // server to avoid spreading the load data into multiple places. 48 class LoadReporterAsyncServiceImpl 49 : public grpc::lb::v1::LoadReporter::AsyncService { 50 public: 51 explicit LoadReporterAsyncServiceImpl( 52 std::unique_ptr<ServerCompletionQueue> cq); 53 ~LoadReporterAsyncServiceImpl() override; 54 55 // Starts the working thread. 56 void StartThread(); 57 58 // Not copyable nor movable. 59 LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete; 60 LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) = 61 delete; 62 63 private: 64 class ReportLoadHandler; 65 66 // A tag that can be called with a bool argument. It's tailored for 67 // ReportLoadHandler's use. Before being used, it should be constructed with a 68 // method of ReportLoadHandler and a shared pointer to the handler. The 69 // shared pointer will be moved to the invoked function and the function can 70 // only be invoked once. That makes ref counting of the handler easier, 71 // because the shared pointer is not bound to the function and can be gone 72 // once the invoked function returns (if not used any more). 73 class CallableTag { 74 public: 75 using HandlerFunction = 76 std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>; 77 CallableTag()78 CallableTag() {} 79 CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)80 CallableTag(HandlerFunction func, 81 std::shared_ptr<ReportLoadHandler> handler) 82 : handler_function_(std::move(func)), handler_(std::move(handler)) { 83 CHECK(handler_function_ != nullptr); 84 CHECK_NE(handler_, nullptr); 85 } 86 87 // Runs the tag. This should be called only once. The handler is no longer 88 // owned by this tag after this method is invoked. 89 void Run(bool ok); 90 91 // Releases and returns the shared pointer to the handler. ReleaseHandler()92 std::shared_ptr<ReportLoadHandler> ReleaseHandler() { 93 return std::move(handler_); 94 } 95 96 private: 97 HandlerFunction handler_function_ = nullptr; 98 std::shared_ptr<ReportLoadHandler> handler_; 99 }; 100 101 // Each handler takes care of one load reporting stream. It contains 102 // per-stream data and it will access the members of the parent class (i.e., 103 // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data). 104 class ReportLoadHandler { 105 public: 106 // Instantiates a ReportLoadHandler and requests the next load reporting 107 // call. The handler object will manage its own lifetime, so no action is 108 // needed from the caller any more regarding that object. 109 static void CreateAndStart(ServerCompletionQueue* cq, 110 LoadReporterAsyncServiceImpl* service, 111 LoadReporter* load_reporter); 112 113 // This ctor is public because we want to use std::make_shared<> in 114 // CreateAndStart(). This ctor shouldn't be used elsewhere. 115 ReportLoadHandler(ServerCompletionQueue* cq, 116 LoadReporterAsyncServiceImpl* service, 117 LoadReporter* load_reporter); 118 119 private: 120 // After the handler has a call request delivered, it starts reading the 121 // initial request. Also, a new handler is spawned so that we can keep 122 // servicing future calls. 123 void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok); 124 125 // The first Read() is expected to succeed, after which the handler starts 126 // sending load reports back to the balancer. The second Read() is 127 // expected to fail, which happens when the balancer half-closes the 128 // stream to signal that it's no longer interested in the load reports. For 129 // the latter case, the handler will then close the stream. 130 void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 131 132 // The report sending operations are sequential as: send report -> send 133 // done, schedule the next send -> waiting for the alarm to fire -> alarm 134 // fires, send report -> ... 135 void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 136 void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 137 138 // Called when Finish() is done. 139 void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 140 141 // Called when AsyncNotifyWhenDone() notifies us. 142 void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok); 143 144 void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason); 145 146 // The key fields of the stream. 147 std::string lb_id_; 148 std::string load_balanced_hostname_; 149 std::string load_key_; 150 uint64_t load_report_interval_ms_; 151 152 // The data for RPC communication with the load reportee. 153 ServerContext ctx_; 154 grpc::lb::v1::LoadReportRequest request_; 155 156 // The members passed down from LoadReporterAsyncServiceImpl. 157 ServerCompletionQueue* cq_; 158 LoadReporterAsyncServiceImpl* service_; 159 LoadReporter* load_reporter_; 160 ServerAsyncReaderWriter<grpc::lb::v1::LoadReportResponse, 161 grpc::lb::v1::LoadReportRequest> 162 stream_; 163 164 // The status of the RPC progress. 165 enum CallStatus { 166 WAITING_FOR_DELIVERY, 167 DELIVERED, 168 INITIAL_REQUEST_RECEIVED, 169 INITIAL_RESPONSE_SENT, 170 FINISH_CALLED 171 } call_status_; 172 bool shutdown_{false}; 173 bool done_notified_{false}; 174 bool is_cancelled_{false}; 175 CallableTag on_done_notified_; 176 CallableTag on_finish_done_; 177 CallableTag next_inbound_; 178 CallableTag next_outbound_; 179 std::unique_ptr<Alarm> next_report_alarm_; 180 }; 181 182 // Handles the incoming requests and drives the completion queue in a loop. 183 static void Work(void* arg); 184 185 // Schedules the next data fetching from Census and LB feedback sampling. 186 void ScheduleNextFetchAndSample(); 187 188 // Fetches data from Census and samples LB feedback. 189 void FetchAndSample(bool ok); 190 191 std::unique_ptr<ServerCompletionQueue> cq_; 192 // To synchronize the operations related to shutdown state of cq_, so that we 193 // don't enqueue new tags into cq_ after it is already shut down. 194 grpc_core::Mutex cq_shutdown_mu_; 195 std::atomic_bool shutdown_{false}; 196 std::unique_ptr<grpc_core::Thread> thread_; 197 std::unique_ptr<LoadReporter> load_reporter_; 198 std::unique_ptr<Alarm> next_fetch_and_sample_alarm_; 199 }; 200 201 } // namespace load_reporter 202 } // namespace grpc 203 204 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 205