1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14
15 #include "rtrs-srv.h"
16 #include "rtrs-log.h"
17 #include <rdma/ib_cm.h>
18 #include <rdma/ib_verbs.h>
19 #include "rtrs-srv-trace.h"
20
21 MODULE_DESCRIPTION("RDMA Transport Server");
22 MODULE_LICENSE("GPL");
23
24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
26 #define DEFAULT_SESS_QUEUE_DEPTH 512
27 #define MAX_HDR_SIZE PAGE_SIZE
28
29 static struct rtrs_rdma_dev_pd dev_pd;
30 struct class *rtrs_dev_class;
31 static struct rtrs_srv_ib_ctx ib_ctx;
32
33 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
34 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
35
36 static bool always_invalidate = true;
37 module_param(always_invalidate, bool, 0444);
38 MODULE_PARM_DESC(always_invalidate,
39 "Invalidate memory registration for contiguous memory regions before accessing.");
40
41 module_param_named(max_chunk_size, max_chunk_size, int, 0444);
42 MODULE_PARM_DESC(max_chunk_size,
43 "Max size for each IO request, when change the unit is in byte (default: "
44 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
45
46 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
47 MODULE_PARM_DESC(sess_queue_depth,
48 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
49 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
50 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
51
52 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
53
54 static struct workqueue_struct *rtrs_wq;
55
to_srv_con(struct rtrs_con * c)56 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
57 {
58 return container_of(c, struct rtrs_srv_con, c);
59 }
60
rtrs_srv_change_state(struct rtrs_srv_path * srv_path,enum rtrs_srv_state new_state)61 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path,
62 enum rtrs_srv_state new_state)
63 {
64 enum rtrs_srv_state old_state;
65 bool changed = false;
66 unsigned long flags;
67
68 spin_lock_irqsave(&srv_path->state_lock, flags);
69 old_state = srv_path->state;
70 switch (new_state) {
71 case RTRS_SRV_CONNECTED:
72 if (old_state == RTRS_SRV_CONNECTING)
73 changed = true;
74 break;
75 case RTRS_SRV_CLOSING:
76 if (old_state == RTRS_SRV_CONNECTING ||
77 old_state == RTRS_SRV_CONNECTED)
78 changed = true;
79 break;
80 case RTRS_SRV_CLOSED:
81 if (old_state == RTRS_SRV_CLOSING)
82 changed = true;
83 break;
84 default:
85 break;
86 }
87 if (changed)
88 srv_path->state = new_state;
89 spin_unlock_irqrestore(&srv_path->state_lock, flags);
90
91 return changed;
92 }
93
free_id(struct rtrs_srv_op * id)94 static void free_id(struct rtrs_srv_op *id)
95 {
96 if (!id)
97 return;
98 kfree(id);
99 }
100
rtrs_srv_free_ops_ids(struct rtrs_srv_path * srv_path)101 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path)
102 {
103 struct rtrs_srv_sess *srv = srv_path->srv;
104 int i;
105
106 if (srv_path->ops_ids) {
107 for (i = 0; i < srv->queue_depth; i++)
108 free_id(srv_path->ops_ids[i]);
109 kfree(srv_path->ops_ids);
110 srv_path->ops_ids = NULL;
111 }
112 }
113
114 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
115
116 static struct ib_cqe io_comp_cqe = {
117 .done = rtrs_srv_rdma_done
118 };
119
rtrs_srv_inflight_ref_release(struct percpu_ref * ref)120 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
121 {
122 struct rtrs_srv_path *srv_path = container_of(ref,
123 struct rtrs_srv_path,
124 ids_inflight_ref);
125
126 percpu_ref_exit(&srv_path->ids_inflight_ref);
127 complete(&srv_path->complete_done);
128 }
129
rtrs_srv_alloc_ops_ids(struct rtrs_srv_path * srv_path)130 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path)
131 {
132 struct rtrs_srv_sess *srv = srv_path->srv;
133 struct rtrs_srv_op *id;
134 int i, ret;
135
136 srv_path->ops_ids = kcalloc(srv->queue_depth,
137 sizeof(*srv_path->ops_ids),
138 GFP_KERNEL);
139 if (!srv_path->ops_ids)
140 goto err;
141
142 for (i = 0; i < srv->queue_depth; ++i) {
143 id = kzalloc(sizeof(*id), GFP_KERNEL);
144 if (!id)
145 goto err;
146
147 srv_path->ops_ids[i] = id;
148 }
149
150 ret = percpu_ref_init(&srv_path->ids_inflight_ref,
151 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
152 if (ret) {
153 pr_err("Percpu reference init failed\n");
154 goto err;
155 }
156 init_completion(&srv_path->complete_done);
157
158 return 0;
159
160 err:
161 rtrs_srv_free_ops_ids(srv_path);
162 return -ENOMEM;
163 }
164
rtrs_srv_get_ops_ids(struct rtrs_srv_path * srv_path)165 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path)
166 {
167 percpu_ref_get(&srv_path->ids_inflight_ref);
168 }
169
rtrs_srv_put_ops_ids(struct rtrs_srv_path * srv_path)170 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path)
171 {
172 percpu_ref_put(&srv_path->ids_inflight_ref);
173 }
174
rtrs_srv_reg_mr_done(struct ib_cq * cq,struct ib_wc * wc)175 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
176 {
177 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
178 struct rtrs_path *s = con->c.path;
179 struct rtrs_srv_path *srv_path = to_srv_path(s);
180
181 if (wc->status != IB_WC_SUCCESS) {
182 rtrs_err(s, "REG MR failed: %s\n",
183 ib_wc_status_msg(wc->status));
184 close_path(srv_path);
185 return;
186 }
187 }
188
189 static struct ib_cqe local_reg_cqe = {
190 .done = rtrs_srv_reg_mr_done
191 };
192
rdma_write_sg(struct rtrs_srv_op * id)193 static int rdma_write_sg(struct rtrs_srv_op *id)
194 {
195 struct rtrs_path *s = id->con->c.path;
196 struct rtrs_srv_path *srv_path = to_srv_path(s);
197 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id];
198 struct rtrs_srv_mr *srv_mr;
199 struct ib_send_wr inv_wr;
200 struct ib_rdma_wr imm_wr;
201 struct ib_rdma_wr *wr = NULL;
202 enum ib_send_flags flags;
203 size_t sg_cnt;
204 int err, offset;
205 bool need_inval;
206 u32 rkey = 0;
207 struct ib_reg_wr rwr;
208 struct ib_sge *plist;
209 struct ib_sge list;
210
211 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
212 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
213 if (sg_cnt != 1)
214 return -EINVAL;
215
216 offset = 0;
217
218 wr = &id->tx_wr;
219 plist = &id->tx_sg;
220 plist->addr = dma_addr + offset;
221 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
222
223 /* WR will fail with length error
224 * if this is 0
225 */
226 if (plist->length == 0) {
227 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
228 return -EINVAL;
229 }
230
231 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
232 offset += plist->length;
233
234 wr->wr.sg_list = plist;
235 wr->wr.num_sge = 1;
236 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
237 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
238 if (rkey == 0)
239 rkey = wr->rkey;
240 else
241 /* Only one key is actually used */
242 WARN_ON_ONCE(rkey != wr->rkey);
243
244 wr->wr.opcode = IB_WR_RDMA_WRITE;
245 wr->wr.wr_cqe = &io_comp_cqe;
246 wr->wr.ex.imm_data = 0;
247 wr->wr.send_flags = 0;
248
249 if (need_inval && always_invalidate) {
250 wr->wr.next = &rwr.wr;
251 rwr.wr.next = &inv_wr;
252 inv_wr.next = &imm_wr.wr;
253 } else if (always_invalidate) {
254 wr->wr.next = &rwr.wr;
255 rwr.wr.next = &imm_wr.wr;
256 } else if (need_inval) {
257 wr->wr.next = &inv_wr;
258 inv_wr.next = &imm_wr.wr;
259 } else {
260 wr->wr.next = &imm_wr.wr;
261 }
262 /*
263 * From time to time we have to post signaled sends,
264 * or send queue will fill up and only QP reset can help.
265 */
266 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
267 0 : IB_SEND_SIGNALED;
268
269 if (need_inval) {
270 inv_wr.sg_list = NULL;
271 inv_wr.num_sge = 0;
272 inv_wr.opcode = IB_WR_SEND_WITH_INV;
273 inv_wr.wr_cqe = &io_comp_cqe;
274 inv_wr.send_flags = 0;
275 inv_wr.ex.invalidate_rkey = rkey;
276 }
277
278 imm_wr.wr.next = NULL;
279 if (always_invalidate) {
280 struct rtrs_msg_rkey_rsp *msg;
281
282 srv_mr = &srv_path->mrs[id->msg_id];
283 rwr.wr.opcode = IB_WR_REG_MR;
284 rwr.wr.wr_cqe = &local_reg_cqe;
285 rwr.wr.num_sge = 0;
286 rwr.mr = srv_mr->mr;
287 rwr.wr.send_flags = 0;
288 rwr.key = srv_mr->mr->rkey;
289 rwr.access = (IB_ACCESS_LOCAL_WRITE |
290 IB_ACCESS_REMOTE_WRITE);
291 msg = srv_mr->iu->buf;
292 msg->buf_id = cpu_to_le16(id->msg_id);
293 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
294 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
295
296 list.addr = srv_mr->iu->dma_addr;
297 list.length = sizeof(*msg);
298 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
299 imm_wr.wr.sg_list = &list;
300 imm_wr.wr.num_sge = 1;
301 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
302 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
303 srv_mr->iu->dma_addr,
304 srv_mr->iu->size, DMA_TO_DEVICE);
305 } else {
306 imm_wr.wr.sg_list = NULL;
307 imm_wr.wr.num_sge = 0;
308 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
309 }
310 imm_wr.wr.send_flags = flags;
311 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
312 0, need_inval));
313
314 imm_wr.wr.wr_cqe = &io_comp_cqe;
315 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr,
316 offset, DMA_BIDIRECTIONAL);
317
318 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
319 if (err)
320 rtrs_err(s,
321 "Posting RDMA-Write-Request to QP failed, err: %d\n",
322 err);
323
324 return err;
325 }
326
327 /**
328 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
329 * requests or on successful WRITE request.
330 * @con: the connection to send back result
331 * @id: the id associated with the IO
332 * @errno: the error number of the IO.
333 *
334 * Return 0 on success, errno otherwise.
335 */
send_io_resp_imm(struct rtrs_srv_con * con,struct rtrs_srv_op * id,int errno)336 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
337 int errno)
338 {
339 struct rtrs_path *s = con->c.path;
340 struct rtrs_srv_path *srv_path = to_srv_path(s);
341 struct ib_send_wr inv_wr, *wr = NULL;
342 struct ib_rdma_wr imm_wr;
343 struct ib_reg_wr rwr;
344 struct rtrs_srv_mr *srv_mr;
345 bool need_inval = false;
346 enum ib_send_flags flags;
347 u32 imm;
348 int err;
349
350 if (id->dir == READ) {
351 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
352 size_t sg_cnt;
353
354 need_inval = le16_to_cpu(rd_msg->flags) &
355 RTRS_MSG_NEED_INVAL_F;
356 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
357
358 if (need_inval) {
359 if (sg_cnt) {
360 inv_wr.wr_cqe = &io_comp_cqe;
361 inv_wr.sg_list = NULL;
362 inv_wr.num_sge = 0;
363 inv_wr.opcode = IB_WR_SEND_WITH_INV;
364 inv_wr.send_flags = 0;
365 /* Only one key is actually used */
366 inv_wr.ex.invalidate_rkey =
367 le32_to_cpu(rd_msg->desc[0].key);
368 } else {
369 WARN_ON_ONCE(1);
370 need_inval = false;
371 }
372 }
373 }
374
375 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno);
376
377 if (need_inval && always_invalidate) {
378 wr = &inv_wr;
379 inv_wr.next = &rwr.wr;
380 rwr.wr.next = &imm_wr.wr;
381 } else if (always_invalidate) {
382 wr = &rwr.wr;
383 rwr.wr.next = &imm_wr.wr;
384 } else if (need_inval) {
385 wr = &inv_wr;
386 inv_wr.next = &imm_wr.wr;
387 } else {
388 wr = &imm_wr.wr;
389 }
390 /*
391 * From time to time we have to post signalled sends,
392 * or send queue will fill up and only QP reset can help.
393 */
394 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
395 0 : IB_SEND_SIGNALED;
396 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
397 imm_wr.wr.next = NULL;
398 if (always_invalidate) {
399 struct ib_sge list;
400 struct rtrs_msg_rkey_rsp *msg;
401
402 srv_mr = &srv_path->mrs[id->msg_id];
403 rwr.wr.next = &imm_wr.wr;
404 rwr.wr.opcode = IB_WR_REG_MR;
405 rwr.wr.wr_cqe = &local_reg_cqe;
406 rwr.wr.num_sge = 0;
407 rwr.wr.send_flags = 0;
408 rwr.mr = srv_mr->mr;
409 rwr.key = srv_mr->mr->rkey;
410 rwr.access = (IB_ACCESS_LOCAL_WRITE |
411 IB_ACCESS_REMOTE_WRITE);
412 msg = srv_mr->iu->buf;
413 msg->buf_id = cpu_to_le16(id->msg_id);
414 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
415 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
416
417 list.addr = srv_mr->iu->dma_addr;
418 list.length = sizeof(*msg);
419 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
420 imm_wr.wr.sg_list = &list;
421 imm_wr.wr.num_sge = 1;
422 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
423 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
424 srv_mr->iu->dma_addr,
425 srv_mr->iu->size, DMA_TO_DEVICE);
426 } else {
427 imm_wr.wr.sg_list = NULL;
428 imm_wr.wr.num_sge = 0;
429 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
430 }
431 imm_wr.wr.send_flags = flags;
432 imm_wr.wr.wr_cqe = &io_comp_cqe;
433
434 imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
435
436 err = ib_post_send(id->con->c.qp, wr, NULL);
437 if (err)
438 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
439 err);
440
441 return err;
442 }
443
close_path(struct rtrs_srv_path * srv_path)444 void close_path(struct rtrs_srv_path *srv_path)
445 {
446 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING))
447 queue_work(rtrs_wq, &srv_path->close_work);
448 WARN_ON(srv_path->state != RTRS_SRV_CLOSING);
449 }
450
rtrs_srv_state_str(enum rtrs_srv_state state)451 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
452 {
453 switch (state) {
454 case RTRS_SRV_CONNECTING:
455 return "RTRS_SRV_CONNECTING";
456 case RTRS_SRV_CONNECTED:
457 return "RTRS_SRV_CONNECTED";
458 case RTRS_SRV_CLOSING:
459 return "RTRS_SRV_CLOSING";
460 case RTRS_SRV_CLOSED:
461 return "RTRS_SRV_CLOSED";
462 default:
463 return "UNKNOWN";
464 }
465 }
466
467 /**
468 * rtrs_srv_resp_rdma() - Finish an RDMA request
469 *
470 * @id: Internal RTRS operation identifier
471 * @status: Response Code sent to the other side for this operation.
472 * 0 = success, <=0 error
473 * Context: any
474 *
475 * Finish a RDMA operation. A message is sent to the client and the
476 * corresponding memory areas will be released.
477 */
rtrs_srv_resp_rdma(struct rtrs_srv_op * id,int status)478 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
479 {
480 struct rtrs_srv_path *srv_path;
481 struct rtrs_srv_con *con;
482 struct rtrs_path *s;
483 int err;
484
485 if (WARN_ON(!id))
486 return true;
487
488 con = id->con;
489 s = con->c.path;
490 srv_path = to_srv_path(s);
491
492 id->status = status;
493
494 if (srv_path->state != RTRS_SRV_CONNECTED) {
495 rtrs_err_rl(s,
496 "Sending I/O response failed, server path %s is disconnected, path state %s\n",
497 kobject_name(&srv_path->kobj),
498 rtrs_srv_state_str(srv_path->state));
499 goto out;
500 }
501 if (always_invalidate) {
502 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id];
503
504 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
505 }
506 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
507 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n",
508 kobject_name(&srv_path->kobj),
509 con->c.cid);
510 atomic_add(1, &con->c.sq_wr_avail);
511 spin_lock(&con->rsp_wr_wait_lock);
512 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
513 spin_unlock(&con->rsp_wr_wait_lock);
514 return false;
515 }
516
517 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
518 err = send_io_resp_imm(con, id, status);
519 else
520 err = rdma_write_sg(id);
521
522 if (err) {
523 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err,
524 kobject_name(&srv_path->kobj));
525 close_path(srv_path);
526 }
527 out:
528 rtrs_srv_put_ops_ids(srv_path);
529 return true;
530 }
531 EXPORT_SYMBOL(rtrs_srv_resp_rdma);
532
533 /**
534 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
535 * @srv: Session pointer
536 * @priv: The private pointer that is associated with the session.
537 */
rtrs_srv_set_sess_priv(struct rtrs_srv_sess * srv,void * priv)538 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv)
539 {
540 srv->priv = priv;
541 }
542 EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
543
unmap_cont_bufs(struct rtrs_srv_path * srv_path)544 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path)
545 {
546 int i;
547
548 for (i = 0; i < srv_path->mrs_num; i++) {
549 struct rtrs_srv_mr *srv_mr;
550
551 srv_mr = &srv_path->mrs[i];
552
553 if (always_invalidate)
554 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
555
556 ib_dereg_mr(srv_mr->mr);
557 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl,
558 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
559 sg_free_table(&srv_mr->sgt);
560 }
561 kfree(srv_path->mrs);
562 }
563
map_cont_bufs(struct rtrs_srv_path * srv_path)564 static int map_cont_bufs(struct rtrs_srv_path *srv_path)
565 {
566 struct rtrs_srv_sess *srv = srv_path->srv;
567 struct rtrs_path *ss = &srv_path->s;
568 int i, mri, err, mrs_num;
569 unsigned int chunk_bits;
570 int chunks_per_mr = 1;
571
572 /*
573 * Here we map queue_depth chunks to MR. Firstly we have to
574 * figure out how many chunks can we map per MR.
575 */
576 if (always_invalidate) {
577 /*
578 * in order to do invalidate for each chunks of memory, we needs
579 * more memory regions.
580 */
581 mrs_num = srv->queue_depth;
582 } else {
583 chunks_per_mr =
584 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
585 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
586 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
587 }
588
589 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL);
590 if (!srv_path->mrs)
591 return -ENOMEM;
592
593 srv_path->mrs_num = mrs_num;
594
595 for (mri = 0; mri < mrs_num; mri++) {
596 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri];
597 struct sg_table *sgt = &srv_mr->sgt;
598 struct scatterlist *s;
599 struct ib_mr *mr;
600 int nr, nr_sgt, chunks;
601
602 chunks = chunks_per_mr * mri;
603 if (!always_invalidate)
604 chunks_per_mr = min_t(int, chunks_per_mr,
605 srv->queue_depth - chunks);
606
607 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
608 if (err)
609 goto err;
610
611 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
612 sg_set_page(s, srv->chunks[chunks + i],
613 max_chunk_size, 0);
614
615 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl,
616 sgt->nents, DMA_BIDIRECTIONAL);
617 if (!nr_sgt) {
618 err = -EINVAL;
619 goto free_sg;
620 }
621 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
622 nr_sgt);
623 if (IS_ERR(mr)) {
624 err = PTR_ERR(mr);
625 goto unmap_sg;
626 }
627 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt,
628 NULL, max_chunk_size);
629 if (nr < 0 || nr < sgt->nents) {
630 err = nr < 0 ? nr : -EINVAL;
631 goto dereg_mr;
632 }
633
634 if (always_invalidate) {
635 srv_mr->iu = rtrs_iu_alloc(1,
636 sizeof(struct rtrs_msg_rkey_rsp),
637 GFP_KERNEL, srv_path->s.dev->ib_dev,
638 DMA_TO_DEVICE, rtrs_srv_rdma_done);
639 if (!srv_mr->iu) {
640 err = -ENOMEM;
641 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
642 goto dereg_mr;
643 }
644 }
645 /* Eventually dma addr for each chunk can be cached */
646 for_each_sg(sgt->sgl, s, nr_sgt, i)
647 srv_path->dma_addr[chunks + i] = sg_dma_address(s);
648
649 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
650 srv_mr->mr = mr;
651
652 continue;
653 err:
654 while (mri--) {
655 srv_mr = &srv_path->mrs[mri];
656 sgt = &srv_mr->sgt;
657 mr = srv_mr->mr;
658 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
659 dereg_mr:
660 ib_dereg_mr(mr);
661 unmap_sg:
662 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl,
663 sgt->nents, DMA_BIDIRECTIONAL);
664 free_sg:
665 sg_free_table(sgt);
666 }
667 kfree(srv_path->mrs);
668
669 return err;
670 }
671
672 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
673 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
674
675 return 0;
676 }
677
rtrs_srv_hb_err_handler(struct rtrs_con * c)678 static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
679 {
680 close_path(to_srv_path(c->path));
681 }
682
rtrs_srv_init_hb(struct rtrs_srv_path * srv_path)683 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path)
684 {
685 rtrs_init_hb(&srv_path->s, &io_comp_cqe,
686 RTRS_HB_INTERVAL_MS,
687 RTRS_HB_MISSED_MAX,
688 rtrs_srv_hb_err_handler,
689 rtrs_wq);
690 }
691
rtrs_srv_start_hb(struct rtrs_srv_path * srv_path)692 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path)
693 {
694 rtrs_start_hb(&srv_path->s);
695 }
696
rtrs_srv_stop_hb(struct rtrs_srv_path * srv_path)697 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path)
698 {
699 rtrs_stop_hb(&srv_path->s);
700 }
701
rtrs_srv_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)702 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
703 {
704 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
705 struct rtrs_path *s = con->c.path;
706 struct rtrs_srv_path *srv_path = to_srv_path(s);
707 struct rtrs_iu *iu;
708
709 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
710 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
711
712 if (wc->status != IB_WC_SUCCESS) {
713 rtrs_err(s, "Sess info response send failed: %s\n",
714 ib_wc_status_msg(wc->status));
715 close_path(srv_path);
716 return;
717 }
718 WARN_ON(wc->opcode != IB_WC_SEND);
719 }
720
rtrs_srv_path_up(struct rtrs_srv_path * srv_path)721 static int rtrs_srv_path_up(struct rtrs_srv_path *srv_path)
722 {
723 struct rtrs_srv_sess *srv = srv_path->srv;
724 struct rtrs_srv_ctx *ctx = srv->ctx;
725 int up, ret = 0;
726
727 mutex_lock(&srv->paths_ev_mutex);
728 up = ++srv->paths_up;
729 if (up == 1)
730 ret = ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
731 mutex_unlock(&srv->paths_ev_mutex);
732
733 /* Mark session as established */
734 if (!ret)
735 srv_path->established = true;
736
737 return ret;
738 }
739
rtrs_srv_path_down(struct rtrs_srv_path * srv_path)740 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path)
741 {
742 struct rtrs_srv_sess *srv = srv_path->srv;
743 struct rtrs_srv_ctx *ctx = srv->ctx;
744
745 if (!srv_path->established)
746 return;
747
748 srv_path->established = false;
749 mutex_lock(&srv->paths_ev_mutex);
750 WARN_ON(!srv->paths_up);
751 if (--srv->paths_up == 0)
752 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
753 mutex_unlock(&srv->paths_ev_mutex);
754 }
755
exist_pathname(struct rtrs_srv_ctx * ctx,const char * pathname,const uuid_t * path_uuid)756 static bool exist_pathname(struct rtrs_srv_ctx *ctx,
757 const char *pathname, const uuid_t *path_uuid)
758 {
759 struct rtrs_srv_sess *srv;
760 struct rtrs_srv_path *srv_path;
761 bool found = false;
762
763 mutex_lock(&ctx->srv_mutex);
764 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
765 mutex_lock(&srv->paths_mutex);
766
767 /* when a client with same uuid and same sessname tried to add a path */
768 if (uuid_equal(&srv->paths_uuid, path_uuid)) {
769 mutex_unlock(&srv->paths_mutex);
770 continue;
771 }
772
773 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
774 if (strlen(srv_path->s.sessname) == strlen(pathname) &&
775 !strcmp(srv_path->s.sessname, pathname)) {
776 found = true;
777 break;
778 }
779 }
780 mutex_unlock(&srv->paths_mutex);
781 if (found)
782 break;
783 }
784 mutex_unlock(&ctx->srv_mutex);
785 return found;
786 }
787
788 static int post_recv_path(struct rtrs_srv_path *srv_path);
789 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
790
process_info_req(struct rtrs_srv_con * con,struct rtrs_msg_info_req * msg)791 static int process_info_req(struct rtrs_srv_con *con,
792 struct rtrs_msg_info_req *msg)
793 {
794 struct rtrs_path *s = con->c.path;
795 struct rtrs_srv_path *srv_path = to_srv_path(s);
796 struct ib_send_wr *reg_wr = NULL;
797 struct rtrs_msg_info_rsp *rsp;
798 struct rtrs_iu *tx_iu;
799 struct ib_reg_wr *rwr;
800 int mri, err;
801 size_t tx_sz;
802
803 err = post_recv_path(srv_path);
804 if (err) {
805 rtrs_err(s, "post_recv_path(), err: %d\n", err);
806 return err;
807 }
808
809 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) {
810 rtrs_err(s, "pathname cannot contain / and .\n");
811 return -EINVAL;
812 }
813
814 if (exist_pathname(srv_path->srv->ctx,
815 msg->pathname, &srv_path->srv->paths_uuid)) {
816 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname);
817 return -EPERM;
818 }
819 strscpy(srv_path->s.sessname, msg->pathname,
820 sizeof(srv_path->s.sessname));
821
822 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL);
823 if (!rwr)
824 return -ENOMEM;
825
826 tx_sz = sizeof(*rsp);
827 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num;
828 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev,
829 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
830 if (!tx_iu) {
831 err = -ENOMEM;
832 goto rwr_free;
833 }
834
835 rsp = tx_iu->buf;
836 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
837 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num);
838
839 for (mri = 0; mri < srv_path->mrs_num; mri++) {
840 struct ib_mr *mr = srv_path->mrs[mri].mr;
841
842 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
843 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
844 rsp->desc[mri].len = cpu_to_le32(mr->length);
845
846 /*
847 * Fill in reg MR request and chain them *backwards*
848 */
849 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
850 rwr[mri].wr.opcode = IB_WR_REG_MR;
851 rwr[mri].wr.wr_cqe = &local_reg_cqe;
852 rwr[mri].wr.num_sge = 0;
853 rwr[mri].wr.send_flags = 0;
854 rwr[mri].mr = mr;
855 rwr[mri].key = mr->rkey;
856 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
857 IB_ACCESS_REMOTE_WRITE);
858 reg_wr = &rwr[mri].wr;
859 }
860
861 err = rtrs_srv_create_path_files(srv_path);
862 if (err)
863 goto iu_free;
864 kobject_get(&srv_path->kobj);
865 get_device(&srv_path->srv->dev);
866 err = rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED);
867 if (!err) {
868 rtrs_err(s, "rtrs_srv_change_state(), err: %d\n", err);
869 goto iu_free;
870 }
871
872 rtrs_srv_start_hb(srv_path);
873
874 /*
875 * We do not account number of established connections at the current
876 * moment, we rely on the client, which should send info request when
877 * all connections are successfully established. Thus, simply notify
878 * listener with a proper event if we are the first path.
879 */
880 err = rtrs_srv_path_up(srv_path);
881 if (err) {
882 rtrs_err(s, "rtrs_srv_path_up(), err: %d\n", err);
883 goto iu_free;
884 }
885
886 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
887 tx_iu->dma_addr,
888 tx_iu->size, DMA_TO_DEVICE);
889
890 /* Send info response */
891 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
892 if (err) {
893 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
894 iu_free:
895 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1);
896 }
897 rwr_free:
898 kfree(rwr);
899
900 return err;
901 }
902
rtrs_srv_info_req_done(struct ib_cq * cq,struct ib_wc * wc)903 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
904 {
905 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
906 struct rtrs_path *s = con->c.path;
907 struct rtrs_srv_path *srv_path = to_srv_path(s);
908 struct rtrs_msg_info_req *msg;
909 struct rtrs_iu *iu;
910 int err;
911
912 WARN_ON(con->c.cid);
913
914 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
915 if (wc->status != IB_WC_SUCCESS) {
916 rtrs_err(s, "Sess info request receive failed: %s\n",
917 ib_wc_status_msg(wc->status));
918 goto close;
919 }
920 WARN_ON(wc->opcode != IB_WC_RECV);
921
922 if (wc->byte_len < sizeof(*msg)) {
923 rtrs_err(s, "Sess info request is malformed: size %d\n",
924 wc->byte_len);
925 goto close;
926 }
927 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr,
928 iu->size, DMA_FROM_DEVICE);
929 msg = iu->buf;
930 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
931 rtrs_err(s, "Sess info request is malformed: type %d\n",
932 le16_to_cpu(msg->type));
933 goto close;
934 }
935 err = process_info_req(con, msg);
936 if (err)
937 goto close;
938
939 out:
940 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
941 return;
942 close:
943 close_path(srv_path);
944 goto out;
945 }
946
post_recv_info_req(struct rtrs_srv_con * con)947 static int post_recv_info_req(struct rtrs_srv_con *con)
948 {
949 struct rtrs_path *s = con->c.path;
950 struct rtrs_srv_path *srv_path = to_srv_path(s);
951 struct rtrs_iu *rx_iu;
952 int err;
953
954 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
955 GFP_KERNEL, srv_path->s.dev->ib_dev,
956 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
957 if (!rx_iu)
958 return -ENOMEM;
959 /* Prepare for getting info response */
960 err = rtrs_iu_post_recv(&con->c, rx_iu);
961 if (err) {
962 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
963 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1);
964 return err;
965 }
966
967 return 0;
968 }
969
post_recv_io(struct rtrs_srv_con * con,size_t q_size)970 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
971 {
972 int i, err;
973
974 for (i = 0; i < q_size; i++) {
975 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
976 if (err)
977 return err;
978 }
979
980 return 0;
981 }
982
post_recv_path(struct rtrs_srv_path * srv_path)983 static int post_recv_path(struct rtrs_srv_path *srv_path)
984 {
985 struct rtrs_srv_sess *srv = srv_path->srv;
986 struct rtrs_path *s = &srv_path->s;
987 size_t q_size;
988 int err, cid;
989
990 for (cid = 0; cid < srv_path->s.con_num; cid++) {
991 if (cid == 0)
992 q_size = SERVICE_CON_QUEUE_DEPTH;
993 else
994 q_size = srv->queue_depth;
995
996 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size);
997 if (err) {
998 rtrs_err(s, "post_recv_io(), err: %d\n", err);
999 return err;
1000 }
1001 }
1002
1003 return 0;
1004 }
1005
process_read(struct rtrs_srv_con * con,struct rtrs_msg_rdma_read * msg,u32 buf_id,u32 off)1006 static void process_read(struct rtrs_srv_con *con,
1007 struct rtrs_msg_rdma_read *msg,
1008 u32 buf_id, u32 off)
1009 {
1010 struct rtrs_path *s = con->c.path;
1011 struct rtrs_srv_path *srv_path = to_srv_path(s);
1012 struct rtrs_srv_sess *srv = srv_path->srv;
1013 struct rtrs_srv_ctx *ctx = srv->ctx;
1014 struct rtrs_srv_op *id;
1015
1016 size_t usr_len, data_len;
1017 void *data;
1018 int ret;
1019
1020 if (srv_path->state != RTRS_SRV_CONNECTED) {
1021 rtrs_err_rl(s,
1022 "Processing read request failed, session is disconnected, sess state %s\n",
1023 rtrs_srv_state_str(srv_path->state));
1024 return;
1025 }
1026 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
1027 rtrs_err_rl(s,
1028 "Processing read request failed, invalid message\n");
1029 return;
1030 }
1031 rtrs_srv_get_ops_ids(srv_path);
1032 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ);
1033 id = srv_path->ops_ids[buf_id];
1034 id->con = con;
1035 id->dir = READ;
1036 id->msg_id = buf_id;
1037 id->rd_msg = msg;
1038 usr_len = le16_to_cpu(msg->usr_len);
1039 data_len = off - usr_len;
1040 data = page_address(srv->chunks[buf_id]);
1041 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len,
1042 data + data_len, usr_len);
1043
1044 if (ret) {
1045 rtrs_err_rl(s,
1046 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1047 buf_id, ret);
1048 goto send_err_msg;
1049 }
1050
1051 return;
1052
1053 send_err_msg:
1054 ret = send_io_resp_imm(con, id, ret);
1055 if (ret < 0) {
1056 rtrs_err_rl(s,
1057 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1058 buf_id, ret);
1059 close_path(srv_path);
1060 }
1061 rtrs_srv_put_ops_ids(srv_path);
1062 }
1063
process_write(struct rtrs_srv_con * con,struct rtrs_msg_rdma_write * req,u32 buf_id,u32 off)1064 static void process_write(struct rtrs_srv_con *con,
1065 struct rtrs_msg_rdma_write *req,
1066 u32 buf_id, u32 off)
1067 {
1068 struct rtrs_path *s = con->c.path;
1069 struct rtrs_srv_path *srv_path = to_srv_path(s);
1070 struct rtrs_srv_sess *srv = srv_path->srv;
1071 struct rtrs_srv_ctx *ctx = srv->ctx;
1072 struct rtrs_srv_op *id;
1073
1074 size_t data_len, usr_len;
1075 void *data;
1076 int ret;
1077
1078 if (srv_path->state != RTRS_SRV_CONNECTED) {
1079 rtrs_err_rl(s,
1080 "Processing write request failed, session is disconnected, sess state %s\n",
1081 rtrs_srv_state_str(srv_path->state));
1082 return;
1083 }
1084 rtrs_srv_get_ops_ids(srv_path);
1085 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE);
1086 id = srv_path->ops_ids[buf_id];
1087 id->con = con;
1088 id->dir = WRITE;
1089 id->msg_id = buf_id;
1090
1091 usr_len = le16_to_cpu(req->usr_len);
1092 data_len = off - usr_len;
1093 data = page_address(srv->chunks[buf_id]);
1094 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len,
1095 data + data_len, usr_len);
1096 if (ret) {
1097 rtrs_err_rl(s,
1098 "Processing write request failed, user module callback reports err: %d\n",
1099 ret);
1100 goto send_err_msg;
1101 }
1102
1103 return;
1104
1105 send_err_msg:
1106 ret = send_io_resp_imm(con, id, ret);
1107 if (ret < 0) {
1108 rtrs_err_rl(s,
1109 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1110 buf_id, ret);
1111 close_path(srv_path);
1112 }
1113 rtrs_srv_put_ops_ids(srv_path);
1114 }
1115
process_io_req(struct rtrs_srv_con * con,void * msg,u32 id,u32 off)1116 static void process_io_req(struct rtrs_srv_con *con, void *msg,
1117 u32 id, u32 off)
1118 {
1119 struct rtrs_path *s = con->c.path;
1120 struct rtrs_srv_path *srv_path = to_srv_path(s);
1121 struct rtrs_msg_rdma_hdr *hdr;
1122 unsigned int type;
1123
1124 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev,
1125 srv_path->dma_addr[id],
1126 max_chunk_size, DMA_BIDIRECTIONAL);
1127 hdr = msg;
1128 type = le16_to_cpu(hdr->type);
1129
1130 switch (type) {
1131 case RTRS_MSG_WRITE:
1132 process_write(con, msg, id, off);
1133 break;
1134 case RTRS_MSG_READ:
1135 process_read(con, msg, id, off);
1136 break;
1137 default:
1138 rtrs_err(s,
1139 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1140 type);
1141 goto err;
1142 }
1143
1144 return;
1145
1146 err:
1147 close_path(srv_path);
1148 }
1149
rtrs_srv_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)1150 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1151 {
1152 struct rtrs_srv_mr *mr =
1153 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1154 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1155 struct rtrs_path *s = con->c.path;
1156 struct rtrs_srv_path *srv_path = to_srv_path(s);
1157 struct rtrs_srv_sess *srv = srv_path->srv;
1158 u32 msg_id, off;
1159 void *data;
1160
1161 if (wc->status != IB_WC_SUCCESS) {
1162 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1163 ib_wc_status_msg(wc->status));
1164 close_path(srv_path);
1165 }
1166 msg_id = mr->msg_id;
1167 off = mr->msg_off;
1168 data = page_address(srv->chunks[msg_id]) + off;
1169 process_io_req(con, data, msg_id, off);
1170 }
1171
rtrs_srv_inv_rkey(struct rtrs_srv_con * con,struct rtrs_srv_mr * mr)1172 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1173 struct rtrs_srv_mr *mr)
1174 {
1175 struct ib_send_wr wr = {
1176 .opcode = IB_WR_LOCAL_INV,
1177 .wr_cqe = &mr->inv_cqe,
1178 .send_flags = IB_SEND_SIGNALED,
1179 .ex.invalidate_rkey = mr->mr->rkey,
1180 };
1181 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1182
1183 return ib_post_send(con->c.qp, &wr, NULL);
1184 }
1185
rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con * con)1186 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1187 {
1188 spin_lock(&con->rsp_wr_wait_lock);
1189 while (!list_empty(&con->rsp_wr_wait_list)) {
1190 struct rtrs_srv_op *id;
1191 int ret;
1192
1193 id = list_entry(con->rsp_wr_wait_list.next,
1194 struct rtrs_srv_op, wait_list);
1195 list_del(&id->wait_list);
1196
1197 spin_unlock(&con->rsp_wr_wait_lock);
1198 ret = rtrs_srv_resp_rdma(id, id->status);
1199 spin_lock(&con->rsp_wr_wait_lock);
1200
1201 if (!ret) {
1202 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1203 break;
1204 }
1205 }
1206 spin_unlock(&con->rsp_wr_wait_lock);
1207 }
1208
rtrs_srv_rdma_done(struct ib_cq * cq,struct ib_wc * wc)1209 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1210 {
1211 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1212 struct rtrs_path *s = con->c.path;
1213 struct rtrs_srv_path *srv_path = to_srv_path(s);
1214 struct rtrs_srv_sess *srv = srv_path->srv;
1215 u32 imm_type, imm_payload;
1216 int err;
1217
1218 if (wc->status != IB_WC_SUCCESS) {
1219 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1220 rtrs_err(s,
1221 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1222 ib_wc_status_msg(wc->status), wc->wr_cqe,
1223 wc->opcode, wc->vendor_err, wc->byte_len);
1224 close_path(srv_path);
1225 }
1226 return;
1227 }
1228
1229 switch (wc->opcode) {
1230 case IB_WC_RECV_RDMA_WITH_IMM:
1231 /*
1232 * post_recv() RDMA write completions of IO reqs (read/write)
1233 * and hb
1234 */
1235 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1236 return;
1237 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1238 if (err) {
1239 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1240 close_path(srv_path);
1241 break;
1242 }
1243 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1244 &imm_type, &imm_payload);
1245 if (imm_type == RTRS_IO_REQ_IMM) {
1246 u32 msg_id, off;
1247 void *data;
1248
1249 msg_id = imm_payload >> srv_path->mem_bits;
1250 off = imm_payload & ((1 << srv_path->mem_bits) - 1);
1251 if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
1252 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1253 msg_id, off);
1254 close_path(srv_path);
1255 return;
1256 }
1257 if (always_invalidate) {
1258 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id];
1259
1260 mr->msg_off = off;
1261 mr->msg_id = msg_id;
1262 err = rtrs_srv_inv_rkey(con, mr);
1263 if (err) {
1264 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1265 err);
1266 close_path(srv_path);
1267 break;
1268 }
1269 } else {
1270 data = page_address(srv->chunks[msg_id]) + off;
1271 process_io_req(con, data, msg_id, off);
1272 }
1273 } else if (imm_type == RTRS_HB_MSG_IMM) {
1274 WARN_ON(con->c.cid);
1275 rtrs_send_hb_ack(&srv_path->s);
1276 } else if (imm_type == RTRS_HB_ACK_IMM) {
1277 WARN_ON(con->c.cid);
1278 srv_path->s.hb_missed_cnt = 0;
1279 } else {
1280 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1281 }
1282 break;
1283 case IB_WC_RDMA_WRITE:
1284 case IB_WC_SEND:
1285 /*
1286 * post_send() RDMA write completions of IO reqs (read/write)
1287 * and hb.
1288 */
1289 atomic_add(s->signal_interval, &con->c.sq_wr_avail);
1290
1291 if (!list_empty_careful(&con->rsp_wr_wait_list))
1292 rtrs_rdma_process_wr_wait_list(con);
1293
1294 break;
1295 default:
1296 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1297 return;
1298 }
1299 }
1300
1301 /**
1302 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname.
1303 * @srv: Session
1304 * @pathname: Pathname buffer
1305 * @len: Length of sessname buffer
1306 */
rtrs_srv_get_path_name(struct rtrs_srv_sess * srv,char * pathname,size_t len)1307 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname,
1308 size_t len)
1309 {
1310 struct rtrs_srv_path *srv_path;
1311 int err = -ENOTCONN;
1312
1313 mutex_lock(&srv->paths_mutex);
1314 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1315 if (srv_path->state != RTRS_SRV_CONNECTED)
1316 continue;
1317 strscpy(pathname, srv_path->s.sessname,
1318 min_t(size_t, sizeof(srv_path->s.sessname), len));
1319 err = 0;
1320 break;
1321 }
1322 mutex_unlock(&srv->paths_mutex);
1323
1324 return err;
1325 }
1326 EXPORT_SYMBOL(rtrs_srv_get_path_name);
1327
1328 /**
1329 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1330 * @srv: Session
1331 */
rtrs_srv_get_queue_depth(struct rtrs_srv_sess * srv)1332 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv)
1333 {
1334 return srv->queue_depth;
1335 }
1336 EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1337
find_next_bit_ring(struct rtrs_srv_path * srv_path)1338 static int find_next_bit_ring(struct rtrs_srv_path *srv_path)
1339 {
1340 struct ib_device *ib_dev = srv_path->s.dev->ib_dev;
1341 int v;
1342
1343 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask);
1344 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1345 v = cpumask_first(&cq_affinity_mask);
1346 return v;
1347 }
1348
rtrs_srv_get_next_cq_vector(struct rtrs_srv_path * srv_path)1349 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path)
1350 {
1351 srv_path->cur_cq_vector = find_next_bit_ring(srv_path);
1352
1353 return srv_path->cur_cq_vector;
1354 }
1355
rtrs_srv_dev_release(struct device * dev)1356 static void rtrs_srv_dev_release(struct device *dev)
1357 {
1358 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess,
1359 dev);
1360
1361 kfree(srv);
1362 }
1363
free_srv(struct rtrs_srv_sess * srv)1364 static void free_srv(struct rtrs_srv_sess *srv)
1365 {
1366 int i;
1367
1368 WARN_ON(refcount_read(&srv->refcount));
1369 for (i = 0; i < srv->queue_depth; i++)
1370 __free_pages(srv->chunks[i], get_order(max_chunk_size));
1371 kfree(srv->chunks);
1372 mutex_destroy(&srv->paths_mutex);
1373 mutex_destroy(&srv->paths_ev_mutex);
1374 /* last put to release the srv structure */
1375 put_device(&srv->dev);
1376 }
1377
get_or_create_srv(struct rtrs_srv_ctx * ctx,const uuid_t * paths_uuid,bool first_conn)1378 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1379 const uuid_t *paths_uuid,
1380 bool first_conn)
1381 {
1382 struct rtrs_srv_sess *srv;
1383 int i;
1384
1385 mutex_lock(&ctx->srv_mutex);
1386 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1387 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1388 refcount_inc_not_zero(&srv->refcount)) {
1389 mutex_unlock(&ctx->srv_mutex);
1390 return srv;
1391 }
1392 }
1393 mutex_unlock(&ctx->srv_mutex);
1394 /*
1395 * If this request is not the first connection request from the
1396 * client for this session then fail and return error.
1397 */
1398 if (!first_conn) {
1399 pr_err_ratelimited("Error: Not the first connection request for this session\n");
1400 return ERR_PTR(-ENXIO);
1401 }
1402
1403 /* need to allocate a new srv */
1404 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1405 if (!srv)
1406 return ERR_PTR(-ENOMEM);
1407
1408 INIT_LIST_HEAD(&srv->paths_list);
1409 mutex_init(&srv->paths_mutex);
1410 mutex_init(&srv->paths_ev_mutex);
1411 uuid_copy(&srv->paths_uuid, paths_uuid);
1412 srv->queue_depth = sess_queue_depth;
1413 srv->ctx = ctx;
1414 device_initialize(&srv->dev);
1415 srv->dev.release = rtrs_srv_dev_release;
1416
1417 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1418 GFP_KERNEL);
1419 if (!srv->chunks)
1420 goto err_free_srv;
1421
1422 for (i = 0; i < srv->queue_depth; i++) {
1423 srv->chunks[i] = alloc_pages(GFP_KERNEL,
1424 get_order(max_chunk_size));
1425 if (!srv->chunks[i])
1426 goto err_free_chunks;
1427 }
1428 refcount_set(&srv->refcount, 1);
1429 mutex_lock(&ctx->srv_mutex);
1430 list_add(&srv->ctx_list, &ctx->srv_list);
1431 mutex_unlock(&ctx->srv_mutex);
1432
1433 return srv;
1434
1435 err_free_chunks:
1436 while (i--)
1437 __free_pages(srv->chunks[i], get_order(max_chunk_size));
1438 kfree(srv->chunks);
1439
1440 err_free_srv:
1441 kfree(srv);
1442 return ERR_PTR(-ENOMEM);
1443 }
1444
put_srv(struct rtrs_srv_sess * srv)1445 static void put_srv(struct rtrs_srv_sess *srv)
1446 {
1447 if (refcount_dec_and_test(&srv->refcount)) {
1448 struct rtrs_srv_ctx *ctx = srv->ctx;
1449
1450 WARN_ON(srv->dev.kobj.state_in_sysfs);
1451
1452 mutex_lock(&ctx->srv_mutex);
1453 list_del(&srv->ctx_list);
1454 mutex_unlock(&ctx->srv_mutex);
1455 free_srv(srv);
1456 }
1457 }
1458
__add_path_to_srv(struct rtrs_srv_sess * srv,struct rtrs_srv_path * srv_path)1459 static void __add_path_to_srv(struct rtrs_srv_sess *srv,
1460 struct rtrs_srv_path *srv_path)
1461 {
1462 list_add_tail(&srv_path->s.entry, &srv->paths_list);
1463 srv->paths_num++;
1464 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1465 }
1466
del_path_from_srv(struct rtrs_srv_path * srv_path)1467 static void del_path_from_srv(struct rtrs_srv_path *srv_path)
1468 {
1469 struct rtrs_srv_sess *srv = srv_path->srv;
1470
1471 if (WARN_ON(!srv))
1472 return;
1473
1474 mutex_lock(&srv->paths_mutex);
1475 list_del(&srv_path->s.entry);
1476 WARN_ON(!srv->paths_num);
1477 srv->paths_num--;
1478 mutex_unlock(&srv->paths_mutex);
1479 }
1480
1481 /* return true if addresses are the same, error other wise */
sockaddr_cmp(const struct sockaddr * a,const struct sockaddr * b)1482 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1483 {
1484 switch (a->sa_family) {
1485 case AF_IB:
1486 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1487 &((struct sockaddr_ib *)b)->sib_addr,
1488 sizeof(struct ib_addr)) &&
1489 (b->sa_family == AF_IB);
1490 case AF_INET:
1491 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1492 &((struct sockaddr_in *)b)->sin_addr,
1493 sizeof(struct in_addr)) &&
1494 (b->sa_family == AF_INET);
1495 case AF_INET6:
1496 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1497 &((struct sockaddr_in6 *)b)->sin6_addr,
1498 sizeof(struct in6_addr)) &&
1499 (b->sa_family == AF_INET6);
1500 default:
1501 return -ENOENT;
1502 }
1503 }
1504
__is_path_w_addr_exists(struct rtrs_srv_sess * srv,struct rdma_addr * addr)1505 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv,
1506 struct rdma_addr *addr)
1507 {
1508 struct rtrs_srv_path *srv_path;
1509
1510 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
1511 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr,
1512 (struct sockaddr *)&addr->dst_addr) &&
1513 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr,
1514 (struct sockaddr *)&addr->src_addr))
1515 return true;
1516
1517 return false;
1518 }
1519
free_path(struct rtrs_srv_path * srv_path)1520 static void free_path(struct rtrs_srv_path *srv_path)
1521 {
1522 if (srv_path->kobj.state_in_sysfs) {
1523 kobject_del(&srv_path->kobj);
1524 kobject_put(&srv_path->kobj);
1525 } else {
1526 free_percpu(srv_path->stats->rdma_stats);
1527 kfree(srv_path->stats);
1528 kfree(srv_path);
1529 }
1530 }
1531
rtrs_srv_close_work(struct work_struct * work)1532 static void rtrs_srv_close_work(struct work_struct *work)
1533 {
1534 struct rtrs_srv_path *srv_path;
1535 struct rtrs_srv_con *con;
1536 int i;
1537
1538 srv_path = container_of(work, typeof(*srv_path), close_work);
1539
1540 rtrs_srv_stop_hb(srv_path);
1541
1542 for (i = 0; i < srv_path->s.con_num; i++) {
1543 if (!srv_path->s.con[i])
1544 continue;
1545 con = to_srv_con(srv_path->s.con[i]);
1546 rdma_disconnect(con->c.cm_id);
1547 ib_drain_qp(con->c.qp);
1548 }
1549
1550 /*
1551 * Degrade ref count to the usual model with a single shared
1552 * atomic_t counter
1553 */
1554 percpu_ref_kill(&srv_path->ids_inflight_ref);
1555
1556 /* Wait for all completion */
1557 wait_for_completion(&srv_path->complete_done);
1558
1559 rtrs_srv_destroy_path_files(srv_path);
1560
1561 /* Notify upper layer if we are the last path */
1562 rtrs_srv_path_down(srv_path);
1563
1564 unmap_cont_bufs(srv_path);
1565 rtrs_srv_free_ops_ids(srv_path);
1566
1567 for (i = 0; i < srv_path->s.con_num; i++) {
1568 if (!srv_path->s.con[i])
1569 continue;
1570 con = to_srv_con(srv_path->s.con[i]);
1571 rtrs_cq_qp_destroy(&con->c);
1572 rdma_destroy_id(con->c.cm_id);
1573 kfree(con);
1574 }
1575 rtrs_ib_dev_put(srv_path->s.dev);
1576
1577 del_path_from_srv(srv_path);
1578 put_srv(srv_path->srv);
1579 srv_path->srv = NULL;
1580 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED);
1581
1582 kfree(srv_path->dma_addr);
1583 kfree(srv_path->s.con);
1584 free_path(srv_path);
1585 }
1586
rtrs_rdma_do_accept(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id)1587 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path,
1588 struct rdma_cm_id *cm_id)
1589 {
1590 struct rtrs_srv_sess *srv = srv_path->srv;
1591 struct rtrs_msg_conn_rsp msg;
1592 struct rdma_conn_param param;
1593 int err;
1594
1595 param = (struct rdma_conn_param) {
1596 .rnr_retry_count = 7,
1597 .private_data = &msg,
1598 .private_data_len = sizeof(msg),
1599 };
1600
1601 msg = (struct rtrs_msg_conn_rsp) {
1602 .magic = cpu_to_le16(RTRS_MAGIC),
1603 .version = cpu_to_le16(RTRS_PROTO_VER),
1604 .queue_depth = cpu_to_le16(srv->queue_depth),
1605 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1606 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1607 };
1608
1609 if (always_invalidate)
1610 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1611
1612 err = rdma_accept(cm_id, ¶m);
1613 if (err)
1614 pr_err("rdma_accept(), err: %d\n", err);
1615
1616 return err;
1617 }
1618
rtrs_rdma_do_reject(struct rdma_cm_id * cm_id,int errno)1619 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1620 {
1621 struct rtrs_msg_conn_rsp msg;
1622 int err;
1623
1624 msg = (struct rtrs_msg_conn_rsp) {
1625 .magic = cpu_to_le16(RTRS_MAGIC),
1626 .version = cpu_to_le16(RTRS_PROTO_VER),
1627 .errno = cpu_to_le16(errno),
1628 };
1629
1630 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1631 if (err)
1632 pr_err("rdma_reject(), err: %d\n", err);
1633
1634 /* Bounce errno back */
1635 return errno;
1636 }
1637
1638 static struct rtrs_srv_path *
__find_path(struct rtrs_srv_sess * srv,const uuid_t * sess_uuid)1639 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid)
1640 {
1641 struct rtrs_srv_path *srv_path;
1642
1643 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1644 if (uuid_equal(&srv_path->s.uuid, sess_uuid))
1645 return srv_path;
1646 }
1647
1648 return NULL;
1649 }
1650
create_con(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id,unsigned int cid)1651 static int create_con(struct rtrs_srv_path *srv_path,
1652 struct rdma_cm_id *cm_id,
1653 unsigned int cid)
1654 {
1655 struct rtrs_srv_sess *srv = srv_path->srv;
1656 struct rtrs_path *s = &srv_path->s;
1657 struct rtrs_srv_con *con;
1658
1659 u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1660 int err, cq_vector;
1661
1662 con = kzalloc(sizeof(*con), GFP_KERNEL);
1663 if (!con) {
1664 err = -ENOMEM;
1665 goto err;
1666 }
1667
1668 spin_lock_init(&con->rsp_wr_wait_lock);
1669 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1670 con->c.cm_id = cm_id;
1671 con->c.path = &srv_path->s;
1672 con->c.cid = cid;
1673 atomic_set(&con->c.wr_cnt, 1);
1674 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr;
1675
1676 if (con->c.cid == 0) {
1677 /*
1678 * All receive and all send (each requiring invalidate)
1679 * + 2 for drain and heartbeat
1680 */
1681 max_send_wr = min_t(int, wr_limit,
1682 SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1683 max_recv_wr = max_send_wr;
1684 s->signal_interval = min_not_zero(srv->queue_depth,
1685 (size_t)SERVICE_CON_QUEUE_DEPTH);
1686 } else {
1687 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1688 if (always_invalidate)
1689 max_send_wr =
1690 min_t(int, wr_limit,
1691 srv->queue_depth * (1 + 4) + 1);
1692 else
1693 max_send_wr =
1694 min_t(int, wr_limit,
1695 srv->queue_depth * (1 + 2) + 1);
1696
1697 max_recv_wr = srv->queue_depth + 1;
1698 /*
1699 * If we have all receive requests posted and
1700 * all write requests posted and each read request
1701 * requires an invalidate request + drain
1702 * and qp gets into error state.
1703 */
1704 }
1705 cq_num = max_send_wr + max_recv_wr;
1706 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1707 cq_vector = rtrs_srv_get_next_cq_vector(srv_path);
1708
1709 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1710 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num,
1711 max_send_wr, max_recv_wr,
1712 IB_POLL_WORKQUEUE);
1713 if (err) {
1714 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1715 goto free_con;
1716 }
1717 if (con->c.cid == 0) {
1718 err = post_recv_info_req(con);
1719 if (err)
1720 goto free_cqqp;
1721 }
1722 WARN_ON(srv_path->s.con[cid]);
1723 srv_path->s.con[cid] = &con->c;
1724
1725 /*
1726 * Change context from server to current connection. The other
1727 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1728 */
1729 cm_id->context = &con->c;
1730
1731 return 0;
1732
1733 free_cqqp:
1734 rtrs_cq_qp_destroy(&con->c);
1735 free_con:
1736 kfree(con);
1737
1738 err:
1739 return err;
1740 }
1741
__alloc_path(struct rtrs_srv_sess * srv,struct rdma_cm_id * cm_id,unsigned int con_num,unsigned int recon_cnt,const uuid_t * uuid)1742 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv,
1743 struct rdma_cm_id *cm_id,
1744 unsigned int con_num,
1745 unsigned int recon_cnt,
1746 const uuid_t *uuid)
1747 {
1748 struct rtrs_srv_path *srv_path;
1749 int err = -ENOMEM;
1750 char str[NAME_MAX];
1751 struct rtrs_addr path;
1752
1753 if (srv->paths_num >= MAX_PATHS_NUM) {
1754 err = -ECONNRESET;
1755 goto err;
1756 }
1757 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1758 err = -EEXIST;
1759 pr_err("Path with same addr exists\n");
1760 goto err;
1761 }
1762 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL);
1763 if (!srv_path)
1764 goto err;
1765
1766 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL);
1767 if (!srv_path->stats)
1768 goto err_free_sess;
1769
1770 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats);
1771 if (!srv_path->stats->rdma_stats)
1772 goto err_free_stats;
1773
1774 srv_path->stats->srv_path = srv_path;
1775
1776 srv_path->dma_addr = kcalloc(srv->queue_depth,
1777 sizeof(*srv_path->dma_addr),
1778 GFP_KERNEL);
1779 if (!srv_path->dma_addr)
1780 goto err_free_percpu;
1781
1782 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con),
1783 GFP_KERNEL);
1784 if (!srv_path->s.con)
1785 goto err_free_dma_addr;
1786
1787 srv_path->state = RTRS_SRV_CONNECTING;
1788 srv_path->srv = srv;
1789 srv_path->cur_cq_vector = -1;
1790 srv_path->s.dst_addr = cm_id->route.addr.dst_addr;
1791 srv_path->s.src_addr = cm_id->route.addr.src_addr;
1792
1793 /* temporary until receiving session-name from client */
1794 path.src = &srv_path->s.src_addr;
1795 path.dst = &srv_path->s.dst_addr;
1796 rtrs_addr_to_str(&path, str, sizeof(str));
1797 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname));
1798
1799 srv_path->s.con_num = con_num;
1800 srv_path->s.irq_con_num = con_num;
1801 srv_path->s.recon_cnt = recon_cnt;
1802 uuid_copy(&srv_path->s.uuid, uuid);
1803 spin_lock_init(&srv_path->state_lock);
1804 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work);
1805 rtrs_srv_init_hb(srv_path);
1806
1807 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1808 if (!srv_path->s.dev) {
1809 err = -ENOMEM;
1810 goto err_free_con;
1811 }
1812 err = map_cont_bufs(srv_path);
1813 if (err)
1814 goto err_put_dev;
1815
1816 err = rtrs_srv_alloc_ops_ids(srv_path);
1817 if (err)
1818 goto err_unmap_bufs;
1819
1820 __add_path_to_srv(srv, srv_path);
1821
1822 return srv_path;
1823
1824 err_unmap_bufs:
1825 unmap_cont_bufs(srv_path);
1826 err_put_dev:
1827 rtrs_ib_dev_put(srv_path->s.dev);
1828 err_free_con:
1829 kfree(srv_path->s.con);
1830 err_free_dma_addr:
1831 kfree(srv_path->dma_addr);
1832 err_free_percpu:
1833 free_percpu(srv_path->stats->rdma_stats);
1834 err_free_stats:
1835 kfree(srv_path->stats);
1836 err_free_sess:
1837 kfree(srv_path);
1838 err:
1839 return ERR_PTR(err);
1840 }
1841
rtrs_rdma_connect(struct rdma_cm_id * cm_id,const struct rtrs_msg_conn_req * msg,size_t len)1842 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1843 const struct rtrs_msg_conn_req *msg,
1844 size_t len)
1845 {
1846 struct rtrs_srv_ctx *ctx = cm_id->context;
1847 struct rtrs_srv_path *srv_path;
1848 struct rtrs_srv_sess *srv;
1849
1850 u16 version, con_num, cid;
1851 u16 recon_cnt;
1852 int err = -ECONNRESET;
1853
1854 if (len < sizeof(*msg)) {
1855 pr_err("Invalid RTRS connection request\n");
1856 goto reject_w_err;
1857 }
1858 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1859 pr_err("Invalid RTRS magic\n");
1860 goto reject_w_err;
1861 }
1862 version = le16_to_cpu(msg->version);
1863 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1864 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1865 version >> 8, RTRS_PROTO_VER_MAJOR);
1866 goto reject_w_err;
1867 }
1868 con_num = le16_to_cpu(msg->cid_num);
1869 if (con_num > 4096) {
1870 /* Sanity check */
1871 pr_err("Too many connections requested: %d\n", con_num);
1872 goto reject_w_err;
1873 }
1874 cid = le16_to_cpu(msg->cid);
1875 if (cid >= con_num) {
1876 /* Sanity check */
1877 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1878 goto reject_w_err;
1879 }
1880 recon_cnt = le16_to_cpu(msg->recon_cnt);
1881 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1882 if (IS_ERR(srv)) {
1883 err = PTR_ERR(srv);
1884 pr_err("get_or_create_srv(), error %d\n", err);
1885 goto reject_w_err;
1886 }
1887 mutex_lock(&srv->paths_mutex);
1888 srv_path = __find_path(srv, &msg->sess_uuid);
1889 if (srv_path) {
1890 struct rtrs_path *s = &srv_path->s;
1891
1892 /* Session already holds a reference */
1893 put_srv(srv);
1894
1895 if (srv_path->state != RTRS_SRV_CONNECTING) {
1896 rtrs_err(s, "Session in wrong state: %s\n",
1897 rtrs_srv_state_str(srv_path->state));
1898 mutex_unlock(&srv->paths_mutex);
1899 goto reject_w_err;
1900 }
1901 /*
1902 * Sanity checks
1903 */
1904 if (con_num != s->con_num || cid >= s->con_num) {
1905 rtrs_err(s, "Incorrect request: %d, %d\n",
1906 cid, con_num);
1907 mutex_unlock(&srv->paths_mutex);
1908 goto reject_w_err;
1909 }
1910 if (s->con[cid]) {
1911 rtrs_err(s, "Connection already exists: %d\n",
1912 cid);
1913 mutex_unlock(&srv->paths_mutex);
1914 goto reject_w_err;
1915 }
1916 } else {
1917 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt,
1918 &msg->sess_uuid);
1919 if (IS_ERR(srv_path)) {
1920 mutex_unlock(&srv->paths_mutex);
1921 put_srv(srv);
1922 err = PTR_ERR(srv_path);
1923 pr_err("RTRS server session allocation failed: %d\n", err);
1924 goto reject_w_err;
1925 }
1926 }
1927 err = create_con(srv_path, cm_id, cid);
1928 if (err) {
1929 rtrs_err((&srv_path->s), "create_con(), error %d\n", err);
1930 rtrs_rdma_do_reject(cm_id, err);
1931 /*
1932 * Since session has other connections we follow normal way
1933 * through workqueue, but still return an error to tell cma.c
1934 * to call rdma_destroy_id() for current connection.
1935 */
1936 goto close_and_return_err;
1937 }
1938 err = rtrs_rdma_do_accept(srv_path, cm_id);
1939 if (err) {
1940 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err);
1941 rtrs_rdma_do_reject(cm_id, err);
1942 /*
1943 * Since current connection was successfully added to the
1944 * session we follow normal way through workqueue to close the
1945 * session, thus return 0 to tell cma.c we call
1946 * rdma_destroy_id() ourselves.
1947 */
1948 err = 0;
1949 goto close_and_return_err;
1950 }
1951 mutex_unlock(&srv->paths_mutex);
1952
1953 return 0;
1954
1955 reject_w_err:
1956 return rtrs_rdma_do_reject(cm_id, err);
1957
1958 close_and_return_err:
1959 mutex_unlock(&srv->paths_mutex);
1960 close_path(srv_path);
1961
1962 return err;
1963 }
1964
rtrs_srv_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1965 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1966 struct rdma_cm_event *ev)
1967 {
1968 struct rtrs_srv_path *srv_path = NULL;
1969 struct rtrs_path *s = NULL;
1970
1971 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1972 struct rtrs_con *c = cm_id->context;
1973
1974 s = c->path;
1975 srv_path = to_srv_path(s);
1976 }
1977
1978 switch (ev->event) {
1979 case RDMA_CM_EVENT_CONNECT_REQUEST:
1980 /*
1981 * In case of error cma.c will destroy cm_id,
1982 * see cma_process_remove()
1983 */
1984 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1985 ev->param.conn.private_data_len);
1986 case RDMA_CM_EVENT_ESTABLISHED:
1987 /* Nothing here */
1988 break;
1989 case RDMA_CM_EVENT_REJECTED:
1990 case RDMA_CM_EVENT_CONNECT_ERROR:
1991 case RDMA_CM_EVENT_UNREACHABLE:
1992 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1993 rdma_event_msg(ev->event), ev->status);
1994 fallthrough;
1995 case RDMA_CM_EVENT_DISCONNECTED:
1996 case RDMA_CM_EVENT_ADDR_CHANGE:
1997 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1998 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1999 close_path(srv_path);
2000 break;
2001 default:
2002 pr_err("Ignoring unexpected CM event %s, err %d\n",
2003 rdma_event_msg(ev->event), ev->status);
2004 break;
2005 }
2006
2007 return 0;
2008 }
2009
rtrs_srv_cm_init(struct rtrs_srv_ctx * ctx,struct sockaddr * addr,enum rdma_ucm_port_space ps)2010 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
2011 struct sockaddr *addr,
2012 enum rdma_ucm_port_space ps)
2013 {
2014 struct rdma_cm_id *cm_id;
2015 int ret;
2016
2017 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
2018 ctx, ps, IB_QPT_RC);
2019 if (IS_ERR(cm_id)) {
2020 ret = PTR_ERR(cm_id);
2021 pr_err("Creating id for RDMA connection failed, err: %d\n",
2022 ret);
2023 goto err_out;
2024 }
2025 ret = rdma_bind_addr(cm_id, addr);
2026 if (ret) {
2027 pr_err("Binding RDMA address failed, err: %d\n", ret);
2028 goto err_cm;
2029 }
2030 ret = rdma_listen(cm_id, 64);
2031 if (ret) {
2032 pr_err("Listening on RDMA connection failed, err: %d\n",
2033 ret);
2034 goto err_cm;
2035 }
2036
2037 return cm_id;
2038
2039 err_cm:
2040 rdma_destroy_id(cm_id);
2041 err_out:
2042
2043 return ERR_PTR(ret);
2044 }
2045
rtrs_srv_rdma_init(struct rtrs_srv_ctx * ctx,u16 port)2046 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2047 {
2048 struct sockaddr_in6 sin = {
2049 .sin6_family = AF_INET6,
2050 .sin6_addr = IN6ADDR_ANY_INIT,
2051 .sin6_port = htons(port),
2052 };
2053 struct sockaddr_ib sib = {
2054 .sib_family = AF_IB,
2055 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2056 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
2057 .sib_pkey = cpu_to_be16(0xffff),
2058 };
2059 struct rdma_cm_id *cm_ip, *cm_ib;
2060 int ret;
2061
2062 /*
2063 * We accept both IPoIB and IB connections, so we need to keep
2064 * two cm id's, one for each socket type and port space.
2065 * If the cm initialization of one of the id's fails, we abort
2066 * everything.
2067 */
2068 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2069 if (IS_ERR(cm_ip))
2070 return PTR_ERR(cm_ip);
2071
2072 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2073 if (IS_ERR(cm_ib)) {
2074 ret = PTR_ERR(cm_ib);
2075 goto free_cm_ip;
2076 }
2077
2078 ctx->cm_id_ip = cm_ip;
2079 ctx->cm_id_ib = cm_ib;
2080
2081 return 0;
2082
2083 free_cm_ip:
2084 rdma_destroy_id(cm_ip);
2085
2086 return ret;
2087 }
2088
alloc_srv_ctx(struct rtrs_srv_ops * ops)2089 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2090 {
2091 struct rtrs_srv_ctx *ctx;
2092
2093 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2094 if (!ctx)
2095 return NULL;
2096
2097 ctx->ops = *ops;
2098 mutex_init(&ctx->srv_mutex);
2099 INIT_LIST_HEAD(&ctx->srv_list);
2100
2101 return ctx;
2102 }
2103
free_srv_ctx(struct rtrs_srv_ctx * ctx)2104 static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2105 {
2106 WARN_ON(!list_empty(&ctx->srv_list));
2107 mutex_destroy(&ctx->srv_mutex);
2108 kfree(ctx);
2109 }
2110
rtrs_srv_add_one(struct ib_device * device)2111 static int rtrs_srv_add_one(struct ib_device *device)
2112 {
2113 struct rtrs_srv_ctx *ctx;
2114 int ret = 0;
2115
2116 mutex_lock(&ib_ctx.ib_dev_mutex);
2117 if (ib_ctx.ib_dev_count)
2118 goto out;
2119
2120 /*
2121 * Since our CM IDs are NOT bound to any ib device we will create them
2122 * only once
2123 */
2124 ctx = ib_ctx.srv_ctx;
2125 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2126 if (ret) {
2127 /*
2128 * We errored out here.
2129 * According to the ib code, if we encounter an error here then the
2130 * error code is ignored, and no more calls to our ops are made.
2131 */
2132 pr_err("Failed to initialize RDMA connection");
2133 goto err_out;
2134 }
2135
2136 out:
2137 /*
2138 * Keep a track on the number of ib devices added
2139 */
2140 ib_ctx.ib_dev_count++;
2141
2142 err_out:
2143 mutex_unlock(&ib_ctx.ib_dev_mutex);
2144 return ret;
2145 }
2146
rtrs_srv_remove_one(struct ib_device * device,void * client_data)2147 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2148 {
2149 struct rtrs_srv_ctx *ctx;
2150
2151 mutex_lock(&ib_ctx.ib_dev_mutex);
2152 ib_ctx.ib_dev_count--;
2153
2154 if (ib_ctx.ib_dev_count)
2155 goto out;
2156
2157 /*
2158 * Since our CM IDs are NOT bound to any ib device we will remove them
2159 * only once, when the last device is removed
2160 */
2161 ctx = ib_ctx.srv_ctx;
2162 rdma_destroy_id(ctx->cm_id_ip);
2163 rdma_destroy_id(ctx->cm_id_ib);
2164
2165 out:
2166 mutex_unlock(&ib_ctx.ib_dev_mutex);
2167 }
2168
2169 static struct ib_client rtrs_srv_client = {
2170 .name = "rtrs_server",
2171 .add = rtrs_srv_add_one,
2172 .remove = rtrs_srv_remove_one
2173 };
2174
2175 /**
2176 * rtrs_srv_open() - open RTRS server context
2177 * @ops: callback functions
2178 * @port: port to listen on
2179 *
2180 * Creates server context with specified callbacks.
2181 *
2182 * Return a valid pointer on success otherwise PTR_ERR.
2183 */
rtrs_srv_open(struct rtrs_srv_ops * ops,u16 port)2184 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2185 {
2186 struct rtrs_srv_ctx *ctx;
2187 int err;
2188
2189 ctx = alloc_srv_ctx(ops);
2190 if (!ctx)
2191 return ERR_PTR(-ENOMEM);
2192
2193 mutex_init(&ib_ctx.ib_dev_mutex);
2194 ib_ctx.srv_ctx = ctx;
2195 ib_ctx.port = port;
2196
2197 err = ib_register_client(&rtrs_srv_client);
2198 if (err) {
2199 free_srv_ctx(ctx);
2200 return ERR_PTR(err);
2201 }
2202
2203 return ctx;
2204 }
2205 EXPORT_SYMBOL(rtrs_srv_open);
2206
close_paths(struct rtrs_srv_sess * srv)2207 static void close_paths(struct rtrs_srv_sess *srv)
2208 {
2209 struct rtrs_srv_path *srv_path;
2210
2211 mutex_lock(&srv->paths_mutex);
2212 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
2213 close_path(srv_path);
2214 mutex_unlock(&srv->paths_mutex);
2215 }
2216
close_ctx(struct rtrs_srv_ctx * ctx)2217 static void close_ctx(struct rtrs_srv_ctx *ctx)
2218 {
2219 struct rtrs_srv_sess *srv;
2220
2221 mutex_lock(&ctx->srv_mutex);
2222 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2223 close_paths(srv);
2224 mutex_unlock(&ctx->srv_mutex);
2225 flush_workqueue(rtrs_wq);
2226 }
2227
2228 /**
2229 * rtrs_srv_close() - close RTRS server context
2230 * @ctx: pointer to server context
2231 *
2232 * Closes RTRS server context with all client sessions.
2233 */
rtrs_srv_close(struct rtrs_srv_ctx * ctx)2234 void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2235 {
2236 ib_unregister_client(&rtrs_srv_client);
2237 mutex_destroy(&ib_ctx.ib_dev_mutex);
2238 close_ctx(ctx);
2239 free_srv_ctx(ctx);
2240 }
2241 EXPORT_SYMBOL(rtrs_srv_close);
2242
check_module_params(void)2243 static int check_module_params(void)
2244 {
2245 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2246 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2247 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2248 return -EINVAL;
2249 }
2250 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2251 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2252 max_chunk_size, MIN_CHUNK_SIZE);
2253 return -EINVAL;
2254 }
2255
2256 /*
2257 * Check if IB immediate data size is enough to hold the mem_id and the
2258 * offset inside the memory chunk
2259 */
2260 if ((ilog2(sess_queue_depth - 1) + 1) +
2261 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2262 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2263 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2264 return -EINVAL;
2265 }
2266
2267 return 0;
2268 }
2269
rtrs_server_init(void)2270 static int __init rtrs_server_init(void)
2271 {
2272 int err;
2273
2274 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2275 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2276 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2277 sess_queue_depth, always_invalidate);
2278
2279 rtrs_rdma_dev_pd_init(0, &dev_pd);
2280
2281 err = check_module_params();
2282 if (err) {
2283 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2284 err);
2285 return err;
2286 }
2287 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2288 if (IS_ERR(rtrs_dev_class)) {
2289 err = PTR_ERR(rtrs_dev_class);
2290 goto out_err;
2291 }
2292 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2293 if (!rtrs_wq) {
2294 err = -ENOMEM;
2295 goto out_dev_class;
2296 }
2297
2298 return 0;
2299
2300 out_dev_class:
2301 class_destroy(rtrs_dev_class);
2302 out_err:
2303 return err;
2304 }
2305
rtrs_server_exit(void)2306 static void __exit rtrs_server_exit(void)
2307 {
2308 destroy_workqueue(rtrs_wq);
2309 class_destroy(rtrs_dev_class);
2310 rtrs_rdma_dev_pd_deinit(&dev_pd);
2311 }
2312
2313 module_init(rtrs_server_init);
2314 module_exit(rtrs_server_exit);
2315