• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * fs/hmdfs/comm/socket_adapter.c
4  *
5  * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6  */
7 
8 #include "socket_adapter.h"
9 
10 #include <linux/file.h>
11 #include <linux/module.h>
12 #include <linux/namei.h>
13 #include <linux/net.h>
14 #include <linux/pagemap.h>
15 #include <net/sock.h>
16 
17 #include "authority/authentication.h"
18 #include "comm/device_node.h"
19 #include "hmdfs_client.h"
20 #include "hmdfs_server.h"
21 #include "hmdfs_trace.h"
22 #include "message_verify.h"
23 
24 #define ACQUIRE_WFIRED_INTVAL_USEC_MIN 10
25 #define ACQUIRE_WFIRED_INTVAL_USEC_MAX 30
26 
27 typedef void (*request_callback)(struct hmdfs_peer *, struct hmdfs_head_cmd *,
28 				 void *);
29 typedef void (*response_callback)(struct hmdfs_peer *,
30 				  struct sendmsg_wait_queue *, void *, size_t);
31 
32 static const request_callback s_recv_callbacks[F_SIZE] = {
33 	[F_OPEN] = hmdfs_server_open,
34 	[F_READPAGE] = hmdfs_server_readpage,
35 	[F_RELEASE] = hmdfs_server_release,
36 	[F_WRITEPAGE] = hmdfs_server_writepage,
37 	[F_ITERATE] = hmdfs_server_readdir,
38 	[F_MKDIR] = hmdfs_server_mkdir,
39 	[F_CREATE] = hmdfs_server_create,
40 	[F_RMDIR] = hmdfs_server_rmdir,
41 	[F_UNLINK] = hmdfs_server_unlink,
42 	[F_RENAME] = hmdfs_server_rename,
43 	[F_SETATTR] = hmdfs_server_setattr,
44 	[F_STATFS] = hmdfs_server_statfs,
45 	[F_DROP_PUSH] = hmdfs_server_get_drop_push,
46 	[F_GETATTR] = hmdfs_server_getattr,
47 	[F_FSYNC] = hmdfs_server_fsync,
48 	[F_SYNCFS] = hmdfs_server_syncfs,
49 	[F_GETXATTR] = hmdfs_server_getxattr,
50 	[F_SETXATTR] = hmdfs_server_setxattr,
51 	[F_LISTXATTR] = hmdfs_server_listxattr,
52 	[F_READPAGES] = hmdfs_server_readpages,
53 	[F_READPAGES_OPEN] = hmdfs_server_readpages_open,
54 	[F_ATOMIC_OPEN] = hmdfs_server_atomic_open,
55 };
56 
57 typedef void (*file_request_callback)(struct hmdfs_peer *,
58 				      struct hmdfs_send_command *);
59 
60 struct async_req_callbacks {
61 	void (*on_wakeup)(struct hmdfs_peer *peer, const struct hmdfs_req *req,
62 			  const struct hmdfs_resp *resp);
63 };
64 
65 static const struct async_req_callbacks g_async_req_callbacks[F_SIZE] = {
66 	[F_SYNCFS] = { .on_wakeup = hmdfs_recv_syncfs_cb },
67 	[F_WRITEPAGE] = { .on_wakeup = hmdfs_writepage_cb },
68 };
69 
msg_release(struct kref * kref)70 static void msg_release(struct kref *kref)
71 {
72 	struct sendmsg_wait_queue *msg_wq;
73 	struct hmdfs_peer *con;
74 
75 	msg_wq = (struct sendmsg_wait_queue *)container_of(kref,
76 			struct hmdfs_msg_idr_head, ref);
77 	con = msg_wq->head.peer;
78 	idr_remove(&con->msg_idr, msg_wq->head.msg_id);
79 	spin_unlock(&con->idr_lock);
80 
81 	kfree(msg_wq->buf);
82 	if (msg_wq->recv_info.local_filp)
83 		fput(msg_wq->recv_info.local_filp);
84 	kfree(msg_wq);
85 }
86 
87 // Always remember to find before put, and make sure con is avilable
msg_put(struct sendmsg_wait_queue * msg_wq)88 void msg_put(struct sendmsg_wait_queue *msg_wq)
89 {
90 	kref_put_lock(&msg_wq->head.ref, msg_release,
91 		      &msg_wq->head.peer->idr_lock);
92 }
93 
recv_info_init(struct file_recv_info * recv_info)94 static void recv_info_init(struct file_recv_info *recv_info)
95 {
96 	memset(recv_info, 0, sizeof(struct file_recv_info));
97 	atomic_set(&recv_info->local_fslices, 0);
98 	atomic_set(&recv_info->state, FILE_RECV_PROCESS);
99 }
100 
msg_init(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)101 static int msg_init(struct hmdfs_peer *con, struct sendmsg_wait_queue *msg_wq)
102 {
103 	int ret = 0;
104 	struct file_recv_info *recv_info = &msg_wq->recv_info;
105 
106 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_MESSAGE_SYNC, msg_wq);
107 	if (unlikely(ret))
108 		return ret;
109 
110 	atomic_set(&msg_wq->valid, MSG_Q_SEND);
111 	init_waitqueue_head(&msg_wq->response_q);
112 	recv_info_init(recv_info);
113 	msg_wq->start = jiffies;
114 	return 0;
115 }
116 
statistic_con_sb_dirty(struct hmdfs_peer * con,const struct hmdfs_cmd * op)117 static inline void statistic_con_sb_dirty(struct hmdfs_peer *con,
118 					  const struct hmdfs_cmd *op)
119 {
120 	if (op->command == F_WRITEPAGE && op->cmd_flag == C_REQUEST)
121 		atomic64_inc(&con->sb_dirty_count);
122 }
123 
hmdfs_sendmessage(struct hmdfs_peer * node,struct hmdfs_send_data * msg)124 int hmdfs_sendmessage(struct hmdfs_peer *node, struct hmdfs_send_data *msg)
125 {
126 	int ret = 0;
127 	struct connection *connect = NULL;
128 	struct tcp_handle *tcp = NULL;
129 	struct hmdfs_head_cmd *head = msg->head;
130 	const struct cred *old_cred;
131 
132 	if (!node) {
133 		hmdfs_err("node NULL when send cmd %d",
134 			  head->operations.command);
135 		ret = -EAGAIN;
136 		goto out_err;
137 	} else if (node->status != NODE_STAT_ONLINE) {
138 		hmdfs_err("device %llu OFFLINE %d when send cmd %d",
139 			  node->device_id, node->status,
140 			  head->operations.command);
141 		ret = -EAGAIN;
142 		goto out;
143 	}
144 
145 	old_cred = hmdfs_override_creds(node->sbi->system_cred);
146 
147 	do {
148 		connect = get_conn_impl(node, CONNECT_TYPE_TCP);
149 		if (!connect) {
150 			hmdfs_info_ratelimited(
151 				"device %llu no connection available when send cmd %d, get new session",
152 				node->device_id, head->operations.command);
153 			if (node->status != NODE_STAT_OFFLINE) {
154 				struct notify_param param;
155 
156 				memcpy(param.remote_cid, node->cid,
157 				       HMDFS_CID_SIZE);
158 				param.notify = NOTIFY_OFFLINE;
159 				param.fd = INVALID_SOCKET_FD;
160 				notify(node, &param);
161 			}
162 			ret = -EAGAIN;
163 			goto revert_cred;
164 		}
165 
166 		ret = connect->send_message(connect, msg);
167 		if (ret == -ESHUTDOWN) {
168 			hmdfs_info("device %llu send cmd %d message fail, connection stop",
169 				   node->device_id, head->operations.command);
170 			connect->status = CONNECT_STAT_STOP;
171 			tcp = connect->connect_handle;
172 			if (node->status != NODE_STAT_OFFLINE) {
173 				connection_get(connect);
174 				if (!queue_work(node->reget_conn_wq,
175 						&connect->reget_work))
176 					connection_put(connect);
177 			}
178 			connection_put(connect);
179 			/*
180 			 * node->status is OFFLINE can not ensure
181 			 * node_seq will be increased before
182 			 * hmdfs_sendmessage() returns.
183 			 */
184 			hmdfs_node_inc_evt_seq(node);
185 		} else {
186 			connection_put(connect);
187 			goto revert_cred;
188 		}
189 	} while (node->status != NODE_STAT_OFFLINE);
190 revert_cred:
191 	hmdfs_revert_creds(old_cred);
192 
193 	if (!ret)
194 		statistic_con_sb_dirty(node, &head->operations);
195 out:
196 	if (node->version == DFS_2_0 &&
197 	    head->operations.cmd_flag == C_REQUEST)
198 		hmdfs_client_snd_statis(node->sbi,
199 					head->operations.command, ret);
200 	else if (node->version == DFS_2_0 &&
201 		 head->operations.cmd_flag == C_RESPONSE)
202 		hmdfs_server_snd_statis(node->sbi,
203 					head->operations.command, ret);
204 out_err:
205 	return ret;
206 }
207 
hmdfs_sendmessage_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,__u32 data_len,void * buf,__u32 ret_code)208 int hmdfs_sendmessage_response(struct hmdfs_peer *con,
209 			       struct hmdfs_head_cmd *cmd, __u32 data_len,
210 			       void *buf, __u32 ret_code)
211 {
212 	int ret;
213 	struct hmdfs_send_data msg;
214 	struct hmdfs_head_cmd head;
215 
216 	head.magic = HMDFS_MSG_MAGIC;
217 	head.version = DFS_2_0;
218 	head.operations = cmd->operations;
219 	head.operations.cmd_flag = C_RESPONSE;
220 	head.data_len = cpu_to_le32(data_len + sizeof(struct hmdfs_head_cmd));
221 	head.ret_code = cpu_to_le32(ret_code);
222 	head.msg_id = cmd->msg_id;
223 	head.reserved = cmd->reserved;
224 	head.reserved1 = cmd->reserved1;
225 	msg.head = &head;
226 	msg.head_len = sizeof(struct hmdfs_head_cmd);
227 	msg.data = buf;
228 	msg.len = data_len;
229 	msg.sdesc = NULL;
230 	msg.sdesc_len = 0;
231 
232 	ret = hmdfs_sendmessage(con, &msg);
233 	return ret;
234 }
235 
mp_release(struct kref * kref)236 static void mp_release(struct kref *kref)
237 {
238 	struct hmdfs_msg_parasite *mp = NULL;
239 	struct hmdfs_peer *peer = NULL;
240 
241 	mp = (struct hmdfs_msg_parasite *)container_of(kref,
242 			struct hmdfs_msg_idr_head, ref);
243 	peer = mp->head.peer;
244 	idr_remove(&peer->msg_idr, mp->head.msg_id);
245 	spin_unlock(&peer->idr_lock);
246 
247 	peer_put(peer);
248 	kfree(mp->resp.out_buf);
249 	kfree(mp);
250 }
251 
mp_put(struct hmdfs_msg_parasite * mp)252 void mp_put(struct hmdfs_msg_parasite *mp)
253 {
254 	kref_put_lock(&mp->head.ref, mp_release, &mp->head.peer->idr_lock);
255 }
256 
async_request_cb_on_wakeup_fn(struct work_struct * w)257 static void async_request_cb_on_wakeup_fn(struct work_struct *w)
258 {
259 	struct hmdfs_msg_parasite *mp =
260 		container_of(w, struct hmdfs_msg_parasite, d_work.work);
261 	struct async_req_callbacks cbs;
262 	const struct cred *old_cred =
263 		hmdfs_override_creds(mp->head.peer->sbi->cred);
264 
265 	if (mp->resp.ret_code == -ETIME)
266 		hmdfs_client_resp_statis(mp->head.peer->sbi,
267 					 mp->req.operations.command,
268 					 HMDFS_RESP_TIMEOUT, 0, 0);
269 
270 	cbs = g_async_req_callbacks[mp->req.operations.command];
271 	if (cbs.on_wakeup)
272 		(*cbs.on_wakeup)(mp->head.peer, &mp->req, &mp->resp);
273 	mp_put(mp);
274 	hmdfs_revert_creds(old_cred);
275 }
276 
mp_alloc(struct hmdfs_peer * peer,const struct hmdfs_req * req)277 static struct hmdfs_msg_parasite *mp_alloc(struct hmdfs_peer *peer,
278 					   const struct hmdfs_req *req)
279 {
280 	struct hmdfs_msg_parasite *mp = kzalloc(sizeof(*mp), GFP_KERNEL);
281 	int ret;
282 
283 	if (unlikely(!mp))
284 		return ERR_PTR(-ENOMEM);
285 
286 	ret = hmdfs_alloc_msg_idr(peer, MSG_IDR_MESSAGE_ASYNC, mp);
287 	if (unlikely(ret)) {
288 		kfree(mp);
289 		return ERR_PTR(ret);
290 	}
291 
292 	mp->start = jiffies;
293 	peer_get(mp->head.peer);
294 	mp->resp.ret_code = -ETIME;
295 	INIT_DELAYED_WORK(&mp->d_work, async_request_cb_on_wakeup_fn);
296 	mp->wfired = false;
297 	mp->req = *req;
298 	return mp;
299 }
300 
301 /**
302  * hmdfs_send_async_request - sendout a async request
303  * @peer: target device node
304  * @req: request descriptor + necessary contexts
305  *
306  * Sendout a request synchronously and wait for its response asynchronously
307  * Return -ESHUTDOWN when the device node is unachievable
308  * Return -EAGAIN if the network is recovering
309  * Return -ENOMEM if out of memory
310  *
311  * Register g_async_req_callbacks to recv the response
312  */
hmdfs_send_async_request(struct hmdfs_peer * peer,const struct hmdfs_req * req)313 int hmdfs_send_async_request(struct hmdfs_peer *peer,
314 			     const struct hmdfs_req *req)
315 {
316 	int ret = 0;
317 	struct hmdfs_send_data msg;
318 	struct hmdfs_head_cmd head;
319 	struct hmdfs_msg_parasite *mp = NULL;
320 	size_t msg_len = req->data_len + sizeof(struct hmdfs_head_cmd);
321 	unsigned int timeout;
322 
323 	if (req->timeout == TIMEOUT_CONFIG)
324 		timeout = get_cmd_timeout(peer->sbi, req->operations.command);
325 	else
326 		timeout = req->timeout;
327 	if (timeout == TIMEOUT_UNINIT || timeout == TIMEOUT_NONE) {
328 		hmdfs_err("send msg %d with uninitialized/invalid timeout",
329 			  req->operations.command);
330 		return -EINVAL;
331 	}
332 
333 	if (!hmdfs_is_node_online(peer))
334 		return -EAGAIN;
335 
336 	mp = mp_alloc(peer, req);
337 	if (IS_ERR(mp))
338 		return PTR_ERR(mp);
339 	head.magic = HMDFS_MSG_MAGIC;
340 	head.version = DFS_2_0;
341 	head.data_len = cpu_to_le32(msg_len);
342 	head.operations = mp->req.operations;
343 	head.msg_id = cpu_to_le32(mp->head.msg_id);
344 	head.reserved = 0;
345 	head.reserved1 = 0;
346 
347 	msg.head = &head;
348 	msg.head_len = sizeof(head);
349 	msg.data = mp->req.data;
350 	msg.len = mp->req.data_len;
351 	msg.sdesc_len = 0;
352 	msg.sdesc = NULL;
353 
354 	ret = hmdfs_sendmessage(peer, &msg);
355 	if (unlikely(ret)) {
356 		mp_put(mp);
357 		goto out;
358 	}
359 
360 	queue_delayed_work(peer->async_wq, &mp->d_work, timeout * HZ);
361 	/*
362 	 * The work may havn't been queued upon the arriving of it's response,
363 	 * resulting in meaningless waiting. So we use the membar to tell the
364 	 * recv thread if the work has been queued
365 	 */
366 	smp_store_release(&mp->wfired, true);
367 out:
368 	hmdfs_dec_msg_idr_process(peer);
369 	return ret;
370 }
371 
hmdfs_record_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)372 static int hmdfs_record_async_readdir(struct hmdfs_peer *con,
373 				      struct sendmsg_wait_queue *msg_wq)
374 {
375 	struct hmdfs_sb_info *sbi = con->sbi;
376 
377 	spin_lock(&sbi->async_readdir_msg_lock);
378 	if (sbi->async_readdir_prohibit) {
379 		spin_unlock(&sbi->async_readdir_msg_lock);
380 		return -EINTR;
381 	}
382 
383 	list_add(&msg_wq->async_msg, &sbi->async_readdir_msg_list);
384 	spin_unlock(&sbi->async_readdir_msg_lock);
385 
386 	return 0;
387 }
388 
hmdfs_untrack_async_readdir(struct hmdfs_peer * con,struct sendmsg_wait_queue * msg_wq)389 static void hmdfs_untrack_async_readdir(struct hmdfs_peer *con,
390 					struct sendmsg_wait_queue *msg_wq)
391 {
392 	struct hmdfs_sb_info *sbi = con->sbi;
393 
394 	spin_lock(&sbi->async_readdir_msg_lock);
395 	list_del(&msg_wq->async_msg);
396 	spin_unlock(&sbi->async_readdir_msg_lock);
397 }
398 
hmdfs_sendmessage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)399 int hmdfs_sendmessage_request(struct hmdfs_peer *con,
400 			      struct hmdfs_send_command *sm)
401 {
402 	int time_left;
403 	int ret = 0;
404 	struct sendmsg_wait_queue *msg_wq = NULL;
405 	struct hmdfs_send_data msg;
406 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
407 	unsigned int timeout =
408 		get_cmd_timeout(con->sbi, sm->operations.command);
409 	struct hmdfs_head_cmd *head = NULL;
410 	bool dec = false;
411 
412 	if (!hmdfs_is_node_online(con))
413 		return -EAGAIN;
414 
415 	if (timeout == TIMEOUT_UNINIT) {
416 		hmdfs_err_ratelimited("send msg %d with uninitialized timeout",
417 				      sm->operations.command);
418 		return -EINVAL;
419 	}
420 
421 	head = kzalloc(sizeof(struct hmdfs_head_cmd), GFP_KERNEL);
422 	if (!head)
423 		return -ENOMEM;
424 
425 	sm->out_buf = NULL;
426 	head->magic = HMDFS_MSG_MAGIC;
427 	head->version = DFS_2_0;
428 	head->operations = sm->operations;
429 	head->data_len = cpu_to_le32(outlen);
430 	head->ret_code = cpu_to_le32(sm->ret_code);
431 	head->reserved = 0;
432 	head->reserved1 = 0;
433 	if (timeout != TIMEOUT_NONE) {
434 		msg_wq = kzalloc(sizeof(*msg_wq), GFP_KERNEL);
435 		if (!msg_wq) {
436 			ret = -ENOMEM;
437 			goto free;
438 		}
439 		ret = msg_init(con, msg_wq);
440 		if (ret) {
441 			kfree(msg_wq);
442 			msg_wq = NULL;
443 			goto free;
444 		}
445 		dec = true;
446 		head->msg_id = cpu_to_le32(msg_wq->head.msg_id);
447 		if (sm->operations.command == F_ITERATE)
448 			msg_wq->recv_info.local_filp = sm->local_filp;
449 	}
450 	msg.head = head;
451 	msg.head_len = sizeof(struct hmdfs_head_cmd);
452 	msg.data = sm->data;
453 	msg.len = sm->len;
454 	msg.sdesc_len = 0;
455 	msg.sdesc = NULL;
456 	ret = hmdfs_sendmessage(con, &msg);
457 	if (ret) {
458 		hmdfs_err_ratelimited("send err sm->device_id, %lld, msg_id %u",
459 				      con->device_id, head->msg_id);
460 		goto free;
461 	}
462 
463 	if (timeout == TIMEOUT_NONE)
464 		goto free;
465 
466 	hmdfs_dec_msg_idr_process(con);
467 	dec = false;
468 
469 	if (sm->operations.command == F_ITERATE) {
470 		ret = hmdfs_record_async_readdir(con, msg_wq);
471 		if (ret) {
472 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_SPC);
473 			goto free;
474 		}
475 	}
476 
477 	time_left = wait_event_interruptible_timeout(
478 		msg_wq->response_q,
479 		(atomic_read(&msg_wq->valid) == MSG_Q_END_RECV), timeout * HZ);
480 
481 	if (sm->operations.command == F_ITERATE)
482 		hmdfs_untrack_async_readdir(con, msg_wq);
483 
484 	if (time_left == -ERESTARTSYS || time_left == 0) {
485 		hmdfs_err("timeout err sm->device_id %lld,  msg_id %d cmd %d",
486 			  con->device_id, head->msg_id,
487 			  head->operations.command);
488 		if (sm->operations.command == F_ITERATE)
489 			atomic_set(&msg_wq->recv_info.state, FILE_RECV_ERR_NET);
490 		ret = -ETIME;
491 		hmdfs_client_resp_statis(con->sbi, sm->operations.command,
492 					 HMDFS_RESP_TIMEOUT, 0, 0);
493 		goto free;
494 	}
495 	sm->out_buf = msg_wq->buf;
496 	msg_wq->buf = NULL;
497 	sm->out_len = msg_wq->size - sizeof(struct hmdfs_head_cmd);
498 	ret = msg_wq->ret;
499 
500 free:
501 	if (msg_wq)
502 		msg_put(msg_wq);
503 	if (dec)
504 		hmdfs_dec_msg_idr_process(con);
505 	kfree(head);
506 	return ret;
507 }
508 
hmdfs_send_slice(struct hmdfs_peer * con,struct hmdfs_head_cmd * cmd,struct slice_descriptor * sdesc,void * slice_buf)509 static int hmdfs_send_slice(struct hmdfs_peer *con, struct hmdfs_head_cmd *cmd,
510 			    struct slice_descriptor *sdesc, void *slice_buf)
511 {
512 	int ret;
513 	struct hmdfs_send_data msg;
514 	struct hmdfs_head_cmd head;
515 	int content_size = le32_to_cpu(sdesc->content_size);
516 	int msg_len = sizeof(struct hmdfs_head_cmd) + content_size +
517 		      sizeof(struct slice_descriptor);
518 
519 	head.magic = HMDFS_MSG_MAGIC;
520 	head.version = DFS_2_0;
521 	head.operations = cmd->operations;
522 	head.operations.cmd_flag = C_RESPONSE;
523 	head.data_len = cpu_to_le32(msg_len);
524 	head.ret_code = cpu_to_le32(0);
525 	head.msg_id = cmd->msg_id;
526 	head.reserved = cmd->reserved;
527 	head.reserved1 = cmd->reserved1;
528 
529 	msg.head = &head;
530 	msg.head_len = sizeof(struct hmdfs_head_cmd);
531 	msg.sdesc = sdesc;
532 	msg.sdesc_len = le32_to_cpu(sizeof(struct slice_descriptor));
533 	msg.data = slice_buf;
534 	msg.len = content_size;
535 
536 	ret = hmdfs_sendmessage(con, &msg);
537 
538 	return ret;
539 }
540 
hmdfs_readfile_response(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,struct file * filp)541 int hmdfs_readfile_response(struct hmdfs_peer *con, struct hmdfs_head_cmd *head,
542 			    struct file *filp)
543 {
544 	int ret;
545 	const unsigned int slice_size = PAGE_SIZE;
546 	char *slice_buf = NULL;
547 	loff_t file_offset = 0, file_size;
548 	ssize_t size;
549 	struct slice_descriptor sdesc;
550 	unsigned int slice_sn = 0;
551 
552 	if (!filp)
553 		return hmdfs_sendmessage_response(con, head, 0, NULL, 0);
554 
555 	sdesc.slice_size = cpu_to_le32(slice_size);
556 	file_size = i_size_read(file_inode(filp));
557 	file_size = round_up(file_size, slice_size);
558 	sdesc.num_slices = cpu_to_le32(file_size / slice_size);
559 
560 	slice_buf = kmalloc(slice_size, GFP_KERNEL);
561 	if (!slice_buf) {
562 		ret = -ENOMEM;
563 		goto out;
564 	}
565 
566 	while (1) {
567 		sdesc.slice_sn = cpu_to_le32(slice_sn++);
568 		size = kernel_read(filp, slice_buf, (size_t)slice_size,
569 				   &file_offset);
570 		if (IS_ERR_VALUE(size)) {
571 			ret = (int)size;
572 			goto out;
573 		}
574 		sdesc.content_size = cpu_to_le32(size);
575 		ret = hmdfs_send_slice(con, head, &sdesc, slice_buf);
576 		if (ret) {
577 			hmdfs_info("Cannot send file slice %d ",
578 				   le32_to_cpu(sdesc.slice_sn));
579 			break;
580 		}
581 		if (file_offset >= i_size_read(file_inode(filp)))
582 			break;
583 	}
584 
585 out:
586 	kfree(slice_buf);
587 	if (ret)
588 		hmdfs_sendmessage_response(con, head, 0, NULL, ret);
589 	return ret;
590 }
591 
asw_release(struct kref * kref)592 static void asw_release(struct kref *kref)
593 {
594 	struct hmdfs_async_work *asw = NULL;
595 	struct hmdfs_peer *peer = NULL;
596 
597 	asw = (struct hmdfs_async_work *)container_of(kref,
598 			struct hmdfs_msg_idr_head, ref);
599 	peer = asw->head.peer;
600 	idr_remove(&peer->msg_idr, asw->head.msg_id);
601 	spin_unlock(&peer->idr_lock);
602 	kfree(asw);
603 }
604 
asw_put(struct hmdfs_async_work * asw)605 void asw_put(struct hmdfs_async_work *asw)
606 {
607 	kref_put_lock(&asw->head.ref, asw_release, &asw->head.peer->idr_lock);
608 }
609 
hmdfs_recv_page_work_fn(struct work_struct * ptr)610 void hmdfs_recv_page_work_fn(struct work_struct *ptr)
611 {
612 	struct hmdfs_async_work *async_work =
613 		container_of(ptr, struct hmdfs_async_work, d_work.work);
614 
615 	if (async_work->head.peer->version >= DFS_2_0)
616 		hmdfs_client_resp_statis(async_work->head.peer->sbi,
617 					 F_READPAGE, HMDFS_RESP_TIMEOUT, 0, 0);
618 	hmdfs_err_ratelimited("timeout and release page, msg_id:%u",
619 			      async_work->head.msg_id);
620 	asw_done(async_work);
621 }
622 
hmdfs_sendpage_request(struct hmdfs_peer * con,struct hmdfs_send_command * sm)623 int hmdfs_sendpage_request(struct hmdfs_peer *con,
624 			   struct hmdfs_send_command *sm)
625 {
626 	int ret = 0;
627 	struct hmdfs_send_data msg;
628 	struct hmdfs_async_work *async_work = NULL;
629 	size_t outlen = sm->len + sizeof(struct hmdfs_head_cmd);
630 	struct hmdfs_head_cmd head;
631 	unsigned int timeout;
632 	unsigned long start = jiffies;
633 
634 	WARN_ON(!sm->out_buf);
635 
636 	timeout = get_cmd_timeout(con->sbi, sm->operations.command);
637 	if (timeout == TIMEOUT_UNINIT) {
638 		hmdfs_err("send msg %d with uninitialized timeout",
639 			  sm->operations.command);
640 		ret = -EINVAL;
641 		goto unlock;
642 	}
643 
644 	if (!hmdfs_is_node_online(con)) {
645 		ret = -EAGAIN;
646 		goto unlock;
647 	}
648 
649 	memset(&head, 0, sizeof(head));
650 	head.magic = HMDFS_MSG_MAGIC;
651 	head.version = DFS_2_0;
652 	head.operations = sm->operations;
653 	head.data_len = cpu_to_le32(outlen);
654 	head.ret_code = cpu_to_le32(sm->ret_code);
655 	head.reserved = 0;
656 	head.reserved1 = 0;
657 
658 	msg.head = &head;
659 	msg.head_len = sizeof(struct hmdfs_head_cmd);
660 	msg.data = sm->data;
661 	msg.len = sm->len;
662 	msg.sdesc_len = 0;
663 	msg.sdesc = NULL;
664 
665 	async_work = kzalloc(sizeof(*async_work), GFP_KERNEL);
666 	if (!async_work) {
667 		ret = -ENOMEM;
668 		goto unlock;
669 	}
670 	async_work->start = start;
671 	ret = hmdfs_alloc_msg_idr(con, MSG_IDR_PAGE, async_work);
672 	if (ret) {
673 		hmdfs_err("alloc msg_id failed, err %d", ret);
674 		goto unlock;
675 	}
676 	head.msg_id = cpu_to_le32(async_work->head.msg_id);
677 	async_work->page = sm->out_buf;
678 	asw_get(async_work);
679 	INIT_DELAYED_WORK(&async_work->d_work, hmdfs_recv_page_work_fn);
680 	ret = queue_delayed_work(con->async_wq, &async_work->d_work,
681 				 timeout * HZ);
682 	if (!ret) {
683 		hmdfs_err("queue_delayed_work failed, msg_id %u", head.msg_id);
684 		goto fail_and_unlock_page;
685 	}
686 	ret = hmdfs_sendmessage(con, &msg);
687 	if (ret) {
688 		hmdfs_err("send err sm->device_id, %lld, msg_id %u",
689 			  con->device_id, head.msg_id);
690 		if (!cancel_delayed_work(&async_work->d_work)) {
691 			hmdfs_err("cancel async work err");
692 			asw_put(async_work);
693 			hmdfs_dec_msg_idr_process(con);
694 			goto out;
695 		}
696 		goto fail_and_unlock_page;
697 	}
698 
699 	asw_put(async_work);
700 	hmdfs_dec_msg_idr_process(con);
701 	return 0;
702 
703 fail_and_unlock_page:
704 	asw_put(async_work);
705 	asw_done(async_work);
706 	hmdfs_dec_msg_idr_process(con);
707 	return ret;
708 unlock:
709 	kfree(async_work);
710 	unlock_page(sm->out_buf);
711 out:
712 	return ret;
713 }
714 
hmdfs_request_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)715 static void hmdfs_request_handle_sync(struct hmdfs_peer *con,
716 				      struct hmdfs_head_cmd *head, void *buf)
717 {
718 	unsigned long start = jiffies;
719 	const struct cred *saved_cred = hmdfs_override_fsids(true);
720 
721 	if (!saved_cred) {
722 		hmdfs_err("prepare cred failed!");
723 		kfree(buf);
724 		return;
725 	}
726 
727 	s_recv_callbacks[head->operations.command](con, head, buf);
728 	hmdfs_statistic(con->sbi, head->operations.command, jiffies - start);
729 
730 	kfree(buf);
731 
732 	hmdfs_revert_fsids(saved_cred);
733 }
734 
hmdfs_msg_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)735 static void hmdfs_msg_handle_sync(struct hmdfs_peer *con,
736 				 struct hmdfs_head_cmd *head, void *buf)
737 {
738 	const struct cred *old_cred = hmdfs_override_creds(con->sbi->cred);
739 
740 	/*
741 	 * Reuse PF_NPROC_EXCEEDED as an indication of hmdfs server context:
742 	 * 1. PF_NPROC_EXCEEDED will set by setreuid()/setuid()/setresuid(),
743 	 *    we assume kwork will not call theses syscalls.
744 	 * 2. PF_NPROC_EXCEEDED will be cleared by execv(), and kworker
745 	 *    will not call it.
746 	 */
747 	current->flags |= PF_NPROC_EXCEEDED;
748 	hmdfs_request_handle_sync(con, head, buf);
749 	current->flags &= ~PF_NPROC_EXCEEDED;
750 
751 	hmdfs_revert_creds(old_cred);
752 }
753 
754 
hmdfs_request_work_fn(struct work_struct * ptr)755 static void hmdfs_request_work_fn(struct work_struct *ptr)
756 {
757 	struct work_handler_desp *desp =
758 		container_of(ptr, struct work_handler_desp, work);
759 
760 	hmdfs_msg_handle_sync(desp->peer, desp->head, desp->buf);
761 	peer_put(desp->peer);
762 	kfree(desp->head);
763 	kfree(desp);
764 }
765 
hmdfs_msg_handle_async(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf,struct workqueue_struct * wq,void (* work_fn)(struct work_struct * ptr))766 static int hmdfs_msg_handle_async(struct hmdfs_peer *con,
767 				  struct hmdfs_head_cmd *head, void *buf,
768 				  struct workqueue_struct *wq,
769 				  void (*work_fn)(struct work_struct *ptr))
770 {
771 	struct work_handler_desp *desp = NULL;
772 	struct hmdfs_head_cmd *dup_head = NULL;
773 	int ret;
774 
775 	desp = kzalloc(sizeof(*desp), GFP_KERNEL);
776 	if (!desp) {
777 		ret = -ENOMEM;
778 		goto exit_desp;
779 	}
780 
781 	dup_head = kzalloc(sizeof(*dup_head), GFP_KERNEL);
782 	if (!dup_head) {
783 		ret = -ENOMEM;
784 		goto exit_desp;
785 	}
786 
787 	*dup_head = *head;
788 	desp->peer = con;
789 	desp->head = dup_head;
790 	desp->buf = buf;
791 	INIT_WORK(&desp->work, work_fn);
792 
793 	peer_get(con);
794 	queue_work(wq, &desp->work);
795 
796 	ret = 0;
797 	return ret;
798 
799 exit_desp:
800 	kfree(desp);
801 	return ret;
802 }
803 
hmdfs_request_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)804 static int hmdfs_request_recv(struct hmdfs_peer *con,
805 			      struct hmdfs_head_cmd *head, void *buf)
806 {
807 	int ret;
808 
809 	if (head->operations.command >= F_SIZE ||
810 	    !s_recv_callbacks[head->operations.command]) {
811 		ret = -EINVAL;
812 		hmdfs_err("NULL callback, command %d",
813 			  head->operations.command);
814 		goto out;
815 	}
816 
817 	switch (head->operations.command) {
818 	case F_OPEN:
819 	case F_RELEASE:
820 	case F_ITERATE:
821 	case F_MKDIR:
822 	case F_RMDIR:
823 	case F_CREATE:
824 	case F_UNLINK:
825 	case F_RENAME:
826 	case F_SETATTR:
827 	case F_STATFS:
828 	case F_CONNECT_REKEY:
829 	case F_DROP_PUSH:
830 	case F_GETATTR:
831 	case F_FSYNC:
832 	case F_SYNCFS:
833 	case F_GETXATTR:
834 	case F_SETXATTR:
835 	case F_LISTXATTR:
836 	case F_READPAGES_OPEN:
837 	case F_ATOMIC_OPEN:
838 		ret = hmdfs_msg_handle_async(con, head, buf, con->req_handle_wq,
839 					     hmdfs_request_work_fn);
840 		break;
841 	case F_WRITEPAGE:
842 	case F_READPAGE:
843 	case F_READPAGES:
844 		hmdfs_msg_handle_sync(con, head, buf);
845 		ret = 0;
846 		break;
847 	default:
848 		hmdfs_err("Fatal! Unexpected request command %d",
849 			  head->operations.command);
850 		ret = -EINVAL;
851 	}
852 
853 out:
854 	return ret;
855 }
856 
hmdfs_response_wakeup(struct sendmsg_wait_queue * msg_info,__u32 ret_code,__u32 data_len,void * buf)857 void hmdfs_response_wakeup(struct sendmsg_wait_queue *msg_info,
858 			   __u32 ret_code, __u32 data_len, void *buf)
859 {
860 	msg_info->ret = ret_code;
861 	msg_info->size = data_len;
862 	msg_info->buf = buf;
863 	atomic_set(&msg_info->valid, MSG_Q_END_RECV);
864 	wake_up_interruptible(&msg_info->response_q);
865 }
866 
hmdfs_readfile_slice(struct sendmsg_wait_queue * msg_info,struct work_handler_desp * desp)867 static int hmdfs_readfile_slice(struct sendmsg_wait_queue *msg_info,
868 				struct work_handler_desp *desp)
869 {
870 	struct slice_descriptor *sdesc = desp->buf;
871 	void *slice_buf = sdesc + 1;
872 	struct file_recv_info *recv_info = &msg_info->recv_info;
873 	struct file *filp = recv_info->local_filp;
874 	loff_t offset;
875 	ssize_t written_size;
876 
877 	if (atomic_read(&recv_info->state) != FILE_RECV_PROCESS)
878 		return -EBUSY;
879 
880 	offset = le32_to_cpu(sdesc->slice_size) * le32_to_cpu(sdesc->slice_sn);
881 
882 	written_size = kernel_write(filp, slice_buf,
883 				    le32_to_cpu(sdesc->content_size), &offset);
884 	if (IS_ERR_VALUE(written_size)) {
885 		atomic_set(&recv_info->state, FILE_RECV_ERR_SPC);
886 		hmdfs_info("Fatal! Cannot store a file slice %d/%d, ret = %d",
887 			   le32_to_cpu(sdesc->slice_sn),
888 			   le32_to_cpu(sdesc->num_slices), (int)written_size);
889 		return (int)written_size;
890 	}
891 
892 	if (atomic_inc_return(&recv_info->local_fslices) >=
893 	    le32_to_cpu(sdesc->num_slices))
894 		atomic_set(&recv_info->state, FILE_RECV_SUCC);
895 	return 0;
896 }
897 
hmdfs_file_response_work_fn(struct work_struct * ptr)898 static void hmdfs_file_response_work_fn(struct work_struct *ptr)
899 {
900 	struct work_handler_desp *desp =
901 		container_of(ptr, struct work_handler_desp, work);
902 	struct sendmsg_wait_queue *msg_info = NULL;
903 	int ret;
904 	atomic_t *pstate = NULL;
905 	u8 cmd = desp->head->operations.command;
906 	const struct cred *old_cred =
907 		hmdfs_override_creds(desp->peer->sbi->cred);
908 
909 	msg_info = (struct sendmsg_wait_queue *)hmdfs_find_msg_head(desp->peer,
910 					le32_to_cpu(desp->head->msg_id));
911 	if (!msg_info || atomic_read(&msg_info->valid) != MSG_Q_SEND) {
912 		hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_DELAY,
913 					 0, 0);
914 		hmdfs_info("cannot find msg(id %d)",
915 			   le32_to_cpu(desp->head->msg_id));
916 		goto free;
917 	}
918 
919 	ret = le32_to_cpu(desp->head->ret_code);
920 	if (ret || le32_to_cpu(desp->head->data_len) == sizeof(*desp->head))
921 		goto wakeup;
922 	ret = hmdfs_readfile_slice(msg_info, desp);
923 	pstate = &msg_info->recv_info.state;
924 	if (ret || atomic_read(pstate) != FILE_RECV_PROCESS)
925 		goto wakeup;
926 	goto free;
927 
928 wakeup:
929 	hmdfs_response_wakeup(msg_info, ret, sizeof(struct hmdfs_head_cmd),
930 			      NULL);
931 	hmdfs_client_resp_statis(desp->peer->sbi, cmd, HMDFS_RESP_NORMAL,
932 				 msg_info->start, jiffies);
933 free:
934 	if (msg_info)
935 		msg_put(msg_info);
936 	peer_put(desp->peer);
937 	hmdfs_revert_creds(old_cred);
938 
939 	kfree(desp->buf);
940 	kfree(desp->head);
941 	kfree(desp);
942 }
943 
hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite * mp)944 static void hmdfs_wait_mp_wfired(struct hmdfs_msg_parasite *mp)
945 {
946 	/* We just cancel queued works */
947 	while (unlikely(!smp_load_acquire(&mp->wfired)))
948 		usleep_range(ACQUIRE_WFIRED_INTVAL_USEC_MIN,
949 			     ACQUIRE_WFIRED_INTVAL_USEC_MAX);
950 }
951 
hmdfs_response_handle_sync(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)952 int hmdfs_response_handle_sync(struct hmdfs_peer *con,
953 			       struct hmdfs_head_cmd *head, void *buf)
954 {
955 	struct sendmsg_wait_queue *msg_info = NULL;
956 	struct hmdfs_msg_parasite *mp = NULL;
957 	struct hmdfs_msg_idr_head *msg_head = NULL;
958 	u32 msg_id = le32_to_cpu(head->msg_id);
959 	bool woke = false;
960 	u8 cmd = head->operations.command;
961 
962 	msg_head = hmdfs_find_msg_head(con, msg_id);
963 	if (!msg_head)
964 		goto out;
965 
966 	switch (msg_head->type) {
967 	case MSG_IDR_MESSAGE_SYNC:
968 		msg_info = (struct sendmsg_wait_queue *)msg_head;
969 		if (atomic_read(&msg_info->valid) == MSG_Q_SEND) {
970 			hmdfs_response_wakeup(msg_info,
971 					      le32_to_cpu(head->ret_code),
972 					      le32_to_cpu(head->data_len), buf);
973 			hmdfs_client_resp_statis(con->sbi, cmd,
974 						 HMDFS_RESP_NORMAL,
975 						 msg_info->start, jiffies);
976 			woke = true;
977 		}
978 
979 		msg_put(msg_info);
980 		break;
981 	case MSG_IDR_MESSAGE_ASYNC:
982 		mp = (struct hmdfs_msg_parasite *)msg_head;
983 
984 		hmdfs_wait_mp_wfired(mp);
985 		if (cancel_delayed_work(&mp->d_work)) {
986 			mp->resp.out_buf = buf;
987 			mp->resp.out_len =
988 				le32_to_cpu(head->data_len) - sizeof(*head);
989 			mp->resp.ret_code = le32_to_cpu(head->ret_code);
990 			queue_delayed_work(con->async_wq, &mp->d_work, 0);
991 			hmdfs_client_resp_statis(con->sbi, cmd,
992 						 HMDFS_RESP_NORMAL, mp->start,
993 						 jiffies);
994 			woke = true;
995 		}
996 		mp_put(mp);
997 		break;
998 	default:
999 		hmdfs_err("receive incorrect msg type %d msg_id %d cmd %d",
1000 			  msg_head->type, msg_id, cmd);
1001 		break;
1002 	}
1003 
1004 	if (likely(woke))
1005 		return 0;
1006 out:
1007 	hmdfs_client_resp_statis(con->sbi, cmd, HMDFS_RESP_DELAY, 0, 0);
1008 	hmdfs_info("cannot find msg_id %d cmd %d", msg_id, cmd);
1009 	return -EINVAL;
1010 }
1011 
hmdfs_response_recv(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,void * buf)1012 static int hmdfs_response_recv(struct hmdfs_peer *con,
1013 			       struct hmdfs_head_cmd *head, void *buf)
1014 {
1015 	__u16 command = head->operations.command;
1016 	int ret;
1017 
1018 	if (command >= F_SIZE) {
1019 		ret = -EINVAL;
1020 		return ret;
1021 	}
1022 
1023 	switch (head->operations.command) {
1024 	case F_OPEN:
1025 	case F_RELEASE:
1026 	case F_READPAGE:
1027 	case F_WRITEPAGE:
1028 	case F_MKDIR:
1029 	case F_RMDIR:
1030 	case F_CREATE:
1031 	case F_UNLINK:
1032 	case F_RENAME:
1033 	case F_SETATTR:
1034 	case F_STATFS:
1035 	case F_CONNECT_REKEY:
1036 	case F_DROP_PUSH:
1037 	case F_GETATTR:
1038 	case F_FSYNC:
1039 	case F_SYNCFS:
1040 	case F_GETXATTR:
1041 	case F_SETXATTR:
1042 	case F_LISTXATTR:
1043 		ret = hmdfs_response_handle_sync(con, head, buf);
1044 		return ret;
1045 
1046 	case F_ITERATE:
1047 		ret = hmdfs_msg_handle_async(con, head, buf, con->async_wq,
1048 					     hmdfs_file_response_work_fn);
1049 		return ret;
1050 
1051 	default:
1052 		hmdfs_err("Fatal! Unexpected response command %d",
1053 			  head->operations.command);
1054 		ret = -EINVAL;
1055 		return ret;
1056 	}
1057 }
1058 
hmdfs_recv_mesg_callback(struct hmdfs_peer * con,void * head,void * buf)1059 static void hmdfs_recv_mesg_callback(struct hmdfs_peer *con, void *head,
1060 				     void *buf)
1061 {
1062 	struct hmdfs_head_cmd *hmdfs_head = (struct hmdfs_head_cmd *)head;
1063 
1064 	trace_hmdfs_recv_mesg_callback(hmdfs_head);
1065 
1066 	if (hmdfs_message_verify(con, hmdfs_head, buf) < 0) {
1067 		hmdfs_info("Message %d has been abandoned", hmdfs_head->msg_id);
1068 		goto out_err;
1069 	}
1070 
1071 	switch (hmdfs_head->operations.cmd_flag) {
1072 	case C_REQUEST:
1073 		if (hmdfs_request_recv(con, hmdfs_head, buf) < 0)
1074 			goto out_err;
1075 		break;
1076 
1077 	case C_RESPONSE:
1078 		if (hmdfs_response_recv(con, hmdfs_head, buf) < 0)
1079 			goto out_err;
1080 		break;
1081 
1082 	default:
1083 		hmdfs_err("Fatal! Unexpected msg cmd %d",
1084 			  hmdfs_head->operations.cmd_flag);
1085 		break;
1086 	}
1087 	return;
1088 
1089 out_err:
1090 	kfree(buf);
1091 }
1092 
hmdfs_recv_page_callback(struct hmdfs_peer * con,struct hmdfs_head_cmd * head,int err,void * data)1093 static inline void hmdfs_recv_page_callback(struct hmdfs_peer *con,
1094 					    struct hmdfs_head_cmd *head,
1095 					    int err, void *data)
1096 {
1097 	if (head->operations.command == F_READPAGE)
1098 		hmdfs_client_recv_readpage(head, err, data);
1099 }
1100 
1101 static const struct connection_operations conn_operations[] = {
1102 	[PROTOCOL_VERSION] = {
1103 		.recvmsg = hmdfs_recv_mesg_callback,
1104 		.recvpage = hmdfs_recv_page_callback,
1105 		/* remote device operations */
1106 		.remote_file_fops =
1107 			&hmdfs_dev_file_fops_remote,
1108 		.remote_file_iops =
1109 			&hmdfs_dev_file_iops_remote,
1110 		.remote_file_aops =
1111 			&hmdfs_dev_file_aops_remote,
1112 		.remote_unlink =
1113 			hmdfs_dev_unlink_from_con,
1114 		.remote_readdir =
1115 			hmdfs_dev_readdir_from_con,
1116 	}
1117 };
1118 
hmdfs_get_peer_operation(__u8 version)1119 const struct connection_operations *hmdfs_get_peer_operation(__u8 version)
1120 {
1121 	if (version <= INVALID_VERSION || version >= MAX_VERSION)
1122 		return NULL;
1123 
1124 	if (version <= USERSPACE_MAX_VER)
1125 		return &(conn_operations[USERDFS_VERSION]);
1126 	else
1127 		return &(conn_operations[PROTOCOL_VERSION]);
1128 }
1129 
hmdfs_wakeup_parasite(struct hmdfs_msg_parasite * mp)1130 void hmdfs_wakeup_parasite(struct hmdfs_msg_parasite *mp)
1131 {
1132 	hmdfs_wait_mp_wfired(mp);
1133 	if (!cancel_delayed_work(&mp->d_work))
1134 		hmdfs_err("cancel parasite work err msg_id=%d cmd=%d",
1135 			  mp->head.msg_id, mp->req.operations.command);
1136 	else
1137 		async_request_cb_on_wakeup_fn(&mp->d_work.work);
1138 }
1139 
hmdfs_wakeup_async_work(struct hmdfs_async_work * async_work)1140 void hmdfs_wakeup_async_work(struct hmdfs_async_work *async_work)
1141 {
1142 	if (!cancel_delayed_work(&async_work->d_work))
1143 		hmdfs_err("cancel async work err msg_id=%d",
1144 			  async_work->head.msg_id);
1145 	else
1146 		hmdfs_recv_page_work_fn(&async_work->d_work.work);
1147 }
1148