• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * lws-minimal-secure-streams-binance
3  *
4  * Written in 2010-2021 by Andy Green <andy@warmcat.com>
5  *                         Kutoga <kutoga@user.github.invalid>
6  *
7  * This file is made available under the Creative Commons CC0 1.0
8  * Universal Public Domain Dedication.
9  *
10  * This demonstrates a Secure Streams implementation of a client that connects
11  * to binance ws server efficiently.
12  *
13  * Build lws with -DLWS_WITH_SECURE_STREAMS=1 -DLWS_WITHOUT_EXTENSIONS=0
14  *
15  * "policy.json" contains all the information about endpoints, protocols and
16  * connection validation, tagged by streamtype name.
17  *
18  * The example tries to load it from the cwd, it lives
19  * in ./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so
20  * either run it from there, or copy the policy.json to your cwd.  It's also
21  * possible to put the policy json in the code as a string and pass that at
22  * context creation time.
23  */
24 
25 #include <libwebsockets.h>
26 #include <string.h>
27 #include <signal.h>
28 #include <ctype.h>
29 
30 static int interrupted;
31 
32 typedef struct range {
33 	uint64_t		sum;
34 	uint64_t		lowest;
35 	uint64_t		highest;
36 
37 	unsigned int		samples;
38 } range_t;
39 
40 typedef struct binance {
41 	struct lws_ss_handle 	*ss;
42 	void			*opaque_data;
43 
44 	lws_sorted_usec_list_t	sul_hz;	     /* 1hz summary dump */
45 
46 	range_t			e_lat_range;
47 	range_t			price_range;
48 } binance_t;
49 
50 /****** Part 1 / 3: application data processing */
51 
52 static void
range_reset(range_t * r)53 range_reset(range_t *r)
54 {
55 	r->sum = r->highest = 0;
56 	r->lowest = 999999999999ull;
57 	r->samples = 0;
58 }
59 
60 static uint64_t
get_us_timeofday(void)61 get_us_timeofday(void)
62 {
63 	struct timeval tv;
64 
65 	gettimeofday(&tv, NULL);
66 
67 	return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) +
68 			  (uint64_t)tv.tv_usec;
69 }
70 
71 static uint64_t
pennies(const char * s)72 pennies(const char *s)
73 {
74 	uint64_t price = (uint64_t)atoll(s) * 100;
75 
76 	s = strchr(s, '.');
77 
78 	if (s && isdigit(s[1]) && isdigit(s[2]))
79 		price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0'));
80 
81 	return price;
82 }
83 
84 static void
sul_hz_cb(lws_sorted_usec_list_t * sul)85 sul_hz_cb(lws_sorted_usec_list_t *sul)
86 {
87 	binance_t *bin = lws_container_of(sul, binance_t, sul_hz);
88 
89 	/*
90 	 * We are called once a second to dump statistics on the connection
91 	 */
92 
93 	lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
94 			 sul_hz_cb, LWS_US_PER_SEC);
95 
96 	if (bin->price_range.samples)
97 		lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, "
98 			    "(%d prices/s)\n", __func__,
99 			    (unsigned long long)bin->price_range.lowest,
100 			    (unsigned long long)bin->price_range.highest,
101 			    (unsigned long long)(bin->price_range.sum /
102 						    bin->price_range.samples),
103 			    bin->price_range.samples);
104 	if (bin->e_lat_range.samples)
105 		lwsl_notice("%s: elatency: min: %llums, max: %llums, "
106 			    "avg: %llums, (%d msg/s)\n", __func__,
107 			    (unsigned long long)bin->e_lat_range.lowest / 1000,
108 			    (unsigned long long)bin->e_lat_range.highest / 1000,
109 			    (unsigned long long)(bin->e_lat_range.sum /
110 					   bin->e_lat_range.samples) / 1000,
111 			    bin->e_lat_range.samples);
112 
113 	range_reset(&bin->e_lat_range);
114 	range_reset(&bin->price_range);
115 }
116 
117 /****** Part 2 / 3: communication */
118 
119 static lws_ss_state_return_t
binance_rx(void * userobj,const uint8_t * in,size_t len,int flags)120 binance_rx(void *userobj, const uint8_t *in, size_t len, int flags)
121 {
122 	binance_t *bin = (binance_t *)userobj;
123 	uint64_t latency_us, now_us;
124 	char numbuf[16];
125 	uint64_t price;
126 	const char *p;
127 	size_t alen;
128 
129 	now_us = (uint64_t)get_us_timeofday();
130 
131 	p = lws_json_simple_find((const char *)in, len, "\"depthUpdate\"",
132 				 &alen);
133 	if (!p)
134 		return LWSSSSRET_OK;
135 
136 	p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen);
137 	if (!p) {
138 		lwsl_err("%s: no E JSON\n", __func__);
139 		return LWSSSSRET_OK;
140 	}
141 
142 	lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
143 	latency_us = now_us - ((uint64_t)atoll(numbuf) * LWS_US_PER_MS);
144 
145 	if (latency_us < bin->e_lat_range.lowest)
146 		bin->e_lat_range.lowest = latency_us;
147 	if (latency_us > bin->e_lat_range.highest)
148 		bin->e_lat_range.highest = latency_us;
149 
150 	bin->e_lat_range.sum += latency_us;
151 	bin->e_lat_range.samples++;
152 
153 	p = lws_json_simple_find((const char *)in, len, "\"a\":[[\"", &alen);
154 	if (!p)
155 		return LWSSSSRET_OK;
156 
157 	lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
158 	price = pennies(numbuf);
159 
160 	if (price < bin->price_range.lowest)
161 		bin->price_range.lowest = price;
162 	if (price > bin->price_range.highest)
163 		bin->price_range.highest = price;
164 
165 	bin->price_range.sum += price;
166 	bin->price_range.samples++;
167 
168 	return LWSSSSRET_OK;
169 }
170 
171 static lws_ss_state_return_t
binance_state(void * userobj,void * h_src,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack)172 binance_state(void *userobj, void *h_src, lws_ss_constate_t state,
173 	      lws_ss_tx_ordinal_t ack)
174 {
175 	binance_t *bin = (binance_t *)userobj;
176 
177 	lwsl_ss_info(bin->ss, "%s (%d), ord 0x%x",
178 		     lws_ss_state_name((int)state), state, (unsigned int)ack);
179 
180 	switch (state) {
181 
182 	case LWSSSCS_CONNECTED:
183 		lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
184 				 sul_hz_cb, LWS_US_PER_SEC);
185 		range_reset(&bin->e_lat_range);
186 		range_reset(&bin->price_range);
187 
188 		return LWSSSSRET_OK;
189 
190 	case LWSSSCS_DISCONNECTED:
191 		lws_sul_cancel(&bin->sul_hz);
192 		break;
193 
194 	default:
195 		break;
196 	}
197 
198 	return LWSSSSRET_OK;
199 }
200 
201 static const lws_ss_info_t ssi_binance = {
202 	.handle_offset		  = offsetof(binance_t, ss),
203 	.opaque_user_data_offset  = offsetof(binance_t, opaque_data),
204 	.rx			  = binance_rx,
205 	.state			  = binance_state,
206 	.user_alloc		  = sizeof(binance_t),
207 	.streamtype		  = "binance", /* bind to corresponding policy */
208 };
209 
210 /****** Part 3 / 3: init and event loop */
211 
212 static const struct lws_extension extensions[] = {
213 	{
214 		"permessage-deflate", lws_extension_callback_pm_deflate,
215 		"permessage-deflate" "; client_no_context_takeover"
216 		 "; client_max_window_bits"
217 	},
218 	{ NULL, NULL, NULL /* terminator */ }
219 };
220 
221 static void
sigint_handler(int sig)222 sigint_handler(int sig)
223 {
224 	interrupted = 1;
225 }
226 
main(int argc,const char ** argv)227 int main(int argc, const char **argv)
228 {
229 	struct lws_context_creation_info info;
230 	struct lws_context *cx;
231 	int n = 0;
232 
233 	signal(SIGINT, sigint_handler);
234 
235 	memset(&info, 0, sizeof info);
236 	lws_cmdline_option_handle_builtin(argc, argv, &info);
237 
238 	lwsl_user("LWS minimal Secure Streams binance client\n");
239 
240 	info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
241 		       LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
242 	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
243 	info.fd_limit_per_thread = 1 + 1 + 1;
244 	info.extensions = extensions;
245 	info.pss_policies_json = "policy.json"; /* literal JSON, or path */
246 
247 	cx = lws_create_context(&info);
248 	if (!cx) {
249 		lwsl_err("lws init failed\n");
250 		return 1;
251 	}
252 
253 	if (lws_ss_create(cx, 0, &ssi_binance, NULL, NULL, NULL, NULL)) {
254 		lwsl_cx_err(cx, "failed to create secure stream");
255 		interrupted = 1;
256 	}
257 
258 	while (n >= 0 && !interrupted)
259 		n = lws_service(cx, 0);
260 
261 	lws_context_destroy(cx);
262 
263 	lwsl_user("Completed\n");
264 
265 	return 0;
266 }
267