1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "pb_decode.h"
22 #include "pb_encode.h"
23 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
24
25 #include <grpc/support/alloc.h>
26
27 /* invoked once for every Server in ServerList */
count_serverlist(pb_istream_t * stream,const pb_field_t * field,void ** arg)28 static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
29 void** arg) {
30 grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(*arg);
31 grpc_grpclb_server server;
32 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
33 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
34 return false;
35 }
36 ++sl->num_servers;
37 return true;
38 }
39
40 typedef struct decode_serverlist_arg {
41 /* The decoding callback is invoked once per server in serverlist. Remember
42 * which index of the serverlist are we currently decoding */
43 size_t decoding_idx;
44 /* The decoded serverlist */
45 grpc_grpclb_serverlist* serverlist;
46 } decode_serverlist_arg;
47
48 /* invoked once for every Server in ServerList */
decode_serverlist(pb_istream_t * stream,const pb_field_t * field,void ** arg)49 static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
50 void** arg) {
51 decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg);
52 GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
53 grpc_grpclb_server* server =
54 static_cast<grpc_grpclb_server*>(gpr_zalloc(sizeof(grpc_grpclb_server)));
55 if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
56 gpr_free(server);
57 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
58 return false;
59 }
60 dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
61 return true;
62 }
63
grpc_grpclb_request_create(const char * lb_service_name)64 grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name) {
65 grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
66 gpr_malloc(sizeof(grpc_grpclb_request)));
67 req->has_client_stats = false;
68 req->has_initial_request = true;
69 req->initial_request.has_name = true;
70 strncpy(req->initial_request.name, lb_service_name,
71 GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH);
72 return req;
73 }
74
populate_timestamp(gpr_timespec timestamp,grpc_grpclb_timestamp * timestamp_pb)75 static void populate_timestamp(gpr_timespec timestamp,
76 grpc_grpclb_timestamp* timestamp_pb) {
77 timestamp_pb->has_seconds = true;
78 timestamp_pb->seconds = timestamp.tv_sec;
79 timestamp_pb->has_nanos = true;
80 timestamp_pb->nanos = timestamp.tv_nsec;
81 }
82
encode_string(pb_ostream_t * stream,const pb_field_t * field,void * const * arg)83 static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
84 void* const* arg) {
85 char* str = static_cast<char*>(*arg);
86 if (!pb_encode_tag_for_field(stream, field)) return false;
87 return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str));
88 }
89
encode_drops(pb_ostream_t * stream,const pb_field_t * field,void * const * arg)90 static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
91 void* const* arg) {
92 grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
93 static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg);
94 if (drop_entries == nullptr) return true;
95 for (size_t i = 0; i < drop_entries->size(); ++i) {
96 if (!pb_encode_tag_for_field(stream, field)) return false;
97 grpc_lb_v1_ClientStatsPerToken drop_message;
98 drop_message.load_balance_token.funcs.encode = encode_string;
99 drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
100 drop_message.has_num_calls = true;
101 drop_message.num_calls = (*drop_entries)[i].count;
102 if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
103 &drop_message)) {
104 return false;
105 }
106 }
107 return true;
108 }
109
grpc_grpclb_load_report_request_create_locked(grpc_core::GrpcLbClientStats * client_stats)110 grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
111 grpc_core::GrpcLbClientStats* client_stats) {
112 grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
113 gpr_zalloc(sizeof(grpc_grpclb_request)));
114 req->has_client_stats = true;
115 req->client_stats.has_timestamp = true;
116 populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
117 req->client_stats.has_num_calls_started = true;
118 req->client_stats.has_num_calls_finished = true;
119 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
120 req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
121 req->client_stats.has_num_calls_finished_known_received = true;
122 req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
123 grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
124 drop_counts;
125 client_stats->GetLocked(
126 &req->client_stats.num_calls_started,
127 &req->client_stats.num_calls_finished,
128 &req->client_stats.num_calls_finished_with_client_failed_to_send,
129 &req->client_stats.num_calls_finished_known_received, &drop_counts);
130 // Will be deleted in grpc_grpclb_request_destroy().
131 req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
132 return req;
133 }
134
grpc_grpclb_request_encode(const grpc_grpclb_request * request)135 grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) {
136 size_t encoded_length;
137 pb_ostream_t sizestream;
138 pb_ostream_t outputstream;
139 grpc_slice slice;
140 memset(&sizestream, 0, sizeof(pb_ostream_t));
141 pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
142 encoded_length = sizestream.bytes_written;
143
144 slice = GRPC_SLICE_MALLOC(encoded_length);
145 outputstream =
146 pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length);
147 GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
148 request) != 0);
149 return slice;
150 }
151
grpc_grpclb_request_destroy(grpc_grpclb_request * request)152 void grpc_grpclb_request_destroy(grpc_grpclb_request* request) {
153 if (request->has_client_stats) {
154 grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
155 static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(
156 request->client_stats.calls_finished_with_drop.arg);
157 grpc_core::Delete(drop_entries);
158 }
159 gpr_free(request);
160 }
161
162 typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
grpc_grpclb_initial_response_parse(grpc_slice encoded_grpc_grpclb_response)163 grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
164 grpc_slice encoded_grpc_grpclb_response) {
165 pb_istream_t stream =
166 pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
167 GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
168 grpc_grpclb_response res;
169 memset(&res, 0, sizeof(grpc_grpclb_response));
170 if (GPR_UNLIKELY(
171 !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
172 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
173 return nullptr;
174 }
175
176 if (!res.has_initial_response) return nullptr;
177
178 grpc_grpclb_initial_response* initial_res =
179 static_cast<grpc_grpclb_initial_response*>(
180 gpr_malloc(sizeof(grpc_grpclb_initial_response)));
181 memcpy(initial_res, &res.initial_response,
182 sizeof(grpc_grpclb_initial_response));
183
184 return initial_res;
185 }
186
grpc_grpclb_response_parse_serverlist(grpc_slice encoded_grpc_grpclb_response)187 grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
188 grpc_slice encoded_grpc_grpclb_response) {
189 pb_istream_t stream =
190 pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
191 GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
192 pb_istream_t stream_at_start = stream;
193 grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(
194 gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
195 grpc_grpclb_response res;
196 memset(&res, 0, sizeof(grpc_grpclb_response));
197 // First pass: count number of servers.
198 res.server_list.servers.funcs.decode = count_serverlist;
199 res.server_list.servers.arg = sl;
200 bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
201 if (GPR_UNLIKELY(!status)) {
202 gpr_free(sl);
203 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
204 return nullptr;
205 }
206 // Second pass: populate servers.
207 if (sl->num_servers > 0) {
208 sl->servers = static_cast<grpc_grpclb_server**>(
209 gpr_zalloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
210 decode_serverlist_arg decode_arg;
211 memset(&decode_arg, 0, sizeof(decode_arg));
212 decode_arg.serverlist = sl;
213 res.server_list.servers.funcs.decode = decode_serverlist;
214 res.server_list.servers.arg = &decode_arg;
215 status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
216 &res);
217 if (GPR_UNLIKELY(!status)) {
218 grpc_grpclb_destroy_serverlist(sl);
219 gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
220 return nullptr;
221 }
222 }
223 return sl;
224 }
225
grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist * serverlist)226 void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist) {
227 if (serverlist == nullptr) {
228 return;
229 }
230 for (size_t i = 0; i < serverlist->num_servers; i++) {
231 gpr_free(serverlist->servers[i]);
232 }
233 gpr_free(serverlist->servers);
234 gpr_free(serverlist);
235 }
236
grpc_grpclb_serverlist_copy(const grpc_grpclb_serverlist * sl)237 grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
238 const grpc_grpclb_serverlist* sl) {
239 grpc_grpclb_serverlist* copy = static_cast<grpc_grpclb_serverlist*>(
240 gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
241 copy->num_servers = sl->num_servers;
242 copy->servers = static_cast<grpc_grpclb_server**>(
243 gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers));
244 for (size_t i = 0; i < sl->num_servers; i++) {
245 copy->servers[i] = static_cast<grpc_grpclb_server*>(
246 gpr_malloc(sizeof(grpc_grpclb_server)));
247 memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server));
248 }
249 return copy;
250 }
251
grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist * lhs,const grpc_grpclb_serverlist * rhs)252 bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
253 const grpc_grpclb_serverlist* rhs) {
254 if (lhs == nullptr || rhs == nullptr) {
255 return false;
256 }
257 if (lhs->num_servers != rhs->num_servers) {
258 return false;
259 }
260 for (size_t i = 0; i < lhs->num_servers; i++) {
261 if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
262 return false;
263 }
264 }
265 return true;
266 }
267
grpc_grpclb_server_equals(const grpc_grpclb_server * lhs,const grpc_grpclb_server * rhs)268 bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs,
269 const grpc_grpclb_server* rhs) {
270 return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0;
271 }
272
grpc_grpclb_duration_compare(const grpc_grpclb_duration * lhs,const grpc_grpclb_duration * rhs)273 int grpc_grpclb_duration_compare(const grpc_grpclb_duration* lhs,
274 const grpc_grpclb_duration* rhs) {
275 GPR_ASSERT(lhs && rhs);
276 if (lhs->has_seconds && rhs->has_seconds) {
277 if (lhs->seconds < rhs->seconds) return -1;
278 if (lhs->seconds > rhs->seconds) return 1;
279 } else if (lhs->has_seconds) {
280 return 1;
281 } else if (rhs->has_seconds) {
282 return -1;
283 }
284
285 GPR_ASSERT(lhs->seconds == rhs->seconds);
286 if (lhs->has_nanos && rhs->has_nanos) {
287 if (lhs->nanos < rhs->nanos) return -1;
288 if (lhs->nanos > rhs->nanos) return 1;
289 } else if (lhs->has_nanos) {
290 return 1;
291 } else if (rhs->has_nanos) {
292 return -1;
293 }
294
295 return 0;
296 }
297
grpc_grpclb_duration_to_millis(grpc_grpclb_duration * duration_pb)298 grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration* duration_pb) {
299 return static_cast<grpc_millis>(
300 (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC +
301 (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS);
302 }
303
grpc_grpclb_initial_response_destroy(grpc_grpclb_initial_response * response)304 void grpc_grpclb_initial_response_destroy(
305 grpc_grpclb_initial_response* response) {
306 gpr_free(response);
307 }
308