• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
21 
22 #include <grpc/support/port_platform.h>
23 #include <grpcpp/alarm.h>
24 #include <grpcpp/grpcpp.h>
25 #include <grpcpp/support/async_stream.h>
26 #include <grpcpp/support/interceptor.h>
27 #include <stdint.h>
28 
29 #include <atomic>
30 #include <functional>
31 #include <memory>
32 #include <string>
33 #include <utility>
34 
35 #include "absl/log/check.h"
36 #include "src/core/util/sync.h"
37 #include "src/core/util/thd.h"
38 #include "src/cpp/server/load_reporter/load_reporter.h"
39 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
40 
41 namespace grpc {
42 namespace load_reporter {
43 
44 // Async load reporting service. It's mainly responsible for controlling the
45 // procedure of incoming requests. The real business logic is handed off to the
46 // LoadReporter. There should be at most one instance of this service on a
47 // server to avoid spreading the load data into multiple places.
48 class LoadReporterAsyncServiceImpl
49     : public grpc::lb::v1::LoadReporter::AsyncService {
50  public:
51   explicit LoadReporterAsyncServiceImpl(
52       std::unique_ptr<ServerCompletionQueue> cq);
53   ~LoadReporterAsyncServiceImpl() override;
54 
55   // Starts the working thread.
56   void StartThread();
57 
58   // Not copyable nor movable.
59   LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
60   LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
61       delete;
62 
63  private:
64   class ReportLoadHandler;
65 
66   // A tag that can be called with a bool argument. It's tailored for
67   // ReportLoadHandler's use. Before being used, it should be constructed with a
68   // method of ReportLoadHandler and a shared pointer to the handler. The
69   // shared pointer will be moved to the invoked function and the function can
70   // only be invoked once. That makes ref counting of the handler easier,
71   // because the shared pointer is not bound to the function and can be gone
72   // once the invoked function returns (if not used any more).
73   class CallableTag {
74    public:
75     using HandlerFunction =
76         std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
77 
CallableTag()78     CallableTag() {}
79 
CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)80     CallableTag(HandlerFunction func,
81                 std::shared_ptr<ReportLoadHandler> handler)
82         : handler_function_(std::move(func)), handler_(std::move(handler)) {
83       CHECK(handler_function_ != nullptr);
84       CHECK_NE(handler_, nullptr);
85     }
86 
87     // Runs the tag. This should be called only once. The handler is no longer
88     // owned by this tag after this method is invoked.
89     void Run(bool ok);
90 
91     // Releases and returns the shared pointer to the handler.
ReleaseHandler()92     std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
93       return std::move(handler_);
94     }
95 
96    private:
97     HandlerFunction handler_function_ = nullptr;
98     std::shared_ptr<ReportLoadHandler> handler_;
99   };
100 
101   // Each handler takes care of one load reporting stream. It contains
102   // per-stream data and it will access the members of the parent class (i.e.,
103   // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
104   class ReportLoadHandler {
105    public:
106     // Instantiates a ReportLoadHandler and requests the next load reporting
107     // call. The handler object will manage its own lifetime, so no action is
108     // needed from the caller any more regarding that object.
109     static void CreateAndStart(ServerCompletionQueue* cq,
110                                LoadReporterAsyncServiceImpl* service,
111                                LoadReporter* load_reporter);
112 
113     // This ctor is public because we want to use std::make_shared<> in
114     // CreateAndStart(). This ctor shouldn't be used elsewhere.
115     ReportLoadHandler(ServerCompletionQueue* cq,
116                       LoadReporterAsyncServiceImpl* service,
117                       LoadReporter* load_reporter);
118 
119    private:
120     // After the handler has a call request delivered, it starts reading the
121     // initial request. Also, a new handler is spawned so that we can keep
122     // servicing future calls.
123     void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
124 
125     // The first Read() is expected to succeed, after which the handler starts
126     // sending load reports back to the balancer. The second Read() is
127     // expected to fail, which happens when the balancer half-closes the
128     // stream to signal that it's no longer interested in the load reports. For
129     // the latter case, the handler will then close the stream.
130     void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
131 
132     // The report sending operations are sequential as: send report -> send
133     // done, schedule the next send -> waiting for the alarm to fire -> alarm
134     // fires, send report -> ...
135     void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
136     void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
137 
138     // Called when Finish() is done.
139     void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
140 
141     // Called when AsyncNotifyWhenDone() notifies us.
142     void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
143 
144     void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
145 
146     // The key fields of the stream.
147     std::string lb_id_;
148     std::string load_balanced_hostname_;
149     std::string load_key_;
150     uint64_t load_report_interval_ms_;
151 
152     // The data for RPC communication with the load reportee.
153     ServerContext ctx_;
154     grpc::lb::v1::LoadReportRequest request_;
155 
156     // The members passed down from LoadReporterAsyncServiceImpl.
157     ServerCompletionQueue* cq_;
158     LoadReporterAsyncServiceImpl* service_;
159     LoadReporter* load_reporter_;
160     ServerAsyncReaderWriter<grpc::lb::v1::LoadReportResponse,
161                             grpc::lb::v1::LoadReportRequest>
162         stream_;
163 
164     // The status of the RPC progress.
165     enum CallStatus {
166       WAITING_FOR_DELIVERY,
167       DELIVERED,
168       INITIAL_REQUEST_RECEIVED,
169       INITIAL_RESPONSE_SENT,
170       FINISH_CALLED
171     } call_status_;
172     bool shutdown_{false};
173     bool done_notified_{false};
174     bool is_cancelled_{false};
175     CallableTag on_done_notified_;
176     CallableTag on_finish_done_;
177     CallableTag next_inbound_;
178     CallableTag next_outbound_;
179     std::unique_ptr<Alarm> next_report_alarm_;
180   };
181 
182   // Handles the incoming requests and drives the completion queue in a loop.
183   static void Work(void* arg);
184 
185   // Schedules the next data fetching from Census and LB feedback sampling.
186   void ScheduleNextFetchAndSample();
187 
188   // Fetches data from Census and samples LB feedback.
189   void FetchAndSample(bool ok);
190 
191   std::unique_ptr<ServerCompletionQueue> cq_;
192   // To synchronize the operations related to shutdown state of cq_, so that we
193   // don't enqueue new tags into cq_ after it is already shut down.
194   grpc_core::Mutex cq_shutdown_mu_;
195   std::atomic_bool shutdown_{false};
196   std::unique_ptr<grpc_core::Thread> thread_;
197   std::unique_ptr<LoadReporter> load_reporter_;
198   std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
199 };
200 
201 }  // namespace load_reporter
202 }  // namespace grpc
203 
204 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
205