• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-ws-client-binance
3  *
4  * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5  *                         Kutoga <kutoga@user.github.invalid>
6  *
7  * This file is made available under the Creative Commons CC0 1.0
8  * Universal Public Domain Dedication.
9  *
10  * This demonstrates a ws client that connects to binance ws server efficiently
11  */
12 
13 #include <libwebsockets.h>
14 #include <string.h>
15 #include <signal.h>
16 #include <ctype.h>
17 
18 typedef struct range {
19 	uint64_t		sum;
20 	uint64_t		lowest;
21 	uint64_t		highest;
22 
23 	unsigned int		samples;
24 } range_t;
25 
26 /*
27  * This represents your object that "contains" the client connection and has
28  * the client connection bound to it
29  */
30 
31 static struct my_conn {
32 	lws_sorted_usec_list_t	sul;	     /* schedule connection retry */
33 	lws_sorted_usec_list_t	sul_hz;	     /* 1hz summary */
34 
35 	range_t			e_lat_range;
36 	range_t			price_range;
37 
38 	struct lws		*wsi;	     /* related wsi if any */
39 	uint16_t		retry_count; /* count of consequetive retries */
40 } mco;
41 
42 static struct lws_context *context;
43 static int interrupted;
44 
45 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
46 /*
47  * OpenSSL uses the system trust store.  mbedTLS / WolfSSL have to be told which
48  * CA to trust explicitly.
49  */
50 static const char * const ca_pem_digicert_global_root =
51 	"-----BEGIN CERTIFICATE-----\n"
52 	"MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBh\n"
53 	"MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3\n"
54 	"d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBD\n"
55 	"QTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVT\n"
56 	"MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j\n"
57 	"b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG\n"
58 	"9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsB\n"
59 	"CSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97\n"
60 	"nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt\n"
61 	"43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7P\n"
62 	"T19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4\n"
63 	"gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAO\n"
64 	"BgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbR\n"
65 	"TLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUw\n"
66 	"DQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/Esr\n"
67 	"hMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg\n"
68 	"06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJF\n"
69 	"PnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0ls\n"
70 	"YSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQk\n"
71 	"CAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4=\n"
72 	"-----END CERTIFICATE-----\n";
73 #endif
74 
75 /*
76  * The retry and backoff policy we want to use for our client connections
77  */
78 
79 static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
80 
81 static const lws_retry_bo_t retry = {
82 	.retry_ms_table			= backoff_ms,
83 	.retry_ms_table_count		= LWS_ARRAY_SIZE(backoff_ms),
84 	.conceal_count			= LWS_ARRAY_SIZE(backoff_ms),
85 
86 	.secs_since_valid_ping		= 400,  /* force PINGs after secs idle */
87 	.secs_since_valid_hangup	= 400, /* hangup after secs idle */
88 
89 	.jitter_percent			= 0,
90 };
91 
92 /*
93  * If we don't enable permessage-deflate ws extension, during times when there
94  * are many ws messages per second the server coalesces them inside a smaller
95  * number of larger ssl records, for >100 mps typically >2048 records.
96  *
97  * This is a problem, because the coalesced record cannot be send nor decrypted
98  * until the last part of the record is received, meaning additional latency
99  * for the earlier members of the coalesced record that have just been sitting
100  * there waiting for the last one to go out and be decrypted.
101  *
102  * permessage-deflate reduces the data size before the tls layer, for >100mps
103  * reducing the colesced records to ~1.2KB.
104  */
105 
106 static const struct lws_extension extensions[] = {
107 	{
108 		"permessage-deflate",
109 		lws_extension_callback_pm_deflate,
110 		"permessage-deflate"
111 		 "; client_no_context_takeover"
112 		 "; client_max_window_bits"
113 	},
114 	{ NULL, NULL, NULL /* terminator */ }
115 };
116 /*
117  * Scheduled sul callback that starts the connection attempt
118  */
119 
120 static void
connect_client(lws_sorted_usec_list_t * sul)121 connect_client(lws_sorted_usec_list_t *sul)
122 {
123 	struct my_conn *mco = lws_container_of(sul, struct my_conn, sul);
124 	struct lws_client_connect_info i;
125 
126 	memset(&i, 0, sizeof(i));
127 
128 	i.context = context;
129 	i.port = 443;
130 	i.address = "fstream.binance.com";
131 	i.path = "/stream?"
132 		 "streams=btcusdt@depth@0ms/btcusdt@bookTicker/btcusdt@aggTrade";
133 	i.host = i.address;
134 	i.origin = i.address;
135 	i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_PRIORITIZE_READS;
136 	i.protocol = NULL;
137 	i.local_protocol_name = "lws-minimal-client";
138 	i.pwsi = &mco->wsi;
139 	i.retry_and_idle_policy = &retry;
140 	i.userdata = mco;
141 
142 	if (!lws_client_connect_via_info(&i))
143 		/*
144 		 * Failed... schedule a retry... we can't use the _retry_wsi()
145 		 * convenience wrapper api here because no valid wsi at this
146 		 * point.
147 		 */
148 		if (lws_retry_sul_schedule(context, 0, sul, &retry,
149 					   connect_client, &mco->retry_count)) {
150 			lwsl_err("%s: connection attempts exhausted\n", __func__);
151 			interrupted = 1;
152 		}
153 }
154 
155 static void
range_reset(range_t * r)156 range_reset(range_t *r)
157 {
158 	r->sum = r->highest = 0;
159 	r->lowest = 999999999999ull;
160 	r->samples = 0;
161 }
162 
163 static uint64_t
get_us_timeofday(void)164 get_us_timeofday(void)
165 {
166 	struct timeval tv;
167 
168 	gettimeofday(&tv, NULL);
169 
170 	return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) + (uint64_t)tv.tv_usec;
171 }
172 
173 static void
sul_hz_cb(lws_sorted_usec_list_t * sul)174 sul_hz_cb(lws_sorted_usec_list_t *sul)
175 {
176 	struct my_conn *mco = lws_container_of(sul, struct my_conn, sul_hz);
177 
178 	/*
179 	 * We are called once a second to dump statistics on the connection
180 	 */
181 
182 	lws_sul_schedule(lws_get_context(mco->wsi), 0, &mco->sul_hz,
183 			 sul_hz_cb, LWS_US_PER_SEC);
184 
185 	if (mco->price_range.samples)
186 		lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, "
187 			    "(%d prices/s)\n",
188 			    __func__,
189 			    (unsigned long long)mco->price_range.lowest,
190 			    (unsigned long long)mco->price_range.highest,
191 			    (unsigned long long)(mco->price_range.sum / mco->price_range.samples),
192 			    mco->price_range.samples);
193 	if (mco->e_lat_range.samples)
194 		lwsl_notice("%s: elatency: min: %llums, max: %llums, "
195 			    "avg: %llums, (%d msg/s)\n", __func__,
196 			    (unsigned long long)mco->e_lat_range.lowest / 1000,
197 			    (unsigned long long)mco->e_lat_range.highest / 1000,
198 			    (unsigned long long)(mco->e_lat_range.sum /
199 					   mco->e_lat_range.samples) / 1000,
200 			    mco->e_lat_range.samples);
201 
202 	range_reset(&mco->e_lat_range);
203 	range_reset(&mco->price_range);
204 }
205 
206 static uint64_t
pennies(const char * s)207 pennies(const char *s)
208 {
209 	uint64_t price = (uint64_t)atoll(s) * 100;
210 
211 	s = strchr(s, '.');
212 
213 	if (s && isdigit(s[1]) && isdigit(s[2]))
214 		price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0'));
215 
216 	return price;
217 }
218 
219 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)220 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
221 		 void *user, void *in, size_t len)
222 {
223 	struct my_conn *mco = (struct my_conn *)user;
224 	uint64_t latency_us, now_us;
225 	uint64_t price;
226 	char numbuf[16];
227 	const char *p;
228 	size_t alen;
229 
230 	switch (reason) {
231 
232 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
233 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
234 			 in ? (char *)in : "(null)");
235 		goto do_retry;
236 		break;
237 
238 	case LWS_CALLBACK_CLIENT_RECEIVE:
239 		/*
240 		 * The messages are a few 100 bytes of JSON each
241 		 */
242 
243 		// lwsl_hexdump_notice(in, len);
244 
245 		now_us = (uint64_t)get_us_timeofday();
246 
247 		p = lws_json_simple_find((const char *)in, len,
248 					 "\"depthUpdate\"", &alen);
249 		/*
250 		 * Only the JSON with depthUpdate init has the numbers we care
251 		 * about as well
252 		 */
253 		if (!p)
254 			break;
255 
256 		p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen);
257 		if (!p) {
258 			lwsl_err("%s: no E JSON\n", __func__);
259 			break;
260 		}
261 		lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
262 		latency_us = now_us -
263 				((uint64_t)atoll(numbuf) * LWS_US_PER_MS);
264 
265 		if (latency_us < mco->e_lat_range.lowest)
266 			mco->e_lat_range.lowest = latency_us;
267 		if (latency_us > mco->e_lat_range.highest)
268 			mco->e_lat_range.highest = latency_us;
269 
270 		mco->e_lat_range.sum += latency_us;
271 		mco->e_lat_range.samples++;
272 
273 		p = lws_json_simple_find((const char *)in, len,
274 					 "\"a\":[[\"", &alen);
275 		if (p) {
276 			lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
277 			price = pennies(numbuf);
278 
279 			if (price < mco->price_range.lowest)
280 				mco->price_range.lowest = price;
281 			if (price > mco->price_range.highest)
282 				mco->price_range.highest = price;
283 
284 			mco->price_range.sum += price;
285 			mco->price_range.samples++;
286 		}
287 		break;
288 
289 	case LWS_CALLBACK_CLIENT_ESTABLISHED:
290 		lwsl_user("%s: established\n", __func__);
291 		lws_sul_schedule(lws_get_context(wsi), 0, &mco->sul_hz,
292 				 sul_hz_cb, LWS_US_PER_SEC);
293 		mco->wsi = wsi;
294 		range_reset(&mco->e_lat_range);
295 		range_reset(&mco->price_range);
296 		break;
297 
298 	case LWS_CALLBACK_CLIENT_CLOSED:
299 		lws_sul_cancel(&mco->sul_hz);
300 		goto do_retry;
301 
302 	default:
303 		break;
304 	}
305 
306 	return lws_callback_http_dummy(wsi, reason, user, in, len);
307 
308 do_retry:
309 	/*
310 	 * retry the connection to keep it nailed up
311 	 *
312 	 * For this example, we try to conceal any problem for one set of
313 	 * backoff retries and then exit the app.
314 	 *
315 	 * If you set retry.conceal_count to be larger than the number of
316 	 * elements in the backoff table, it will never give up and keep
317 	 * retrying at the last backoff delay plus the random jitter amount.
318 	 */
319 	if (lws_retry_sul_schedule_retry_wsi(wsi, &mco->sul, connect_client,
320 					     &mco->retry_count)) {
321 		lwsl_err("%s: connection attempts exhausted\n", __func__);
322 		interrupted = 1;
323 	}
324 
325 	return 0;
326 }
327 
328 static const struct lws_protocols protocols[] = {
329 	{ "lws-minimal-client", callback_minimal, 0, 0, 0, NULL, 0 },
330 	LWS_PROTOCOL_LIST_TERM
331 };
332 
333 static void
sigint_handler(int sig)334 sigint_handler(int sig)
335 {
336 	interrupted = 1;
337 }
338 
main(int argc,const char ** argv)339 int main(int argc, const char **argv)
340 {
341 	struct lws_context_creation_info info;
342 	int n = 0;
343 
344 	signal(SIGINT, sigint_handler);
345 	memset(&info, 0, sizeof info);
346 	lws_cmdline_option_handle_builtin(argc, argv, &info);
347 
348 	lwsl_user("LWS minimal binance client\n");
349 
350 	info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
351 	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
352 	info.protocols = protocols;
353 	info.fd_limit_per_thread = 1 + 1 + 1;
354 	info.extensions = extensions;
355 
356 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
357 	/*
358 	 * OpenSSL uses the system trust store.  mbedTLS / WolfSSL have to be
359 	 * told which CA to trust explicitly.
360 	 */
361 	info.client_ssl_ca_mem = ca_pem_digicert_global_root;
362 	info.client_ssl_ca_mem_len = (unsigned int)strlen(ca_pem_digicert_global_root);
363 #endif
364 
365 	context = lws_create_context(&info);
366 	if (!context) {
367 		lwsl_err("lws init failed\n");
368 		return 1;
369 	}
370 
371 	/* schedule the first client connection attempt to happen immediately */
372 	lws_sul_schedule(context, 0, &mco.sul, connect_client, 1);
373 
374 	while (n >= 0 && !interrupted)
375 		n = lws_service(context, 0);
376 
377 	lws_context_destroy(context);
378 	lwsl_user("Completed\n");
379 
380 	return 0;
381 }
382 
383