• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * r2net.c
3  *
4  * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
5  *
6  * Ramster_r2net provides an interface between zcache and r2net.
7  *
8  * FIXME: support more than two nodes
9  */
10 
11 #include <linux/list.h>
12 #include "cluster/tcp.h"
13 #include "cluster/nodemanager.h"
14 #include "tmem.h"
15 #include "zcache.h"
16 #include "ramster.h"
17 
18 #define RAMSTER_TESTING
19 
20 #define RMSTR_KEY	0x77347734
21 
22 enum {
23 	RMSTR_TMEM_PUT_EPH = 100,
24 	RMSTR_TMEM_PUT_PERS,
25 	RMSTR_TMEM_ASYNC_GET_REQUEST,
26 	RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 	RMSTR_TMEM_ASYNC_GET_REPLY,
28 	RMSTR_TMEM_FLUSH,
29 	RMSTR_TMEM_FLOBJ,
30 	RMSTR_TMEM_DESTROY_POOL,
31 };
32 
33 #define RMSTR_R2NET_MAX_LEN \
34 		(R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35 
36 #include "cluster/tcp_internal.h"
37 
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40 
r2net_remote_target_node_set(int node_num)41 int r2net_remote_target_node_set(int node_num)
42 {
43 	int ret = -1;
44 
45 	r2net_target_node = r2nm_get_node_by_num(node_num);
46 	if (r2net_target_node != NULL) {
47 		r2net_target_nodenum = node_num;
48 		r2nm_node_put(r2net_target_node);
49 		ret = 0;
50 	}
51 	return ret;
52 }
53 
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56 
ramster_remote_async_get_request_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 				u32 len, void *data, void **ret_data)
59 {
60 	char *pdata;
61 	struct tmem_xhandle xh;
62 	int found;
63 	size_t size = RMSTR_R2NET_MAX_LEN;
64 	u16 msgtype = be16_to_cpu(msg->msg_type);
65 	bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66 	unsigned long flags;
67 
68 	xh = *(struct tmem_xhandle *)msg->buf;
69 	if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70 		BUG();
71 	pdata = ramster_async_get_buf;
72 	*(struct tmem_xhandle *)pdata = xh;
73 	pdata += sizeof(struct tmem_xhandle);
74 	local_irq_save(flags);
75 	found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 				pdata, &size, 1, get_and_free ? 1 : -1);
77 	local_irq_restore(flags);
78 	if (found < 0) {
79 		/* a zero size indicates the get failed */
80 		size = 0;
81 	}
82 	if (size > RMSTR_R2NET_MAX_LEN)
83 		BUG();
84 	*ret_data = pdata - sizeof(struct tmem_xhandle);
85 	/* now make caller (r2net_process_message) handle specially */
86 	r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 	return size + sizeof(struct tmem_xhandle);
88 }
89 
ramster_remote_async_get_reply_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 				u32 len, void *data, void **ret_data)
92 {
93 	char *in = (char *)msg->buf;
94 	int datalen = len - sizeof(struct r2net_msg);
95 	int ret = -1;
96 	struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97 
98 	in += sizeof(struct tmem_xhandle);
99 	datalen -= sizeof(struct tmem_xhandle);
100 	BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 	ret = zcache_localify(xh->pool_id, &xh->oid, xh->index,
102 				in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104 	if (ret == -EEXIST)
105 		pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107 	return ret;
108 }
109 
ramster_remote_put_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)110 int ramster_remote_put_handler(struct r2net_msg *msg,
111 				u32 len, void *data, void **ret_data)
112 {
113 	struct tmem_xhandle *xh;
114 	char *p = (char *)msg->buf;
115 	int datalen = len - sizeof(struct r2net_msg) -
116 				sizeof(struct tmem_xhandle);
117 	u16 msgtype = be16_to_cpu(msg->msg_type);
118 	bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119 	unsigned long flags;
120 	int ret;
121 
122 	xh = (struct tmem_xhandle *)p;
123 	p += sizeof(struct tmem_xhandle);
124 	zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 	local_irq_save(flags);
126 	ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 				p, datalen, 1, ephemeral ? 1 : -1);
128 	local_irq_restore(flags);
129 	return ret;
130 }
131 
ramster_remote_flush_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133 				u32 len, void *data, void **ret_data)
134 {
135 	struct tmem_xhandle *xh;
136 	char *p = (char *)msg->buf;
137 
138 	xh = (struct tmem_xhandle *)p;
139 	p += sizeof(struct tmem_xhandle);
140 	(void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index);
141 	return 0;
142 }
143 
ramster_remote_flobj_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)144 int ramster_remote_flobj_handler(struct r2net_msg *msg,
145 				u32 len, void *data, void **ret_data)
146 {
147 	struct tmem_xhandle *xh;
148 	char *p = (char *)msg->buf;
149 
150 	xh = (struct tmem_xhandle *)p;
151 	p += sizeof(struct tmem_xhandle);
152 	(void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
153 	return 0;
154 }
155 
ramster_remote_async_get(struct tmem_xhandle * xh,bool free,int remotenode,size_t expect_size,uint8_t expect_cksum,void * extra)156 int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
157 				size_t expect_size, uint8_t expect_cksum,
158 				void *extra)
159 {
160 	int ret = -1, status;
161 	struct r2nm_node *node = NULL;
162 	struct kvec vec[1];
163 	size_t veclen = 1;
164 	u32 msg_type;
165 
166 	node = r2nm_get_node_by_num(remotenode);
167 	if (node == NULL)
168 		goto out;
169 	xh->client_id = r2nm_this_node(); /* which node is getting */
170 	xh->xh_data_cksum = expect_cksum;
171 	xh->xh_data_size = expect_size;
172 	xh->extra = extra;
173 	vec[0].iov_len = sizeof(*xh);
174 	vec[0].iov_base = xh;
175 	if (free)
176 		msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
177 	else
178 		msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
179 	ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
180 					vec, veclen, remotenode, &status);
181 	r2nm_node_put(node);
182 	if (ret < 0) {
183 		/* FIXME handle bad message possibilities here? */
184 		pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
185 	}
186 	ret = status;
187 out:
188 	return ret;
189 }
190 
191 #ifdef RAMSTER_TESTING
192 /* leave me here to see if it catches a weird crash */
ramster_check_irq_counts(void)193 static void ramster_check_irq_counts(void)
194 {
195 	static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
196 	int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
197 
198 	cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
199 	if (cur_hardirq_cnt > last_hardirq_cnt) {
200 		last_hardirq_cnt = cur_hardirq_cnt;
201 		if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
202 			pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
203 				last_hardirq_cnt);
204 	}
205 	cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
206 	if (cur_softirq_cnt > last_softirq_cnt) {
207 		last_softirq_cnt = cur_softirq_cnt;
208 		if (!(last_softirq_cnt&(last_softirq_cnt-1)))
209 			pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
210 				last_softirq_cnt);
211 	}
212 	cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
213 	if (cur_preempt_cnt > last_preempt_cnt) {
214 		last_preempt_cnt = cur_preempt_cnt;
215 		if (!(last_preempt_cnt&(last_preempt_cnt-1)))
216 			pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
217 				last_preempt_cnt);
218 	}
219 }
220 #endif
221 
ramster_remote_put(struct tmem_xhandle * xh,char * data,size_t size,bool ephemeral,int * remotenode)222 int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
223 				bool ephemeral, int *remotenode)
224 {
225 	int nodenum, ret = -1, status;
226 	struct r2nm_node *node = NULL;
227 	struct kvec vec[2];
228 	size_t veclen = 2;
229 	u32 msg_type;
230 #ifdef RAMSTER_TESTING
231 	struct r2net_node *nn;
232 #endif
233 
234 	BUG_ON(size > RMSTR_R2NET_MAX_LEN);
235 	xh->client_id = r2nm_this_node(); /* which node is putting */
236 	vec[0].iov_len = sizeof(*xh);
237 	vec[0].iov_base = xh;
238 	vec[1].iov_len = size;
239 	vec[1].iov_base = data;
240 	node = r2net_target_node;
241 	if (!node)
242 		goto out;
243 
244 	nodenum = r2net_target_nodenum;
245 
246 	r2nm_node_get(node);
247 
248 #ifdef RAMSTER_TESTING
249 	nn = r2net_nn_from_num(nodenum);
250 	WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid);
251 #endif
252 
253 	if (ephemeral)
254 		msg_type = RMSTR_TMEM_PUT_EPH;
255 	else
256 		msg_type = RMSTR_TMEM_PUT_PERS;
257 #ifdef RAMSTER_TESTING
258 	/* leave me here to see if it catches a weird crash */
259 	ramster_check_irq_counts();
260 #endif
261 
262 	ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
263 						nodenum, &status);
264 #ifdef RAMSTER_TESTING
265 	if (ret != 0) {
266 		static unsigned long cnt;
267 		cnt++;
268 		if (!(cnt&(cnt-1)))
269 			pr_err("ramster_remote_put: message failed, "
270 				"ret=%d, cnt=%lu\n", ret, cnt);
271 		ret = -1;
272 	}
273 #endif
274 	if (ret < 0)
275 		ret = -1;
276 	else {
277 		ret = status;
278 		*remotenode = nodenum;
279 	}
280 
281 	r2nm_node_put(node);
282 out:
283 	return ret;
284 }
285 
ramster_remote_flush(struct tmem_xhandle * xh,int remotenode)286 int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode)
287 {
288 	int ret = -1, status;
289 	struct r2nm_node *node = NULL;
290 	struct kvec vec[1];
291 	size_t veclen = 1;
292 
293 	node = r2nm_get_node_by_num(remotenode);
294 	BUG_ON(node == NULL);
295 	xh->client_id = r2nm_this_node(); /* which node is flushing */
296 	vec[0].iov_len = sizeof(*xh);
297 	vec[0].iov_base = xh;
298 	BUG_ON(irqs_disabled());
299 	BUG_ON(in_softirq());
300 	ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
301 					vec, veclen, remotenode, &status);
302 	r2nm_node_put(node);
303 	return ret;
304 }
305 
ramster_remote_flush_object(struct tmem_xhandle * xh,int remotenode)306 int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
307 {
308 	int ret = -1, status;
309 	struct r2nm_node *node = NULL;
310 	struct kvec vec[1];
311 	size_t veclen = 1;
312 
313 	node = r2nm_get_node_by_num(remotenode);
314 	BUG_ON(node == NULL);
315 	xh->client_id = r2nm_this_node(); /* which node is flobjing */
316 	vec[0].iov_len = sizeof(*xh);
317 	vec[0].iov_base = xh;
318 	ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
319 					vec, veclen, remotenode, &status);
320 	r2nm_node_put(node);
321 	return ret;
322 }
323 
324 /*
325  * Handler registration
326  */
327 
328 static LIST_HEAD(r2net_unreg_list);
329 
r2net_unregister_handlers(void)330 static void r2net_unregister_handlers(void)
331 {
332 	r2net_unregister_handler_list(&r2net_unreg_list);
333 }
334 
r2net_register_handlers(void)335 int r2net_register_handlers(void)
336 {
337 	int status;
338 
339 	status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
340 				RMSTR_R2NET_MAX_LEN,
341 				ramster_remote_put_handler,
342 				NULL, NULL, &r2net_unreg_list);
343 	if (status)
344 		goto bail;
345 
346 	status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
347 				RMSTR_R2NET_MAX_LEN,
348 				ramster_remote_put_handler,
349 				NULL, NULL, &r2net_unreg_list);
350 	if (status)
351 		goto bail;
352 
353 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
354 				RMSTR_R2NET_MAX_LEN,
355 				ramster_remote_async_get_request_handler,
356 				NULL, NULL,
357 				&r2net_unreg_list);
358 	if (status)
359 		goto bail;
360 
361 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
362 				RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
363 				ramster_remote_async_get_request_handler,
364 				NULL, NULL,
365 				&r2net_unreg_list);
366 	if (status)
367 		goto bail;
368 
369 	status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
370 				RMSTR_R2NET_MAX_LEN,
371 				ramster_remote_async_get_reply_handler,
372 				NULL, NULL,
373 				&r2net_unreg_list);
374 	if (status)
375 		goto bail;
376 
377 	status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
378 				RMSTR_R2NET_MAX_LEN,
379 				ramster_remote_flush_handler,
380 				NULL, NULL,
381 				&r2net_unreg_list);
382 	if (status)
383 		goto bail;
384 
385 	status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
386 				RMSTR_R2NET_MAX_LEN,
387 				ramster_remote_flobj_handler,
388 				NULL, NULL,
389 				&r2net_unreg_list);
390 	if (status)
391 		goto bail;
392 
393 	pr_info("ramster: r2net handlers registered\n");
394 
395 bail:
396 	if (status) {
397 		r2net_unregister_handlers();
398 		pr_err("ramster: couldn't register r2net handlers\n");
399 	}
400 	return status;
401 }
402