1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * fs/hmdfs/comm/socket_adapter.c
4 *
5 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6 */
7
8 #include "socket_adapter.h"
9
10 #include <linux/file.h>
11 #include <linux/module.h>
12 #include <linux/namei.h>
13 #include <linux/net.h>
14 #include <linux/pagemap.h>
15 #include <net/sock.h>
16
17 #include "authority/authentication.h"
18 #include "comm/device_node.h"
19 #include "hmdfs_client.h"
20 #include "hmdfs_server.h"
21 #include "hmdfs_trace.h"
22 #include "message_verify.h"
23
24 #define ACQUIRE_WFIRED_INTVAL_USEC_MIN 10
25 #define ACQUIRE_WFIRED_INTVAL_USEC_MAX 30
26
27 typedef void (*request_callback)(struct hmdfs_peer *, struct hmdfs_head_cmd *,
28 void *);
29 typedef void (*response_callback)(struct hmdfs_peer *,
30 struct sendmsg_wait_queue *, void *, size_t);
31
32 static const request_callback s_recv_callbacks[F_SIZE] = {
33 [F_OPEN] = hmdfs_server_open,
34 [F_READPAGE] = hmdfs_server_readpage,
35 [F_RELEASE] = hmdfs_server_release,
36 [F_WRITEPAGE] = hmdfs_server_writepage,
37 [F_ITERATE] = hmdfs_server_readdir,
38 [F_MKDIR] = hmdfs_server_mkdir,
39 [F_CREATE] = hmdfs_server_create,
40 [F_RMDIR] = hmdfs_server_rmdir,
41 [F_UNLINK] = hmdfs_server_unlink,
42 [F_RENAME] = hmdfs_server_rename,
43 [F_SETATTR] = hmdfs_server_setattr,
44 [F_STATFS] = hmdfs_server_statfs,
45 [F_DROP_PUSH] = hmdfs_server_get_drop_push,
46 [F_GETATTR] = hmdfs_server_getattr,
47 [F_FSYNC] = hmdfs_server_fsync,
48 [F_SYNCFS] = hmdfs_server_syncfs,
49 [F_GETXATTR] = hmdfs_server_getxattr,
50 [F_SETXATTR] = hmdfs_server_setxattr,
51 [F_LISTXATTR] = hmdfs_server_listxattr,
52 [F_READPAGES] = hmdfs_server_readpages,
53 [F_READPAGES_OPEN] = hmdfs_server_readpages_open,
54 [F_ATOMIC_OPEN] = hmdfs_server_atomic_open,
55 };
56
57 typedef void (*file_request_callback)(struct hmdfs_peer *,
58 struct hmdfs_send_command *);
59
60 struct async_req_callbacks {
61 void (*on_wakeup)(struct hmdfs_peer *peer, const struct hmdfs_req *req,
62 const struct hmdfs_resp *resp);
63 };
64
65 static const struct async_req_callbacks g_async_req_callbacks[F_SIZE] = {
66 [F_SYNCFS] = { .on_wakeup = hmdfs_recv_syncfs_cb },
67 [F_WRITEPAGE] = { .on_wakeup = hmdfs_writepage_cb },
68 };
69
msg_release(struct kref * kref)70 static void msg_release(struct kref *kref)
71 {
72 struct sendmsg_wait_queue *msg_wq;
73 struct hmdfs_peer *con;
74
75 msg_wq = (struct sendmsg_wait_queue *)container_of(kref,
76 struct hmdfs_msg_idr_head, ref);
77 con = msg_wq->head.peer;
78 idr_remove(&con->msg_idr, msg_wq->head.msg_id);
79 spin_unlock(&con->idr_lock);
80
81 kfree(msg_wq->buf);
82 if (msg_wq->recv_info.local_filp)
83 fput(msg_wq->recv_info.local_filp);
84 kfree(msg_wq);
85 }
86
87 // Always remember to find before put, and make sure con is avilable
msg_put(struct sendmsg_wait_queue * msg_wq)88 void msg_put(struct sendmsg_wait_queue *msg_wq)
89 {
90 kref_put_lock(&msg_wq->head.ref, msg_release,
91 &msg_wq->head.peer->idr_lock);
92 }
93
recv_info_init(struct file_recv_info * recv_info)94 static void recv_info_init(struct file_recv_info *recv_info)
95 {
96 memset(recv_info, 0, sizeof(struct file_recv_info));
97 atomic_set(&recv_info->local_fslices, 0);
98 atomic_set(&recv_info->state, FILE_RECV_PROCESS);
99 }
100
msg_init(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)101 static int msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq)
102 {
103 int ret = 0;
104 struct file_recv_info *recv_info = &msg_wq->recv_info;
105
106 ret = hmdfs_alloc_msg_idr(con, MSG_IDR_MESSAGE_SYNC, msg_wq);
107 if (unlikely(ret))
108 return ret;
109
110 atomic_set(&msg_wq->valid, MSG_Q_SEND);
111 init_waitqueue_head(&msg_wq->response_q);
112 recv_info_init(recv_info);
113 msg_wq->start = jiffies;
114 return 0;
115 }
116
statistic_con_sb_dirty(struct hmdfs_peer * con,const struct hmdfs_cmd * op)117 static inline void statistic_con_sb_dirty(struct hmdfs_peer *con,
118 const struct hmdfs_cmd *op)
119 {
120 if (op->command == F_WRITEPAGE && op->cmd_flag == C_REQUEST)
121 atomic64_inc(&con->sb_dirty_count);
122 }
123
hmdfs_sendmessage(struct hmdfs_peer * node,struct hmdfs_send_data * msg)124 int hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)
125 {
126 int ret = 0;
127 struct connection *connect = NULL;
128 struct tcp_handle *tcp = NULL;
129 struct hmdfs_head_cmd *head = msg->head;
130 const struct cred *old_cred;
131
132 if (!node) {
133 hmdfs_err("node NULL when send cmd %d",
134 head->operations.command);
135 ret = -EAGAIN;
136 goto out_err;
137 } else if (node->status != NODE_STAT_ONLINE) {
138 hmdfs_err("device %llu OFFLINE %d when send cmd %d",
139 node->device_id, node->status,
140 head->operations.command);
141 ret = -EAGAIN;
142 goto out;
143 }
144
145 old_cred = hmdfs_override_creds(node->sbi->system_cred);
146
147 do {
148 connect = get_conn_impl(node, CONNECT_TYPE_TCP);
149 if (!connect) {
150 hmdfs_info_ratelimited(
151 "device %llu no connection available when send cmd %d, get new session",
152 node->device_id, head->operations.command);
153 if (node->status != NODE_STAT_OFFLINE) {
154 struct notify_param param;
155
156 memcpy(param.remote_cid, node->cid,
157 HMDFS_CID_SIZE);
158 param.notify = NOTIFY_OFFLINE;
159 param.fd = INVALID_SOCKET_FD;
160 notify(node, ¶m);
161 }
162 ret = -EAGAIN;
163 goto revert_cred;
164 }
165
166 ret = connect->send_message(connect, msg);
167 if (ret == -ESHUTDOWN) {
168 hmdfs_info("device %llu send cmd %d message fail, connection stop",
169 node->device_id, head->operations.command);
170 connect->status = CONNECT_STAT_STOP;
171 tcp = connect->connect_handle;
172 if (node->status != NODE_STAT_OFFLINE) {
173 connection_get(connect);
174 if (!queue_work(node->reget_conn_wq,
175 &connect->reget_work))
176 connection_put(connect);
177 }
178 connection_put(connect);
179 /*
180 * node->status is OFFLINE can not ensure
181 * node_seq will be increased before
182 * hmdfs_sendmessage() returns.
183 */
184 hmdfs_node_inc_evt_seq(node);
185 } else {
186 connection_put(connect);
187 goto revert_cred;
188 }
189 } while (node->status != NODE_STAT_OFFLINE);
190 revert_cred:
191 hmdfs_revert_creds(old_cred);
192
193 if (!ret)
194 statistic_con_sb_dirty(node, &head->operations);
195 out:
196 if (node->version == DFS_2_0 &&
197 head->operations.cmd_flag == C_REQUEST)
198 hmdfs_client_snd_statis(node->sbi,
199 head->operations.command, ret);
200 else if (node->version == DFS_2_0 &&
201 head->operations.cmd_flag == C_RESPONSE)
202 hmdfs_server_snd_statis(node->sbi,
203 head->operations.command, ret);
204 out_err:
205 return ret;
206 }
207
hmdfs_sendmessage_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,__u32 data_len,void * buf,__u32 ret_code)208 int hmdfs_sendmessage_response(struct hmdfs_peer *con,
209 struct hmdfs_head_cmd *cmd, __u32 data_len,
210 void *buf, __u32 ret_code)
211 {
212 int ret;
213 struct hmdfs_send_data msg;
214 struct hmdfs_head_cmd head;
215
216 head.magic = HMDFS_MSG_MAGIC;
217 head.version = DFS_2_0;
218 head.operations = cmd->operations;
219 head.operations.cmd_flag = C_RESPONSE;
220 head.data_len = cpu_to_le32(data_len + sizeof(struct hmdfs_head_cmd));
221 head.ret_code = cpu_to_le32(ret_code);
222 head.msg_id = cmd->msg_id;
223 head.reserved = cmd->reserved;
224 head.reserved1 = cmd->reserved1;
225 msg.head = &head;
226 msg.head_len = sizeof(struct hmdfs_head_cmd);
227 msg.data = buf;
228 msg.len = data_len;
229 msg.sdesc = NULL;
230 msg.sdesc_len = 0;
231
232 ret = hmdfs_sendmessage(con, &msg);
233 return ret;
234 }
235
mp_release(struct kref * kref)236 static void mp_release(struct kref *kref)
237 {
238 struct hmdfs_msg_parasite *mp = NULL;
239 struct hmdfs_peer *peer = NULL;
240
241 mp = (struct hmdfs_msg_parasite *)container_of(kref,
242 struct hmdfs_msg_idr_head, ref);
243 peer = mp->head.peer;
244 idr_remove(&peer->msg_idr, mp->head.msg_id);
245 spin_unlock(&peer->idr_lock);
246
247 peer_put(peer);
248 kfree(mp->resp.out_buf);
249 kfree(mp);
250 }
251
mp_put(struct hmdfs_msg_parasite * mp)252 void mp_put(struct hmdfs_msg_parasite *mp)
253 {
254 kref_put_lock(&mp->head.ref, mp_release, &mp->head.peer->idr_lock);
255 }
256
async_request_cb_on_wakeup_fn(struct work_struct * w)257 static void async_request_cb_on_wakeup_fn(struct work_struct *w)
258 {
259 struct hmdfs_msg_parasite *mp =
260 container_of(w, struct hmdfs_msg_parasite, d_work.work);
261 struct async_req_callbacks cbs;
262 const struct cred *old_cred =
263 hmdfs_override_creds(mp->head.peer->sbi->cred);
264
265 if (mp->resp.ret_code == -ETIME)
266 hmdfs_client_resp_statis(mp->head.peer->sbi,
267 mp->req.operations.command,
268 HMDFS_RESP_TIMEOUT, 0, 0);
269
270 cbs = g_async_req_callbacks[mp->req.operations.command];
271 if (cbs.on_wakeup)
272 (*cbs.on_wakeup)(mp->head.peer, &mp->req, &mp->resp);
273 mp_put(mp);
274 hmdfs_revert_creds(old_cred);
275 }
276
mp_alloc(struct hmdfs_peer * peer,const struct hmdfs_req * req)277 static struct hmdfs_msg_parasite *mp_alloc(struct hmdfs_peer *peer,
278 const struct hmdfs_req *req)
279 {
280 struct hmdfs_msg_parasite *mp = kzalloc(sizeof(*mp), GFP_KERNEL);
281 int ret;
282
283 if (unlikely(!mp))
284 return ERR_PTR(-ENOMEM);
285
286 ret = hmdfs_alloc_msg_idr(peer, MSG_IDR_MESSAGE_ASYNC, mp);
287 if (unlikely(ret)) {
288 kfree(mp);
289 return ERR_PTR(ret);
290 }
291
292 mp->start = jiffies;
293 peer_get(mp->head.peer);
294 mp->resp.ret_code = -ETIME;
295 INIT_DELAYED_WORK(&mp->d_work, async_request_cb_on_wakeup_fn);
296 mp->wfired = false;
297 mp->req = *req;
298 return mp;
299 }
300
301 /**
302 * hmdfs_send_async_request - sendout a async request
303 * @peer: target device node
304 * @req: request descriptor + necessary contexts
305 *
306 * Sendout a request synchronously and wait for its response asynchronously
307 * Return -ESHUTDOWN when the device node is unachievable
308 * Return -EAGAIN if the network is recovering
309 * Return -ENOMEM if out of memory
310 *
311 * Register g_async_req_callbacks to recv the response
312 */
hmdfs_send_async_request(struct hmdfs_peer * peer,const struct hmdfs_req * req)313 int hmdfs_send_async_request(struct hmdfs_peer *peer,
314 const struct hmdfs_req *req)
315 {
316 int ret = 0;
317 struct hmdfs_send_data msg;
318 struct hmdfs_head_cmd head;
319 struct hmdfs_msg_parasite *mp = NULL;
320 size_t msg_len = req->data_len + sizeof(struct hmdfs_head_cmd);
321 unsigned int timeout;
322
323 if (req->timeout == TIMEOUT_CONFIG)
324 timeout = get_cmd_timeout(peer->sbi, req->operations.command);
325 else
326 timeout = req->timeout;
327 if (timeout == TIMEOUT_UNINIT || timeout == TIMEOUT_NONE) {
328 hmdfs_err("send msg %d with uninitialized/invalid timeout",
329 req->operations.command);
330 return -EINVAL;
331 }
332
333 if (!hmdfs_is_node_online(peer))
334 return -EAGAIN;
335
336 mp = mp_alloc(peer, req);
337 if (IS_ERR(mp))
338 return PTR_ERR(mp);
339 head.magic = HMDFS_MSG_MAGIC;
340 head.version = DFS_2_0;
341 head.data_len = cpu_to_le32(msg_len);
342 head.operations = mp->req.operations;
343 head.msg_id = cpu_to_le32(mp->head.msg_id);
344 head.reserved = 0;
345 head.reserved1 = 0;
346
347 msg.head = &head;
348 msg.head_len = sizeof(head);
349 msg.data = mp->req.data;
350 msg.len = mp->req.data_len;
351 msg.sdesc_len = 0;
352 msg.sdesc = NULL;
353
354 ret = hmdfs_sendmessage(peer, &msg);
355 if (unlikely(ret)) {
356 mp_put(mp);
357 goto out;
358 }
359
360 queue_delayed_work(peer->async_wq, &mp->d_work, timeout * HZ);
361 /*
362 * The work may havn't been queued upon the arriving of it's response,
363 * resulting in meaningless waiting. So we use the membar to tell the
364 * recv thread if the work has been queued
365 */
366 smp_store_release(&mp->wfired, true);
367 out:
368 hmdfs_dec_msg_idr_process(peer);
369 return ret;
370 }
371
hmdfs_record_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)372 static int hmdfs_record_async_readdir(struct hmdfs_peer *con,
373 struct sendmsg_wait_queue *msg_wq)
374 {
375 struct hmdfs_sb_info *sbi = con->sbi;
376
377 spin_lock(&sbi->async_readdir_msg_lock);
378 if (sbi->async_readdir_prohibit) {
379 spin_unlock(&sbi->async_readdir_msg_lock);
380 return -EINTR;
381 }
382
383 list_add(&msg_wq->async_msg, &sbi->async_readdir_msg_list);
384 spin_unlock(&sbi->async_readdir_msg_lock);
385
386 return 0;
387 }
388
hmdfs_untrack_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)389 static void hmdfs_untrack_async_readdir(struct hmdfs_peer *con,
390 struct sendmsg_wait_queue *msg_wq)
391 {
392 struct hmdfs_sb_info *sbi = con->sbi;
393
394 spin_lock(&sbi->async_readdir_msg_lock);
395 list_del(&msg_wq->async_msg);
396 spin_unlock(&sbi->async_readdir_msg_lock);
397 }
398
hmdfs_sendmessage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)399 int hmdfs_sendmessage_request(struct hmdfs_peer *con,
400 struct hmdfs_send_command *sm)
401 {
402 int time_left;
403 int ret = 0;
404 struct sendmsg_wait_queue *msg_wq = NULL;
405 struct hmdfs_send_data msg;
406 size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
407 unsigned int timeout =
408 get_cmd_timeout(con->sbi, sm->operations.command);
409 struct hmdfs_head_cmd *head = NULL;
410 bool dec = false;
411
412 if (!hmdfs_is_node_online(con)) {
413 ret = -EAGAIN;
414 goto free_filp;
415 }
416
417 if (timeout == TIMEOUT_UNINIT) {
418 hmdfs_err_ratelimited("send msg %d with uninitialized timeout",
419 sm->operations.command);
420 ret = -EINVAL;
421 goto free_filp;
422 }
423
424 head = kzalloc(sizeof(struct hmdfs_head_cmd), GFP_KERNEL);
425 if (!head) {
426 ret = -ENOMEM;
427 goto free_filp;
428 }
429
430 sm->out_buf = NULL;
431 head->magic = HMDFS_MSG_MAGIC;
432 head->version = DFS_2_0;
433 head->operations = sm->operations;
434 head->data_len = cpu_to_le32(outlen);
435 head->ret_code = cpu_to_le32(sm->ret_code);
436 head->reserved = 0;
437 head->reserved1 = 0;
438 if (timeout != TIMEOUT_NONE) {
439 msg_wq = kzalloc(sizeof(*msg_wq), GFP_KERNEL);
440 if (!msg_wq) {
441 ret = -ENOMEM;
442 goto free_filp;
443 }
444 ret = msg_init(con, msg_wq);
445 if (ret) {
446 kfree(msg_wq);
447 msg_wq = NULL;
448 goto free_filp;
449 }
450 dec = true;
451 head->msg_id = cpu_to_le32(msg_wq->head.msg_id);
452 if (sm->operations.command == F_ITERATE)
453 msg_wq->recv_info.local_filp = sm->local_filp;
454 }
455 msg.head = head;
456 msg.head_len = sizeof(struct hmdfs_head_cmd);
457 msg.data = sm->data;
458 msg.len = sm->len;
459 msg.sdesc_len = 0;
460 msg.sdesc = NULL;
461 ret = hmdfs_sendmessage(con, &msg);
462 if (ret) {
463 hmdfs_err_ratelimited("send err sm->device_id, %lld, msg_id %u",
464 con->device_id, head->msg_id);
465 goto free;
466 }
467
468 if (timeout == TIMEOUT_NONE)
469 goto free;
470
471 hmdfs_dec_msg_idr_process(con);
472 dec = false;
473
474 if (sm->operations.command == F_ITERATE) {
475 ret = hmdfs_record_async_readdir(con, msg_wq);
476 if (ret) {
477 atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_SPC);
478 goto free;
479 }
480 }
481
482 time_left = wait_event_interruptible_timeout(
483 msg_wq->response_q,
484 (atomic_read(&msg_wq->valid) == MSG_Q_END_RECV), timeout * HZ);
485
486 if (sm->operations.command == F_ITERATE)
487 hmdfs_untrack_async_readdir(con, msg_wq);
488
489 if (time_left == -ERESTARTSYS || time_left == 0) {
490 hmdfs_err("timeout err sm->device_id %lld, msg_id %d cmd %d",
491 con->device_id, head->msg_id,
492 head->operations.command);
493 if (sm->operations.command == F_ITERATE)
494 atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_NET);
495 ret = -ETIME;
496 hmdfs_client_resp_statis(con->sbi, sm->operations.command,
497 HMDFS_RESP_TIMEOUT, 0, 0);
498 goto free;
499 }
500 sm->out_buf = msg_wq->buf;
501 msg_wq->buf = NULL;
502 sm->out_len = msg_wq->size - sizeof(struct hmdfs_head_cmd);
503 ret = msg_wq->ret;
504
505 free:
506 if (msg_wq)
507 msg_put(msg_wq);
508 if (dec)
509 hmdfs_dec_msg_idr_process(con);
510 kfree(head);
511 return ret;
512
513 free_filp:
514 if (sm->local_filp)
515 fput(sm->local_filp);
516 kfree(head);
517 return ret;
518 }
519
hmdfs_send_slice(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,struct slice_descriptor * sdesc,void * slice_buf)520 static int hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd,
521 struct slice_descriptor *sdesc, void *slice_buf)
522 {
523 int ret;
524 struct hmdfs_send_data msg;
525 struct hmdfs_head_cmd head;
526 int content_size = le32_to_cpu(sdesc->content_size);
527 int msg_len = sizeof(struct hmdfs_head_cmd) + content_size +
528 sizeof(struct slice_descriptor);
529
530 head.magic = HMDFS_MSG_MAGIC;
531 head.version = DFS_2_0;
532 head.operations = cmd->operations;
533 head.operations.cmd_flag = C_RESPONSE;
534 head.data_len = cpu_to_le32(msg_len);
535 head.ret_code = cpu_to_le32(0);
536 head.msg_id = cmd->msg_id;
537 head.reserved = cmd->reserved;
538 head.reserved1 = cmd->reserved1;
539
540 msg.head = &head;
541 msg.head_len = sizeof(struct hmdfs_head_cmd);
542 msg.sdesc = sdesc;
543 msg.sdesc_len = le32_to_cpu(sizeof(struct slice_descriptor));
544 msg.data = slice_buf;
545 msg.len = content_size;
546
547 ret = hmdfs_sendmessage(con, &msg);
548
549 return ret;
550 }
551
hmdfs_readfile_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,struct file * filp)552 int hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head,
553 struct file *filp)
554 {
555 int ret;
556 const unsigned int slice_size = PAGE_SIZE;
557 char *slice_buf = NULL;
558 loff_t file_offset = 0, file_size;
559 ssize_t size;
560 struct slice_descriptor sdesc;
561 unsigned int slice_sn = 0;
562
563 if (!filp)
564 return hmdfs_sendmessage_response(con, head, 0, NULL, 0);
565
566 sdesc.slice_size = cpu_to_le32(slice_size);
567 file_size = i_size_read(file_inode(filp));
568 file_size = round_up(file_size, slice_size);
569 sdesc.num_slices = cpu_to_le32(file_size / slice_size);
570
571 slice_buf = kmalloc(slice_size, GFP_KERNEL);
572 if (!slice_buf) {
573 ret = -ENOMEM;
574 goto out;
575 }
576
577 while (1) {
578 sdesc.slice_sn = cpu_to_le32(slice_sn++);
579 size = kernel_read(filp, slice_buf, (size_t)slice_size,
580 &file_offset);
581 if (IS_ERR_VALUE(size)) {
582 ret = (int)size;
583 goto out;
584 }
585 sdesc.content_size = cpu_to_le32(size);
586 ret = hmdfs_send_slice(con, head, &sdesc, slice_buf);
587 if (ret) {
588 hmdfs_info("Cannot send file slice %d ",
589 le32_to_cpu(sdesc.slice_sn));
590 break;
591 }
592 if (file_offset >= i_size_read(file_inode(filp)))
593 break;
594 }
595
596 out:
597 kfree(slice_buf);
598 if (ret)
599 hmdfs_sendmessage_response(con, head, 0, NULL, ret);
600 return ret;
601 }
602
asw_release(struct kref * kref)603 static void asw_release(struct kref *kref)
604 {
605 struct hmdfs_async_work *asw = NULL;
606 struct hmdfs_peer *peer = NULL;
607
608 asw = (struct hmdfs_async_work *)container_of(kref,
609 struct hmdfs_msg_idr_head, ref);
610 peer = asw->head.peer;
611 idr_remove(&peer->msg_idr, asw->head.msg_id);
612 spin_unlock(&peer->idr_lock);
613 kfree(asw);
614 }
615
asw_put(struct hmdfs_async_work * asw)616 void asw_put(struct hmdfs_async_work *asw)
617 {
618 kref_put_lock(&asw->head.ref, asw_release, &asw->head.peer->idr_lock);
619 }
620
hmdfs_recv_page_work_fn(struct work_struct * ptr)621 void hmdfs_recv_page_work_fn(struct work_struct *ptr)
622 {
623 struct hmdfs_async_work *async_work =
624 container_of(ptr, struct hmdfs_async_work, d_work.work);
625
626 if (async_work->head.peer->version >= DFS_2_0)
627 hmdfs_client_resp_statis(async_work->head.peer->sbi,
628 F_READPAGE, HMDFS_RESP_TIMEOUT, 0, 0);
629 hmdfs_err_ratelimited("timeout and release page, msg_id:%u",
630 async_work->head.msg_id);
631 asw_done(async_work);
632 }
633
hmdfs_sendpage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)634 int hmdfs_sendpage_request(struct hmdfs_peer *con,
635 struct hmdfs_send_command *sm)
636 {
637 int ret = 0;
638 struct hmdfs_send_data msg;
639 struct hmdfs_async_work *async_work = NULL;
640 size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
641 struct hmdfs_head_cmd head;
642 unsigned int timeout;
643 unsigned long start = jiffies;
644
645 WARN_ON(!sm->out_buf);
646
647 timeout = get_cmd_timeout(con->sbi, sm->operations.command);
648 if (timeout == TIMEOUT_UNINIT) {
649 hmdfs_err("send msg %d with uninitialized timeout",
650 sm->operations.command);
651 ret = -EINVAL;
652 goto unlock;
653 }
654
655 if (!hmdfs_is_node_online(con)) {
656 ret = -EAGAIN;
657 goto unlock;
658 }
659
660 memset(&head, 0, sizeof(head));
661 head.magic = HMDFS_MSG_MAGIC;
662 head.version = DFS_2_0;
663 head.operations = sm->operations;
664 head.data_len = cpu_to_le32(outlen);
665 head.ret_code = cpu_to_le32(sm->ret_code);
666 head.reserved = 0;
667 head.reserved1 = 0;
668
669 msg.head = &head;
670 msg.head_len = sizeof(struct hmdfs_head_cmd);
671 msg.data = sm->data;
672 msg.len = sm->len;
673 msg.sdesc_len = 0;
674 msg.sdesc = NULL;
675
676 async_work = kzalloc(sizeof(*async_work), GFP_KERNEL);
677 if (!async_work) {
678 ret = -ENOMEM;
679 goto unlock;
680 }
681 async_work->start = start;
682 ret = hmdfs_alloc_msg_idr(con, MSG_IDR_PAGE, async_work);
683 if (ret) {
684 hmdfs_err("alloc msg_id failed, err %d", ret);
685 goto unlock;
686 }
687 head.msg_id = cpu_to_le32(async_work->head.msg_id);
688 async_work->page = sm->out_buf;
689 asw_get(async_work);
690 INIT_DELAYED_WORK(&async_work->d_work, hmdfs_recv_page_work_fn);
691 ret = queue_delayed_work(con->async_wq, &async_work->d_work,
692 timeout * HZ);
693 if (!ret) {
694 hmdfs_err("queue_delayed_work failed, msg_id %u", head.msg_id);
695 goto fail_and_unlock_page;
696 }
697 ret = hmdfs_sendmessage(con, &msg);
698 if (ret) {
699 hmdfs_err("send err sm->device_id, %lld, msg_id %u",
700 con->device_id, head.msg_id);
701 if (!cancel_delayed_work(&async_work->d_work)) {
702 hmdfs_err("cancel async work err");
703 asw_put(async_work);
704 hmdfs_dec_msg_idr_process(con);
705 goto out;
706 }
707 goto fail_and_unlock_page;
708 }
709
710 asw_put(async_work);
711 hmdfs_dec_msg_idr_process(con);
712 return 0;
713
714 fail_and_unlock_page:
715 asw_put(async_work);
716 asw_done(async_work);
717 hmdfs_dec_msg_idr_process(con);
718 return ret;
719 unlock:
720 kfree(async_work);
721 unlock_page(sm->out_buf);
722 out:
723 return ret;
724 }
725
hmdfs_request_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)726 static void hmdfs_request_handle_sync(struct hmdfs_peer *con,
727 struct hmdfs_head_cmd *head, void *buf)
728 {
729 unsigned long start = jiffies;
730 const struct cred *saved_cred = hmdfs_override_fsids(true);
731
732 if (!saved_cred) {
733 hmdfs_err("prepare cred failed!");
734 kfree(buf);
735 return;
736 }
737
738 s_recv_callbacks[head->operations.command](con, head, buf);
739 hmdfs_statistic(con->sbi, head->operations.command, jiffies - start);
740
741 kfree(buf);
742
743 hmdfs_revert_fsids(saved_cred);
744 }
745
hmdfs_msg_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)746 static void hmdfs_msg_handle_sync(struct hmdfs_peer *con,
747 struct hmdfs_head_cmd *head, void *buf)
748 {
749 const struct cred *old_cred = hmdfs_override_creds(con->sbi->cred);
750
751 /*
752 * Reuse PF_NPROC_EXCEEDED as an indication of hmdfs server context:
753 * 1. PF_NPROC_EXCEEDED will set by setreuid()/setuid()/setresuid(),
754 * we assume kwork will not call theses syscalls.
755 * 2. PF_NPROC_EXCEEDED will be cleared by execv(), and kworker
756 * will not call it.
757 */
758 current->flags |= PF_NPROC_EXCEEDED;
759 hmdfs_request_handle_sync(con, head, buf);
760 current->flags &= ~PF_NPROC_EXCEEDED;
761
762 hmdfs_revert_creds(old_cred);
763 }
764
765
hmdfs_request_work_fn(struct work_struct * ptr)766 static void hmdfs_request_work_fn(struct work_struct *ptr)
767 {
768 struct work_handler_desp *desp =
769 container_of(ptr, struct work_handler_desp, work);
770
771 hmdfs_msg_handle_sync(desp->peer, desp->head, desp->buf);
772 peer_put(desp->peer);
773 kfree(desp->head);
774 kfree(desp);
775 }
776
hmdfs_msg_handle_async(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf,struct workqueue_struct * wq,void (* work_fn)(struct work_struct * ptr))777 static int hmdfs_msg_handle_async(struct hmdfs_peer *con,
778 struct hmdfs_head_cmd *head, void *buf,
779 struct workqueue_struct *wq,
780 void (*work_fn)(struct work_struct *ptr))
781 {
782 struct work_handler_desp *desp = NULL;
783 struct hmdfs_head_cmd *dup_head = NULL;
784 int ret;
785
786 desp = kzalloc(sizeof(*desp), GFP_KERNEL);
787 if (!desp) {
788 ret = -ENOMEM;
789 goto exit_desp;
790 }
791
792 dup_head = kzalloc(sizeof(*dup_head), GFP_KERNEL);
793 if (!dup_head) {
794 ret = -ENOMEM;
795 goto exit_desp;
796 }
797
798 *dup_head = *head;
799 desp->peer = con;
800 desp->head = dup_head;
801 desp->buf = buf;
802 INIT_WORK(&desp->work, work_fn);
803
804 peer_get(con);
805 queue_work(wq, &desp->work);
806
807 ret = 0;
808 return ret;
809
810 exit_desp:
811 kfree(desp);
812 return ret;
813 }
814
hmdfs_request_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)815 static int hmdfs_request_recv(struct hmdfs_peer *con,
816 struct hmdfs_head_cmd *head, void *buf)
817 {
818 int ret;
819
820 if (head->operations.command >= F_SIZE ||
821 !s_recv_callbacks[head->operations.command]) {
822 ret = -EINVAL;
823 hmdfs_err("NULL callback, command %d",
824 head->operations.command);
825 goto out;
826 }
827
828 switch (head->operations.command) {
829 case F_OPEN:
830 case F_RELEASE:
831 case F_ITERATE:
832 case F_MKDIR:
833 case F_RMDIR:
834 case F_CREATE:
835 case F_UNLINK:
836 case F_RENAME:
837 case F_SETATTR:
838 case F_STATFS:
839 case F_CONNECT_REKEY:
840 case F_DROP_PUSH:
841 case F_GETATTR:
842 case F_FSYNC:
843 case F_SYNCFS:
844 case F_GETXATTR:
845 case F_SETXATTR:
846 case F_LISTXATTR:
847 case F_READPAGES_OPEN:
848 case F_ATOMIC_OPEN:
849 ret = hmdfs_msg_handle_async(con, head, buf, con->req_handle_wq,
850 hmdfs_request_work_fn);
851 break;
852 case F_WRITEPAGE:
853 case F_READPAGE:
854 case F_READPAGES:
855 hmdfs_msg_handle_sync(con, head, buf);
856 ret = 0;
857 break;
858 default:
859 hmdfs_err("Fatal! Unexpected request command %d",
860 head->operations.command);
861 ret = -EINVAL;
862 }
863
864 out:
865 return ret;
866 }
867
hmdfs_response_wakeup(struct sendmsg_wait_queue * msg_info,__u32 ret_code,__u32 data_len,void * buf)868 void hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info,
869 __u32 ret_code, __u32 data_len, void *buf)
870 {
871 msg_info->ret = ret_code;
872 msg_info->size = data_len;
873 msg_info->buf = buf;
874 atomic_set(&msg_info->valid, MSG_Q_END_RECV);
875 wake_up_interruptible(&msg_info->response_q);
876 }
877
hmdfs_readfile_slice(struct sendmsg_wait_queue * msg_info,struct work_handler_desp * desp)878 static int hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info,
879 struct work_handler_desp *desp)
880 {
881 struct slice_descriptor *sdesc = desp->buf;
882 void *slice_buf = sdesc + 1;
883 struct file_recv_info *recv_info = &msg_info->recv_info;
884 struct file *filp = recv_info->local_filp;
885 loff_t offset;
886 ssize_t written_size;
887
888 if (atomic_read(&recv_info->state) != FILE_RECV_PROCESS)
889 return -EBUSY;
890
891 offset = le32_to_cpu(sdesc->slice_size) * le32_to_cpu(sdesc->slice_sn);
892
893 written_size = kernel_write(filp, slice_buf,
894 le32_to_cpu(sdesc->content_size), &offset);
895 if (IS_ERR_VALUE(written_size)) {
896 atomic_set(&recv_info->state, FILE_RECV_ERR_SPC);
897 hmdfs_info("Fatal! Cannot store a file slice %d/%d, ret = %d",
898 le32_to_cpu(sdesc->slice_sn),
899 le32_to_cpu(sdesc->num_slices), (int)written_size);
900 return (int)written_size;
901 }
902
903 if (atomic_inc_return(&recv_info->local_fslices) >=
904 le32_to_cpu(sdesc->num_slices))
905 atomic_set(&recv_info->state, FILE_RECV_SUCC);
906 return 0;
907 }
908
hmdfs_file_response_work_fn(struct work_struct * ptr)909 static void hmdfs_file_response_work_fn(struct work_struct *ptr)
910 {
911 struct work_handler_desp *desp =
912 container_of(ptr, struct work_handler_desp, work);
913 struct sendmsg_wait_queue *msg_info = NULL;
914 int ret;
915 atomic_t *pstate = NULL;
916 u8 cmd = desp->head->operations.command;
917 const struct cred *old_cred =
918 hmdfs_override_creds(desp->peer->sbi->cred);
919
920 msg_info = (struct sendmsg_wait_queue *)hmdfs_find_msg_head(desp->peer,
921 le32_to_cpu(desp->head->msg_id));
922 if (!msg_info || atomic_read(&msg_info->valid) != MSG_Q_SEND) {
923 hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_DELAY,
924 0, 0);
925 hmdfs_info("cannot find msg(id %d)",
926 le32_to_cpu(desp->head->msg_id));
927 goto free;
928 }
929
930 ret = le32_to_cpu(desp->head->ret_code);
931 if (ret || le32_to_cpu(desp->head->data_len) == sizeof(*desp->head))
932 goto wakeup;
933 ret = hmdfs_readfile_slice(msg_info, desp);
934 pstate = &msg_info->recv_info.state;
935 if (ret || atomic_read(pstate) != FILE_RECV_PROCESS)
936 goto wakeup;
937 goto free;
938
939 wakeup:
940 hmdfs_response_wakeup(msg_info, ret, sizeof(struct hmdfs_head_cmd),
941 NULL);
942 hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_NORMAL,
943 msg_info->start, jiffies);
944 free:
945 if (msg_info)
946 msg_put(msg_info);
947 peer_put(desp->peer);
948 hmdfs_revert_creds(old_cred);
949
950 kfree(desp->buf);
951 kfree(desp->head);
952 kfree(desp);
953 }
954
hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite * mp)955 static void hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)
956 {
957 /* We just cancel queued works */
958 while (unlikely(!smp_load_acquire(&mp->wfired)))
959 usleep_range(ACQUIRE_WFIRED_INTVAL_USEC_MIN,
960 ACQUIRE_WFIRED_INTVAL_USEC_MAX);
961 }
962
hmdfs_response_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)963 int hmdfs_response_handle_sync(struct hmdfs_peer *con,
964 struct hmdfs_head_cmd *head, void *buf)
965 {
966 struct sendmsg_wait_queue *msg_info = NULL;
967 struct hmdfs_msg_parasite *mp = NULL;
968 struct hmdfs_msg_idr_head *msg_head = NULL;
969 u32 msg_id = le32_to_cpu(head->msg_id);
970 bool woke = false;
971 u8 cmd = head->operations.command;
972
973 msg_head = hmdfs_find_msg_head(con, msg_id);
974 if (!msg_head)
975 goto out;
976
977 switch (msg_head->type) {
978 case MSG_IDR_MESSAGE_SYNC:
979 msg_info = (struct sendmsg_wait_queue *)msg_head;
980 if (atomic_read(&msg_info->valid) == MSG_Q_SEND) {
981 hmdfs_response_wakeup(msg_info,
982 le32_to_cpu(head->ret_code),
983 le32_to_cpu(head->data_len), buf);
984 hmdfs_client_resp_statis(con->sbi, cmd,
985 HMDFS_RESP_NORMAL,
986 msg_info->start, jiffies);
987 woke = true;
988 }
989
990 msg_put(msg_info);
991 break;
992 case MSG_IDR_MESSAGE_ASYNC:
993 mp = (struct hmdfs_msg_parasite *)msg_head;
994
995 hmdfs_wait_mp_wfired(mp);
996 if (cancel_delayed_work(&mp->d_work)) {
997 mp->resp.out_buf = buf;
998 mp->resp.out_len =
999 le32_to_cpu(head->data_len) - sizeof(*head);
1000 mp->resp.ret_code = le32_to_cpu(head->ret_code);
1001 queue_delayed_work(con->async_wq, &mp->d_work, 0);
1002 hmdfs_client_resp_statis(con->sbi, cmd,
1003 HMDFS_RESP_NORMAL, mp->start,
1004 jiffies);
1005 woke = true;
1006 }
1007 mp_put(mp);
1008 break;
1009 default:
1010 hmdfs_err("receive incorrect msg type %d msg_id %d cmd %d",
1011 msg_head->type, msg_id, cmd);
1012 break;
1013 }
1014
1015 if (likely(woke))
1016 return 0;
1017 out:
1018 hmdfs_client_resp_statis(con->sbi, cmd, HMDFS_RESP_DELAY, 0, 0);
1019 hmdfs_info("cannot find msg_id %d cmd %d", msg_id, cmd);
1020 return -EINVAL;
1021 }
1022
hmdfs_response_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)1023 static int hmdfs_response_recv(struct hmdfs_peer *con,
1024 struct hmdfs_head_cmd *head, void *buf)
1025 {
1026 __u16 command = head->operations.command;
1027 int ret;
1028
1029 if (command >= F_SIZE) {
1030 ret = -EINVAL;
1031 return ret;
1032 }
1033
1034 switch (head->operations.command) {
1035 case F_OPEN:
1036 case F_RELEASE:
1037 case F_READPAGE:
1038 case F_WRITEPAGE:
1039 case F_MKDIR:
1040 case F_RMDIR:
1041 case F_CREATE:
1042 case F_UNLINK:
1043 case F_RENAME:
1044 case F_SETATTR:
1045 case F_STATFS:
1046 case F_CONNECT_REKEY:
1047 case F_DROP_PUSH:
1048 case F_GETATTR:
1049 case F_FSYNC:
1050 case F_SYNCFS:
1051 case F_GETXATTR:
1052 case F_SETXATTR:
1053 case F_LISTXATTR:
1054 ret = hmdfs_response_handle_sync(con, head, buf);
1055 return ret;
1056
1057 case F_ITERATE:
1058 ret = hmdfs_msg_handle_async(con, head, buf, con->async_wq,
1059 hmdfs_file_response_work_fn);
1060 return ret;
1061
1062 default:
1063 hmdfs_err("Fatal! Unexpected response command %d",
1064 head->operations.command);
1065 ret = -EINVAL;
1066 return ret;
1067 }
1068 }
1069
hmdfs_recv_mesg_callback(struct hmdfs_peer * con,void * head,void * buf)1070 static void hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head,
1071 void *buf)
1072 {
1073 struct hmdfs_head_cmd *hmdfs_head = (struct hmdfs_head_cmd *)head;
1074
1075 trace_hmdfs_recv_mesg_callback(hmdfs_head);
1076
1077 if (hmdfs_message_verify(con, hmdfs_head, buf) < 0) {
1078 hmdfs_info("Message %d has been abandoned", hmdfs_head->msg_id);
1079 goto out_err;
1080 }
1081
1082 switch (hmdfs_head->operations.cmd_flag) {
1083 case C_REQUEST:
1084 if (hmdfs_request_recv(con, hmdfs_head, buf) < 0)
1085 goto out_err;
1086 break;
1087
1088 case C_RESPONSE:
1089 if (hmdfs_response_recv(con, hmdfs_head, buf) < 0)
1090 goto out_err;
1091 break;
1092
1093 default:
1094 hmdfs_err("Fatal! Unexpected msg cmd %d",
1095 hmdfs_head->operations.cmd_flag);
1096 goto out_err;
1097 }
1098 return;
1099
1100 out_err:
1101 kfree(buf);
1102 }
1103
hmdfs_recv_page_callback(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,int err,void * data)1104 static inline void hmdfs_recv_page_callback(struct hmdfs_peer *con,
1105 struct hmdfs_head_cmd *head,
1106 int err, void *data)
1107 {
1108 if (head->operations.command == F_READPAGE)
1109 hmdfs_client_recv_readpage(head, err, data);
1110 }
1111
1112 static const struct connection_operations conn_operations[] = {
1113 [PROTOCOL_VERSION] = {
1114 .recvmsg = hmdfs_recv_mesg_callback,
1115 .recvpage = hmdfs_recv_page_callback,
1116 /* remote device operations */
1117 .remote_file_fops =
1118 &hmdfs_dev_file_fops_remote,
1119 .remote_file_iops =
1120 &hmdfs_dev_file_iops_remote,
1121 .remote_file_aops =
1122 &hmdfs_dev_file_aops_remote,
1123 .remote_unlink =
1124 hmdfs_dev_unlink_from_con,
1125 .remote_readdir =
1126 hmdfs_dev_readdir_from_con,
1127 }
1128 };
1129
hmdfs_get_peer_operation(__u8 version)1130 const struct connection_operations *hmdfs_get_peer_operation(__u8 version)
1131 {
1132 if (version <= INVALID_VERSION || version >= MAX_VERSION)
1133 return NULL;
1134
1135 if (version <= USERSPACE_MAX_VER)
1136 return &(conn_operations[USERDFS_VERSION]);
1137 else
1138 return &(conn_operations[PROTOCOL_VERSION]);
1139 }
1140
hmdfs_wakeup_parasite(struct hmdfs_msg_parasite * mp)1141 void hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)
1142 {
1143 hmdfs_wait_mp_wfired(mp);
1144 if (!cancel_delayed_work(&mp->d_work))
1145 hmdfs_err("cancel parasite work err msg_id=%d cmd=%d",
1146 mp->head.msg_id, mp->req.operations.command);
1147 else
1148 async_request_cb_on_wakeup_fn(&mp->d_work.work);
1149 }
1150
hmdfs_wakeup_async_work(struct hmdfs_async_work * async_work)1151 void hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)
1152 {
1153 if (!cancel_delayed_work(&async_work->d_work))
1154 hmdfs_err("cancel async work err msg_id=%d",
1155 async_work->head.msg_id);
1156 else
1157 hmdfs_recv_page_work_fn(&async_work->d_work.work);
1158 }
1159