• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * fs/hmdfs/comm/socket_adapter.c
4  *
5  * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6  */
7 
8 #include "socket_adapter.h"
9 
10 #include <linux/file.h>
11 #include <linux/module.h>
12 #include <linux/namei.h>
13 #include <linux/net.h>
14 #include <linux/pagemap.h>
15 #include <net/sock.h>
16 
17 #include "authority/authentication.h"
18 #include "comm/device_node.h"
19 #include "hmdfs_client.h"
20 #include "hmdfs_server.h"
21 #include "hmdfs_trace.h"
22 #include "message_verify.h"
23 
24 #define ACQUIRE_WFIRED_INTVAL_USEC_MIN 10
25 #define ACQUIRE_WFIRED_INTVAL_USEC_MAX 30
26 
27 typedef void (*request_callback)(struct hmdfs_peer *, struct hmdfs_head_cmd *,
28 				 void *);
29 typedef void (*response_callback)(struct hmdfs_peer *,
30 				  struct sendmsg_wait_queue *, void *, size_t);
31 
32 static const request_callback s_recv_callbacks[F_SIZE] = {
33 	[F_OPEN] = hmdfs_server_open,
34 	[F_READPAGE] = hmdfs_server_readpage,
35 	[F_RELEASE] = hmdfs_server_release,
36 	[F_WRITEPAGE] = hmdfs_server_writepage,
37 	[F_ITERATE] = hmdfs_server_readdir,
38 	[F_MKDIR] = hmdfs_server_mkdir,
39 	[F_CREATE] = hmdfs_server_create,
40 	[F_RMDIR] = hmdfs_server_rmdir,
41 	[F_UNLINK] = hmdfs_server_unlink,
42 	[F_RENAME] = hmdfs_server_rename,
43 	[F_SETATTR] = hmdfs_server_setattr,
44 	[F_STATFS] = hmdfs_server_statfs,
45 	[F_DROP_PUSH] = hmdfs_server_get_drop_push,
46 	[F_GETATTR] = hmdfs_server_getattr,
47 	[F_FSYNC] = hmdfs_server_fsync,
48 	[F_SYNCFS] = hmdfs_server_syncfs,
49 	[F_GETXATTR] = hmdfs_server_getxattr,
50 	[F_SETXATTR] = hmdfs_server_setxattr,
51 	[F_LISTXATTR] = hmdfs_server_listxattr,
52 	[F_ATOMIC_OPEN] = hmdfs_server_atomic_open,
53 };
54 
55 typedef void (*file_request_callback)(struct hmdfs_peer *,
56 				      struct hmdfs_send_command *);
57 
58 struct async_req_callbacks {
59 	void (*on_wakeup)(struct hmdfs_peer *peer, const struct hmdfs_req *req,
60 			  const struct hmdfs_resp *resp);
61 };
62 
63 static const struct async_req_callbacks g_async_req_callbacks[F_SIZE] = {
64 	[F_SYNCFS] = { .on_wakeup = hmdfs_recv_syncfs_cb },
65 	[F_WRITEPAGE] = { .on_wakeup = hmdfs_writepage_cb },
66 };
67 
msg_release(struct kref * kref)68 static void msg_release(struct kref *kref)
69 {
70 	struct sendmsg_wait_queue *msg_wq;
71 	struct hmdfs_peer *con;
72 
73 	msg_wq = (struct sendmsg_wait_queue *)container_of(kref,
74 			struct hmdfs_msg_idr_head, ref);
75 	con = msg_wq->head.peer;
76 	idr_remove(&con->msg_idr, msg_wq->head.msg_id);
77 	spin_unlock(&con->idr_lock);
78 
79 	kfree(msg_wq->buf);
80 	if (msg_wq->recv_info.local_filp)
81 		fput(msg_wq->recv_info.local_filp);
82 	kfree(msg_wq);
83 }
84 
85 // Always remember to find before put, and make sure con is avilable
msg_put(struct sendmsg_wait_queue * msg_wq)86 void msg_put(struct sendmsg_wait_queue *msg_wq)
87 {
88 	kref_put_lock(&msg_wq->head.ref, msg_release,
89 		      &msg_wq->head.peer->idr_lock);
90 }
91 
recv_info_init(struct file_recv_info * recv_info)92 static void recv_info_init(struct file_recv_info *recv_info)
93 {
94 	memset(recv_info, 0, sizeof(struct file_recv_info));
95 	atomic_set(&recv_info->local_fslices, 0);
96 	atomic_set(&recv_info->state, FILE_RECV_PROCESS);
97 }
98 
msg_init(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)99 static int msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq)
100 {
101 	int ret = 0;
102 	struct file_recv_info *recv_info = &msg_wq->recv_info;
103 
104 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_MESSAGE_SYNC, msg_wq);
105 	if (unlikely(ret))
106 		return ret;
107 
108 	atomic_set(&msg_wq->valid, MSG_Q_SEND);
109 	init_waitqueue_head(&msg_wq->response_q);
110 	recv_info_init(recv_info);
111 	msg_wq->start = jiffies;
112 	return 0;
113 }
114 
statistic_con_sb_dirty(struct hmdfs_peer * con,const struct hmdfs_cmd * op)115 static inline void statistic_con_sb_dirty(struct hmdfs_peer *con,
116 					  const struct hmdfs_cmd *op)
117 {
118 	if (op->command == F_WRITEPAGE && op->cmd_flag == C_REQUEST)
119 		atomic64_inc(&con->sb_dirty_count);
120 }
121 
hmdfs_sendmessage(struct hmdfs_peer * node,struct hmdfs_send_data * msg)122 int hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)
123 {
124 	int ret = 0;
125 	struct connection *connect = NULL;
126 	struct tcp_handle *tcp = NULL;
127 	struct hmdfs_head_cmd *head = msg->head;
128 	const struct cred *old_cred;
129 
130 	if (!node) {
131 		hmdfs_err("node NULL when send cmd %d",
132 			  head->operations.command);
133 		ret = -EAGAIN;
134 		goto out_err;
135 	} else if (node->status != NODE_STAT_ONLINE) {
136 		hmdfs_err("device %llu OFFLINE %d when send cmd %d",
137 			  node->device_id, node->status,
138 			  head->operations.command);
139 		ret = -EAGAIN;
140 		goto out;
141 	}
142 
143 	old_cred = hmdfs_override_creds(node->sbi->system_cred);
144 
145 	do {
146 		connect = get_conn_impl(node, CONNECT_TYPE_TCP);
147 		if (!connect) {
148 			hmdfs_info_ratelimited(
149 				"device %llu no connection available when send cmd %d, get new session",
150 				node->device_id, head->operations.command);
151 			if (node->status != NODE_STAT_OFFLINE) {
152 				struct notify_param param;
153 
154 				memcpy(param.remote_cid, node->cid,
155 				       HMDFS_CID_SIZE);
156 				param.notify = NOTIFY_OFFLINE;
157 				param.fd = INVALID_SOCKET_FD;
158 				notify(node, &param);
159 			}
160 			ret = -EAGAIN;
161 			goto revert_cred;
162 		}
163 
164 		ret = connect->send_message(connect, msg);
165 		if (ret == -ESHUTDOWN) {
166 			hmdfs_info("device %llu send cmd %d message fail, connection stop",
167 				   node->device_id, head->operations.command);
168 			connect->status = CONNECT_STAT_STOP;
169 			tcp = connect->connect_handle;
170 			if (node->status != NODE_STAT_OFFLINE) {
171 				connection_get(connect);
172 				if (!queue_work(node->reget_conn_wq,
173 						&connect->reget_work))
174 					connection_put(connect);
175 			}
176 			connection_put(connect);
177 			/*
178 			 * node->status is OFFLINE can not ensure
179 			 * node_seq will be increased before
180 			 * hmdfs_sendmessage() returns.
181 			 */
182 			hmdfs_node_inc_evt_seq(node);
183 		} else {
184 			connection_put(connect);
185 			goto revert_cred;
186 		}
187 	} while (node->status != NODE_STAT_OFFLINE);
188 revert_cred:
189 	hmdfs_revert_creds(old_cred);
190 
191 	if (!ret)
192 		statistic_con_sb_dirty(node, &head->operations);
193 out:
194 	if (head->operations.cmd_flag == C_REQUEST)
195 		hmdfs_client_snd_statis(node->sbi,
196 					head->operations.command, ret);
197 	else if (head->operations.cmd_flag == C_RESPONSE)
198 		hmdfs_server_snd_statis(node->sbi,
199 					head->operations.command, ret);
200 out_err:
201 	return ret;
202 }
203 
hmdfs_sendmessage_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,__u32 data_len,void * buf,__u32 ret_code)204 int hmdfs_sendmessage_response(struct hmdfs_peer *con,
205 			       struct hmdfs_head_cmd *cmd, __u32 data_len,
206 			       void *buf, __u32 ret_code)
207 {
208 	int ret;
209 	struct hmdfs_send_data msg;
210 	struct hmdfs_head_cmd head;
211 
212 	head.magic = HMDFS_MSG_MAGIC;
213 	head.version = HMDFS_VERSION;
214 	head.operations = cmd->operations;
215 	head.operations.cmd_flag = C_RESPONSE;
216 	head.data_len = cpu_to_le32(data_len + sizeof(struct hmdfs_head_cmd));
217 	head.ret_code = cpu_to_le32(ret_code);
218 	head.msg_id = cmd->msg_id;
219 	head.reserved = cmd->reserved;
220 	head.reserved1 = cmd->reserved1;
221 	msg.head = &head;
222 	msg.head_len = sizeof(struct hmdfs_head_cmd);
223 	msg.data = buf;
224 	msg.len = data_len;
225 	msg.sdesc = NULL;
226 	msg.sdesc_len = 0;
227 
228 	ret = hmdfs_sendmessage(con, &msg);
229 	return ret;
230 }
231 
mp_release(struct kref * kref)232 static void mp_release(struct kref *kref)
233 {
234 	struct hmdfs_msg_parasite *mp = NULL;
235 	struct hmdfs_peer *peer = NULL;
236 
237 	mp = (struct hmdfs_msg_parasite *)container_of(kref,
238 			struct hmdfs_msg_idr_head, ref);
239 	peer = mp->head.peer;
240 	idr_remove(&peer->msg_idr, mp->head.msg_id);
241 	spin_unlock(&peer->idr_lock);
242 
243 	peer_put(peer);
244 	kfree(mp->resp.out_buf);
245 	kfree(mp);
246 }
247 
mp_put(struct hmdfs_msg_parasite * mp)248 void mp_put(struct hmdfs_msg_parasite *mp)
249 {
250 	kref_put_lock(&mp->head.ref, mp_release, &mp->head.peer->idr_lock);
251 }
252 
async_request_cb_on_wakeup_fn(struct work_struct * w)253 static void async_request_cb_on_wakeup_fn(struct work_struct *w)
254 {
255 	struct hmdfs_msg_parasite *mp =
256 		container_of(w, struct hmdfs_msg_parasite, d_work.work);
257 	struct async_req_callbacks cbs;
258 	const struct cred *old_cred =
259 		hmdfs_override_creds(mp->head.peer->sbi->cred);
260 
261 	if (mp->resp.ret_code == -ETIME)
262 		hmdfs_client_resp_statis(mp->head.peer->sbi,
263 					 mp->req.operations.command,
264 					 HMDFS_RESP_TIMEOUT, 0, 0);
265 
266 	cbs = g_async_req_callbacks[mp->req.operations.command];
267 	if (cbs.on_wakeup)
268 		(*cbs.on_wakeup)(mp->head.peer, &mp->req, &mp->resp);
269 	mp_put(mp);
270 	hmdfs_revert_creds(old_cred);
271 }
272 
mp_alloc(struct hmdfs_peer * peer,const struct hmdfs_req * req)273 static struct hmdfs_msg_parasite *mp_alloc(struct hmdfs_peer *peer,
274 					   const struct hmdfs_req *req)
275 {
276 	struct hmdfs_msg_parasite *mp = kzalloc(sizeof(*mp), GFP_KERNEL);
277 	int ret;
278 
279 	if (unlikely(!mp))
280 		return ERR_PTR(-ENOMEM);
281 
282 	ret = hmdfs_alloc_msg_idr(peer, MSG_IDR_MESSAGE_ASYNC, mp);
283 	if (unlikely(ret)) {
284 		kfree(mp);
285 		return ERR_PTR(ret);
286 	}
287 
288 	mp->start = jiffies;
289 	peer_get(mp->head.peer);
290 	mp->resp.ret_code = -ETIME;
291 	INIT_DELAYED_WORK(&mp->d_work, async_request_cb_on_wakeup_fn);
292 	mp->wfired = false;
293 	mp->req = *req;
294 	return mp;
295 }
296 
297 /**
298  * hmdfs_send_async_request - sendout a async request
299  * @peer: target device node
300  * @req: request descriptor + necessary contexts
301  *
302  * Sendout a request synchronously and wait for its response asynchronously
303  * Return -ESHUTDOWN when the device node is unachievable
304  * Return -EAGAIN if the network is recovering
305  * Return -ENOMEM if out of memory
306  *
307  * Register g_async_req_callbacks to recv the response
308  */
hmdfs_send_async_request(struct hmdfs_peer * peer,const struct hmdfs_req * req)309 int hmdfs_send_async_request(struct hmdfs_peer *peer,
310 			     const struct hmdfs_req *req)
311 {
312 	int ret = 0;
313 	struct hmdfs_send_data msg;
314 	struct hmdfs_head_cmd head;
315 	struct hmdfs_msg_parasite *mp = NULL;
316 	size_t msg_len = req->data_len + sizeof(struct hmdfs_head_cmd);
317 	unsigned int timeout;
318 
319 	if (req->timeout == TIMEOUT_CONFIG)
320 		timeout = get_cmd_timeout(peer->sbi, req->operations.command);
321 	else
322 		timeout = req->timeout;
323 	if (timeout == TIMEOUT_UNINIT || timeout == TIMEOUT_NONE) {
324 		hmdfs_err("send msg %d with uninitialized/invalid timeout",
325 			  req->operations.command);
326 		return -EINVAL;
327 	}
328 
329 	if (!hmdfs_is_node_online(peer))
330 		return -EAGAIN;
331 
332 	mp = mp_alloc(peer, req);
333 	if (IS_ERR(mp))
334 		return PTR_ERR(mp);
335 	head.magic = HMDFS_MSG_MAGIC;
336 	head.version = HMDFS_VERSION;
337 	head.data_len = cpu_to_le32(msg_len);
338 	head.operations = mp->req.operations;
339 	head.msg_id = cpu_to_le32(mp->head.msg_id);
340 	head.reserved = 0;
341 	head.reserved1 = 0;
342 
343 	msg.head = &head;
344 	msg.head_len = sizeof(head);
345 	msg.data = mp->req.data;
346 	msg.len = mp->req.data_len;
347 	msg.sdesc_len = 0;
348 	msg.sdesc = NULL;
349 
350 	ret = hmdfs_sendmessage(peer, &msg);
351 	if (unlikely(ret)) {
352 		mp_put(mp);
353 		goto out;
354 	}
355 
356 	queue_delayed_work(peer->async_wq, &mp->d_work, timeout * HZ);
357 	/*
358 	 * The work may havn't been queued upon the arriving of it's response,
359 	 * resulting in meaningless waiting. So we use the membar to tell the
360 	 * recv thread if the work has been queued
361 	 */
362 	smp_store_release(&mp->wfired, true);
363 out:
364 	hmdfs_dec_msg_idr_process(peer);
365 	return ret;
366 }
367 
hmdfs_record_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)368 static int hmdfs_record_async_readdir(struct hmdfs_peer *con,
369 				      struct sendmsg_wait_queue *msg_wq)
370 {
371 	struct hmdfs_sb_info *sbi = con->sbi;
372 
373 	spin_lock(&sbi->async_readdir_msg_lock);
374 	if (sbi->async_readdir_prohibit) {
375 		spin_unlock(&sbi->async_readdir_msg_lock);
376 		return -EINTR;
377 	}
378 
379 	list_add(&msg_wq->async_msg, &sbi->async_readdir_msg_list);
380 	spin_unlock(&sbi->async_readdir_msg_lock);
381 
382 	return 0;
383 }
384 
hmdfs_untrack_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)385 static void hmdfs_untrack_async_readdir(struct hmdfs_peer *con,
386 					struct sendmsg_wait_queue *msg_wq)
387 {
388 	struct hmdfs_sb_info *sbi = con->sbi;
389 
390 	spin_lock(&sbi->async_readdir_msg_lock);
391 	list_del(&msg_wq->async_msg);
392 	spin_unlock(&sbi->async_readdir_msg_lock);
393 }
394 
hmdfs_sendmessage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)395 int hmdfs_sendmessage_request(struct hmdfs_peer *con,
396 			      struct hmdfs_send_command *sm)
397 {
398 	int time_left;
399 	int ret = 0;
400 	struct sendmsg_wait_queue *msg_wq = NULL;
401 	struct hmdfs_send_data msg;
402 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
403 	unsigned int timeout =
404 		get_cmd_timeout(con->sbi, sm->operations.command);
405 	struct hmdfs_head_cmd *head = NULL;
406 	bool dec = false;
407 
408 	if (!hmdfs_is_node_online(con)) {
409 		ret = -EAGAIN;
410 		goto free_filp;
411 	}
412 
413 	if (timeout == TIMEOUT_UNINIT) {
414 		hmdfs_err_ratelimited("send msg %d with uninitialized timeout",
415 				      sm->operations.command);
416 		ret = -EINVAL;
417 		goto free_filp;
418 	}
419 
420 	head = kzalloc(sizeof(struct hmdfs_head_cmd), GFP_KERNEL);
421 	if (!head) {
422 		ret = -ENOMEM;
423 		goto free_filp;
424 	}
425 
426 	sm->out_buf = NULL;
427 	head->magic = HMDFS_MSG_MAGIC;
428 	head->version = HMDFS_VERSION;
429 	head->operations = sm->operations;
430 	head->data_len = cpu_to_le32(outlen);
431 	head->ret_code = cpu_to_le32(sm->ret_code);
432 	head->reserved = 0;
433 	head->reserved1 = 0;
434 	if (timeout != TIMEOUT_NONE) {
435 		msg_wq = kzalloc(sizeof(*msg_wq), GFP_KERNEL);
436 		if (!msg_wq) {
437 			ret = -ENOMEM;
438 			goto free_filp;
439 		}
440 		ret = msg_init(con, msg_wq);
441 		if (ret) {
442 			kfree(msg_wq);
443 			msg_wq = NULL;
444 			goto free_filp;
445 		}
446 		dec = true;
447 		head->msg_id = cpu_to_le32(msg_wq->head.msg_id);
448 		if (sm->operations.command == F_ITERATE)
449 			msg_wq->recv_info.local_filp = sm->local_filp;
450 	}
451 	msg.head = head;
452 	msg.head_len = sizeof(struct hmdfs_head_cmd);
453 	msg.data = sm->data;
454 	msg.len = sm->len;
455 	msg.sdesc_len = 0;
456 	msg.sdesc = NULL;
457 	ret = hmdfs_sendmessage(con, &msg);
458 	if (ret) {
459 		hmdfs_err_ratelimited("send err sm->device_id, %lld, msg_id %u",
460 				      con->device_id, head->msg_id);
461 		goto free;
462 	}
463 
464 	if (timeout == TIMEOUT_NONE)
465 		goto free;
466 
467 	hmdfs_dec_msg_idr_process(con);
468 	dec = false;
469 
470 	if (sm->operations.command == F_ITERATE) {
471 		ret = hmdfs_record_async_readdir(con, msg_wq);
472 		if (ret) {
473 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_SPC);
474 			goto free;
475 		}
476 	}
477 
478 	time_left = wait_event_interruptible_timeout(
479 		msg_wq->response_q,
480 		(atomic_read(&msg_wq->valid) == MSG_Q_END_RECV), timeout * HZ);
481 
482 	if (sm->operations.command == F_ITERATE)
483 		hmdfs_untrack_async_readdir(con, msg_wq);
484 
485 	if (time_left == -ERESTARTSYS || time_left == 0) {
486 		hmdfs_err("timeout err sm->device_id %lld,  msg_id %d cmd %d",
487 			  con->device_id, head->msg_id,
488 			  head->operations.command);
489 		if (sm->operations.command == F_ITERATE)
490 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_NET);
491 		ret = -ETIME;
492 		hmdfs_client_resp_statis(con->sbi, sm->operations.command,
493 					 HMDFS_RESP_TIMEOUT, 0, 0);
494 		goto free;
495 	}
496 	sm->out_buf = msg_wq->buf;
497 	msg_wq->buf = NULL;
498 	sm->out_len = msg_wq->size - sizeof(struct hmdfs_head_cmd);
499 	ret = msg_wq->ret;
500 
501 free:
502 	if (msg_wq)
503 		msg_put(msg_wq);
504 	if (dec)
505 		hmdfs_dec_msg_idr_process(con);
506 	kfree(head);
507 	return ret;
508 
509 free_filp:
510 	if (sm->local_filp)
511 		fput(sm->local_filp);
512 	kfree(head);
513 	return ret;
514 }
515 
hmdfs_send_slice(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,struct slice_descriptor * sdesc,void * slice_buf)516 static int hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd,
517 			    struct slice_descriptor *sdesc, void *slice_buf)
518 {
519 	int ret;
520 	struct hmdfs_send_data msg;
521 	struct hmdfs_head_cmd head;
522 	int content_size = le32_to_cpu(sdesc->content_size);
523 	int msg_len = sizeof(struct hmdfs_head_cmd) + content_size +
524 		      sizeof(struct slice_descriptor);
525 
526 	head.magic = HMDFS_MSG_MAGIC;
527 	head.version = HMDFS_VERSION;
528 	head.operations = cmd->operations;
529 	head.operations.cmd_flag = C_RESPONSE;
530 	head.data_len = cpu_to_le32(msg_len);
531 	head.ret_code = cpu_to_le32(0);
532 	head.msg_id = cmd->msg_id;
533 	head.reserved = cmd->reserved;
534 	head.reserved1 = cmd->reserved1;
535 
536 	msg.head = &head;
537 	msg.head_len = sizeof(struct hmdfs_head_cmd);
538 	msg.sdesc = sdesc;
539 	msg.sdesc_len = le32_to_cpu(sizeof(struct slice_descriptor));
540 	msg.data = slice_buf;
541 	msg.len = content_size;
542 
543 	ret = hmdfs_sendmessage(con, &msg);
544 
545 	return ret;
546 }
547 
hmdfs_readfile_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,struct file * filp)548 int hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head,
549 			    struct file *filp)
550 {
551 	int ret;
552 	const unsigned int slice_size = PAGE_SIZE;
553 	char *slice_buf = NULL;
554 	loff_t file_offset = 0, file_size;
555 	ssize_t size;
556 	struct slice_descriptor sdesc;
557 	unsigned int slice_sn = 0;
558 
559 	if (!filp)
560 		return hmdfs_sendmessage_response(con, head, 0, NULL, 0);
561 
562 	sdesc.slice_size = cpu_to_le32(slice_size);
563 	file_size = i_size_read(file_inode(filp));
564 	file_size = round_up(file_size, slice_size);
565 	sdesc.num_slices = cpu_to_le32(file_size / slice_size);
566 
567 	slice_buf = kmalloc(slice_size, GFP_KERNEL);
568 	if (!slice_buf) {
569 		ret = -ENOMEM;
570 		goto out;
571 	}
572 
573 	while (1) {
574 		sdesc.slice_sn = cpu_to_le32(slice_sn++);
575 		size = kernel_read(filp, slice_buf, (size_t)slice_size,
576 				   &file_offset);
577 		if (IS_ERR_VALUE(size)) {
578 			ret = (int)size;
579 			goto out;
580 		}
581 		sdesc.content_size = cpu_to_le32(size);
582 		ret = hmdfs_send_slice(con, head, &sdesc, slice_buf);
583 		if (ret) {
584 			hmdfs_info("Cannot send file slice %d ",
585 				   le32_to_cpu(sdesc.slice_sn));
586 			break;
587 		}
588 		if (file_offset >= i_size_read(file_inode(filp)))
589 			break;
590 	}
591 
592 out:
593 	kfree(slice_buf);
594 	if (ret)
595 		hmdfs_sendmessage_response(con, head, 0, NULL, ret);
596 	return ret;
597 }
598 
asw_release(struct kref * kref)599 static void asw_release(struct kref *kref)
600 {
601 	struct hmdfs_async_work *asw = NULL;
602 	struct hmdfs_peer *peer = NULL;
603 
604 	asw = (struct hmdfs_async_work *)container_of(kref,
605 			struct hmdfs_msg_idr_head, ref);
606 	peer = asw->head.peer;
607 	idr_remove(&peer->msg_idr, asw->head.msg_id);
608 	spin_unlock(&peer->idr_lock);
609 	kfree(asw);
610 }
611 
asw_put(struct hmdfs_async_work * asw)612 void asw_put(struct hmdfs_async_work *asw)
613 {
614 	kref_put_lock(&asw->head.ref, asw_release, &asw->head.peer->idr_lock);
615 }
616 
hmdfs_recv_page_work_fn(struct work_struct * ptr)617 void hmdfs_recv_page_work_fn(struct work_struct *ptr)
618 {
619 	struct hmdfs_async_work *async_work =
620 		container_of(ptr, struct hmdfs_async_work, d_work.work);
621 
622 	hmdfs_client_resp_statis(async_work->head.peer->sbi,
623 					F_READPAGE, HMDFS_RESP_TIMEOUT, 0, 0);
624 	hmdfs_err_ratelimited("timeout and release page, msg_id:%u",
625 			      async_work->head.msg_id);
626 	asw_done(async_work);
627 }
628 
hmdfs_sendpage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)629 int hmdfs_sendpage_request(struct hmdfs_peer *con,
630 			   struct hmdfs_send_command *sm)
631 {
632 	int ret = 0;
633 	struct hmdfs_send_data msg;
634 	struct hmdfs_async_work *async_work = NULL;
635 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
636 	struct hmdfs_head_cmd head;
637 	unsigned int timeout;
638 	unsigned long start = jiffies;
639 
640 	WARN_ON(!sm->out_buf);
641 
642 	timeout = get_cmd_timeout(con->sbi, sm->operations.command);
643 	if (timeout == TIMEOUT_UNINIT) {
644 		hmdfs_err("send msg %d with uninitialized timeout",
645 			  sm->operations.command);
646 		ret = -EINVAL;
647 		goto unlock;
648 	}
649 
650 	if (!hmdfs_is_node_online(con)) {
651 		ret = -EAGAIN;
652 		goto unlock;
653 	}
654 
655 	memset(&head, 0, sizeof(head));
656 	head.magic = HMDFS_MSG_MAGIC;
657 	head.version = HMDFS_VERSION;
658 	head.operations = sm->operations;
659 	head.data_len = cpu_to_le32(outlen);
660 	head.ret_code = cpu_to_le32(sm->ret_code);
661 	head.reserved = 0;
662 	head.reserved1 = 0;
663 
664 	msg.head = &head;
665 	msg.head_len = sizeof(struct hmdfs_head_cmd);
666 	msg.data = sm->data;
667 	msg.len = sm->len;
668 	msg.sdesc_len = 0;
669 	msg.sdesc = NULL;
670 
671 	async_work = kzalloc(sizeof(*async_work), GFP_KERNEL);
672 	if (!async_work) {
673 		ret = -ENOMEM;
674 		goto unlock;
675 	}
676 	async_work->start = start;
677 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_PAGE, async_work);
678 	if (ret) {
679 		hmdfs_err("alloc msg_id failed, err %d", ret);
680 		goto unlock;
681 	}
682 	head.msg_id = cpu_to_le32(async_work->head.msg_id);
683 	async_work->page = sm->out_buf;
684 	asw_get(async_work);
685 	INIT_DELAYED_WORK(&async_work->d_work, hmdfs_recv_page_work_fn);
686 	ret = queue_delayed_work(con->async_wq, &async_work->d_work,
687 				 timeout * HZ);
688 	if (!ret) {
689 		hmdfs_err("queue_delayed_work failed, msg_id %u", head.msg_id);
690 		goto fail_and_unlock_page;
691 	}
692 	ret = hmdfs_sendmessage(con, &msg);
693 	if (ret) {
694 		hmdfs_err("send err sm->device_id, %lld, msg_id %u",
695 			  con->device_id, head.msg_id);
696 		if (!cancel_delayed_work(&async_work->d_work)) {
697 			hmdfs_err("cancel async work err");
698 			asw_put(async_work);
699 			hmdfs_dec_msg_idr_process(con);
700 			goto out;
701 		}
702 		goto fail_and_unlock_page;
703 	}
704 
705 	asw_put(async_work);
706 	hmdfs_dec_msg_idr_process(con);
707 	return 0;
708 
709 fail_and_unlock_page:
710 	asw_put(async_work);
711 	asw_done(async_work);
712 	hmdfs_dec_msg_idr_process(con);
713 	return ret;
714 unlock:
715 	kfree(async_work);
716 	unlock_page(sm->out_buf);
717 out:
718 	return ret;
719 }
720 
hmdfs_request_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)721 static void hmdfs_request_handle_sync(struct hmdfs_peer *con,
722 				      struct hmdfs_head_cmd *head, void *buf)
723 {
724 	unsigned long start = jiffies;
725 	const struct cred *saved_cred = hmdfs_override_fsids(true);
726 
727 	if (!saved_cred) {
728 		hmdfs_err("prepare cred failed!");
729 		kfree(buf);
730 		return;
731 	}
732 
733 	s_recv_callbacks[head->operations.command](con, head, buf);
734 	hmdfs_statistic(con->sbi, head->operations.command, jiffies - start);
735 
736 	kfree(buf);
737 
738 	hmdfs_revert_fsids(saved_cred);
739 }
740 
hmdfs_msg_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)741 static void hmdfs_msg_handle_sync(struct hmdfs_peer *con,
742 				 struct hmdfs_head_cmd *head, void *buf)
743 {
744 	const struct cred *old_cred = hmdfs_override_creds(con->sbi->cred);
745 
746 	/*
747 	 * Reuse PF_NPROC_EXCEEDED as an indication of hmdfs server context:
748 	 * 1. PF_NPROC_EXCEEDED will set by setreuid()/setuid()/setresuid(),
749 	 *    we assume kwork will not call theses syscalls.
750 	 * 2. PF_NPROC_EXCEEDED will be cleared by execv(), and kworker
751 	 *    will not call it.
752 	 */
753 	current->flags |= PF_NPROC_EXCEEDED;
754 	hmdfs_request_handle_sync(con, head, buf);
755 	current->flags &= ~PF_NPROC_EXCEEDED;
756 
757 	hmdfs_revert_creds(old_cred);
758 }
759 
760 
hmdfs_request_work_fn(struct work_struct * ptr)761 static void hmdfs_request_work_fn(struct work_struct *ptr)
762 {
763 	struct work_handler_desp *desp =
764 		container_of(ptr, struct work_handler_desp, work);
765 
766 	hmdfs_msg_handle_sync(desp->peer, desp->head, desp->buf);
767 	peer_put(desp->peer);
768 	kfree(desp->head);
769 	kfree(desp);
770 }
771 
hmdfs_msg_handle_async(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf,struct workqueue_struct * wq,void (* work_fn)(struct work_struct * ptr))772 static int hmdfs_msg_handle_async(struct hmdfs_peer *con,
773 				  struct hmdfs_head_cmd *head, void *buf,
774 				  struct workqueue_struct *wq,
775 				  void (*work_fn)(struct work_struct *ptr))
776 {
777 	struct work_handler_desp *desp = NULL;
778 	struct hmdfs_head_cmd *dup_head = NULL;
779 	int ret;
780 
781 	desp = kzalloc(sizeof(*desp), GFP_KERNEL);
782 	if (!desp) {
783 		ret = -ENOMEM;
784 		goto exit_desp;
785 	}
786 
787 	dup_head = kzalloc(sizeof(*dup_head), GFP_KERNEL);
788 	if (!dup_head) {
789 		ret = -ENOMEM;
790 		goto exit_desp;
791 	}
792 
793 	*dup_head = *head;
794 	desp->peer = con;
795 	desp->head = dup_head;
796 	desp->buf = buf;
797 	INIT_WORK(&desp->work, work_fn);
798 
799 	peer_get(con);
800 	queue_work(wq, &desp->work);
801 
802 	ret = 0;
803 	return ret;
804 
805 exit_desp:
806 	kfree(desp);
807 	return ret;
808 }
809 
hmdfs_request_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)810 static int hmdfs_request_recv(struct hmdfs_peer *con,
811 			      struct hmdfs_head_cmd *head, void *buf)
812 {
813 	int ret;
814 
815 	if (head->operations.command >= F_SIZE ||
816 	    !s_recv_callbacks[head->operations.command]) {
817 		ret = -EINVAL;
818 		hmdfs_err("NULL callback, command %d",
819 			  head->operations.command);
820 		goto out;
821 	}
822 
823 	switch (head->operations.command) {
824 	case F_OPEN:
825 	case F_RELEASE:
826 	case F_ITERATE:
827 	case F_MKDIR:
828 	case F_RMDIR:
829 	case F_CREATE:
830 	case F_UNLINK:
831 	case F_RENAME:
832 	case F_SETATTR:
833 	case F_STATFS:
834 	case F_CONNECT_REKEY:
835 	case F_DROP_PUSH:
836 	case F_GETATTR:
837 	case F_FSYNC:
838 	case F_SYNCFS:
839 	case F_GETXATTR:
840 	case F_SETXATTR:
841 	case F_LISTXATTR:
842 	case F_ATOMIC_OPEN:
843 		ret = hmdfs_msg_handle_async(con, head, buf, con->req_handle_wq,
844 					     hmdfs_request_work_fn);
845 		break;
846 	case F_WRITEPAGE:
847 	case F_READPAGE:
848 		hmdfs_msg_handle_sync(con, head, buf);
849 		ret = 0;
850 		break;
851 	default:
852 		hmdfs_err("Fatal! Unexpected request command %d",
853 			  head->operations.command);
854 		ret = -EINVAL;
855 	}
856 
857 out:
858 	return ret;
859 }
860 
hmdfs_response_wakeup(struct sendmsg_wait_queue * msg_info,__u32 ret_code,__u32 data_len,void * buf)861 void hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info,
862 			   __u32 ret_code, __u32 data_len, void *buf)
863 {
864 	msg_info->ret = ret_code;
865 	msg_info->size = data_len;
866 	msg_info->buf = buf;
867 	atomic_set(&msg_info->valid, MSG_Q_END_RECV);
868 	wake_up_interruptible(&msg_info->response_q);
869 }
870 
hmdfs_readfile_slice(struct sendmsg_wait_queue * msg_info,struct work_handler_desp * desp)871 static int hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info,
872 				struct work_handler_desp *desp)
873 {
874 	struct slice_descriptor *sdesc = desp->buf;
875 	void *slice_buf = sdesc + 1;
876 	struct file_recv_info *recv_info = &msg_info->recv_info;
877 	struct file *filp = recv_info->local_filp;
878 	loff_t offset;
879 	ssize_t written_size;
880 
881 	if (atomic_read(&recv_info->state) != FILE_RECV_PROCESS)
882 		return -EBUSY;
883 
884 	offset = le32_to_cpu(sdesc->slice_size) * le32_to_cpu(sdesc->slice_sn);
885 
886 	written_size = kernel_write(filp, slice_buf,
887 				    le32_to_cpu(sdesc->content_size), &offset);
888 	if (IS_ERR_VALUE(written_size)) {
889 		atomic_set(&recv_info->state, FILE_RECV_ERR_SPC);
890 		hmdfs_info("Fatal! Cannot store a file slice %d/%d, ret = %d",
891 			   le32_to_cpu(sdesc->slice_sn),
892 			   le32_to_cpu(sdesc->num_slices), (int)written_size);
893 		return (int)written_size;
894 	}
895 
896 	if (atomic_inc_return(&recv_info->local_fslices) >=
897 	    le32_to_cpu(sdesc->num_slices))
898 		atomic_set(&recv_info->state, FILE_RECV_SUCC);
899 	return 0;
900 }
901 
hmdfs_file_response_work_fn(struct work_struct * ptr)902 static void hmdfs_file_response_work_fn(struct work_struct *ptr)
903 {
904 	struct work_handler_desp *desp =
905 		container_of(ptr, struct work_handler_desp, work);
906 	struct sendmsg_wait_queue *msg_info = NULL;
907 	int ret;
908 	atomic_t *pstate = NULL;
909 	u8 cmd = desp->head->operations.command;
910 	const struct cred *old_cred =
911 		hmdfs_override_creds(desp->peer->sbi->cred);
912 
913 	msg_info = (struct sendmsg_wait_queue *)hmdfs_find_msg_head(desp->peer,
914 					le32_to_cpu(desp->head->msg_id));
915 	if (!msg_info || atomic_read(&msg_info->valid) != MSG_Q_SEND) {
916 		hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_DELAY,
917 					 0, 0);
918 		hmdfs_info("cannot find msg(id %d)",
919 			   le32_to_cpu(desp->head->msg_id));
920 		goto free;
921 	}
922 
923 	ret = le32_to_cpu(desp->head->ret_code);
924 	if (ret || le32_to_cpu(desp->head->data_len) == sizeof(*desp->head))
925 		goto wakeup;
926 	ret = hmdfs_readfile_slice(msg_info, desp);
927 	pstate = &msg_info->recv_info.state;
928 	if (ret || atomic_read(pstate) != FILE_RECV_PROCESS)
929 		goto wakeup;
930 	goto free;
931 
932 wakeup:
933 	hmdfs_response_wakeup(msg_info, ret, sizeof(struct hmdfs_head_cmd),
934 			      NULL);
935 	hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_NORMAL,
936 				 msg_info->start, jiffies);
937 free:
938 	if (msg_info)
939 		msg_put(msg_info);
940 	peer_put(desp->peer);
941 	hmdfs_revert_creds(old_cred);
942 
943 	kfree(desp->buf);
944 	kfree(desp->head);
945 	kfree(desp);
946 }
947 
hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite * mp)948 static void hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)
949 {
950 	/* We just cancel queued works */
951 	while (unlikely(!smp_load_acquire(&mp->wfired)))
952 		usleep_range(ACQUIRE_WFIRED_INTVAL_USEC_MIN,
953 			     ACQUIRE_WFIRED_INTVAL_USEC_MAX);
954 }
955 
hmdfs_response_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)956 int hmdfs_response_handle_sync(struct hmdfs_peer *con,
957 			       struct hmdfs_head_cmd *head, void *buf)
958 {
959 	struct sendmsg_wait_queue *msg_info = NULL;
960 	struct hmdfs_msg_parasite *mp = NULL;
961 	struct hmdfs_msg_idr_head *msg_head = NULL;
962 	u32 msg_id = le32_to_cpu(head->msg_id);
963 	bool woke = false;
964 	u8 cmd = head->operations.command;
965 
966 	msg_head = hmdfs_find_msg_head(con, msg_id);
967 	if (!msg_head)
968 		goto out;
969 
970 	switch (msg_head->type) {
971 	case MSG_IDR_MESSAGE_SYNC:
972 		msg_info = (struct sendmsg_wait_queue *)msg_head;
973 		if (atomic_read(&msg_info->valid) == MSG_Q_SEND) {
974 			hmdfs_response_wakeup(msg_info,
975 					      le32_to_cpu(head->ret_code),
976 					      le32_to_cpu(head->data_len), buf);
977 			hmdfs_client_resp_statis(con->sbi, cmd,
978 						 HMDFS_RESP_NORMAL,
979 						 msg_info->start, jiffies);
980 			woke = true;
981 		}
982 
983 		msg_put(msg_info);
984 		break;
985 	case MSG_IDR_MESSAGE_ASYNC:
986 		mp = (struct hmdfs_msg_parasite *)msg_head;
987 
988 		hmdfs_wait_mp_wfired(mp);
989 		if (cancel_delayed_work(&mp->d_work)) {
990 			mp->resp.out_buf = buf;
991 			mp->resp.out_len =
992 				le32_to_cpu(head->data_len) - sizeof(*head);
993 			mp->resp.ret_code = le32_to_cpu(head->ret_code);
994 			queue_delayed_work(con->async_wq, &mp->d_work, 0);
995 			hmdfs_client_resp_statis(con->sbi, cmd,
996 						 HMDFS_RESP_NORMAL, mp->start,
997 						 jiffies);
998 			woke = true;
999 		}
1000 		mp_put(mp);
1001 		break;
1002 	default:
1003 		hmdfs_err("receive incorrect msg type %d msg_id %d cmd %d",
1004 			  msg_head->type, msg_id, cmd);
1005 		break;
1006 	}
1007 
1008 	if (likely(woke))
1009 		return 0;
1010 out:
1011 	hmdfs_client_resp_statis(con->sbi, cmd, HMDFS_RESP_DELAY, 0, 0);
1012 	hmdfs_info("cannot find msg_id %d cmd %d", msg_id, cmd);
1013 	return -EINVAL;
1014 }
1015 
hmdfs_response_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)1016 static int hmdfs_response_recv(struct hmdfs_peer *con,
1017 			       struct hmdfs_head_cmd *head, void *buf)
1018 {
1019 	__u16 command = head->operations.command;
1020 	int ret;
1021 
1022 	if (command >= F_SIZE) {
1023 		ret = -EINVAL;
1024 		return ret;
1025 	}
1026 
1027 	switch (head->operations.command) {
1028 	case F_OPEN:
1029 	case F_RELEASE:
1030 	case F_READPAGE:
1031 	case F_WRITEPAGE:
1032 	case F_MKDIR:
1033 	case F_RMDIR:
1034 	case F_CREATE:
1035 	case F_UNLINK:
1036 	case F_RENAME:
1037 	case F_SETATTR:
1038 	case F_STATFS:
1039 	case F_CONNECT_REKEY:
1040 	case F_DROP_PUSH:
1041 	case F_GETATTR:
1042 	case F_FSYNC:
1043 	case F_SYNCFS:
1044 	case F_GETXATTR:
1045 	case F_SETXATTR:
1046 	case F_LISTXATTR:
1047 		ret = hmdfs_response_handle_sync(con, head, buf);
1048 		return ret;
1049 
1050 	case F_ITERATE:
1051 		ret = hmdfs_msg_handle_async(con, head, buf, con->async_wq,
1052 					     hmdfs_file_response_work_fn);
1053 		return ret;
1054 
1055 	default:
1056 		hmdfs_err("Fatal! Unexpected response command %d",
1057 			  head->operations.command);
1058 		ret = -EINVAL;
1059 		return ret;
1060 	}
1061 }
1062 
hmdfs_recv_mesg_callback(struct hmdfs_peer * con,void * head,void * buf)1063 void hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head,
1064 				     void *buf)
1065 {
1066 	struct hmdfs_head_cmd *hmdfs_head = (struct hmdfs_head_cmd *)head;
1067 
1068 	trace_hmdfs_recv_mesg_callback(hmdfs_head);
1069 
1070 	if (hmdfs_message_verify(con, hmdfs_head, buf) < 0) {
1071 		hmdfs_info("Message %d has been abandoned", hmdfs_head->msg_id);
1072 		goto out_err;
1073 	}
1074 
1075 	switch (hmdfs_head->operations.cmd_flag) {
1076 	case C_REQUEST:
1077 		if (hmdfs_request_recv(con, hmdfs_head, buf) < 0)
1078 			goto out_err;
1079 		break;
1080 
1081 	case C_RESPONSE:
1082 		if (hmdfs_response_recv(con, hmdfs_head, buf) < 0)
1083 			goto out_err;
1084 		break;
1085 
1086 	default:
1087 		hmdfs_err("Fatal! Unexpected msg cmd %d",
1088 			  hmdfs_head->operations.cmd_flag);
1089 		goto out_err;
1090 	}
1091 	return;
1092 
1093 out_err:
1094 	kfree(buf);
1095 }
1096 
hmdfs_wakeup_parasite(struct hmdfs_msg_parasite * mp)1097 void hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)
1098 {
1099 	hmdfs_wait_mp_wfired(mp);
1100 	if (!cancel_delayed_work(&mp->d_work))
1101 		hmdfs_err("cancel parasite work err msg_id=%d cmd=%d",
1102 			  mp->head.msg_id, mp->req.operations.command);
1103 	else
1104 		async_request_cb_on_wakeup_fn(&mp->d_work.work);
1105 }
1106 
hmdfs_wakeup_async_work(struct hmdfs_async_work * async_work)1107 void hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)
1108 {
1109 	if (!cancel_delayed_work(&async_work->d_work))
1110 		hmdfs_err("cancel async work err msg_id=%d",
1111 			  async_work->head.msg_id);
1112 	else
1113 		hmdfs_recv_page_work_fn(&async_work->d_work.work);
1114 }
1115