• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
22 #include "src/core/lib/gpr/useful.h"
23 
24 #include "google/protobuf/duration.upb.h"
25 #include "google/protobuf/timestamp.upb.h"
26 
27 #include <grpc/support/alloc.h>
28 
29 namespace grpc_core {
30 
operator ==(const GrpcLbServer & other) const31 bool GrpcLbServer::operator==(const GrpcLbServer& other) const {
32   if (ip_size != other.ip_size) return false;
33   int r = memcmp(ip_addr, other.ip_addr, ip_size);
34   if (r != 0) return false;
35   if (port != other.port) return false;
36   r = strncmp(load_balance_token, other.load_balance_token,
37               sizeof(load_balance_token));
38   if (r != 0) return false;
39   return drop == other.drop;
40 }
41 
42 namespace {
43 
grpc_grpclb_request_encode(const grpc_lb_v1_LoadBalanceRequest * request,upb_arena * arena)44 grpc_slice grpc_grpclb_request_encode(
45     const grpc_lb_v1_LoadBalanceRequest* request, upb_arena* arena) {
46   size_t buf_length;
47   char* buf =
48       grpc_lb_v1_LoadBalanceRequest_serialize(request, arena, &buf_length);
49   return grpc_slice_from_copied_buffer(buf, buf_length);
50 }
51 
52 }  // namespace
53 
GrpcLbRequestCreate(const char * lb_service_name,upb_arena * arena)54 grpc_slice GrpcLbRequestCreate(const char* lb_service_name, upb_arena* arena) {
55   grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
56   grpc_lb_v1_InitialLoadBalanceRequest* initial_request =
57       grpc_lb_v1_LoadBalanceRequest_mutable_initial_request(req, arena);
58   size_t name_len =
59       GPR_MIN(strlen(lb_service_name), GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH);
60   grpc_lb_v1_InitialLoadBalanceRequest_set_name(
61       initial_request, upb_strview_make(lb_service_name, name_len));
62   return grpc_grpclb_request_encode(req, arena);
63 }
64 
65 namespace {
66 
google_protobuf_Timestamp_assign(google_protobuf_Timestamp * timestamp,const gpr_timespec & value)67 void google_protobuf_Timestamp_assign(google_protobuf_Timestamp* timestamp,
68                                       const gpr_timespec& value) {
69   google_protobuf_Timestamp_set_seconds(timestamp, value.tv_sec);
70   google_protobuf_Timestamp_set_nanos(timestamp, value.tv_nsec);
71 }
72 
73 }  // namespace
74 
GrpcLbLoadReportRequestCreate(int64_t num_calls_started,int64_t num_calls_finished,int64_t num_calls_finished_with_client_failed_to_send,int64_t num_calls_finished_known_received,const GrpcLbClientStats::DroppedCallCounts * drop_token_counts,upb_arena * arena)75 grpc_slice GrpcLbLoadReportRequestCreate(
76     int64_t num_calls_started, int64_t num_calls_finished,
77     int64_t num_calls_finished_with_client_failed_to_send,
78     int64_t num_calls_finished_known_received,
79     const GrpcLbClientStats::DroppedCallCounts* drop_token_counts,
80     upb_arena* arena) {
81   grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
82   grpc_lb_v1_ClientStats* req_stats =
83       grpc_lb_v1_LoadBalanceRequest_mutable_client_stats(req, arena);
84   google_protobuf_Timestamp_assign(
85       grpc_lb_v1_ClientStats_mutable_timestamp(req_stats, arena),
86       gpr_now(GPR_CLOCK_REALTIME));
87   grpc_lb_v1_ClientStats_set_num_calls_started(req_stats, num_calls_started);
88   grpc_lb_v1_ClientStats_set_num_calls_finished(req_stats, num_calls_finished);
89   grpc_lb_v1_ClientStats_set_num_calls_finished_with_client_failed_to_send(
90       req_stats, num_calls_finished_with_client_failed_to_send);
91   grpc_lb_v1_ClientStats_set_num_calls_finished_known_received(
92       req_stats, num_calls_finished_known_received);
93   if (drop_token_counts != nullptr) {
94     for (size_t i = 0; i < drop_token_counts->size(); ++i) {
95       const GrpcLbClientStats::DropTokenCount& cur = (*drop_token_counts)[i];
96       grpc_lb_v1_ClientStatsPerToken* cur_msg =
97           grpc_lb_v1_ClientStats_add_calls_finished_with_drop(req_stats, arena);
98       const size_t token_len = strlen(cur.token.get());
99       char* token = reinterpret_cast<char*>(upb_arena_malloc(arena, token_len));
100       memcpy(token, cur.token.get(), token_len);
101       grpc_lb_v1_ClientStatsPerToken_set_load_balance_token(
102           cur_msg, upb_strview_make(token, token_len));
103       grpc_lb_v1_ClientStatsPerToken_set_num_calls(cur_msg, cur.count);
104     }
105   }
106   return grpc_grpclb_request_encode(req, arena);
107 }
108 
109 namespace {
110 
ParseServerList(const grpc_lb_v1_LoadBalanceResponse & response,std::vector<GrpcLbServer> * server_list)111 bool ParseServerList(const grpc_lb_v1_LoadBalanceResponse& response,
112                      std::vector<GrpcLbServer>* server_list) {
113   // Determine the number of servers.
114   const grpc_lb_v1_ServerList* server_list_msg =
115       grpc_lb_v1_LoadBalanceResponse_server_list(&response);
116   if (server_list_msg == nullptr) return false;
117   size_t server_count = 0;
118   const grpc_lb_v1_Server* const* servers =
119       grpc_lb_v1_ServerList_servers(server_list_msg, &server_count);
120   // Populate servers.
121   if (server_count > 0) {
122     server_list->reserve(server_count);
123     for (size_t i = 0; i < server_count; ++i) {
124       GrpcLbServer& cur = *server_list->emplace(server_list->end());
125       upb_strview address = grpc_lb_v1_Server_ip_address(servers[i]);
126       if (address.size == 0) {
127         ;  // Nothing to do because cur->ip_address is an empty string.
128       } else if (address.size <= GRPC_GRPCLB_SERVER_IP_ADDRESS_MAX_SIZE) {
129         cur.ip_size = static_cast<int32_t>(address.size);
130         memcpy(cur.ip_addr, address.data, address.size);
131       }
132       cur.port = grpc_lb_v1_Server_port(servers[i]);
133       upb_strview token = grpc_lb_v1_Server_load_balance_token(servers[i]);
134       if (token.size == 0) {
135         ;  // Nothing to do because cur->load_balance_token is an empty string.
136       } else if (token.size <= GRPC_GRPCLB_SERVER_LOAD_BALANCE_TOKEN_MAX_SIZE) {
137         memcpy(cur.load_balance_token, token.data, token.size);
138       } else {
139         gpr_log(GPR_ERROR,
140                 "grpc_lb_v1_LoadBalanceResponse has too long token. len=%zu",
141                 token.size);
142       }
143       cur.drop = grpc_lb_v1_Server_drop(servers[i]);
144     }
145   }
146   return true;
147 }
148 
grpc_grpclb_duration_to_millis(const google_protobuf_Duration * duration_pb)149 grpc_millis grpc_grpclb_duration_to_millis(
150     const google_protobuf_Duration* duration_pb) {
151   return static_cast<grpc_millis>(
152       (google_protobuf_Duration_seconds(duration_pb) * GPR_MS_PER_SEC) +
153       (google_protobuf_Duration_nanos(duration_pb) / GPR_NS_PER_MS));
154 }
155 
156 }  // namespace
157 
GrpcLbResponseParse(const grpc_slice & serialized_response,upb_arena * arena,GrpcLbResponse * result)158 bool GrpcLbResponseParse(const grpc_slice& serialized_response,
159                          upb_arena* arena, GrpcLbResponse* result) {
160   grpc_lb_v1_LoadBalanceResponse* response =
161       grpc_lb_v1_LoadBalanceResponse_parse(
162           reinterpret_cast<const char*>(
163               GRPC_SLICE_START_PTR(serialized_response)),
164           GRPC_SLICE_LENGTH(serialized_response), arena);
165   // Handle serverlist responses.
166   if (ParseServerList(*response, &result->serverlist)) {
167     result->type = result->SERVERLIST;
168     return true;
169   }
170   // Handle initial responses.
171   auto* initial_response =
172       grpc_lb_v1_LoadBalanceResponse_initial_response(response);
173   if (initial_response != nullptr) {
174     result->type = result->INITIAL;
175     const google_protobuf_Duration* client_stats_report_interval =
176         grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
177             initial_response);
178     if (client_stats_report_interval != nullptr) {
179       result->client_stats_report_interval =
180           grpc_grpclb_duration_to_millis(client_stats_report_interval);
181     }
182     return true;
183   }
184   // Handle fallback.
185   if (grpc_lb_v1_LoadBalanceResponse_has_fallback_response(response)) {
186     result->type = result->FALLBACK;
187     return true;
188   }
189   // Unknown response type.
190   return false;
191 }
192 
193 }  // namespace grpc_core
194