1 /*
2 * lws-minimal-ws-client-tx
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This demonstrates a ws "publisher" to go with the minimal-ws-broker
10 * example.
11 *
12 * Two threads are spawned that produce messages to be sent to the broker,
13 * via a local ringbuffer. Locking is provided to make ringbuffer access
14 * threadsafe.
15 *
16 * When a nailed-up client connection to the broker is established, the
17 * ringbuffer is sent to the broker, which distributes the events to all
18 * connected clients.
19 */
20
21 #include <libwebsockets.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <pthread.h>
25
26 static int interrupted;
27
28 /* one of these created for each message */
29
30 struct msg {
31 void *payload; /* is malloc'd */
32 size_t len;
33 };
34
35 struct per_vhost_data__minimal {
36 struct lws_context *context;
37 struct lws_vhost *vhost;
38 const struct lws_protocols *protocol;
39 pthread_t pthread_spam[2];
40
41 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
42 struct lws_ring *ring; /* ringbuffer holding unsent messages */
43 uint32_t tail;
44
45 struct lws_client_connect_info i;
46 struct lws *client_wsi;
47
48 int counter;
49 char finished;
50 char established;
51 };
52
53 #if defined(WIN32)
usleep(unsigned long l)54 static void usleep(unsigned long l) { Sleep(l / 1000); }
55 #endif
56
57 static void
__minimal_destroy_message(void * _msg)58 __minimal_destroy_message(void *_msg)
59 {
60 struct msg *msg = _msg;
61
62 free(msg->payload);
63 msg->payload = NULL;
64 msg->len = 0;
65 }
66
67 static void *
thread_spam(void * d)68 thread_spam(void *d)
69 {
70 struct per_vhost_data__minimal *vhd =
71 (struct per_vhost_data__minimal *)d;
72 struct msg amsg;
73 int len = 128, index = 1, n;
74
75 do {
76 /* don't generate output if client not connected */
77 if (!vhd->established)
78 goto wait;
79
80 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
81
82 /* only create if space in ringbuffer */
83 n = (int)lws_ring_get_count_free_elements(vhd->ring);
84 if (!n) {
85 lwsl_user("dropping!\n");
86 goto wait_unlock;
87 }
88
89 amsg.payload = malloc(LWS_PRE + len);
90 if (!amsg.payload) {
91 lwsl_user("OOM: dropping\n");
92 goto wait_unlock;
93 }
94 n = lws_snprintf((char *)amsg.payload + LWS_PRE, len,
95 "tid: %p, msg: %d",
96 (void *)pthread_self(), index++);
97 amsg.len = n;
98 n = lws_ring_insert(vhd->ring, &amsg, 1);
99 if (n != 1) {
100 __minimal_destroy_message(&amsg);
101 lwsl_user("dropping!\n");
102 } else
103 /*
104 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
105 * in the lws service thread context.
106 */
107 lws_cancel_service(vhd->context);
108
109 wait_unlock:
110 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
111
112 wait:
113 usleep(100000);
114
115 } while (!vhd->finished);
116
117 lwsl_notice("thread_spam %p exiting\n", (void *)pthread_self());
118
119 pthread_exit(NULL);
120
121 return NULL;
122 }
123
124 static int
connect_client(struct per_vhost_data__minimal * vhd)125 connect_client(struct per_vhost_data__minimal *vhd)
126 {
127 vhd->i.context = vhd->context;
128 vhd->i.port = 7681;
129 vhd->i.address = "localhost";
130 vhd->i.path = "/publisher";
131 vhd->i.host = vhd->i.address;
132 vhd->i.origin = vhd->i.address;
133 vhd->i.ssl_connection = 0;
134
135 vhd->i.protocol = "lws-minimal-broker";
136 vhd->i.pwsi = &vhd->client_wsi;
137
138 return !lws_client_connect_via_info(&vhd->i);
139 }
140
141 static int
callback_minimal_broker(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)142 callback_minimal_broker(struct lws *wsi, enum lws_callback_reasons reason,
143 void *user, void *in, size_t len)
144 {
145 struct per_vhost_data__minimal *vhd =
146 (struct per_vhost_data__minimal *)
147 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
148 lws_get_protocol(wsi));
149 const struct msg *pmsg;
150 void *retval;
151 int n, m, r = 0;
152
153 switch (reason) {
154
155 /* --- protocol lifecycle callbacks --- */
156
157 case LWS_CALLBACK_PROTOCOL_INIT:
158 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
159 lws_get_protocol(wsi),
160 sizeof(struct per_vhost_data__minimal));
161 vhd->context = lws_get_context(wsi);
162 vhd->protocol = lws_get_protocol(wsi);
163 vhd->vhost = lws_get_vhost(wsi);
164
165 vhd->ring = lws_ring_create(sizeof(struct msg), 8,
166 __minimal_destroy_message);
167 if (!vhd->ring)
168 return 1;
169
170 pthread_mutex_init(&vhd->lock_ring, NULL);
171
172 /* start the content-creating threads */
173
174 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
175 if (pthread_create(&vhd->pthread_spam[n], NULL,
176 thread_spam, vhd)) {
177 lwsl_err("thread creation failed\n");
178 r = 1;
179 goto init_fail;
180 }
181
182 if (connect_client(vhd))
183 lws_timed_callback_vh_protocol(vhd->vhost,
184 vhd->protocol, LWS_CALLBACK_USER, 1);
185 break;
186
187 case LWS_CALLBACK_PROTOCOL_DESTROY:
188 init_fail:
189 vhd->finished = 1;
190 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
191 if (vhd->pthread_spam[n])
192 pthread_join(vhd->pthread_spam[n], &retval);
193
194 if (vhd->ring)
195 lws_ring_destroy(vhd->ring);
196
197 pthread_mutex_destroy(&vhd->lock_ring);
198
199 return r;
200
201 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
202 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
203 in ? (char *)in : "(null)");
204 vhd->client_wsi = NULL;
205 lws_timed_callback_vh_protocol(vhd->vhost,
206 vhd->protocol, LWS_CALLBACK_USER, 1);
207 break;
208
209 /* --- client callbacks --- */
210
211 case LWS_CALLBACK_CLIENT_ESTABLISHED:
212 lwsl_user("%s: established\n", __func__);
213 vhd->established = 1;
214 break;
215
216 case LWS_CALLBACK_CLIENT_WRITEABLE:
217 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
218 pmsg = lws_ring_get_element(vhd->ring, &vhd->tail);
219 if (!pmsg)
220 goto skip;
221
222 /* notice we allowed for LWS_PRE in the payload already */
223 m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
224 pmsg->len, LWS_WRITE_TEXT);
225 if (m < (int)pmsg->len) {
226 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock */
227 lwsl_err("ERROR %d writing to ws socket\n", m);
228 return -1;
229 }
230
231 lws_ring_consume_single_tail(vhd->ring, &vhd->tail, 1);
232
233 /* more to do for us? */
234 if (lws_ring_get_element(vhd->ring, &vhd->tail))
235 /* come back as soon as we can write more */
236 lws_callback_on_writable(wsi);
237
238 skip:
239 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
240 break;
241
242 case LWS_CALLBACK_CLIENT_CLOSED:
243 vhd->client_wsi = NULL;
244 vhd->established = 0;
245 lws_timed_callback_vh_protocol(vhd->vhost, vhd->protocol,
246 LWS_CALLBACK_USER, 1);
247 break;
248
249 case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
250 /*
251 * When the "spam" threads add a message to the ringbuffer,
252 * they create this event in the lws service thread context
253 * using lws_cancel_service().
254 *
255 * We respond by scheduling a writable callback for the
256 * connected client, if any.
257 */
258 if (vhd && vhd->client_wsi && vhd->established)
259 lws_callback_on_writable(vhd->client_wsi);
260 break;
261
262 /* rate-limited client connect retries */
263
264 case LWS_CALLBACK_USER:
265 lwsl_notice("%s: LWS_CALLBACK_USER\n", __func__);
266 if (connect_client(vhd))
267 lws_timed_callback_vh_protocol(vhd->vhost,
268 vhd->protocol,
269 LWS_CALLBACK_USER, 1);
270 break;
271
272 default:
273 break;
274 }
275
276 return lws_callback_http_dummy(wsi, reason, user, in, len);
277 }
278
279 static const struct lws_protocols protocols[] = {
280 {
281 "lws-minimal-broker",
282 callback_minimal_broker,
283 0,
284 0,
285 },
286 { NULL, NULL, 0, 0 }
287 };
288
289 static void
sigint_handler(int sig)290 sigint_handler(int sig)
291 {
292 interrupted = 1;
293 }
294
main(int argc,const char ** argv)295 int main(int argc, const char **argv)
296 {
297 struct lws_context_creation_info info;
298 struct lws_context *context;
299 const char *p;
300 int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
301 /* for LLL_ verbosity above NOTICE to be built into lws,
302 * lws must have been configured and built with
303 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
304 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
305 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
306 /* | LLL_DEBUG */;
307
308 signal(SIGINT, sigint_handler);
309
310 if ((p = lws_cmdline_option(argc, argv, "-d")))
311 logs = atoi(p);
312
313 lws_set_log_level(logs, NULL);
314 lwsl_user("LWS minimal ws client tx\n");
315 lwsl_user(" Run minimal-ws-broker and browse to that\n");
316
317 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
318 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
319 info.protocols = protocols;
320 /*
321 * since we know this lws context is only ever going to be used with
322 * one client wsis / fds / sockets at a time, let lws know it doesn't
323 * have to use the default allocations for fd tables up to ulimit -n.
324 * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
325 * will use.
326 */
327 info.fd_limit_per_thread = 1 + 1 + 1;
328
329 context = lws_create_context(&info);
330 if (!context) {
331 lwsl_err("lws init failed\n");
332 return 1;
333 }
334
335 while (n >= 0 && !interrupted)
336 n = lws_service(context, 0);
337
338 lws_context_destroy(context);
339 lwsl_user("Completed\n");
340
341 return 0;
342 }
343