1 /*
2 * ws protocol handler plugin for "lws-minimal-broker"
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This implements a minimal "broker", for systems that look like this
10 *
11 * [ publisher ws client ] <-> [ ws server broker ws server ] <-> [ ws client subscriber ]
12 *
13 * The "publisher" role is to add data to the broker.
14 *
15 * The "subscriber" role is to hear about all data added to the system.
16 *
17 * The "broker" role is to manage incoming data from publishers and pass it out
18 * to subscribers.
19 *
20 * Any number of publishers and subscribers are supported.
21 *
22 * This example implements a single ws server, using one ws protocol, that treats ws
23 * connections as being in publisher or subscriber mode according to the URL the ws
24 * connection was made to. ws connections to "/publisher" URL are understood to be
25 * publishing data and to any other URL, subscribing.
26 */
27
28 #if !defined (LWS_PLUGIN_STATIC)
29 #define LWS_DLL
30 #define LWS_INTERNAL
31 #include <libwebsockets.h>
32 #endif
33
34 #include <string.h>
35
36 /* one of these created for each message */
37
38 struct msg {
39 void *payload; /* is malloc'd */
40 size_t len;
41 };
42
43 /* one of these is created for each client connecting to us */
44
45 struct per_session_data__minimal {
46 struct per_session_data__minimal *pss_list;
47 struct lws *wsi;
48 uint32_t tail;
49 char publishing; /* nonzero: peer is publishing to us */
50 };
51
52 /* one of these is created for each vhost our protocol is used with */
53
54 struct per_vhost_data__minimal {
55 struct lws_context *context;
56 struct lws_vhost *vhost;
57 const struct lws_protocols *protocol;
58
59 struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
60
61 struct lws_ring *ring; /* ringbuffer holding unsent messages */
62 };
63
64 /* destroys the message when everyone has had a copy of it */
65
66 static void
__minimal_destroy_message(void * _msg)67 __minimal_destroy_message(void *_msg)
68 {
69 struct msg *msg = _msg;
70
71 free(msg->payload);
72 msg->payload = NULL;
73 msg->len = 0;
74 }
75
76 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)77 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
78 void *user, void *in, size_t len)
79 {
80 struct per_session_data__minimal *pss =
81 (struct per_session_data__minimal *)user;
82 struct per_vhost_data__minimal *vhd =
83 (struct per_vhost_data__minimal *)
84 lws_protocol_vh_priv_get(lws_get_vhost(wsi),
85 lws_get_protocol(wsi));
86 const struct msg *pmsg;
87 struct msg amsg;
88 char buf[32];
89 int n, m;
90
91 switch (reason) {
92 case LWS_CALLBACK_PROTOCOL_INIT:
93 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
94 lws_get_protocol(wsi),
95 sizeof(struct per_vhost_data__minimal));
96 vhd->context = lws_get_context(wsi);
97 vhd->protocol = lws_get_protocol(wsi);
98 vhd->vhost = lws_get_vhost(wsi);
99
100 vhd->ring = lws_ring_create(sizeof(struct msg), 8,
101 __minimal_destroy_message);
102 if (!vhd->ring)
103 return 1;
104 break;
105
106 case LWS_CALLBACK_PROTOCOL_DESTROY:
107 lws_ring_destroy(vhd->ring);
108 break;
109
110 case LWS_CALLBACK_ESTABLISHED:
111 pss->tail = lws_ring_get_oldest_tail(vhd->ring);
112 pss->wsi = wsi;
113 if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
114 pss->publishing = !strcmp(buf, "/publisher");
115 if (!pss->publishing)
116 /* add subscribers to the list of live pss held in the vhd */
117 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
118 break;
119
120 case LWS_CALLBACK_CLOSED:
121 /* remove our closing pss from the list of live pss */
122 lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
123 pss, vhd->pss_list);
124 break;
125
126 case LWS_CALLBACK_SERVER_WRITEABLE:
127
128 if (pss->publishing)
129 break;
130
131 pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
132 if (!pmsg)
133 break;
134
135 /* notice we allowed for LWS_PRE in the payload already */
136 m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
137 pmsg->len, LWS_WRITE_TEXT);
138 if (m < (int)pmsg->len) {
139 lwsl_err("ERROR %d writing to ws socket\n", m);
140 return -1;
141 }
142
143 lws_ring_consume_and_update_oldest_tail(
144 vhd->ring, /* lws_ring object */
145 struct per_session_data__minimal, /* type of objects with tails */
146 &pss->tail, /* tail of guy doing the consuming */
147 1, /* number of payload objects being consumed */
148 vhd->pss_list, /* head of list of objects with tails */
149 tail, /* member name of tail in objects with tails */
150 pss_list /* member name of next object in objects with tails */
151 );
152
153 /* more to do? */
154 if (lws_ring_get_element(vhd->ring, &pss->tail))
155 /* come back as soon as we can write more */
156 lws_callback_on_writable(pss->wsi);
157 break;
158
159 case LWS_CALLBACK_RECEIVE:
160
161 if (!pss->publishing)
162 break;
163
164 /*
165 * For test, our policy is ignore publishing when there are
166 * no subscribers connected.
167 */
168 if (!vhd->pss_list)
169 break;
170
171 n = (int)lws_ring_get_count_free_elements(vhd->ring);
172 if (!n) {
173 lwsl_user("dropping!\n");
174 break;
175 }
176
177 amsg.len = len;
178 /* notice we over-allocate by LWS_PRE */
179 amsg.payload = malloc(LWS_PRE + len);
180 if (!amsg.payload) {
181 lwsl_user("OOM: dropping\n");
182 break;
183 }
184
185 memcpy((char *)amsg.payload + LWS_PRE, in, len);
186 if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
187 __minimal_destroy_message(&amsg);
188 lwsl_user("dropping 2!\n");
189 break;
190 }
191
192 /*
193 * let every subscriber know we want to write something
194 * on them as soon as they are ready
195 */
196 lws_start_foreach_llp(struct per_session_data__minimal **,
197 ppss, vhd->pss_list) {
198 if (!(*ppss)->publishing)
199 lws_callback_on_writable((*ppss)->wsi);
200 } lws_end_foreach_llp(ppss, pss_list);
201 break;
202
203 default:
204 break;
205 }
206
207 return 0;
208 }
209
210 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
211 { \
212 "lws-minimal-broker", \
213 callback_minimal, \
214 sizeof(struct per_session_data__minimal), \
215 128, \
216 0, NULL, 0 \
217 }
218