1 /*
2 * r2net.c
3 *
4 * Copyright (c) 2011-2012, Dan Magenheimer, Oracle Corp.
5 *
6 * Ramster_r2net provides an interface between zcache and r2net.
7 *
8 * FIXME: support more than two nodes
9 */
10
11 #include <linux/list.h>
12 #include "tcp.h"
13 #include "nodemanager.h"
14 #include "../tmem.h"
15 #include "../zcache.h"
16 #include "ramster.h"
17
18 #define RAMSTER_TESTING
19
20 #define RMSTR_KEY 0x77347734
21
22 enum {
23 RMSTR_TMEM_PUT_EPH = 100,
24 RMSTR_TMEM_PUT_PERS,
25 RMSTR_TMEM_ASYNC_GET_REQUEST,
26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 RMSTR_TMEM_ASYNC_GET_REPLY,
28 RMSTR_TMEM_FLUSH,
29 RMSTR_TMEM_FLOBJ,
30 RMSTR_TMEM_DESTROY_POOL,
31 };
32
33 #define RMSTR_R2NET_MAX_LEN \
34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35
36 #include "tcp_internal.h"
37
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40
r2net_remote_target_node_set(int node_num)41 int r2net_remote_target_node_set(int node_num)
42 {
43 int ret = -1;
44
45 r2net_target_node = r2nm_get_node_by_num(node_num);
46 if (r2net_target_node != NULL) {
47 r2net_target_nodenum = node_num;
48 r2nm_node_put(r2net_target_node);
49 ret = 0;
50 }
51 return ret;
52 }
53
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56
ramster_remote_async_get_request_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 u32 len, void *data, void **ret_data)
59 {
60 char *pdata;
61 struct tmem_xhandle xh;
62 int found;
63 size_t size = RMSTR_R2NET_MAX_LEN;
64 u16 msgtype = be16_to_cpu(msg->msg_type);
65 bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66 unsigned long flags;
67
68 xh = *(struct tmem_xhandle *)msg->buf;
69 if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70 BUG();
71 pdata = ramster_async_get_buf;
72 *(struct tmem_xhandle *)pdata = xh;
73 pdata += sizeof(struct tmem_xhandle);
74 local_irq_save(flags);
75 found = zcache_get_page(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 pdata, &size, true, get_and_free ? 1 : -1);
77 local_irq_restore(flags);
78 if (found < 0) {
79 /* a zero size indicates the get failed */
80 size = 0;
81 }
82 if (size > RMSTR_R2NET_MAX_LEN)
83 BUG();
84 *ret_data = pdata - sizeof(struct tmem_xhandle);
85 /* now make caller (r2net_process_message) handle specially */
86 r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 return size + sizeof(struct tmem_xhandle);
88 }
89
ramster_remote_async_get_reply_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 u32 len, void *data, void **ret_data)
92 {
93 char *in = (char *)msg->buf;
94 int datalen = len - sizeof(struct r2net_msg);
95 int ret = -1;
96 struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97
98 in += sizeof(struct tmem_xhandle);
99 datalen -= sizeof(struct tmem_xhandle);
100 BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 ret = ramster_localify(xh->pool_id, &xh->oid, xh->index,
102 in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104 if (ret == -EEXIST)
105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107 return ret;
108 }
109
ramster_remote_put_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)110 int ramster_remote_put_handler(struct r2net_msg *msg,
111 u32 len, void *data, void **ret_data)
112 {
113 struct tmem_xhandle *xh;
114 char *p = (char *)msg->buf;
115 int datalen = len - sizeof(struct r2net_msg) -
116 sizeof(struct tmem_xhandle);
117 u16 msgtype = be16_to_cpu(msg->msg_type);
118 bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119 unsigned long flags;
120 int ret;
121
122 xh = (struct tmem_xhandle *)p;
123 p += sizeof(struct tmem_xhandle);
124 zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 local_irq_save(flags);
126 ret = zcache_put_page(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 p, datalen, true, ephemeral);
128 local_irq_restore(flags);
129 return ret;
130 }
131
ramster_remote_flush_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133 u32 len, void *data, void **ret_data)
134 {
135 struct tmem_xhandle *xh;
136 char *p = (char *)msg->buf;
137
138 xh = (struct tmem_xhandle *)p;
139 p += sizeof(struct tmem_xhandle);
140 (void)zcache_flush_page(xh->client_id, xh->pool_id,
141 &xh->oid, xh->index);
142 return 0;
143 }
144
ramster_remote_flobj_handler(struct r2net_msg * msg,u32 len,void * data,void ** ret_data)145 int ramster_remote_flobj_handler(struct r2net_msg *msg,
146 u32 len, void *data, void **ret_data)
147 {
148 struct tmem_xhandle *xh;
149 char *p = (char *)msg->buf;
150
151 xh = (struct tmem_xhandle *)p;
152 p += sizeof(struct tmem_xhandle);
153 (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
154 return 0;
155 }
156
r2net_remote_async_get(struct tmem_xhandle * xh,bool free,int remotenode,size_t expect_size,uint8_t expect_cksum,void * extra)157 int r2net_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
158 size_t expect_size, uint8_t expect_cksum,
159 void *extra)
160 {
161 int nodenum, ret = -1, status;
162 struct r2nm_node *node = NULL;
163 struct kvec vec[1];
164 size_t veclen = 1;
165 u32 msg_type;
166 struct r2net_node *nn;
167
168 node = r2nm_get_node_by_num(remotenode);
169 if (node == NULL)
170 goto out;
171 xh->client_id = r2nm_this_node(); /* which node is getting */
172 xh->xh_data_cksum = expect_cksum;
173 xh->xh_data_size = expect_size;
174 xh->extra = extra;
175 vec[0].iov_len = sizeof(*xh);
176 vec[0].iov_base = xh;
177
178 node = r2net_target_node;
179 if (!node)
180 goto out;
181
182 nodenum = r2net_target_nodenum;
183
184 r2nm_node_get(node);
185 nn = r2net_nn_from_num(nodenum);
186 if (nn->nn_persistent_error || !nn->nn_sc_valid) {
187 ret = -ENOTCONN;
188 r2nm_node_put(node);
189 goto out;
190 }
191
192 if (free)
193 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
194 else
195 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
196 ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
197 vec, veclen, remotenode, &status);
198 r2nm_node_put(node);
199 if (ret < 0) {
200 if (ret == -ENOTCONN || ret == -EHOSTDOWN)
201 goto out;
202 if (ret == -EAGAIN)
203 goto out;
204 /* FIXME handle bad message possibilities here? */
205 pr_err("UNTESTED ret<0 in ramster_remote_async_get: ret=%d\n",
206 ret);
207 }
208 ret = status;
209 out:
210 return ret;
211 }
212
213 #ifdef RAMSTER_TESTING
214 /* leave me here to see if it catches a weird crash */
ramster_check_irq_counts(void)215 static void ramster_check_irq_counts(void)
216 {
217 static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
218 int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
219
220 cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
221 if (cur_hardirq_cnt > last_hardirq_cnt) {
222 last_hardirq_cnt = cur_hardirq_cnt;
223 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
224 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
225 last_hardirq_cnt);
226 }
227 cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
228 if (cur_softirq_cnt > last_softirq_cnt) {
229 last_softirq_cnt = cur_softirq_cnt;
230 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
231 pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
232 last_softirq_cnt);
233 }
234 cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
235 if (cur_preempt_cnt > last_preempt_cnt) {
236 last_preempt_cnt = cur_preempt_cnt;
237 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
238 pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
239 last_preempt_cnt);
240 }
241 }
242 #endif
243
r2net_remote_put(struct tmem_xhandle * xh,char * data,size_t size,bool ephemeral,int * remotenode)244 int r2net_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
245 bool ephemeral, int *remotenode)
246 {
247 int nodenum, ret = -1, status;
248 struct r2nm_node *node = NULL;
249 struct kvec vec[2];
250 size_t veclen = 2;
251 u32 msg_type;
252 struct r2net_node *nn;
253
254 BUG_ON(size > RMSTR_R2NET_MAX_LEN);
255 xh->client_id = r2nm_this_node(); /* which node is putting */
256 vec[0].iov_len = sizeof(*xh);
257 vec[0].iov_base = xh;
258 vec[1].iov_len = size;
259 vec[1].iov_base = data;
260
261 node = r2net_target_node;
262 if (!node)
263 goto out;
264
265 nodenum = r2net_target_nodenum;
266
267 r2nm_node_get(node);
268
269 nn = r2net_nn_from_num(nodenum);
270 if (nn->nn_persistent_error || !nn->nn_sc_valid) {
271 ret = -ENOTCONN;
272 r2nm_node_put(node);
273 goto out;
274 }
275
276 if (ephemeral)
277 msg_type = RMSTR_TMEM_PUT_EPH;
278 else
279 msg_type = RMSTR_TMEM_PUT_PERS;
280 #ifdef RAMSTER_TESTING
281 /* leave me here to see if it catches a weird crash */
282 ramster_check_irq_counts();
283 #endif
284
285 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
286 nodenum, &status);
287 if (ret < 0)
288 ret = -1;
289 else {
290 ret = status;
291 *remotenode = nodenum;
292 }
293
294 r2nm_node_put(node);
295 out:
296 return ret;
297 }
298
r2net_remote_flush(struct tmem_xhandle * xh,int remotenode)299 int r2net_remote_flush(struct tmem_xhandle *xh, int remotenode)
300 {
301 int ret = -1, status;
302 struct r2nm_node *node = NULL;
303 struct kvec vec[1];
304 size_t veclen = 1;
305
306 node = r2nm_get_node_by_num(remotenode);
307 BUG_ON(node == NULL);
308 xh->client_id = r2nm_this_node(); /* which node is flushing */
309 vec[0].iov_len = sizeof(*xh);
310 vec[0].iov_base = xh;
311 BUG_ON(irqs_disabled());
312 BUG_ON(in_softirq());
313 ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
314 vec, veclen, remotenode, &status);
315 r2nm_node_put(node);
316 return ret;
317 }
318
r2net_remote_flush_object(struct tmem_xhandle * xh,int remotenode)319 int r2net_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
320 {
321 int ret = -1, status;
322 struct r2nm_node *node = NULL;
323 struct kvec vec[1];
324 size_t veclen = 1;
325
326 node = r2nm_get_node_by_num(remotenode);
327 BUG_ON(node == NULL);
328 xh->client_id = r2nm_this_node(); /* which node is flobjing */
329 vec[0].iov_len = sizeof(*xh);
330 vec[0].iov_base = xh;
331 ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
332 vec, veclen, remotenode, &status);
333 r2nm_node_put(node);
334 return ret;
335 }
336
337 /*
338 * Handler registration
339 */
340
341 static LIST_HEAD(r2net_unreg_list);
342
r2net_unregister_handlers(void)343 static void r2net_unregister_handlers(void)
344 {
345 r2net_unregister_handler_list(&r2net_unreg_list);
346 }
347
r2net_register_handlers(void)348 int r2net_register_handlers(void)
349 {
350 int status;
351
352 status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
353 RMSTR_R2NET_MAX_LEN,
354 ramster_remote_put_handler,
355 NULL, NULL, &r2net_unreg_list);
356 if (status)
357 goto bail;
358
359 status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
360 RMSTR_R2NET_MAX_LEN,
361 ramster_remote_put_handler,
362 NULL, NULL, &r2net_unreg_list);
363 if (status)
364 goto bail;
365
366 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
367 RMSTR_R2NET_MAX_LEN,
368 ramster_remote_async_get_request_handler,
369 NULL, NULL,
370 &r2net_unreg_list);
371 if (status)
372 goto bail;
373
374 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
375 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
376 ramster_remote_async_get_request_handler,
377 NULL, NULL,
378 &r2net_unreg_list);
379 if (status)
380 goto bail;
381
382 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
383 RMSTR_R2NET_MAX_LEN,
384 ramster_remote_async_get_reply_handler,
385 NULL, NULL,
386 &r2net_unreg_list);
387 if (status)
388 goto bail;
389
390 status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
391 RMSTR_R2NET_MAX_LEN,
392 ramster_remote_flush_handler,
393 NULL, NULL,
394 &r2net_unreg_list);
395 if (status)
396 goto bail;
397
398 status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
399 RMSTR_R2NET_MAX_LEN,
400 ramster_remote_flobj_handler,
401 NULL, NULL,
402 &r2net_unreg_list);
403 if (status)
404 goto bail;
405
406 pr_info("ramster: r2net handlers registered\n");
407
408 bail:
409 if (status) {
410 r2net_unregister_handlers();
411 pr_err("ramster: couldn't register r2net handlers\n");
412 }
413 return status;
414 }
415