• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * r2net.c
3  *
4  * Copyright (c) 2011-2012, Dan Magenheimer, Oracle Corp.
5  *
6  * Ramster_r2net provides an interface between zcache and r2net.
7  *
8  * FIXME: support more than two nodes
9  */
10 
11 #include <linux/list.h>
12 #include "tcp.h"
13 #include "nodemanager.h"
14 #include "../tmem.h"
15 #include "../zcache.h"
16 #include "ramster.h"
17 
18 #define RAMSTER_TESTING
19 
20 #define RMSTR_KEY	0x77347734
21 
22 enum {
23 	RMSTR_TMEM_PUT_EPH = 100,
24 	RMSTR_TMEM_PUT_PERS,
25 	RMSTR_TMEM_ASYNC_GET_REQUEST,
26 	RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 	RMSTR_TMEM_ASYNC_GET_REPLY,
28 	RMSTR_TMEM_FLUSH,
29 	RMSTR_TMEM_FLOBJ,
30 	RMSTR_TMEM_DESTROY_POOL,
31 };
32 
33 #define RMSTR_R2NET_MAX_LEN \
34 		(R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35 
36 #include "tcp_internal.h"
37 
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40 
r2net_remote_target_node_set(int node_num)41 int r2net_remote_target_node_set(int node_num)
42 {
43 	int ret = -1;
44 
45 	r2net_target_node = r2nm_get_node_by_num(node_num);
46 	if (r2net_target_node != NULL) {
47 		r2net_target_nodenum = node_num;
48 		r2nm_node_put(r2net_target_node);
49 		ret = 0;
50 	}
51 	return ret;
52 }
53 
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56 
ramster_remote_async_get_request_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 				u32 len, void *data, void **ret_data)
59 {
60 	char *pdata;
61 	struct tmem_xhandle xh;
62 	int found;
63 	size_t size = RMSTR_R2NET_MAX_LEN;
64 	u16 msgtype = be16_to_cpu(msg->msg_type);
65 	bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66 	unsigned long flags;
67 
68 	xh = *(struct tmem_xhandle *)msg->buf;
69 	if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70 		BUG();
71 	pdata = ramster_async_get_buf;
72 	*(struct tmem_xhandle *)pdata = xh;
73 	pdata += sizeof(struct tmem_xhandle);
74 	local_irq_save(flags);
75 	found = zcache_get_page(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 				pdata, &size, true, get_and_free ? 1 : -1);
77 	local_irq_restore(flags);
78 	if (found < 0) {
79 		/* a zero size indicates the get failed */
80 		size = 0;
81 	}
82 	if (size > RMSTR_R2NET_MAX_LEN)
83 		BUG();
84 	*ret_data = pdata - sizeof(struct tmem_xhandle);
85 	/* now make caller (r2net_process_message) handle specially */
86 	r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 	return size + sizeof(struct tmem_xhandle);
88 }
89 
ramster_remote_async_get_reply_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 				u32 len, void *data, void **ret_data)
92 {
93 	char *in = (char *)msg->buf;
94 	int datalen = len - sizeof(struct r2net_msg);
95 	int ret = -1;
96 	struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97 
98 	in += sizeof(struct tmem_xhandle);
99 	datalen -= sizeof(struct tmem_xhandle);
100 	BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 	ret = ramster_localify(xh->pool_id, &xh->oid, xh->index,
102 				in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104 	if (ret == -EEXIST)
105 		pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107 	return ret;
108 }
109 
ramster_remote_put_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)110 int ramster_remote_put_handler(struct r2net_msg *msg,
111 				u32 len, void *data, void **ret_data)
112 {
113 	struct tmem_xhandle *xh;
114 	char *p = (char *)msg->buf;
115 	int datalen = len - sizeof(struct r2net_msg) -
116 				sizeof(struct tmem_xhandle);
117 	u16 msgtype = be16_to_cpu(msg->msg_type);
118 	bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119 	unsigned long flags;
120 	int ret;
121 
122 	xh = (struct tmem_xhandle *)p;
123 	p += sizeof(struct tmem_xhandle);
124 	zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 	local_irq_save(flags);
126 	ret = zcache_put_page(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 				p, datalen, true, ephemeral);
128 	local_irq_restore(flags);
129 	return ret;
130 }
131 
ramster_remote_flush_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133 				u32 len, void *data, void **ret_data)
134 {
135 	struct tmem_xhandle *xh;
136 	char *p = (char *)msg->buf;
137 
138 	xh = (struct tmem_xhandle *)p;
139 	p += sizeof(struct tmem_xhandle);
140 	(void)zcache_flush_page(xh->client_id, xh->pool_id,
141 					&xh->oid, xh->index);
142 	return 0;
143 }
144 
ramster_remote_flobj_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)145 int ramster_remote_flobj_handler(struct r2net_msg *msg,
146 				u32 len, void *data, void **ret_data)
147 {
148 	struct tmem_xhandle *xh;
149 	char *p = (char *)msg->buf;
150 
151 	xh = (struct tmem_xhandle *)p;
152 	p += sizeof(struct tmem_xhandle);
153 	(void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
154 	return 0;
155 }
156 
r2net_remote_async_get(struct tmem_xhandle * xh,bool free,int remotenode,size_t expect_size,uint8_t expect_cksum,void * extra)157 int r2net_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
158 				size_t expect_size, uint8_t expect_cksum,
159 				void *extra)
160 {
161 	int nodenum, ret = -1, status;
162 	struct r2nm_node *node = NULL;
163 	struct kvec vec[1];
164 	size_t veclen = 1;
165 	u32 msg_type;
166 	struct r2net_node *nn;
167 
168 	node = r2nm_get_node_by_num(remotenode);
169 	if (node == NULL)
170 		goto out;
171 	xh->client_id = r2nm_this_node(); /* which node is getting */
172 	xh->xh_data_cksum = expect_cksum;
173 	xh->xh_data_size = expect_size;
174 	xh->extra = extra;
175 	vec[0].iov_len = sizeof(*xh);
176 	vec[0].iov_base = xh;
177 
178 	node = r2net_target_node;
179 	if (!node)
180 		goto out;
181 
182 	nodenum = r2net_target_nodenum;
183 
184 	r2nm_node_get(node);
185 	nn = r2net_nn_from_num(nodenum);
186 	if (nn->nn_persistent_error || !nn->nn_sc_valid) {
187 		ret = -ENOTCONN;
188 		r2nm_node_put(node);
189 		goto out;
190 	}
191 
192 	if (free)
193 		msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
194 	else
195 		msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
196 	ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
197 					vec, veclen, remotenode, &status);
198 	r2nm_node_put(node);
199 	if (ret < 0) {
200 		if (ret == -ENOTCONN || ret == -EHOSTDOWN)
201 			goto out;
202 		if (ret == -EAGAIN)
203 			goto out;
204 		/* FIXME handle bad message possibilities here? */
205 		pr_err("UNTESTED ret<0 in ramster_remote_async_get: ret=%d\n",
206 				ret);
207 	}
208 	ret = status;
209 out:
210 	return ret;
211 }
212 
213 #ifdef RAMSTER_TESTING
214 /* leave me here to see if it catches a weird crash */
ramster_check_irq_counts(void)215 static void ramster_check_irq_counts(void)
216 {
217 	static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
218 	int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
219 
220 	cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
221 	if (cur_hardirq_cnt > last_hardirq_cnt) {
222 		last_hardirq_cnt = cur_hardirq_cnt;
223 		if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
224 			pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
225 				last_hardirq_cnt);
226 	}
227 	cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
228 	if (cur_softirq_cnt > last_softirq_cnt) {
229 		last_softirq_cnt = cur_softirq_cnt;
230 		if (!(last_softirq_cnt&(last_softirq_cnt-1)))
231 			pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
232 				last_softirq_cnt);
233 	}
234 	cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
235 	if (cur_preempt_cnt > last_preempt_cnt) {
236 		last_preempt_cnt = cur_preempt_cnt;
237 		if (!(last_preempt_cnt&(last_preempt_cnt-1)))
238 			pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
239 				last_preempt_cnt);
240 	}
241 }
242 #endif
243 
r2net_remote_put(struct tmem_xhandle * xh,char * data,size_t size,bool ephemeral,int * remotenode)244 int r2net_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
245 				bool ephemeral, int *remotenode)
246 {
247 	int nodenum, ret = -1, status;
248 	struct r2nm_node *node = NULL;
249 	struct kvec vec[2];
250 	size_t veclen = 2;
251 	u32 msg_type;
252 	struct r2net_node *nn;
253 
254 	BUG_ON(size > RMSTR_R2NET_MAX_LEN);
255 	xh->client_id = r2nm_this_node(); /* which node is putting */
256 	vec[0].iov_len = sizeof(*xh);
257 	vec[0].iov_base = xh;
258 	vec[1].iov_len = size;
259 	vec[1].iov_base = data;
260 
261 	node = r2net_target_node;
262 	if (!node)
263 		goto out;
264 
265 	nodenum = r2net_target_nodenum;
266 
267 	r2nm_node_get(node);
268 
269 	nn = r2net_nn_from_num(nodenum);
270 	if (nn->nn_persistent_error || !nn->nn_sc_valid) {
271 		ret = -ENOTCONN;
272 		r2nm_node_put(node);
273 		goto out;
274 	}
275 
276 	if (ephemeral)
277 		msg_type = RMSTR_TMEM_PUT_EPH;
278 	else
279 		msg_type = RMSTR_TMEM_PUT_PERS;
280 #ifdef RAMSTER_TESTING
281 	/* leave me here to see if it catches a weird crash */
282 	ramster_check_irq_counts();
283 #endif
284 
285 	ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
286 						nodenum, &status);
287 	if (ret < 0)
288 		ret = -1;
289 	else {
290 		ret = status;
291 		*remotenode = nodenum;
292 	}
293 
294 	r2nm_node_put(node);
295 out:
296 	return ret;
297 }
298 
r2net_remote_flush(struct tmem_xhandle * xh,int remotenode)299 int r2net_remote_flush(struct tmem_xhandle *xh, int remotenode)
300 {
301 	int ret = -1, status;
302 	struct r2nm_node *node = NULL;
303 	struct kvec vec[1];
304 	size_t veclen = 1;
305 
306 	node = r2nm_get_node_by_num(remotenode);
307 	BUG_ON(node == NULL);
308 	xh->client_id = r2nm_this_node(); /* which node is flushing */
309 	vec[0].iov_len = sizeof(*xh);
310 	vec[0].iov_base = xh;
311 	BUG_ON(irqs_disabled());
312 	BUG_ON(in_softirq());
313 	ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
314 					vec, veclen, remotenode, &status);
315 	r2nm_node_put(node);
316 	return ret;
317 }
318 
r2net_remote_flush_object(struct tmem_xhandle * xh,int remotenode)319 int r2net_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
320 {
321 	int ret = -1, status;
322 	struct r2nm_node *node = NULL;
323 	struct kvec vec[1];
324 	size_t veclen = 1;
325 
326 	node = r2nm_get_node_by_num(remotenode);
327 	BUG_ON(node == NULL);
328 	xh->client_id = r2nm_this_node(); /* which node is flobjing */
329 	vec[0].iov_len = sizeof(*xh);
330 	vec[0].iov_base = xh;
331 	ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
332 					vec, veclen, remotenode, &status);
333 	r2nm_node_put(node);
334 	return ret;
335 }
336 
337 /*
338  * Handler registration
339  */
340 
341 static LIST_HEAD(r2net_unreg_list);
342 
r2net_unregister_handlers(void)343 static void r2net_unregister_handlers(void)
344 {
345 	r2net_unregister_handler_list(&r2net_unreg_list);
346 }
347 
r2net_register_handlers(void)348 int r2net_register_handlers(void)
349 {
350 	int status;
351 
352 	status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
353 				RMSTR_R2NET_MAX_LEN,
354 				ramster_remote_put_handler,
355 				NULL, NULL, &r2net_unreg_list);
356 	if (status)
357 		goto bail;
358 
359 	status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
360 				RMSTR_R2NET_MAX_LEN,
361 				ramster_remote_put_handler,
362 				NULL, NULL, &r2net_unreg_list);
363 	if (status)
364 		goto bail;
365 
366 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
367 				RMSTR_R2NET_MAX_LEN,
368 				ramster_remote_async_get_request_handler,
369 				NULL, NULL,
370 				&r2net_unreg_list);
371 	if (status)
372 		goto bail;
373 
374 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
375 				RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
376 				ramster_remote_async_get_request_handler,
377 				NULL, NULL,
378 				&r2net_unreg_list);
379 	if (status)
380 		goto bail;
381 
382 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
383 				RMSTR_R2NET_MAX_LEN,
384 				ramster_remote_async_get_reply_handler,
385 				NULL, NULL,
386 				&r2net_unreg_list);
387 	if (status)
388 		goto bail;
389 
390 	status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
391 				RMSTR_R2NET_MAX_LEN,
392 				ramster_remote_flush_handler,
393 				NULL, NULL,
394 				&r2net_unreg_list);
395 	if (status)
396 		goto bail;
397 
398 	status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
399 				RMSTR_R2NET_MAX_LEN,
400 				ramster_remote_flobj_handler,
401 				NULL, NULL,
402 				&r2net_unreg_list);
403 	if (status)
404 		goto bail;
405 
406 	pr_info("ramster: r2net handlers registered\n");
407 
408 bail:
409 	if (status) {
410 		r2net_unregister_handlers();
411 		pr_err("ramster: couldn't register r2net handlers\n");
412 	}
413 	return status;
414 }
415