• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * ws protocol handler plugin for "lws-minimal-broker"
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * This implements a minimal "broker", for systems that look like this
10  *
11  * [ publisher  ws client ] <-> [ ws server  broker ws server ] <-> [ ws client subscriber ]
12  *
13  * The "publisher" role is to add data to the broker.
14  *
15  * The "subscriber" role is to hear about all data added to the system.
16  *
17  * The "broker" role is to manage incoming data from publishers and pass it out
18  * to subscribers.
19  *
20  * Any number of publishers and subscribers are supported.
21  *
22  * This example implements a single ws server, using one ws protocol, that treats ws
23  * connections as being in publisher or subscriber mode according to the URL the ws
24  * connection was made to.  ws connections to "/publisher" URL are understood to be
25  * publishing data and to any other URL, subscribing.
26  */
27 
28 #if !defined (LWS_PLUGIN_STATIC)
29 #define LWS_DLL
30 #define LWS_INTERNAL
31 #include <libwebsockets.h>
32 #endif
33 
34 #include <string.h>
35 
36 /* one of these created for each message */
37 
38 struct msg {
39 	void *payload; /* is malloc'd */
40 	size_t len;
41 };
42 
43 /* one of these is created for each client connecting to us */
44 
45 struct per_session_data__minimal {
46 	struct per_session_data__minimal *pss_list;
47 	struct lws *wsi;
48 	uint32_t tail;
49 	char publishing; /* nonzero: peer is publishing to us */
50 };
51 
52 /* one of these is created for each vhost our protocol is used with */
53 
54 struct per_vhost_data__minimal {
55 	struct lws_context *context;
56 	struct lws_vhost *vhost;
57 	const struct lws_protocols *protocol;
58 
59 	struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
60 
61 	struct lws_ring *ring; /* ringbuffer holding unsent messages */
62 };
63 
64 /* destroys the message when everyone has had a copy of it */
65 
66 static void
__minimal_destroy_message(void * _msg)67 __minimal_destroy_message(void *_msg)
68 {
69 	struct msg *msg = _msg;
70 
71 	free(msg->payload);
72 	msg->payload = NULL;
73 	msg->len = 0;
74 }
75 
76 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)77 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
78 			void *user, void *in, size_t len)
79 {
80 	struct per_session_data__minimal *pss =
81 			(struct per_session_data__minimal *)user;
82 	struct per_vhost_data__minimal *vhd =
83 			(struct per_vhost_data__minimal *)
84 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
85 					lws_get_protocol(wsi));
86 	const struct msg *pmsg;
87 	struct msg amsg;
88 	char buf[32];
89 	int n, m;
90 
91 	switch (reason) {
92 	case LWS_CALLBACK_PROTOCOL_INIT:
93 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
94 				lws_get_protocol(wsi),
95 				sizeof(struct per_vhost_data__minimal));
96 		vhd->context = lws_get_context(wsi);
97 		vhd->protocol = lws_get_protocol(wsi);
98 		vhd->vhost = lws_get_vhost(wsi);
99 
100 		vhd->ring = lws_ring_create(sizeof(struct msg), 8,
101 					    __minimal_destroy_message);
102 		if (!vhd->ring)
103 			return 1;
104 		break;
105 
106 	case LWS_CALLBACK_PROTOCOL_DESTROY:
107 		lws_ring_destroy(vhd->ring);
108 		break;
109 
110 	case LWS_CALLBACK_ESTABLISHED:
111 		pss->tail = lws_ring_get_oldest_tail(vhd->ring);
112 		pss->wsi = wsi;
113 		if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
114 			pss->publishing = !strcmp(buf, "/publisher");
115 		if (!pss->publishing)
116 			/* add subscribers to the list of live pss held in the vhd */
117 			lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
118 		break;
119 
120 	case LWS_CALLBACK_CLOSED:
121 		/* remove our closing pss from the list of live pss */
122 		lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
123 				  pss, vhd->pss_list);
124 		break;
125 
126 	case LWS_CALLBACK_SERVER_WRITEABLE:
127 
128 		if (pss->publishing)
129 			break;
130 
131 		pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
132 		if (!pmsg)
133 			break;
134 
135 		/* notice we allowed for LWS_PRE in the payload already */
136 		m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
137 			      pmsg->len, LWS_WRITE_TEXT);
138 		if (m < (int)pmsg->len) {
139 			lwsl_err("ERROR %d writing to ws socket\n", m);
140 			return -1;
141 		}
142 
143 		lws_ring_consume_and_update_oldest_tail(
144 			vhd->ring,	/* lws_ring object */
145 			struct per_session_data__minimal, /* type of objects with tails */
146 			&pss->tail,	/* tail of guy doing the consuming */
147 			1,		/* number of payload objects being consumed */
148 			vhd->pss_list,	/* head of list of objects with tails */
149 			tail,		/* member name of tail in objects with tails */
150 			pss_list	/* member name of next object in objects with tails */
151 		);
152 
153 		/* more to do? */
154 		if (lws_ring_get_element(vhd->ring, &pss->tail))
155 			/* come back as soon as we can write more */
156 			lws_callback_on_writable(pss->wsi);
157 		break;
158 
159 	case LWS_CALLBACK_RECEIVE:
160 
161 		if (!pss->publishing)
162 			break;
163 
164 		/*
165 		 * For test, our policy is ignore publishing when there are
166 		 * no subscribers connected.
167 		 */
168 		if (!vhd->pss_list)
169 			break;
170 
171 		n = (int)lws_ring_get_count_free_elements(vhd->ring);
172 		if (!n) {
173 			lwsl_user("dropping!\n");
174 			break;
175 		}
176 
177 		amsg.len = len;
178 		/* notice we over-allocate by LWS_PRE */
179 		amsg.payload = malloc(LWS_PRE + len);
180 		if (!amsg.payload) {
181 			lwsl_user("OOM: dropping\n");
182 			break;
183 		}
184 
185 		memcpy((char *)amsg.payload + LWS_PRE, in, len);
186 		if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
187 			__minimal_destroy_message(&amsg);
188 			lwsl_user("dropping 2!\n");
189 			break;
190 		}
191 
192 		/*
193 		 * let every subscriber know we want to write something
194 		 * on them as soon as they are ready
195 		 */
196 		lws_start_foreach_llp(struct per_session_data__minimal **,
197 				      ppss, vhd->pss_list) {
198 			if (!(*ppss)->publishing)
199 				lws_callback_on_writable((*ppss)->wsi);
200 		} lws_end_foreach_llp(ppss, pss_list);
201 		break;
202 
203 	default:
204 		break;
205 	}
206 
207 	return 0;
208 }
209 
210 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
211 	{ \
212 		"lws-minimal-broker", \
213 		callback_minimal, \
214 		sizeof(struct per_session_data__minimal), \
215 		128, \
216 		0, NULL, 0 \
217 	}
218