1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/mempool.h>
15
16 #include "rtrs-srv.h"
17 #include "rtrs-log.h"
18 #include <rdma/ib_cm.h>
19 #include <rdma/ib_verbs.h>
20
21 MODULE_DESCRIPTION("RDMA Transport Server");
22 MODULE_LICENSE("GPL");
23
24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
26 #define DEFAULT_SESS_QUEUE_DEPTH 512
27 #define MAX_HDR_SIZE PAGE_SIZE
28
29 /* We guarantee to serve 10 paths at least */
30 #define CHUNK_POOL_SZ 10
31
32 static struct rtrs_rdma_dev_pd dev_pd;
33 static mempool_t *chunk_pool;
34 struct class *rtrs_dev_class;
35 static struct rtrs_srv_ib_ctx ib_ctx;
36
37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
39
40 static bool always_invalidate = true;
41 module_param(always_invalidate, bool, 0444);
42 MODULE_PARM_DESC(always_invalidate,
43 "Invalidate memory registration for contiguous memory regions before accessing.");
44
45 module_param_named(max_chunk_size, max_chunk_size, int, 0444);
46 MODULE_PARM_DESC(max_chunk_size,
47 "Max size for each IO request, when change the unit is in byte (default: "
48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
49
50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
51 MODULE_PARM_DESC(sess_queue_depth,
52 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
55
56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
57
58 static struct workqueue_struct *rtrs_wq;
59
to_srv_con(struct rtrs_con * c)60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
61 {
62 return container_of(c, struct rtrs_srv_con, c);
63 }
64
to_srv_path(struct rtrs_path * s)65 static inline struct rtrs_srv_path *to_srv_path(struct rtrs_path *s)
66 {
67 return container_of(s, struct rtrs_srv_path, s);
68 }
69
rtrs_srv_change_state(struct rtrs_srv_path * srv_path,enum rtrs_srv_state new_state)70 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path,
71 enum rtrs_srv_state new_state)
72 {
73 enum rtrs_srv_state old_state;
74 bool changed = false;
75 unsigned long flags;
76
77 spin_lock_irqsave(&srv_path->state_lock, flags);
78 old_state = srv_path->state;
79 switch (new_state) {
80 case RTRS_SRV_CONNECTED:
81 if (old_state == RTRS_SRV_CONNECTING)
82 changed = true;
83 break;
84 case RTRS_SRV_CLOSING:
85 if (old_state == RTRS_SRV_CONNECTING ||
86 old_state == RTRS_SRV_CONNECTED)
87 changed = true;
88 break;
89 case RTRS_SRV_CLOSED:
90 if (old_state == RTRS_SRV_CLOSING)
91 changed = true;
92 break;
93 default:
94 break;
95 }
96 if (changed)
97 srv_path->state = new_state;
98 spin_unlock_irqrestore(&srv_path->state_lock, flags);
99
100 return changed;
101 }
102
free_id(struct rtrs_srv_op * id)103 static void free_id(struct rtrs_srv_op *id)
104 {
105 if (!id)
106 return;
107 kfree(id);
108 }
109
rtrs_srv_free_ops_ids(struct rtrs_srv_path * srv_path)110 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path)
111 {
112 struct rtrs_srv *srv = srv_path->srv;
113 int i;
114
115 if (srv_path->ops_ids) {
116 for (i = 0; i < srv->queue_depth; i++)
117 free_id(srv_path->ops_ids[i]);
118 kfree(srv_path->ops_ids);
119 srv_path->ops_ids = NULL;
120 }
121 }
122
123 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
124
125 static struct ib_cqe io_comp_cqe = {
126 .done = rtrs_srv_rdma_done
127 };
128
rtrs_srv_inflight_ref_release(struct percpu_ref * ref)129 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
130 {
131 struct rtrs_srv_path *srv_path = container_of(ref,
132 struct rtrs_srv_path,
133 ids_inflight_ref);
134
135 percpu_ref_exit(&srv_path->ids_inflight_ref);
136 complete(&srv_path->complete_done);
137 }
138
rtrs_srv_alloc_ops_ids(struct rtrs_srv_path * srv_path)139 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path)
140 {
141 struct rtrs_srv *srv = srv_path->srv;
142 struct rtrs_srv_op *id;
143 int i, ret;
144
145 srv_path->ops_ids = kcalloc(srv->queue_depth,
146 sizeof(*srv_path->ops_ids),
147 GFP_KERNEL);
148 if (!srv_path->ops_ids)
149 goto err;
150
151 for (i = 0; i < srv->queue_depth; ++i) {
152 id = kzalloc(sizeof(*id), GFP_KERNEL);
153 if (!id)
154 goto err;
155
156 srv_path->ops_ids[i] = id;
157 }
158
159 ret = percpu_ref_init(&srv_path->ids_inflight_ref,
160 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
161 if (ret) {
162 pr_err("Percpu reference init failed\n");
163 goto err;
164 }
165 init_completion(&srv_path->complete_done);
166
167 return 0;
168
169 err:
170 rtrs_srv_free_ops_ids(srv_path);
171 return -ENOMEM;
172 }
173
rtrs_srv_get_ops_ids(struct rtrs_srv_path * srv_path)174 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path)
175 {
176 percpu_ref_get(&srv_path->ids_inflight_ref);
177 }
178
rtrs_srv_put_ops_ids(struct rtrs_srv_path * srv_path)179 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path)
180 {
181 percpu_ref_put(&srv_path->ids_inflight_ref);
182 }
183
rtrs_srv_reg_mr_done(struct ib_cq * cq,struct ib_wc * wc)184 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
185 {
186 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
187 struct rtrs_path *s = con->c.path;
188 struct rtrs_srv_path *srv_path = to_srv_path(s);
189
190 if (wc->status != IB_WC_SUCCESS) {
191 rtrs_err(s, "REG MR failed: %s\n",
192 ib_wc_status_msg(wc->status));
193 close_path(srv_path);
194 return;
195 }
196 }
197
198 static struct ib_cqe local_reg_cqe = {
199 .done = rtrs_srv_reg_mr_done
200 };
201
rdma_write_sg(struct rtrs_srv_op * id)202 static int rdma_write_sg(struct rtrs_srv_op *id)
203 {
204 struct rtrs_path *s = id->con->c.path;
205 struct rtrs_srv_path *srv_path = to_srv_path(s);
206 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id];
207 struct rtrs_srv_mr *srv_mr;
208 struct ib_send_wr inv_wr;
209 struct ib_rdma_wr imm_wr;
210 struct ib_rdma_wr *wr = NULL;
211 enum ib_send_flags flags;
212 size_t sg_cnt;
213 int err, offset;
214 bool need_inval;
215 u32 rkey = 0;
216 struct ib_reg_wr rwr;
217 struct ib_sge *plist;
218 struct ib_sge list;
219
220 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
221 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
222 if (sg_cnt != 1)
223 return -EINVAL;
224
225 offset = 0;
226
227 wr = &id->tx_wr;
228 plist = &id->tx_sg;
229 plist->addr = dma_addr + offset;
230 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
231
232 /* WR will fail with length error
233 * if this is 0
234 */
235 if (plist->length == 0) {
236 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
237 return -EINVAL;
238 }
239
240 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
241 offset += plist->length;
242
243 wr->wr.sg_list = plist;
244 wr->wr.num_sge = 1;
245 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
246 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
247 if (rkey == 0)
248 rkey = wr->rkey;
249 else
250 /* Only one key is actually used */
251 WARN_ON_ONCE(rkey != wr->rkey);
252
253 wr->wr.opcode = IB_WR_RDMA_WRITE;
254 wr->wr.wr_cqe = &io_comp_cqe;
255 wr->wr.ex.imm_data = 0;
256 wr->wr.send_flags = 0;
257
258 if (need_inval && always_invalidate) {
259 wr->wr.next = &rwr.wr;
260 rwr.wr.next = &inv_wr;
261 inv_wr.next = &imm_wr.wr;
262 } else if (always_invalidate) {
263 wr->wr.next = &rwr.wr;
264 rwr.wr.next = &imm_wr.wr;
265 } else if (need_inval) {
266 wr->wr.next = &inv_wr;
267 inv_wr.next = &imm_wr.wr;
268 } else {
269 wr->wr.next = &imm_wr.wr;
270 }
271 /*
272 * From time to time we have to post signaled sends,
273 * or send queue will fill up and only QP reset can help.
274 */
275 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
276 0 : IB_SEND_SIGNALED;
277
278 if (need_inval) {
279 inv_wr.sg_list = NULL;
280 inv_wr.num_sge = 0;
281 inv_wr.opcode = IB_WR_SEND_WITH_INV;
282 inv_wr.wr_cqe = &io_comp_cqe;
283 inv_wr.send_flags = 0;
284 inv_wr.ex.invalidate_rkey = rkey;
285 }
286
287 imm_wr.wr.next = NULL;
288 if (always_invalidate) {
289 struct rtrs_msg_rkey_rsp *msg;
290
291 srv_mr = &srv_path->mrs[id->msg_id];
292 rwr.wr.opcode = IB_WR_REG_MR;
293 rwr.wr.wr_cqe = &local_reg_cqe;
294 rwr.wr.num_sge = 0;
295 rwr.mr = srv_mr->mr;
296 rwr.wr.send_flags = 0;
297 rwr.key = srv_mr->mr->rkey;
298 rwr.access = (IB_ACCESS_LOCAL_WRITE |
299 IB_ACCESS_REMOTE_WRITE);
300 msg = srv_mr->iu->buf;
301 msg->buf_id = cpu_to_le16(id->msg_id);
302 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
303 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
304
305 list.addr = srv_mr->iu->dma_addr;
306 list.length = sizeof(*msg);
307 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
308 imm_wr.wr.sg_list = &list;
309 imm_wr.wr.num_sge = 1;
310 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
311 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
312 srv_mr->iu->dma_addr,
313 srv_mr->iu->size, DMA_TO_DEVICE);
314 } else {
315 imm_wr.wr.sg_list = NULL;
316 imm_wr.wr.num_sge = 0;
317 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
318 }
319 imm_wr.wr.send_flags = flags;
320 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
321 0, need_inval));
322
323 imm_wr.wr.wr_cqe = &io_comp_cqe;
324 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr,
325 offset, DMA_BIDIRECTIONAL);
326
327 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
328 if (err)
329 rtrs_err(s,
330 "Posting RDMA-Write-Request to QP failed, err: %d\n",
331 err);
332
333 return err;
334 }
335
336 /**
337 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
338 * requests or on successful WRITE request.
339 * @con: the connection to send back result
340 * @id: the id associated with the IO
341 * @errno: the error number of the IO.
342 *
343 * Return 0 on success, errno otherwise.
344 */
send_io_resp_imm(struct rtrs_srv_con * con,struct rtrs_srv_op * id,int errno)345 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
346 int errno)
347 {
348 struct rtrs_path *s = con->c.path;
349 struct rtrs_srv_path *srv_path = to_srv_path(s);
350 struct ib_send_wr inv_wr, *wr = NULL;
351 struct ib_rdma_wr imm_wr;
352 struct ib_reg_wr rwr;
353 struct rtrs_srv_mr *srv_mr;
354 bool need_inval = false;
355 enum ib_send_flags flags;
356 u32 imm;
357 int err;
358
359 if (id->dir == READ) {
360 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
361 size_t sg_cnt;
362
363 need_inval = le16_to_cpu(rd_msg->flags) &
364 RTRS_MSG_NEED_INVAL_F;
365 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
366
367 if (need_inval) {
368 if (sg_cnt) {
369 inv_wr.wr_cqe = &io_comp_cqe;
370 inv_wr.sg_list = NULL;
371 inv_wr.num_sge = 0;
372 inv_wr.opcode = IB_WR_SEND_WITH_INV;
373 inv_wr.send_flags = 0;
374 /* Only one key is actually used */
375 inv_wr.ex.invalidate_rkey =
376 le32_to_cpu(rd_msg->desc[0].key);
377 } else {
378 WARN_ON_ONCE(1);
379 need_inval = false;
380 }
381 }
382 }
383
384 if (need_inval && always_invalidate) {
385 wr = &inv_wr;
386 inv_wr.next = &rwr.wr;
387 rwr.wr.next = &imm_wr.wr;
388 } else if (always_invalidate) {
389 wr = &rwr.wr;
390 rwr.wr.next = &imm_wr.wr;
391 } else if (need_inval) {
392 wr = &inv_wr;
393 inv_wr.next = &imm_wr.wr;
394 } else {
395 wr = &imm_wr.wr;
396 }
397 /*
398 * From time to time we have to post signalled sends,
399 * or send queue will fill up and only QP reset can help.
400 */
401 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
402 0 : IB_SEND_SIGNALED;
403 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
404 imm_wr.wr.next = NULL;
405 if (always_invalidate) {
406 struct ib_sge list;
407 struct rtrs_msg_rkey_rsp *msg;
408
409 srv_mr = &srv_path->mrs[id->msg_id];
410 rwr.wr.next = &imm_wr.wr;
411 rwr.wr.opcode = IB_WR_REG_MR;
412 rwr.wr.wr_cqe = &local_reg_cqe;
413 rwr.wr.num_sge = 0;
414 rwr.wr.send_flags = 0;
415 rwr.mr = srv_mr->mr;
416 rwr.key = srv_mr->mr->rkey;
417 rwr.access = (IB_ACCESS_LOCAL_WRITE |
418 IB_ACCESS_REMOTE_WRITE);
419 msg = srv_mr->iu->buf;
420 msg->buf_id = cpu_to_le16(id->msg_id);
421 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
422 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
423
424 list.addr = srv_mr->iu->dma_addr;
425 list.length = sizeof(*msg);
426 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
427 imm_wr.wr.sg_list = &list;
428 imm_wr.wr.num_sge = 1;
429 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
430 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
431 srv_mr->iu->dma_addr,
432 srv_mr->iu->size, DMA_TO_DEVICE);
433 } else {
434 imm_wr.wr.sg_list = NULL;
435 imm_wr.wr.num_sge = 0;
436 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
437 }
438 imm_wr.wr.send_flags = flags;
439 imm_wr.wr.wr_cqe = &io_comp_cqe;
440
441 imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
442
443 err = ib_post_send(id->con->c.qp, wr, NULL);
444 if (err)
445 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
446 err);
447
448 return err;
449 }
450
close_path(struct rtrs_srv_path * srv_path)451 void close_path(struct rtrs_srv_path *srv_path)
452 {
453 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING))
454 queue_work(rtrs_wq, &srv_path->close_work);
455 WARN_ON(srv_path->state != RTRS_SRV_CLOSING);
456 }
457
rtrs_srv_state_str(enum rtrs_srv_state state)458 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
459 {
460 switch (state) {
461 case RTRS_SRV_CONNECTING:
462 return "RTRS_SRV_CONNECTING";
463 case RTRS_SRV_CONNECTED:
464 return "RTRS_SRV_CONNECTED";
465 case RTRS_SRV_CLOSING:
466 return "RTRS_SRV_CLOSING";
467 case RTRS_SRV_CLOSED:
468 return "RTRS_SRV_CLOSED";
469 default:
470 return "UNKNOWN";
471 }
472 }
473
474 /**
475 * rtrs_srv_resp_rdma() - Finish an RDMA request
476 *
477 * @id: Internal RTRS operation identifier
478 * @status: Response Code sent to the other side for this operation.
479 * 0 = success, <=0 error
480 * Context: any
481 *
482 * Finish a RDMA operation. A message is sent to the client and the
483 * corresponding memory areas will be released.
484 */
rtrs_srv_resp_rdma(struct rtrs_srv_op * id,int status)485 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
486 {
487 struct rtrs_srv_path *srv_path;
488 struct rtrs_srv_con *con;
489 struct rtrs_path *s;
490 int err;
491
492 if (WARN_ON(!id))
493 return true;
494
495 con = id->con;
496 s = con->c.path;
497 srv_path = to_srv_path(s);
498
499 id->status = status;
500
501 if (srv_path->state != RTRS_SRV_CONNECTED) {
502 rtrs_err_rl(s,
503 "Sending I/O response failed, server path %s is disconnected, path state %s\n",
504 kobject_name(&srv_path->kobj),
505 rtrs_srv_state_str(srv_path->state));
506 goto out;
507 }
508 if (always_invalidate) {
509 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id];
510
511 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
512 }
513 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
514 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n",
515 kobject_name(&srv_path->kobj),
516 con->c.cid);
517 atomic_add(1, &con->c.sq_wr_avail);
518 spin_lock(&con->rsp_wr_wait_lock);
519 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
520 spin_unlock(&con->rsp_wr_wait_lock);
521 return false;
522 }
523
524 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
525 err = send_io_resp_imm(con, id, status);
526 else
527 err = rdma_write_sg(id);
528
529 if (err) {
530 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err,
531 kobject_name(&srv_path->kobj));
532 close_path(srv_path);
533 }
534 out:
535 rtrs_srv_put_ops_ids(srv_path);
536 return true;
537 }
538 EXPORT_SYMBOL(rtrs_srv_resp_rdma);
539
540 /**
541 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
542 * @srv: Session pointer
543 * @priv: The private pointer that is associated with the session.
544 */
rtrs_srv_set_sess_priv(struct rtrs_srv * srv,void * priv)545 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
546 {
547 srv->priv = priv;
548 }
549 EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
550
unmap_cont_bufs(struct rtrs_srv_path * srv_path)551 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path)
552 {
553 int i;
554
555 for (i = 0; i < srv_path->mrs_num; i++) {
556 struct rtrs_srv_mr *srv_mr;
557
558 srv_mr = &srv_path->mrs[i];
559
560 if (always_invalidate)
561 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
562
563 ib_dereg_mr(srv_mr->mr);
564 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl,
565 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
566 sg_free_table(&srv_mr->sgt);
567 }
568 kfree(srv_path->mrs);
569 }
570
map_cont_bufs(struct rtrs_srv_path * srv_path)571 static int map_cont_bufs(struct rtrs_srv_path *srv_path)
572 {
573 struct rtrs_srv *srv = srv_path->srv;
574 struct rtrs_path *ss = &srv_path->s;
575 int i, mri, err, mrs_num;
576 unsigned int chunk_bits;
577 int chunks_per_mr = 1;
578
579 /*
580 * Here we map queue_depth chunks to MR. Firstly we have to
581 * figure out how many chunks can we map per MR.
582 */
583 if (always_invalidate) {
584 /*
585 * in order to do invalidate for each chunks of memory, we needs
586 * more memory regions.
587 */
588 mrs_num = srv->queue_depth;
589 } else {
590 chunks_per_mr =
591 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
592 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
593 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
594 }
595
596 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL);
597 if (!srv_path->mrs)
598 return -ENOMEM;
599
600 srv_path->mrs_num = mrs_num;
601
602 for (mri = 0; mri < mrs_num; mri++) {
603 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri];
604 struct sg_table *sgt = &srv_mr->sgt;
605 struct scatterlist *s;
606 struct ib_mr *mr;
607 int nr, nr_sgt, chunks;
608
609 chunks = chunks_per_mr * mri;
610 if (!always_invalidate)
611 chunks_per_mr = min_t(int, chunks_per_mr,
612 srv->queue_depth - chunks);
613
614 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
615 if (err)
616 goto err;
617
618 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
619 sg_set_page(s, srv->chunks[chunks + i],
620 max_chunk_size, 0);
621
622 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl,
623 sgt->nents, DMA_BIDIRECTIONAL);
624 if (!nr_sgt) {
625 err = -EINVAL;
626 goto free_sg;
627 }
628 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
629 nr_sgt);
630 if (IS_ERR(mr)) {
631 err = PTR_ERR(mr);
632 goto unmap_sg;
633 }
634 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt,
635 NULL, max_chunk_size);
636 if (nr < 0 || nr < sgt->nents) {
637 err = nr < 0 ? nr : -EINVAL;
638 goto dereg_mr;
639 }
640
641 if (always_invalidate) {
642 srv_mr->iu = rtrs_iu_alloc(1,
643 sizeof(struct rtrs_msg_rkey_rsp),
644 GFP_KERNEL, srv_path->s.dev->ib_dev,
645 DMA_TO_DEVICE, rtrs_srv_rdma_done);
646 if (!srv_mr->iu) {
647 err = -ENOMEM;
648 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
649 goto dereg_mr;
650 }
651 }
652 /* Eventually dma addr for each chunk can be cached */
653 for_each_sg(sgt->sgl, s, nr_sgt, i)
654 srv_path->dma_addr[chunks + i] = sg_dma_address(s);
655
656 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
657 srv_mr->mr = mr;
658
659 continue;
660 err:
661 while (mri--) {
662 srv_mr = &srv_path->mrs[mri];
663 sgt = &srv_mr->sgt;
664 mr = srv_mr->mr;
665 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
666 dereg_mr:
667 ib_dereg_mr(mr);
668 unmap_sg:
669 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl,
670 sgt->nents, DMA_BIDIRECTIONAL);
671 free_sg:
672 sg_free_table(sgt);
673 }
674 kfree(srv_path->mrs);
675
676 return err;
677 }
678
679 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
680 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
681
682 return 0;
683 }
684
rtrs_srv_hb_err_handler(struct rtrs_con * c)685 static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
686 {
687 close_path(to_srv_path(c->path));
688 }
689
rtrs_srv_init_hb(struct rtrs_srv_path * srv_path)690 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path)
691 {
692 rtrs_init_hb(&srv_path->s, &io_comp_cqe,
693 RTRS_HB_INTERVAL_MS,
694 RTRS_HB_MISSED_MAX,
695 rtrs_srv_hb_err_handler,
696 rtrs_wq);
697 }
698
rtrs_srv_start_hb(struct rtrs_srv_path * srv_path)699 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path)
700 {
701 rtrs_start_hb(&srv_path->s);
702 }
703
rtrs_srv_stop_hb(struct rtrs_srv_path * srv_path)704 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path)
705 {
706 rtrs_stop_hb(&srv_path->s);
707 }
708
rtrs_srv_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)709 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
710 {
711 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
712 struct rtrs_path *s = con->c.path;
713 struct rtrs_srv_path *srv_path = to_srv_path(s);
714 struct rtrs_iu *iu;
715
716 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
717 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
718
719 if (wc->status != IB_WC_SUCCESS) {
720 rtrs_err(s, "Sess info response send failed: %s\n",
721 ib_wc_status_msg(wc->status));
722 close_path(srv_path);
723 return;
724 }
725 WARN_ON(wc->opcode != IB_WC_SEND);
726 }
727
rtrs_srv_path_up(struct rtrs_srv_path * srv_path)728 static int rtrs_srv_path_up(struct rtrs_srv_path *srv_path)
729 {
730 struct rtrs_srv *srv = srv_path->srv;
731 struct rtrs_srv_ctx *ctx = srv->ctx;
732 int up, ret = 0;
733
734 mutex_lock(&srv->paths_ev_mutex);
735 up = ++srv->paths_up;
736 if (up == 1)
737 ret = ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
738 mutex_unlock(&srv->paths_ev_mutex);
739
740 /* Mark session as established */
741 if (!ret)
742 srv_path->established = true;
743
744 return ret;
745 }
746
rtrs_srv_path_down(struct rtrs_srv_path * srv_path)747 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path)
748 {
749 struct rtrs_srv *srv = srv_path->srv;
750 struct rtrs_srv_ctx *ctx = srv->ctx;
751
752 if (!srv_path->established)
753 return;
754
755 srv_path->established = false;
756 mutex_lock(&srv->paths_ev_mutex);
757 WARN_ON(!srv->paths_up);
758 if (--srv->paths_up == 0)
759 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
760 mutex_unlock(&srv->paths_ev_mutex);
761 }
762
exist_pathname(struct rtrs_srv_ctx * ctx,const char * pathname,const uuid_t * path_uuid)763 static bool exist_pathname(struct rtrs_srv_ctx *ctx,
764 const char *pathname, const uuid_t *path_uuid)
765 {
766 struct rtrs_srv *srv;
767 struct rtrs_srv_path *srv_path;
768 bool found = false;
769
770 mutex_lock(&ctx->srv_mutex);
771 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
772 mutex_lock(&srv->paths_mutex);
773
774 /* when a client with same uuid and same sessname tried to add a path */
775 if (uuid_equal(&srv->paths_uuid, path_uuid)) {
776 mutex_unlock(&srv->paths_mutex);
777 continue;
778 }
779
780 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
781 if (strlen(srv_path->s.sessname) == strlen(pathname) &&
782 !strcmp(srv_path->s.sessname, pathname)) {
783 found = true;
784 break;
785 }
786 }
787 mutex_unlock(&srv->paths_mutex);
788 if (found)
789 break;
790 }
791 mutex_unlock(&ctx->srv_mutex);
792 return found;
793 }
794
795 static int post_recv_path(struct rtrs_srv_path *srv_path);
796 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
797
process_info_req(struct rtrs_srv_con * con,struct rtrs_msg_info_req * msg)798 static int process_info_req(struct rtrs_srv_con *con,
799 struct rtrs_msg_info_req *msg)
800 {
801 struct rtrs_path *s = con->c.path;
802 struct rtrs_srv_path *srv_path = to_srv_path(s);
803 struct ib_send_wr *reg_wr = NULL;
804 struct rtrs_msg_info_rsp *rsp;
805 struct rtrs_iu *tx_iu;
806 struct ib_reg_wr *rwr;
807 int mri, err;
808 size_t tx_sz;
809
810 err = post_recv_path(srv_path);
811 if (err) {
812 rtrs_err(s, "post_recv_path(), err: %d\n", err);
813 return err;
814 }
815
816 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) {
817 rtrs_err(s, "pathname cannot contain / and .\n");
818 return -EINVAL;
819 }
820
821 if (exist_pathname(srv_path->srv->ctx,
822 msg->pathname, &srv_path->srv->paths_uuid)) {
823 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname);
824 return -EPERM;
825 }
826 strscpy(srv_path->s.sessname, msg->pathname,
827 sizeof(srv_path->s.sessname));
828
829 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL);
830 if (!rwr)
831 return -ENOMEM;
832
833 tx_sz = sizeof(*rsp);
834 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num;
835 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev,
836 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
837 if (!tx_iu) {
838 err = -ENOMEM;
839 goto rwr_free;
840 }
841
842 rsp = tx_iu->buf;
843 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
844 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num);
845
846 for (mri = 0; mri < srv_path->mrs_num; mri++) {
847 struct ib_mr *mr = srv_path->mrs[mri].mr;
848
849 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
850 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
851 rsp->desc[mri].len = cpu_to_le32(mr->length);
852
853 /*
854 * Fill in reg MR request and chain them *backwards*
855 */
856 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
857 rwr[mri].wr.opcode = IB_WR_REG_MR;
858 rwr[mri].wr.wr_cqe = &local_reg_cqe;
859 rwr[mri].wr.num_sge = 0;
860 rwr[mri].wr.send_flags = 0;
861 rwr[mri].mr = mr;
862 rwr[mri].key = mr->rkey;
863 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
864 IB_ACCESS_REMOTE_WRITE);
865 reg_wr = &rwr[mri].wr;
866 }
867
868 err = rtrs_srv_create_path_files(srv_path);
869 if (err)
870 goto iu_free;
871 kobject_get(&srv_path->kobj);
872 get_device(&srv_path->srv->dev);
873 err = rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED);
874 if (!err) {
875 rtrs_err(s, "rtrs_srv_change_state(), err: %d\n", err);
876 goto iu_free;
877 }
878
879 rtrs_srv_start_hb(srv_path);
880
881 /*
882 * We do not account number of established connections at the current
883 * moment, we rely on the client, which should send info request when
884 * all connections are successfully established. Thus, simply notify
885 * listener with a proper event if we are the first path.
886 */
887 err = rtrs_srv_path_up(srv_path);
888 if (err) {
889 rtrs_err(s, "rtrs_srv_path_up(), err: %d\n", err);
890 goto iu_free;
891 }
892
893 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
894 tx_iu->dma_addr,
895 tx_iu->size, DMA_TO_DEVICE);
896
897 /* Send info response */
898 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
899 if (err) {
900 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
901 iu_free:
902 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1);
903 }
904 rwr_free:
905 kfree(rwr);
906
907 return err;
908 }
909
rtrs_srv_info_req_done(struct ib_cq * cq,struct ib_wc * wc)910 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
911 {
912 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
913 struct rtrs_path *s = con->c.path;
914 struct rtrs_srv_path *srv_path = to_srv_path(s);
915 struct rtrs_msg_info_req *msg;
916 struct rtrs_iu *iu;
917 int err;
918
919 WARN_ON(con->c.cid);
920
921 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
922 if (wc->status != IB_WC_SUCCESS) {
923 rtrs_err(s, "Sess info request receive failed: %s\n",
924 ib_wc_status_msg(wc->status));
925 goto close;
926 }
927 WARN_ON(wc->opcode != IB_WC_RECV);
928
929 if (wc->byte_len < sizeof(*msg)) {
930 rtrs_err(s, "Sess info request is malformed: size %d\n",
931 wc->byte_len);
932 goto close;
933 }
934 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr,
935 iu->size, DMA_FROM_DEVICE);
936 msg = iu->buf;
937 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
938 rtrs_err(s, "Sess info request is malformed: type %d\n",
939 le16_to_cpu(msg->type));
940 goto close;
941 }
942 err = process_info_req(con, msg);
943 if (err)
944 goto close;
945
946 out:
947 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
948 return;
949 close:
950 close_path(srv_path);
951 goto out;
952 }
953
post_recv_info_req(struct rtrs_srv_con * con)954 static int post_recv_info_req(struct rtrs_srv_con *con)
955 {
956 struct rtrs_path *s = con->c.path;
957 struct rtrs_srv_path *srv_path = to_srv_path(s);
958 struct rtrs_iu *rx_iu;
959 int err;
960
961 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
962 GFP_KERNEL, srv_path->s.dev->ib_dev,
963 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
964 if (!rx_iu)
965 return -ENOMEM;
966 /* Prepare for getting info response */
967 err = rtrs_iu_post_recv(&con->c, rx_iu);
968 if (err) {
969 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
970 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1);
971 return err;
972 }
973
974 return 0;
975 }
976
post_recv_io(struct rtrs_srv_con * con,size_t q_size)977 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
978 {
979 int i, err;
980
981 for (i = 0; i < q_size; i++) {
982 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
983 if (err)
984 return err;
985 }
986
987 return 0;
988 }
989
post_recv_path(struct rtrs_srv_path * srv_path)990 static int post_recv_path(struct rtrs_srv_path *srv_path)
991 {
992 struct rtrs_srv *srv = srv_path->srv;
993 struct rtrs_path *s = &srv_path->s;
994 size_t q_size;
995 int err, cid;
996
997 for (cid = 0; cid < srv_path->s.con_num; cid++) {
998 if (cid == 0)
999 q_size = SERVICE_CON_QUEUE_DEPTH;
1000 else
1001 q_size = srv->queue_depth;
1002
1003 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size);
1004 if (err) {
1005 rtrs_err(s, "post_recv_io(), err: %d\n", err);
1006 return err;
1007 }
1008 }
1009
1010 return 0;
1011 }
1012
process_read(struct rtrs_srv_con * con,struct rtrs_msg_rdma_read * msg,u32 buf_id,u32 off)1013 static void process_read(struct rtrs_srv_con *con,
1014 struct rtrs_msg_rdma_read *msg,
1015 u32 buf_id, u32 off)
1016 {
1017 struct rtrs_path *s = con->c.path;
1018 struct rtrs_srv_path *srv_path = to_srv_path(s);
1019 struct rtrs_srv *srv = srv_path->srv;
1020 struct rtrs_srv_ctx *ctx = srv->ctx;
1021 struct rtrs_srv_op *id;
1022
1023 size_t usr_len, data_len;
1024 void *data;
1025 int ret;
1026
1027 if (srv_path->state != RTRS_SRV_CONNECTED) {
1028 rtrs_err_rl(s,
1029 "Processing read request failed, session is disconnected, sess state %s\n",
1030 rtrs_srv_state_str(srv_path->state));
1031 return;
1032 }
1033 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
1034 rtrs_err_rl(s,
1035 "Processing read request failed, invalid message\n");
1036 return;
1037 }
1038 rtrs_srv_get_ops_ids(srv_path);
1039 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ);
1040 id = srv_path->ops_ids[buf_id];
1041 id->con = con;
1042 id->dir = READ;
1043 id->msg_id = buf_id;
1044 id->rd_msg = msg;
1045 usr_len = le16_to_cpu(msg->usr_len);
1046 data_len = off - usr_len;
1047 data = page_address(srv->chunks[buf_id]);
1048 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len,
1049 data + data_len, usr_len);
1050
1051 if (ret) {
1052 rtrs_err_rl(s,
1053 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1054 buf_id, ret);
1055 goto send_err_msg;
1056 }
1057
1058 return;
1059
1060 send_err_msg:
1061 ret = send_io_resp_imm(con, id, ret);
1062 if (ret < 0) {
1063 rtrs_err_rl(s,
1064 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1065 buf_id, ret);
1066 close_path(srv_path);
1067 }
1068 rtrs_srv_put_ops_ids(srv_path);
1069 }
1070
process_write(struct rtrs_srv_con * con,struct rtrs_msg_rdma_write * req,u32 buf_id,u32 off)1071 static void process_write(struct rtrs_srv_con *con,
1072 struct rtrs_msg_rdma_write *req,
1073 u32 buf_id, u32 off)
1074 {
1075 struct rtrs_path *s = con->c.path;
1076 struct rtrs_srv_path *srv_path = to_srv_path(s);
1077 struct rtrs_srv *srv = srv_path->srv;
1078 struct rtrs_srv_ctx *ctx = srv->ctx;
1079 struct rtrs_srv_op *id;
1080
1081 size_t data_len, usr_len;
1082 void *data;
1083 int ret;
1084
1085 if (srv_path->state != RTRS_SRV_CONNECTED) {
1086 rtrs_err_rl(s,
1087 "Processing write request failed, session is disconnected, sess state %s\n",
1088 rtrs_srv_state_str(srv_path->state));
1089 return;
1090 }
1091 rtrs_srv_get_ops_ids(srv_path);
1092 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE);
1093 id = srv_path->ops_ids[buf_id];
1094 id->con = con;
1095 id->dir = WRITE;
1096 id->msg_id = buf_id;
1097
1098 usr_len = le16_to_cpu(req->usr_len);
1099 data_len = off - usr_len;
1100 data = page_address(srv->chunks[buf_id]);
1101 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len,
1102 data + data_len, usr_len);
1103 if (ret) {
1104 rtrs_err_rl(s,
1105 "Processing write request failed, user module callback reports err: %d\n",
1106 ret);
1107 goto send_err_msg;
1108 }
1109
1110 return;
1111
1112 send_err_msg:
1113 ret = send_io_resp_imm(con, id, ret);
1114 if (ret < 0) {
1115 rtrs_err_rl(s,
1116 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1117 buf_id, ret);
1118 close_path(srv_path);
1119 }
1120 rtrs_srv_put_ops_ids(srv_path);
1121 }
1122
process_io_req(struct rtrs_srv_con * con,void * msg,u32 id,u32 off)1123 static void process_io_req(struct rtrs_srv_con *con, void *msg,
1124 u32 id, u32 off)
1125 {
1126 struct rtrs_path *s = con->c.path;
1127 struct rtrs_srv_path *srv_path = to_srv_path(s);
1128 struct rtrs_msg_rdma_hdr *hdr;
1129 unsigned int type;
1130
1131 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev,
1132 srv_path->dma_addr[id],
1133 max_chunk_size, DMA_BIDIRECTIONAL);
1134 hdr = msg;
1135 type = le16_to_cpu(hdr->type);
1136
1137 switch (type) {
1138 case RTRS_MSG_WRITE:
1139 process_write(con, msg, id, off);
1140 break;
1141 case RTRS_MSG_READ:
1142 process_read(con, msg, id, off);
1143 break;
1144 default:
1145 rtrs_err(s,
1146 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1147 type);
1148 goto err;
1149 }
1150
1151 return;
1152
1153 err:
1154 close_path(srv_path);
1155 }
1156
rtrs_srv_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)1157 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1158 {
1159 struct rtrs_srv_mr *mr =
1160 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1161 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1162 struct rtrs_path *s = con->c.path;
1163 struct rtrs_srv_path *srv_path = to_srv_path(s);
1164 struct rtrs_srv *srv = srv_path->srv;
1165 u32 msg_id, off;
1166 void *data;
1167
1168 if (wc->status != IB_WC_SUCCESS) {
1169 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1170 ib_wc_status_msg(wc->status));
1171 close_path(srv_path);
1172 }
1173 msg_id = mr->msg_id;
1174 off = mr->msg_off;
1175 data = page_address(srv->chunks[msg_id]) + off;
1176 process_io_req(con, data, msg_id, off);
1177 }
1178
rtrs_srv_inv_rkey(struct rtrs_srv_con * con,struct rtrs_srv_mr * mr)1179 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1180 struct rtrs_srv_mr *mr)
1181 {
1182 struct ib_send_wr wr = {
1183 .opcode = IB_WR_LOCAL_INV,
1184 .wr_cqe = &mr->inv_cqe,
1185 .send_flags = IB_SEND_SIGNALED,
1186 .ex.invalidate_rkey = mr->mr->rkey,
1187 };
1188 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1189
1190 return ib_post_send(con->c.qp, &wr, NULL);
1191 }
1192
rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con * con)1193 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1194 {
1195 spin_lock(&con->rsp_wr_wait_lock);
1196 while (!list_empty(&con->rsp_wr_wait_list)) {
1197 struct rtrs_srv_op *id;
1198 int ret;
1199
1200 id = list_entry(con->rsp_wr_wait_list.next,
1201 struct rtrs_srv_op, wait_list);
1202 list_del(&id->wait_list);
1203
1204 spin_unlock(&con->rsp_wr_wait_lock);
1205 ret = rtrs_srv_resp_rdma(id, id->status);
1206 spin_lock(&con->rsp_wr_wait_lock);
1207
1208 if (!ret) {
1209 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1210 break;
1211 }
1212 }
1213 spin_unlock(&con->rsp_wr_wait_lock);
1214 }
1215
rtrs_srv_rdma_done(struct ib_cq * cq,struct ib_wc * wc)1216 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1217 {
1218 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1219 struct rtrs_path *s = con->c.path;
1220 struct rtrs_srv_path *srv_path = to_srv_path(s);
1221 struct rtrs_srv *srv = srv_path->srv;
1222 u32 imm_type, imm_payload;
1223 int err;
1224
1225 if (wc->status != IB_WC_SUCCESS) {
1226 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1227 rtrs_err(s,
1228 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1229 ib_wc_status_msg(wc->status), wc->wr_cqe,
1230 wc->opcode, wc->vendor_err, wc->byte_len);
1231 close_path(srv_path);
1232 }
1233 return;
1234 }
1235
1236 switch (wc->opcode) {
1237 case IB_WC_RECV_RDMA_WITH_IMM:
1238 /*
1239 * post_recv() RDMA write completions of IO reqs (read/write)
1240 * and hb
1241 */
1242 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1243 return;
1244 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1245 if (err) {
1246 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1247 close_path(srv_path);
1248 break;
1249 }
1250 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1251 &imm_type, &imm_payload);
1252 if (imm_type == RTRS_IO_REQ_IMM) {
1253 u32 msg_id, off;
1254 void *data;
1255
1256 msg_id = imm_payload >> srv_path->mem_bits;
1257 off = imm_payload & ((1 << srv_path->mem_bits) - 1);
1258 if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
1259 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1260 msg_id, off);
1261 close_path(srv_path);
1262 return;
1263 }
1264 if (always_invalidate) {
1265 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id];
1266
1267 mr->msg_off = off;
1268 mr->msg_id = msg_id;
1269 err = rtrs_srv_inv_rkey(con, mr);
1270 if (err) {
1271 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1272 err);
1273 close_path(srv_path);
1274 break;
1275 }
1276 } else {
1277 data = page_address(srv->chunks[msg_id]) + off;
1278 process_io_req(con, data, msg_id, off);
1279 }
1280 } else if (imm_type == RTRS_HB_MSG_IMM) {
1281 WARN_ON(con->c.cid);
1282 rtrs_send_hb_ack(&srv_path->s);
1283 } else if (imm_type == RTRS_HB_ACK_IMM) {
1284 WARN_ON(con->c.cid);
1285 srv_path->s.hb_missed_cnt = 0;
1286 } else {
1287 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1288 }
1289 break;
1290 case IB_WC_RDMA_WRITE:
1291 case IB_WC_SEND:
1292 /*
1293 * post_send() RDMA write completions of IO reqs (read/write)
1294 * and hb.
1295 */
1296 atomic_add(s->signal_interval, &con->c.sq_wr_avail);
1297
1298 if (!list_empty_careful(&con->rsp_wr_wait_list))
1299 rtrs_rdma_process_wr_wait_list(con);
1300
1301 break;
1302 default:
1303 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1304 return;
1305 }
1306 }
1307
1308 /**
1309 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname.
1310 * @srv: Session
1311 * @pathname: Pathname buffer
1312 * @len: Length of sessname buffer
1313 */
rtrs_srv_get_path_name(struct rtrs_srv * srv,char * pathname,size_t len)1314 int rtrs_srv_get_path_name(struct rtrs_srv *srv, char *pathname,
1315 size_t len)
1316 {
1317 struct rtrs_srv_path *srv_path;
1318 int err = -ENOTCONN;
1319
1320 mutex_lock(&srv->paths_mutex);
1321 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1322 if (srv_path->state != RTRS_SRV_CONNECTED)
1323 continue;
1324 strscpy(pathname, srv_path->s.sessname,
1325 min_t(size_t, sizeof(srv_path->s.sessname), len));
1326 err = 0;
1327 break;
1328 }
1329 mutex_unlock(&srv->paths_mutex);
1330
1331 return err;
1332 }
1333 EXPORT_SYMBOL(rtrs_srv_get_path_name);
1334
1335 /**
1336 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1337 * @srv: Session
1338 */
rtrs_srv_get_queue_depth(struct rtrs_srv * srv)1339 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
1340 {
1341 return srv->queue_depth;
1342 }
1343 EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1344
find_next_bit_ring(struct rtrs_srv_path * srv_path)1345 static int find_next_bit_ring(struct rtrs_srv_path *srv_path)
1346 {
1347 struct ib_device *ib_dev = srv_path->s.dev->ib_dev;
1348 int v;
1349
1350 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask);
1351 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1352 v = cpumask_first(&cq_affinity_mask);
1353 return v;
1354 }
1355
rtrs_srv_get_next_cq_vector(struct rtrs_srv_path * srv_path)1356 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path)
1357 {
1358 srv_path->cur_cq_vector = find_next_bit_ring(srv_path);
1359
1360 return srv_path->cur_cq_vector;
1361 }
1362
rtrs_srv_dev_release(struct device * dev)1363 static void rtrs_srv_dev_release(struct device *dev)
1364 {
1365 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev);
1366
1367 kfree(srv);
1368 }
1369
free_srv(struct rtrs_srv * srv)1370 static void free_srv(struct rtrs_srv *srv)
1371 {
1372 int i;
1373
1374 WARN_ON(refcount_read(&srv->refcount));
1375 for (i = 0; i < srv->queue_depth; i++)
1376 mempool_free(srv->chunks[i], chunk_pool);
1377 kfree(srv->chunks);
1378 mutex_destroy(&srv->paths_mutex);
1379 mutex_destroy(&srv->paths_ev_mutex);
1380 /* last put to release the srv structure */
1381 put_device(&srv->dev);
1382 }
1383
get_or_create_srv(struct rtrs_srv_ctx * ctx,const uuid_t * paths_uuid,bool first_conn)1384 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1385 const uuid_t *paths_uuid,
1386 bool first_conn)
1387 {
1388 struct rtrs_srv *srv;
1389 int i;
1390
1391 mutex_lock(&ctx->srv_mutex);
1392 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1393 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1394 refcount_inc_not_zero(&srv->refcount)) {
1395 mutex_unlock(&ctx->srv_mutex);
1396 return srv;
1397 }
1398 }
1399 mutex_unlock(&ctx->srv_mutex);
1400 /*
1401 * If this request is not the first connection request from the
1402 * client for this session then fail and return error.
1403 */
1404 if (!first_conn) {
1405 pr_err_ratelimited("Error: Not the first connection request for this session\n");
1406 return ERR_PTR(-ENXIO);
1407 }
1408
1409 /* need to allocate a new srv */
1410 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1411 if (!srv)
1412 return ERR_PTR(-ENOMEM);
1413
1414 INIT_LIST_HEAD(&srv->paths_list);
1415 mutex_init(&srv->paths_mutex);
1416 mutex_init(&srv->paths_ev_mutex);
1417 uuid_copy(&srv->paths_uuid, paths_uuid);
1418 srv->queue_depth = sess_queue_depth;
1419 srv->ctx = ctx;
1420 device_initialize(&srv->dev);
1421 srv->dev.release = rtrs_srv_dev_release;
1422
1423 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1424 GFP_KERNEL);
1425 if (!srv->chunks)
1426 goto err_free_srv;
1427
1428 for (i = 0; i < srv->queue_depth; i++) {
1429 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1430 if (!srv->chunks[i])
1431 goto err_free_chunks;
1432 }
1433 refcount_set(&srv->refcount, 1);
1434 mutex_lock(&ctx->srv_mutex);
1435 list_add(&srv->ctx_list, &ctx->srv_list);
1436 mutex_unlock(&ctx->srv_mutex);
1437
1438 return srv;
1439
1440 err_free_chunks:
1441 while (i--)
1442 mempool_free(srv->chunks[i], chunk_pool);
1443 kfree(srv->chunks);
1444
1445 err_free_srv:
1446 kfree(srv);
1447 return ERR_PTR(-ENOMEM);
1448 }
1449
put_srv(struct rtrs_srv * srv)1450 static void put_srv(struct rtrs_srv *srv)
1451 {
1452 if (refcount_dec_and_test(&srv->refcount)) {
1453 struct rtrs_srv_ctx *ctx = srv->ctx;
1454
1455 WARN_ON(srv->dev.kobj.state_in_sysfs);
1456
1457 mutex_lock(&ctx->srv_mutex);
1458 list_del(&srv->ctx_list);
1459 mutex_unlock(&ctx->srv_mutex);
1460 free_srv(srv);
1461 }
1462 }
1463
__add_path_to_srv(struct rtrs_srv * srv,struct rtrs_srv_path * srv_path)1464 static void __add_path_to_srv(struct rtrs_srv *srv,
1465 struct rtrs_srv_path *srv_path)
1466 {
1467 list_add_tail(&srv_path->s.entry, &srv->paths_list);
1468 srv->paths_num++;
1469 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1470 }
1471
del_path_from_srv(struct rtrs_srv_path * srv_path)1472 static void del_path_from_srv(struct rtrs_srv_path *srv_path)
1473 {
1474 struct rtrs_srv *srv = srv_path->srv;
1475
1476 if (WARN_ON(!srv))
1477 return;
1478
1479 mutex_lock(&srv->paths_mutex);
1480 list_del(&srv_path->s.entry);
1481 WARN_ON(!srv->paths_num);
1482 srv->paths_num--;
1483 mutex_unlock(&srv->paths_mutex);
1484 }
1485
1486 /* return true if addresses are the same, error other wise */
sockaddr_cmp(const struct sockaddr * a,const struct sockaddr * b)1487 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1488 {
1489 switch (a->sa_family) {
1490 case AF_IB:
1491 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1492 &((struct sockaddr_ib *)b)->sib_addr,
1493 sizeof(struct ib_addr)) &&
1494 (b->sa_family == AF_IB);
1495 case AF_INET:
1496 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1497 &((struct sockaddr_in *)b)->sin_addr,
1498 sizeof(struct in_addr)) &&
1499 (b->sa_family == AF_INET);
1500 case AF_INET6:
1501 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1502 &((struct sockaddr_in6 *)b)->sin6_addr,
1503 sizeof(struct in6_addr)) &&
1504 (b->sa_family == AF_INET6);
1505 default:
1506 return -ENOENT;
1507 }
1508 }
1509
__is_path_w_addr_exists(struct rtrs_srv * srv,struct rdma_addr * addr)1510 static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
1511 struct rdma_addr *addr)
1512 {
1513 struct rtrs_srv_path *srv_path;
1514
1515 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
1516 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr,
1517 (struct sockaddr *)&addr->dst_addr) &&
1518 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr,
1519 (struct sockaddr *)&addr->src_addr))
1520 return true;
1521
1522 return false;
1523 }
1524
free_path(struct rtrs_srv_path * srv_path)1525 static void free_path(struct rtrs_srv_path *srv_path)
1526 {
1527 if (srv_path->kobj.state_in_sysfs) {
1528 kobject_del(&srv_path->kobj);
1529 kobject_put(&srv_path->kobj);
1530 } else {
1531 kfree(srv_path->stats);
1532 kfree(srv_path);
1533 }
1534 }
1535
rtrs_srv_close_work(struct work_struct * work)1536 static void rtrs_srv_close_work(struct work_struct *work)
1537 {
1538 struct rtrs_srv_path *srv_path;
1539 struct rtrs_srv_con *con;
1540 int i;
1541
1542 srv_path = container_of(work, typeof(*srv_path), close_work);
1543
1544 rtrs_srv_stop_hb(srv_path);
1545
1546 for (i = 0; i < srv_path->s.con_num; i++) {
1547 if (!srv_path->s.con[i])
1548 continue;
1549 con = to_srv_con(srv_path->s.con[i]);
1550 rdma_disconnect(con->c.cm_id);
1551 ib_drain_qp(con->c.qp);
1552 }
1553
1554 /*
1555 * Degrade ref count to the usual model with a single shared
1556 * atomic_t counter
1557 */
1558 percpu_ref_kill(&srv_path->ids_inflight_ref);
1559
1560 /* Wait for all completion */
1561 wait_for_completion(&srv_path->complete_done);
1562
1563 rtrs_srv_destroy_path_files(srv_path);
1564
1565 /* Notify upper layer if we are the last path */
1566 rtrs_srv_path_down(srv_path);
1567
1568 unmap_cont_bufs(srv_path);
1569 rtrs_srv_free_ops_ids(srv_path);
1570
1571 for (i = 0; i < srv_path->s.con_num; i++) {
1572 if (!srv_path->s.con[i])
1573 continue;
1574 con = to_srv_con(srv_path->s.con[i]);
1575 rtrs_cq_qp_destroy(&con->c);
1576 rdma_destroy_id(con->c.cm_id);
1577 kfree(con);
1578 }
1579 rtrs_ib_dev_put(srv_path->s.dev);
1580
1581 del_path_from_srv(srv_path);
1582 put_srv(srv_path->srv);
1583 srv_path->srv = NULL;
1584 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED);
1585
1586 kfree(srv_path->dma_addr);
1587 kfree(srv_path->s.con);
1588 free_path(srv_path);
1589 }
1590
rtrs_rdma_do_accept(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id)1591 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path,
1592 struct rdma_cm_id *cm_id)
1593 {
1594 struct rtrs_srv *srv = srv_path->srv;
1595 struct rtrs_msg_conn_rsp msg;
1596 struct rdma_conn_param param;
1597 int err;
1598
1599 param = (struct rdma_conn_param) {
1600 .rnr_retry_count = 7,
1601 .private_data = &msg,
1602 .private_data_len = sizeof(msg),
1603 };
1604
1605 msg = (struct rtrs_msg_conn_rsp) {
1606 .magic = cpu_to_le16(RTRS_MAGIC),
1607 .version = cpu_to_le16(RTRS_PROTO_VER),
1608 .queue_depth = cpu_to_le16(srv->queue_depth),
1609 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1610 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1611 };
1612
1613 if (always_invalidate)
1614 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1615
1616 err = rdma_accept(cm_id, ¶m);
1617 if (err)
1618 pr_err("rdma_accept(), err: %d\n", err);
1619
1620 return err;
1621 }
1622
rtrs_rdma_do_reject(struct rdma_cm_id * cm_id,int errno)1623 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1624 {
1625 struct rtrs_msg_conn_rsp msg;
1626 int err;
1627
1628 msg = (struct rtrs_msg_conn_rsp) {
1629 .magic = cpu_to_le16(RTRS_MAGIC),
1630 .version = cpu_to_le16(RTRS_PROTO_VER),
1631 .errno = cpu_to_le16(errno),
1632 };
1633
1634 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1635 if (err)
1636 pr_err("rdma_reject(), err: %d\n", err);
1637
1638 /* Bounce errno back */
1639 return errno;
1640 }
1641
1642 static struct rtrs_srv_path *
__find_path(struct rtrs_srv * srv,const uuid_t * sess_uuid)1643 __find_path(struct rtrs_srv *srv, const uuid_t *sess_uuid)
1644 {
1645 struct rtrs_srv_path *srv_path;
1646
1647 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1648 if (uuid_equal(&srv_path->s.uuid, sess_uuid))
1649 return srv_path;
1650 }
1651
1652 return NULL;
1653 }
1654
create_con(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id,unsigned int cid)1655 static int create_con(struct rtrs_srv_path *srv_path,
1656 struct rdma_cm_id *cm_id,
1657 unsigned int cid)
1658 {
1659 struct rtrs_srv *srv = srv_path->srv;
1660 struct rtrs_path *s = &srv_path->s;
1661 struct rtrs_srv_con *con;
1662
1663 u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1664 int err, cq_vector;
1665
1666 con = kzalloc(sizeof(*con), GFP_KERNEL);
1667 if (!con) {
1668 err = -ENOMEM;
1669 goto err;
1670 }
1671
1672 spin_lock_init(&con->rsp_wr_wait_lock);
1673 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1674 con->c.cm_id = cm_id;
1675 con->c.path = &srv_path->s;
1676 con->c.cid = cid;
1677 atomic_set(&con->c.wr_cnt, 1);
1678 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr;
1679
1680 if (con->c.cid == 0) {
1681 /*
1682 * All receive and all send (each requiring invalidate)
1683 * + 2 for drain and heartbeat
1684 */
1685 max_send_wr = min_t(int, wr_limit,
1686 SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1687 max_recv_wr = max_send_wr;
1688 s->signal_interval = min_not_zero(srv->queue_depth,
1689 (size_t)SERVICE_CON_QUEUE_DEPTH);
1690 } else {
1691 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1692 if (always_invalidate)
1693 max_send_wr =
1694 min_t(int, wr_limit,
1695 srv->queue_depth * (1 + 4) + 1);
1696 else
1697 max_send_wr =
1698 min_t(int, wr_limit,
1699 srv->queue_depth * (1 + 2) + 1);
1700
1701 max_recv_wr = srv->queue_depth + 1;
1702 /*
1703 * If we have all receive requests posted and
1704 * all write requests posted and each read request
1705 * requires an invalidate request + drain
1706 * and qp gets into error state.
1707 */
1708 }
1709 cq_num = max_send_wr + max_recv_wr;
1710 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1711 cq_vector = rtrs_srv_get_next_cq_vector(srv_path);
1712
1713 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1714 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num,
1715 max_send_wr, max_recv_wr,
1716 IB_POLL_WORKQUEUE);
1717 if (err) {
1718 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1719 goto free_con;
1720 }
1721 if (con->c.cid == 0) {
1722 err = post_recv_info_req(con);
1723 if (err)
1724 goto free_cqqp;
1725 }
1726 WARN_ON(srv_path->s.con[cid]);
1727 srv_path->s.con[cid] = &con->c;
1728
1729 /*
1730 * Change context from server to current connection. The other
1731 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1732 */
1733 cm_id->context = &con->c;
1734
1735 return 0;
1736
1737 free_cqqp:
1738 rtrs_cq_qp_destroy(&con->c);
1739 free_con:
1740 kfree(con);
1741
1742 err:
1743 return err;
1744 }
1745
__alloc_path(struct rtrs_srv * srv,struct rdma_cm_id * cm_id,unsigned int con_num,unsigned int recon_cnt,const uuid_t * uuid)1746 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv *srv,
1747 struct rdma_cm_id *cm_id,
1748 unsigned int con_num,
1749 unsigned int recon_cnt,
1750 const uuid_t *uuid)
1751 {
1752 struct rtrs_srv_path *srv_path;
1753 int err = -ENOMEM;
1754 char str[NAME_MAX];
1755 struct rtrs_addr path;
1756
1757 if (srv->paths_num >= MAX_PATHS_NUM) {
1758 err = -ECONNRESET;
1759 goto err;
1760 }
1761 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1762 err = -EEXIST;
1763 pr_err("Path with same addr exists\n");
1764 goto err;
1765 }
1766 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL);
1767 if (!srv_path)
1768 goto err;
1769
1770 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL);
1771 if (!srv_path->stats)
1772 goto err_free_sess;
1773
1774 srv_path->stats->srv_path = srv_path;
1775
1776 srv_path->dma_addr = kcalloc(srv->queue_depth,
1777 sizeof(*srv_path->dma_addr),
1778 GFP_KERNEL);
1779 if (!srv_path->dma_addr)
1780 goto err_free_stats;
1781
1782 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con),
1783 GFP_KERNEL);
1784 if (!srv_path->s.con)
1785 goto err_free_dma_addr;
1786
1787 srv_path->state = RTRS_SRV_CONNECTING;
1788 srv_path->srv = srv;
1789 srv_path->cur_cq_vector = -1;
1790 srv_path->s.dst_addr = cm_id->route.addr.dst_addr;
1791 srv_path->s.src_addr = cm_id->route.addr.src_addr;
1792
1793 /* temporary until receiving session-name from client */
1794 path.src = &srv_path->s.src_addr;
1795 path.dst = &srv_path->s.dst_addr;
1796 rtrs_addr_to_str(&path, str, sizeof(str));
1797 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname));
1798
1799 srv_path->s.con_num = con_num;
1800 srv_path->s.irq_con_num = con_num;
1801 srv_path->s.recon_cnt = recon_cnt;
1802 uuid_copy(&srv_path->s.uuid, uuid);
1803 spin_lock_init(&srv_path->state_lock);
1804 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work);
1805 rtrs_srv_init_hb(srv_path);
1806
1807 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1808 if (!srv_path->s.dev) {
1809 err = -ENOMEM;
1810 goto err_free_con;
1811 }
1812 err = map_cont_bufs(srv_path);
1813 if (err)
1814 goto err_put_dev;
1815
1816 err = rtrs_srv_alloc_ops_ids(srv_path);
1817 if (err)
1818 goto err_unmap_bufs;
1819
1820 __add_path_to_srv(srv, srv_path);
1821
1822 return srv_path;
1823
1824 err_unmap_bufs:
1825 unmap_cont_bufs(srv_path);
1826 err_put_dev:
1827 rtrs_ib_dev_put(srv_path->s.dev);
1828 err_free_con:
1829 kfree(srv_path->s.con);
1830 err_free_dma_addr:
1831 kfree(srv_path->dma_addr);
1832 err_free_stats:
1833 kfree(srv_path->stats);
1834 err_free_sess:
1835 kfree(srv_path);
1836 err:
1837 return ERR_PTR(err);
1838 }
1839
rtrs_rdma_connect(struct rdma_cm_id * cm_id,const struct rtrs_msg_conn_req * msg,size_t len)1840 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1841 const struct rtrs_msg_conn_req *msg,
1842 size_t len)
1843 {
1844 struct rtrs_srv_ctx *ctx = cm_id->context;
1845 struct rtrs_srv_path *srv_path;
1846 struct rtrs_srv *srv;
1847
1848 u16 version, con_num, cid;
1849 u16 recon_cnt;
1850 int err = -ECONNRESET;
1851
1852 if (len < sizeof(*msg)) {
1853 pr_err("Invalid RTRS connection request\n");
1854 goto reject_w_err;
1855 }
1856 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1857 pr_err("Invalid RTRS magic\n");
1858 goto reject_w_err;
1859 }
1860 version = le16_to_cpu(msg->version);
1861 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1862 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1863 version >> 8, RTRS_PROTO_VER_MAJOR);
1864 goto reject_w_err;
1865 }
1866 con_num = le16_to_cpu(msg->cid_num);
1867 if (con_num > 4096) {
1868 /* Sanity check */
1869 pr_err("Too many connections requested: %d\n", con_num);
1870 goto reject_w_err;
1871 }
1872 cid = le16_to_cpu(msg->cid);
1873 if (cid >= con_num) {
1874 /* Sanity check */
1875 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1876 goto reject_w_err;
1877 }
1878 recon_cnt = le16_to_cpu(msg->recon_cnt);
1879 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1880 if (IS_ERR(srv)) {
1881 err = PTR_ERR(srv);
1882 pr_err("get_or_create_srv(), error %d\n", err);
1883 goto reject_w_err;
1884 }
1885 mutex_lock(&srv->paths_mutex);
1886 srv_path = __find_path(srv, &msg->sess_uuid);
1887 if (srv_path) {
1888 struct rtrs_path *s = &srv_path->s;
1889
1890 /* Session already holds a reference */
1891 put_srv(srv);
1892
1893 if (srv_path->state != RTRS_SRV_CONNECTING) {
1894 rtrs_err(s, "Session in wrong state: %s\n",
1895 rtrs_srv_state_str(srv_path->state));
1896 mutex_unlock(&srv->paths_mutex);
1897 goto reject_w_err;
1898 }
1899 /*
1900 * Sanity checks
1901 */
1902 if (con_num != s->con_num || cid >= s->con_num) {
1903 rtrs_err(s, "Incorrect request: %d, %d\n",
1904 cid, con_num);
1905 mutex_unlock(&srv->paths_mutex);
1906 goto reject_w_err;
1907 }
1908 if (s->con[cid]) {
1909 rtrs_err(s, "Connection already exists: %d\n",
1910 cid);
1911 mutex_unlock(&srv->paths_mutex);
1912 goto reject_w_err;
1913 }
1914 } else {
1915 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt,
1916 &msg->sess_uuid);
1917 if (IS_ERR(srv_path)) {
1918 mutex_unlock(&srv->paths_mutex);
1919 put_srv(srv);
1920 err = PTR_ERR(srv_path);
1921 pr_err("RTRS server session allocation failed: %d\n", err);
1922 goto reject_w_err;
1923 }
1924 }
1925 err = create_con(srv_path, cm_id, cid);
1926 if (err) {
1927 rtrs_err((&srv_path->s), "create_con(), error %d\n", err);
1928 rtrs_rdma_do_reject(cm_id, err);
1929 /*
1930 * Since session has other connections we follow normal way
1931 * through workqueue, but still return an error to tell cma.c
1932 * to call rdma_destroy_id() for current connection.
1933 */
1934 goto close_and_return_err;
1935 }
1936 err = rtrs_rdma_do_accept(srv_path, cm_id);
1937 if (err) {
1938 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err);
1939 rtrs_rdma_do_reject(cm_id, err);
1940 /*
1941 * Since current connection was successfully added to the
1942 * session we follow normal way through workqueue to close the
1943 * session, thus return 0 to tell cma.c we call
1944 * rdma_destroy_id() ourselves.
1945 */
1946 err = 0;
1947 goto close_and_return_err;
1948 }
1949 mutex_unlock(&srv->paths_mutex);
1950
1951 return 0;
1952
1953 reject_w_err:
1954 return rtrs_rdma_do_reject(cm_id, err);
1955
1956 close_and_return_err:
1957 mutex_unlock(&srv->paths_mutex);
1958 close_path(srv_path);
1959
1960 return err;
1961 }
1962
rtrs_srv_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1963 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1964 struct rdma_cm_event *ev)
1965 {
1966 struct rtrs_srv_path *srv_path = NULL;
1967 struct rtrs_path *s = NULL;
1968
1969 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1970 struct rtrs_con *c = cm_id->context;
1971
1972 s = c->path;
1973 srv_path = to_srv_path(s);
1974 }
1975
1976 switch (ev->event) {
1977 case RDMA_CM_EVENT_CONNECT_REQUEST:
1978 /*
1979 * In case of error cma.c will destroy cm_id,
1980 * see cma_process_remove()
1981 */
1982 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1983 ev->param.conn.private_data_len);
1984 case RDMA_CM_EVENT_ESTABLISHED:
1985 /* Nothing here */
1986 break;
1987 case RDMA_CM_EVENT_REJECTED:
1988 case RDMA_CM_EVENT_CONNECT_ERROR:
1989 case RDMA_CM_EVENT_UNREACHABLE:
1990 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1991 rdma_event_msg(ev->event), ev->status);
1992 fallthrough;
1993 case RDMA_CM_EVENT_DISCONNECTED:
1994 case RDMA_CM_EVENT_ADDR_CHANGE:
1995 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1996 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1997 close_path(srv_path);
1998 break;
1999 default:
2000 pr_err("Ignoring unexpected CM event %s, err %d\n",
2001 rdma_event_msg(ev->event), ev->status);
2002 break;
2003 }
2004
2005 return 0;
2006 }
2007
rtrs_srv_cm_init(struct rtrs_srv_ctx * ctx,struct sockaddr * addr,enum rdma_ucm_port_space ps)2008 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
2009 struct sockaddr *addr,
2010 enum rdma_ucm_port_space ps)
2011 {
2012 struct rdma_cm_id *cm_id;
2013 int ret;
2014
2015 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
2016 ctx, ps, IB_QPT_RC);
2017 if (IS_ERR(cm_id)) {
2018 ret = PTR_ERR(cm_id);
2019 pr_err("Creating id for RDMA connection failed, err: %d\n",
2020 ret);
2021 goto err_out;
2022 }
2023 ret = rdma_bind_addr(cm_id, addr);
2024 if (ret) {
2025 pr_err("Binding RDMA address failed, err: %d\n", ret);
2026 goto err_cm;
2027 }
2028 ret = rdma_listen(cm_id, 64);
2029 if (ret) {
2030 pr_err("Listening on RDMA connection failed, err: %d\n",
2031 ret);
2032 goto err_cm;
2033 }
2034
2035 return cm_id;
2036
2037 err_cm:
2038 rdma_destroy_id(cm_id);
2039 err_out:
2040
2041 return ERR_PTR(ret);
2042 }
2043
rtrs_srv_rdma_init(struct rtrs_srv_ctx * ctx,u16 port)2044 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2045 {
2046 struct sockaddr_in6 sin = {
2047 .sin6_family = AF_INET6,
2048 .sin6_addr = IN6ADDR_ANY_INIT,
2049 .sin6_port = htons(port),
2050 };
2051 struct sockaddr_ib sib = {
2052 .sib_family = AF_IB,
2053 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2054 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
2055 .sib_pkey = cpu_to_be16(0xffff),
2056 };
2057 struct rdma_cm_id *cm_ip, *cm_ib;
2058 int ret;
2059
2060 /*
2061 * We accept both IPoIB and IB connections, so we need to keep
2062 * two cm id's, one for each socket type and port space.
2063 * If the cm initialization of one of the id's fails, we abort
2064 * everything.
2065 */
2066 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2067 if (IS_ERR(cm_ip))
2068 return PTR_ERR(cm_ip);
2069
2070 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2071 if (IS_ERR(cm_ib)) {
2072 ret = PTR_ERR(cm_ib);
2073 goto free_cm_ip;
2074 }
2075
2076 ctx->cm_id_ip = cm_ip;
2077 ctx->cm_id_ib = cm_ib;
2078
2079 return 0;
2080
2081 free_cm_ip:
2082 rdma_destroy_id(cm_ip);
2083
2084 return ret;
2085 }
2086
alloc_srv_ctx(struct rtrs_srv_ops * ops)2087 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2088 {
2089 struct rtrs_srv_ctx *ctx;
2090
2091 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2092 if (!ctx)
2093 return NULL;
2094
2095 ctx->ops = *ops;
2096 mutex_init(&ctx->srv_mutex);
2097 INIT_LIST_HEAD(&ctx->srv_list);
2098
2099 return ctx;
2100 }
2101
free_srv_ctx(struct rtrs_srv_ctx * ctx)2102 static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2103 {
2104 WARN_ON(!list_empty(&ctx->srv_list));
2105 mutex_destroy(&ctx->srv_mutex);
2106 kfree(ctx);
2107 }
2108
rtrs_srv_add_one(struct ib_device * device)2109 static int rtrs_srv_add_one(struct ib_device *device)
2110 {
2111 struct rtrs_srv_ctx *ctx;
2112 int ret = 0;
2113
2114 mutex_lock(&ib_ctx.ib_dev_mutex);
2115 if (ib_ctx.ib_dev_count)
2116 goto out;
2117
2118 /*
2119 * Since our CM IDs are NOT bound to any ib device we will create them
2120 * only once
2121 */
2122 ctx = ib_ctx.srv_ctx;
2123 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2124 if (ret) {
2125 /*
2126 * We errored out here.
2127 * According to the ib code, if we encounter an error here then the
2128 * error code is ignored, and no more calls to our ops are made.
2129 */
2130 pr_err("Failed to initialize RDMA connection");
2131 goto err_out;
2132 }
2133
2134 out:
2135 /*
2136 * Keep a track on the number of ib devices added
2137 */
2138 ib_ctx.ib_dev_count++;
2139
2140 err_out:
2141 mutex_unlock(&ib_ctx.ib_dev_mutex);
2142 return ret;
2143 }
2144
rtrs_srv_remove_one(struct ib_device * device,void * client_data)2145 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2146 {
2147 struct rtrs_srv_ctx *ctx;
2148
2149 mutex_lock(&ib_ctx.ib_dev_mutex);
2150 ib_ctx.ib_dev_count--;
2151
2152 if (ib_ctx.ib_dev_count)
2153 goto out;
2154
2155 /*
2156 * Since our CM IDs are NOT bound to any ib device we will remove them
2157 * only once, when the last device is removed
2158 */
2159 ctx = ib_ctx.srv_ctx;
2160 rdma_destroy_id(ctx->cm_id_ip);
2161 rdma_destroy_id(ctx->cm_id_ib);
2162
2163 out:
2164 mutex_unlock(&ib_ctx.ib_dev_mutex);
2165 }
2166
2167 static struct ib_client rtrs_srv_client = {
2168 .name = "rtrs_server",
2169 .add = rtrs_srv_add_one,
2170 .remove = rtrs_srv_remove_one
2171 };
2172
2173 /**
2174 * rtrs_srv_open() - open RTRS server context
2175 * @ops: callback functions
2176 * @port: port to listen on
2177 *
2178 * Creates server context with specified callbacks.
2179 *
2180 * Return a valid pointer on success otherwise PTR_ERR.
2181 */
rtrs_srv_open(struct rtrs_srv_ops * ops,u16 port)2182 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2183 {
2184 struct rtrs_srv_ctx *ctx;
2185 int err;
2186
2187 ctx = alloc_srv_ctx(ops);
2188 if (!ctx)
2189 return ERR_PTR(-ENOMEM);
2190
2191 mutex_init(&ib_ctx.ib_dev_mutex);
2192 ib_ctx.srv_ctx = ctx;
2193 ib_ctx.port = port;
2194
2195 err = ib_register_client(&rtrs_srv_client);
2196 if (err) {
2197 free_srv_ctx(ctx);
2198 return ERR_PTR(err);
2199 }
2200
2201 return ctx;
2202 }
2203 EXPORT_SYMBOL(rtrs_srv_open);
2204
close_paths(struct rtrs_srv * srv)2205 static void close_paths(struct rtrs_srv *srv)
2206 {
2207 struct rtrs_srv_path *srv_path;
2208
2209 mutex_lock(&srv->paths_mutex);
2210 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
2211 close_path(srv_path);
2212 mutex_unlock(&srv->paths_mutex);
2213 }
2214
close_ctx(struct rtrs_srv_ctx * ctx)2215 static void close_ctx(struct rtrs_srv_ctx *ctx)
2216 {
2217 struct rtrs_srv *srv;
2218
2219 mutex_lock(&ctx->srv_mutex);
2220 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2221 close_paths(srv);
2222 mutex_unlock(&ctx->srv_mutex);
2223 flush_workqueue(rtrs_wq);
2224 }
2225
2226 /**
2227 * rtrs_srv_close() - close RTRS server context
2228 * @ctx: pointer to server context
2229 *
2230 * Closes RTRS server context with all client sessions.
2231 */
rtrs_srv_close(struct rtrs_srv_ctx * ctx)2232 void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2233 {
2234 ib_unregister_client(&rtrs_srv_client);
2235 mutex_destroy(&ib_ctx.ib_dev_mutex);
2236 close_ctx(ctx);
2237 free_srv_ctx(ctx);
2238 }
2239 EXPORT_SYMBOL(rtrs_srv_close);
2240
check_module_params(void)2241 static int check_module_params(void)
2242 {
2243 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2244 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2245 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2246 return -EINVAL;
2247 }
2248 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2249 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2250 max_chunk_size, MIN_CHUNK_SIZE);
2251 return -EINVAL;
2252 }
2253
2254 /*
2255 * Check if IB immediate data size is enough to hold the mem_id and the
2256 * offset inside the memory chunk
2257 */
2258 if ((ilog2(sess_queue_depth - 1) + 1) +
2259 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2260 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2261 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2262 return -EINVAL;
2263 }
2264
2265 return 0;
2266 }
2267
rtrs_server_init(void)2268 static int __init rtrs_server_init(void)
2269 {
2270 int err;
2271
2272 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2273 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2274 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2275 sess_queue_depth, always_invalidate);
2276
2277 rtrs_rdma_dev_pd_init(0, &dev_pd);
2278
2279 err = check_module_params();
2280 if (err) {
2281 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2282 err);
2283 return err;
2284 }
2285 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2286 get_order(max_chunk_size));
2287 if (!chunk_pool)
2288 return -ENOMEM;
2289 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2290 if (IS_ERR(rtrs_dev_class)) {
2291 err = PTR_ERR(rtrs_dev_class);
2292 goto out_chunk_pool;
2293 }
2294 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2295 if (!rtrs_wq) {
2296 err = -ENOMEM;
2297 goto out_dev_class;
2298 }
2299
2300 return 0;
2301
2302 out_dev_class:
2303 class_destroy(rtrs_dev_class);
2304 out_chunk_pool:
2305 mempool_destroy(chunk_pool);
2306
2307 return err;
2308 }
2309
rtrs_server_exit(void)2310 static void __exit rtrs_server_exit(void)
2311 {
2312 destroy_workqueue(rtrs_wq);
2313 class_destroy(rtrs_dev_class);
2314 mempool_destroy(chunk_pool);
2315 rtrs_rdma_dev_pd_deinit(&dev_pd);
2316 }
2317
2318 module_init(rtrs_server_init);
2319 module_exit(rtrs_server_exit);
2320