1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14
15 #include "super.h"
16 #include "mds_client.h"
17
18 #include <linux/ceph/ceph_features.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/pagelist.h>
22 #include <linux/ceph/auth.h>
23 #include <linux/ceph/debugfs.h>
24
25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27 /*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage. Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid. If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
52 struct ceph_reconnect_state {
53 struct ceph_mds_session *session;
54 int nr_caps, nr_realms;
55 struct ceph_pagelist *pagelist;
56 unsigned msg_version;
57 bool allow_multi;
58 };
59
60 static void __wake_requests(struct ceph_mds_client *mdsc,
61 struct list_head *head);
62 static void ceph_cap_release_work(struct work_struct *work);
63 static void ceph_cap_reclaim_work(struct work_struct *work);
64
65 static const struct ceph_connection_operations mds_con_ops;
66
67
68 /*
69 * mds reply parsing
70 */
71
parse_reply_info_quota(void ** p,void * end,struct ceph_mds_reply_info_in * info)72 static int parse_reply_info_quota(void **p, void *end,
73 struct ceph_mds_reply_info_in *info)
74 {
75 u8 struct_v, struct_compat;
76 u32 struct_len;
77
78 ceph_decode_8_safe(p, end, struct_v, bad);
79 ceph_decode_8_safe(p, end, struct_compat, bad);
80 /* struct_v is expected to be >= 1. we only
81 * understand encoding with struct_compat == 1. */
82 if (!struct_v || struct_compat != 1)
83 goto bad;
84 ceph_decode_32_safe(p, end, struct_len, bad);
85 ceph_decode_need(p, end, struct_len, bad);
86 end = *p + struct_len;
87 ceph_decode_64_safe(p, end, info->max_bytes, bad);
88 ceph_decode_64_safe(p, end, info->max_files, bad);
89 *p = end;
90 return 0;
91 bad:
92 return -EIO;
93 }
94
95 /*
96 * parse individual inode info
97 */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,u64 features)98 static int parse_reply_info_in(void **p, void *end,
99 struct ceph_mds_reply_info_in *info,
100 u64 features)
101 {
102 int err = 0;
103 u8 struct_v = 0;
104
105 if (features == (u64)-1) {
106 u32 struct_len;
107 u8 struct_compat;
108 ceph_decode_8_safe(p, end, struct_v, bad);
109 ceph_decode_8_safe(p, end, struct_compat, bad);
110 /* struct_v is expected to be >= 1. we only understand
111 * encoding with struct_compat == 1. */
112 if (!struct_v || struct_compat != 1)
113 goto bad;
114 ceph_decode_32_safe(p, end, struct_len, bad);
115 ceph_decode_need(p, end, struct_len, bad);
116 end = *p + struct_len;
117 }
118
119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120 info->in = *p;
121 *p += sizeof(struct ceph_mds_reply_inode) +
122 sizeof(*info->in->fragtree.splits) *
123 le32_to_cpu(info->in->fragtree.nsplits);
124
125 ceph_decode_32_safe(p, end, info->symlink_len, bad);
126 ceph_decode_need(p, end, info->symlink_len, bad);
127 info->symlink = *p;
128 *p += info->symlink_len;
129
130 ceph_decode_copy_safe(p, end, &info->dir_layout,
131 sizeof(info->dir_layout), bad);
132 ceph_decode_32_safe(p, end, info->xattr_len, bad);
133 ceph_decode_need(p, end, info->xattr_len, bad);
134 info->xattr_data = *p;
135 *p += info->xattr_len;
136
137 if (features == (u64)-1) {
138 /* inline data */
139 ceph_decode_64_safe(p, end, info->inline_version, bad);
140 ceph_decode_32_safe(p, end, info->inline_len, bad);
141 ceph_decode_need(p, end, info->inline_len, bad);
142 info->inline_data = *p;
143 *p += info->inline_len;
144 /* quota */
145 err = parse_reply_info_quota(p, end, info);
146 if (err < 0)
147 goto out_bad;
148 /* pool namespace */
149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150 if (info->pool_ns_len > 0) {
151 ceph_decode_need(p, end, info->pool_ns_len, bad);
152 info->pool_ns_data = *p;
153 *p += info->pool_ns_len;
154 }
155
156 /* btime */
157 ceph_decode_need(p, end, sizeof(info->btime), bad);
158 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160 /* change attribute */
161 ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163 /* dir pin */
164 if (struct_v >= 2) {
165 ceph_decode_32_safe(p, end, info->dir_pin, bad);
166 } else {
167 info->dir_pin = -ENODATA;
168 }
169
170 /* snapshot birth time, remains zero for v<=2 */
171 if (struct_v >= 3) {
172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173 ceph_decode_copy(p, &info->snap_btime,
174 sizeof(info->snap_btime));
175 } else {
176 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177 }
178
179 *p = end;
180 } else {
181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182 ceph_decode_64_safe(p, end, info->inline_version, bad);
183 ceph_decode_32_safe(p, end, info->inline_len, bad);
184 ceph_decode_need(p, end, info->inline_len, bad);
185 info->inline_data = *p;
186 *p += info->inline_len;
187 } else
188 info->inline_version = CEPH_INLINE_NONE;
189
190 if (features & CEPH_FEATURE_MDS_QUOTA) {
191 err = parse_reply_info_quota(p, end, info);
192 if (err < 0)
193 goto out_bad;
194 } else {
195 info->max_bytes = 0;
196 info->max_files = 0;
197 }
198
199 info->pool_ns_len = 0;
200 info->pool_ns_data = NULL;
201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203 if (info->pool_ns_len > 0) {
204 ceph_decode_need(p, end, info->pool_ns_len, bad);
205 info->pool_ns_data = *p;
206 *p += info->pool_ns_len;
207 }
208 }
209
210 if (features & CEPH_FEATURE_FS_BTIME) {
211 ceph_decode_need(p, end, sizeof(info->btime), bad);
212 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
213 ceph_decode_64_safe(p, end, info->change_attr, bad);
214 }
215
216 info->dir_pin = -ENODATA;
217 /* info->snap_btime remains zero */
218 }
219 return 0;
220 bad:
221 err = -EIO;
222 out_bad:
223 return err;
224 }
225
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_dirfrag ** dirfrag,u64 features)226 static int parse_reply_info_dir(void **p, void *end,
227 struct ceph_mds_reply_dirfrag **dirfrag,
228 u64 features)
229 {
230 if (features == (u64)-1) {
231 u8 struct_v, struct_compat;
232 u32 struct_len;
233 ceph_decode_8_safe(p, end, struct_v, bad);
234 ceph_decode_8_safe(p, end, struct_compat, bad);
235 /* struct_v is expected to be >= 1. we only understand
236 * encoding whose struct_compat == 1. */
237 if (!struct_v || struct_compat != 1)
238 goto bad;
239 ceph_decode_32_safe(p, end, struct_len, bad);
240 ceph_decode_need(p, end, struct_len, bad);
241 end = *p + struct_len;
242 }
243
244 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245 *dirfrag = *p;
246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247 if (unlikely(*p > end))
248 goto bad;
249 if (features == (u64)-1)
250 *p = end;
251 return 0;
252 bad:
253 return -EIO;
254 }
255
parse_reply_info_lease(void ** p,void * end,struct ceph_mds_reply_lease ** lease,u64 features)256 static int parse_reply_info_lease(void **p, void *end,
257 struct ceph_mds_reply_lease **lease,
258 u64 features)
259 {
260 if (features == (u64)-1) {
261 u8 struct_v, struct_compat;
262 u32 struct_len;
263 ceph_decode_8_safe(p, end, struct_v, bad);
264 ceph_decode_8_safe(p, end, struct_compat, bad);
265 /* struct_v is expected to be >= 1. we only understand
266 * encoding whose struct_compat == 1. */
267 if (!struct_v || struct_compat != 1)
268 goto bad;
269 ceph_decode_32_safe(p, end, struct_len, bad);
270 ceph_decode_need(p, end, struct_len, bad);
271 end = *p + struct_len;
272 }
273
274 ceph_decode_need(p, end, sizeof(**lease), bad);
275 *lease = *p;
276 *p += sizeof(**lease);
277 if (features == (u64)-1)
278 *p = end;
279 return 0;
280 bad:
281 return -EIO;
282 }
283
284 /*
285 * parse a normal reply, which may contain a (dir+)dentry and/or a
286 * target inode.
287 */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)288 static int parse_reply_info_trace(void **p, void *end,
289 struct ceph_mds_reply_info_parsed *info,
290 u64 features)
291 {
292 int err;
293
294 if (info->head->is_dentry) {
295 err = parse_reply_info_in(p, end, &info->diri, features);
296 if (err < 0)
297 goto out_bad;
298
299 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300 if (err < 0)
301 goto out_bad;
302
303 ceph_decode_32_safe(p, end, info->dname_len, bad);
304 ceph_decode_need(p, end, info->dname_len, bad);
305 info->dname = *p;
306 *p += info->dname_len;
307
308 err = parse_reply_info_lease(p, end, &info->dlease, features);
309 if (err < 0)
310 goto out_bad;
311 }
312
313 if (info->head->is_target) {
314 err = parse_reply_info_in(p, end, &info->targeti, features);
315 if (err < 0)
316 goto out_bad;
317 }
318
319 if (unlikely(*p != end))
320 goto bad;
321 return 0;
322
323 bad:
324 err = -EIO;
325 out_bad:
326 pr_err("problem parsing mds trace %d\n", err);
327 return err;
328 }
329
330 /*
331 * parse readdir results
332 */
parse_reply_info_readdir(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)333 static int parse_reply_info_readdir(void **p, void *end,
334 struct ceph_mds_reply_info_parsed *info,
335 u64 features)
336 {
337 u32 num, i = 0;
338 int err;
339
340 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341 if (err < 0)
342 goto out_bad;
343
344 ceph_decode_need(p, end, sizeof(num) + 2, bad);
345 num = ceph_decode_32(p);
346 {
347 u16 flags = ceph_decode_16(p);
348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
352 }
353 if (num == 0)
354 goto done;
355
356 BUG_ON(!info->dir_entries);
357 if ((unsigned long)(info->dir_entries + num) >
358 (unsigned long)info->dir_entries + info->dir_buf_size) {
359 pr_err("dir contents are larger than expected\n");
360 WARN_ON(1);
361 goto bad;
362 }
363
364 info->dir_nr = num;
365 while (num) {
366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
367 /* dentry */
368 ceph_decode_32_safe(p, end, rde->name_len, bad);
369 ceph_decode_need(p, end, rde->name_len, bad);
370 rde->name = *p;
371 *p += rde->name_len;
372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
373
374 /* dentry lease */
375 err = parse_reply_info_lease(p, end, &rde->lease, features);
376 if (err)
377 goto out_bad;
378 /* inode */
379 err = parse_reply_info_in(p, end, &rde->inode, features);
380 if (err < 0)
381 goto out_bad;
382 /* ceph_readdir_prepopulate() will update it */
383 rde->offset = 0;
384 i++;
385 num--;
386 }
387
388 done:
389 /* Skip over any unrecognized fields */
390 *p = end;
391 return 0;
392
393 bad:
394 err = -EIO;
395 out_bad:
396 pr_err("problem parsing dir contents %d\n", err);
397 return err;
398 }
399
400 /*
401 * parse fcntl F_GETLK results
402 */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)403 static int parse_reply_info_filelock(void **p, void *end,
404 struct ceph_mds_reply_info_parsed *info,
405 u64 features)
406 {
407 if (*p + sizeof(*info->filelock_reply) > end)
408 goto bad;
409
410 info->filelock_reply = *p;
411
412 /* Skip over any unrecognized fields */
413 *p = end;
414 return 0;
415 bad:
416 return -EIO;
417 }
418
419
420 #if BITS_PER_LONG == 64
421
422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1)
423
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)424 static int ceph_parse_deleg_inos(void **p, void *end,
425 struct ceph_mds_session *s)
426 {
427 u32 sets;
428
429 ceph_decode_32_safe(p, end, sets, bad);
430 dout("got %u sets of delegated inodes\n", sets);
431 while (sets--) {
432 u64 start, len, ino;
433
434 ceph_decode_64_safe(p, end, start, bad);
435 ceph_decode_64_safe(p, end, len, bad);
436
437 /* Don't accept a delegation of system inodes */
438 if (start < CEPH_INO_SYSTEM_BASE) {
439 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
440 start, len);
441 continue;
442 }
443 while (len--) {
444 int err = xa_insert(&s->s_delegated_inos, ino = start++,
445 DELEGATED_INO_AVAILABLE,
446 GFP_KERNEL);
447 if (!err) {
448 dout("added delegated inode 0x%llx\n",
449 start - 1);
450 } else if (err == -EBUSY) {
451 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
452 start - 1);
453 } else {
454 return err;
455 }
456 }
457 }
458 return 0;
459 bad:
460 return -EIO;
461 }
462
ceph_get_deleg_ino(struct ceph_mds_session * s)463 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
464 {
465 unsigned long ino;
466 void *val;
467
468 xa_for_each(&s->s_delegated_inos, ino, val) {
469 val = xa_erase(&s->s_delegated_inos, ino);
470 if (val == DELEGATED_INO_AVAILABLE)
471 return ino;
472 }
473 return 0;
474 }
475
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)476 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
477 {
478 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
479 GFP_KERNEL);
480 }
481 #else /* BITS_PER_LONG == 64 */
482 /*
483 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
484 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
485 * and bottom words?
486 */
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)487 static int ceph_parse_deleg_inos(void **p, void *end,
488 struct ceph_mds_session *s)
489 {
490 u32 sets;
491
492 ceph_decode_32_safe(p, end, sets, bad);
493 if (sets)
494 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
495 return 0;
496 bad:
497 return -EIO;
498 }
499
ceph_get_deleg_ino(struct ceph_mds_session * s)500 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
501 {
502 return 0;
503 }
504
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)505 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
506 {
507 return 0;
508 }
509 #endif /* BITS_PER_LONG == 64 */
510
511 /*
512 * parse create results
513 */
parse_reply_info_create(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)514 static int parse_reply_info_create(void **p, void *end,
515 struct ceph_mds_reply_info_parsed *info,
516 u64 features, struct ceph_mds_session *s)
517 {
518 int ret;
519
520 if (features == (u64)-1 ||
521 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
522 if (*p == end) {
523 /* Malformed reply? */
524 info->has_create_ino = false;
525 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
526 u8 struct_v, struct_compat;
527 u32 len;
528
529 info->has_create_ino = true;
530 ceph_decode_8_safe(p, end, struct_v, bad);
531 ceph_decode_8_safe(p, end, struct_compat, bad);
532 ceph_decode_32_safe(p, end, len, bad);
533 ceph_decode_64_safe(p, end, info->ino, bad);
534 ret = ceph_parse_deleg_inos(p, end, s);
535 if (ret)
536 return ret;
537 } else {
538 /* legacy */
539 ceph_decode_64_safe(p, end, info->ino, bad);
540 info->has_create_ino = true;
541 }
542 } else {
543 if (*p != end)
544 goto bad;
545 }
546
547 /* Skip over any unrecognized fields */
548 *p = end;
549 return 0;
550 bad:
551 return -EIO;
552 }
553
554 /*
555 * parse extra results
556 */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)557 static int parse_reply_info_extra(void **p, void *end,
558 struct ceph_mds_reply_info_parsed *info,
559 u64 features, struct ceph_mds_session *s)
560 {
561 u32 op = le32_to_cpu(info->head->op);
562
563 if (op == CEPH_MDS_OP_GETFILELOCK)
564 return parse_reply_info_filelock(p, end, info, features);
565 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
566 return parse_reply_info_readdir(p, end, info, features);
567 else if (op == CEPH_MDS_OP_CREATE)
568 return parse_reply_info_create(p, end, info, features, s);
569 else
570 return -EIO;
571 }
572
573 /*
574 * parse entire mds reply
575 */
parse_reply_info(struct ceph_mds_session * s,struct ceph_msg * msg,struct ceph_mds_reply_info_parsed * info,u64 features)576 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
577 struct ceph_mds_reply_info_parsed *info,
578 u64 features)
579 {
580 void *p, *end;
581 u32 len;
582 int err;
583
584 info->head = msg->front.iov_base;
585 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
586 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
587
588 /* trace */
589 ceph_decode_32_safe(&p, end, len, bad);
590 if (len > 0) {
591 ceph_decode_need(&p, end, len, bad);
592 err = parse_reply_info_trace(&p, p+len, info, features);
593 if (err < 0)
594 goto out_bad;
595 }
596
597 /* extra */
598 ceph_decode_32_safe(&p, end, len, bad);
599 if (len > 0) {
600 ceph_decode_need(&p, end, len, bad);
601 err = parse_reply_info_extra(&p, p+len, info, features, s);
602 if (err < 0)
603 goto out_bad;
604 }
605
606 /* snap blob */
607 ceph_decode_32_safe(&p, end, len, bad);
608 info->snapblob_len = len;
609 info->snapblob = p;
610 p += len;
611
612 if (p != end)
613 goto bad;
614 return 0;
615
616 bad:
617 err = -EIO;
618 out_bad:
619 pr_err("mds parse_reply err %d\n", err);
620 return err;
621 }
622
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)623 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
624 {
625 if (!info->dir_entries)
626 return;
627 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
628 }
629
630
631 /*
632 * sessions
633 */
ceph_session_state_name(int s)634 const char *ceph_session_state_name(int s)
635 {
636 switch (s) {
637 case CEPH_MDS_SESSION_NEW: return "new";
638 case CEPH_MDS_SESSION_OPENING: return "opening";
639 case CEPH_MDS_SESSION_OPEN: return "open";
640 case CEPH_MDS_SESSION_HUNG: return "hung";
641 case CEPH_MDS_SESSION_CLOSING: return "closing";
642 case CEPH_MDS_SESSION_CLOSED: return "closed";
643 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
644 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
645 case CEPH_MDS_SESSION_REJECTED: return "rejected";
646 default: return "???";
647 }
648 }
649
ceph_get_mds_session(struct ceph_mds_session * s)650 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
651 {
652 if (refcount_inc_not_zero(&s->s_ref)) {
653 dout("mdsc get_session %p %d -> %d\n", s,
654 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
655 return s;
656 } else {
657 dout("mdsc get_session %p 0 -- FAIL\n", s);
658 return NULL;
659 }
660 }
661
ceph_put_mds_session(struct ceph_mds_session * s)662 void ceph_put_mds_session(struct ceph_mds_session *s)
663 {
664 if (IS_ERR_OR_NULL(s))
665 return;
666
667 dout("mdsc put_session %p %d -> %d\n", s,
668 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
669 if (refcount_dec_and_test(&s->s_ref)) {
670 if (s->s_auth.authorizer)
671 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672 WARN_ON(mutex_is_locked(&s->s_mutex));
673 xa_destroy(&s->s_delegated_inos);
674 kfree(s);
675 }
676 }
677
678 /*
679 * called under mdsc->mutex
680 */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
682 int mds)
683 {
684 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
685 return NULL;
686 return ceph_get_mds_session(mdsc->sessions[mds]);
687 }
688
__have_session(struct ceph_mds_client * mdsc,int mds)689 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
690 {
691 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
692 return false;
693 else
694 return true;
695 }
696
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)697 static int __verify_registered_session(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *s)
699 {
700 if (s->s_mds >= mdsc->max_sessions ||
701 mdsc->sessions[s->s_mds] != s)
702 return -ENOENT;
703 return 0;
704 }
705
706 /*
707 * create+register a new session for given mds.
708 * called under mdsc->mutex.
709 */
register_session(struct ceph_mds_client * mdsc,int mds)710 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
711 int mds)
712 {
713 struct ceph_mds_session *s;
714
715 if (mds >= mdsc->mdsmap->possible_max_rank)
716 return ERR_PTR(-EINVAL);
717
718 s = kzalloc(sizeof(*s), GFP_NOFS);
719 if (!s)
720 return ERR_PTR(-ENOMEM);
721
722 if (mds >= mdsc->max_sessions) {
723 int newmax = 1 << get_count_order(mds + 1);
724 struct ceph_mds_session **sa;
725
726 dout("%s: realloc to %d\n", __func__, newmax);
727 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
728 if (!sa)
729 goto fail_realloc;
730 if (mdsc->sessions) {
731 memcpy(sa, mdsc->sessions,
732 mdsc->max_sessions * sizeof(void *));
733 kfree(mdsc->sessions);
734 }
735 mdsc->sessions = sa;
736 mdsc->max_sessions = newmax;
737 }
738
739 dout("%s: mds%d\n", __func__, mds);
740 s->s_mdsc = mdsc;
741 s->s_mds = mds;
742 s->s_state = CEPH_MDS_SESSION_NEW;
743 s->s_ttl = 0;
744 s->s_seq = 0;
745 mutex_init(&s->s_mutex);
746
747 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
748
749 spin_lock_init(&s->s_gen_ttl_lock);
750 s->s_cap_gen = 1;
751 s->s_cap_ttl = jiffies - 1;
752
753 spin_lock_init(&s->s_cap_lock);
754 s->s_renew_requested = 0;
755 s->s_renew_seq = 0;
756 INIT_LIST_HEAD(&s->s_caps);
757 s->s_nr_caps = 0;
758 refcount_set(&s->s_ref, 1);
759 INIT_LIST_HEAD(&s->s_waiting);
760 INIT_LIST_HEAD(&s->s_unsafe);
761 xa_init(&s->s_delegated_inos);
762 s->s_num_cap_releases = 0;
763 s->s_cap_reconnect = 0;
764 s->s_cap_iterator = NULL;
765 INIT_LIST_HEAD(&s->s_cap_releases);
766 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767
768 INIT_LIST_HEAD(&s->s_cap_dirty);
769 INIT_LIST_HEAD(&s->s_cap_flushing);
770
771 mdsc->sessions[mds] = s;
772 atomic_inc(&mdsc->num_sessions);
773 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
774
775 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
776 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
777
778 return s;
779
780 fail_realloc:
781 kfree(s);
782 return ERR_PTR(-ENOMEM);
783 }
784
785 /*
786 * called under mdsc->mutex
787 */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)788 static void __unregister_session(struct ceph_mds_client *mdsc,
789 struct ceph_mds_session *s)
790 {
791 dout("__unregister_session mds%d %p\n", s->s_mds, s);
792 BUG_ON(mdsc->sessions[s->s_mds] != s);
793 mdsc->sessions[s->s_mds] = NULL;
794 ceph_con_close(&s->s_con);
795 ceph_put_mds_session(s);
796 atomic_dec(&mdsc->num_sessions);
797 }
798
799 /*
800 * drop session refs in request.
801 *
802 * should be last request ref, or hold mdsc->mutex
803 */
put_request_session(struct ceph_mds_request * req)804 static void put_request_session(struct ceph_mds_request *req)
805 {
806 if (req->r_session) {
807 ceph_put_mds_session(req->r_session);
808 req->r_session = NULL;
809 }
810 }
811
ceph_mdsc_iterate_sessions(struct ceph_mds_client * mdsc,void (* cb)(struct ceph_mds_session *),bool check_state)812 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
813 void (*cb)(struct ceph_mds_session *),
814 bool check_state)
815 {
816 int mds;
817
818 mutex_lock(&mdsc->mutex);
819 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
820 struct ceph_mds_session *s;
821
822 s = __ceph_lookup_mds_session(mdsc, mds);
823 if (!s)
824 continue;
825
826 if (check_state && !check_session_state(s)) {
827 ceph_put_mds_session(s);
828 continue;
829 }
830
831 mutex_unlock(&mdsc->mutex);
832 cb(s);
833 ceph_put_mds_session(s);
834 mutex_lock(&mdsc->mutex);
835 }
836 mutex_unlock(&mdsc->mutex);
837 }
838
ceph_mdsc_release_request(struct kref * kref)839 void ceph_mdsc_release_request(struct kref *kref)
840 {
841 struct ceph_mds_request *req = container_of(kref,
842 struct ceph_mds_request,
843 r_kref);
844 ceph_mdsc_release_dir_caps_no_check(req);
845 destroy_reply_info(&req->r_reply_info);
846 if (req->r_request)
847 ceph_msg_put(req->r_request);
848 if (req->r_reply)
849 ceph_msg_put(req->r_reply);
850 if (req->r_inode) {
851 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
852 /* avoid calling iput_final() in mds dispatch threads */
853 ceph_async_iput(req->r_inode);
854 }
855 if (req->r_parent) {
856 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
857 ceph_async_iput(req->r_parent);
858 }
859 ceph_async_iput(req->r_target_inode);
860 if (req->r_dentry)
861 dput(req->r_dentry);
862 if (req->r_old_dentry)
863 dput(req->r_old_dentry);
864 if (req->r_old_dentry_dir) {
865 /*
866 * track (and drop pins for) r_old_dentry_dir
867 * separately, since r_old_dentry's d_parent may have
868 * changed between the dir mutex being dropped and
869 * this request being freed.
870 */
871 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
872 CEPH_CAP_PIN);
873 ceph_async_iput(req->r_old_dentry_dir);
874 }
875 kfree(req->r_path1);
876 kfree(req->r_path2);
877 if (req->r_pagelist)
878 ceph_pagelist_release(req->r_pagelist);
879 put_request_session(req);
880 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
881 WARN_ON_ONCE(!list_empty(&req->r_wait));
882 kmem_cache_free(ceph_mds_request_cachep, req);
883 }
884
DEFINE_RB_FUNCS(request,struct ceph_mds_request,r_tid,r_node)885 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
886
887 /*
888 * lookup session, bump ref if found.
889 *
890 * called under mdsc->mutex.
891 */
892 static struct ceph_mds_request *
893 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
894 {
895 struct ceph_mds_request *req;
896
897 req = lookup_request(&mdsc->request_tree, tid);
898 if (req)
899 ceph_mdsc_get_request(req);
900
901 return req;
902 }
903
904 /*
905 * Register an in-flight request, and assign a tid. Link to directory
906 * are modifying (if any).
907 *
908 * Called under mdsc->mutex.
909 */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)910 static void __register_request(struct ceph_mds_client *mdsc,
911 struct ceph_mds_request *req,
912 struct inode *dir)
913 {
914 int ret = 0;
915
916 req->r_tid = ++mdsc->last_tid;
917 if (req->r_num_caps) {
918 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
919 req->r_num_caps);
920 if (ret < 0) {
921 pr_err("__register_request %p "
922 "failed to reserve caps: %d\n", req, ret);
923 /* set req->r_err to fail early from __do_request */
924 req->r_err = ret;
925 return;
926 }
927 }
928 dout("__register_request %p tid %lld\n", req, req->r_tid);
929 ceph_mdsc_get_request(req);
930 insert_request(&mdsc->request_tree, req);
931
932 req->r_uid = current_fsuid();
933 req->r_gid = current_fsgid();
934
935 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
936 mdsc->oldest_tid = req->r_tid;
937
938 if (dir) {
939 struct ceph_inode_info *ci = ceph_inode(dir);
940
941 ihold(dir);
942 req->r_unsafe_dir = dir;
943 spin_lock(&ci->i_unsafe_lock);
944 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
945 spin_unlock(&ci->i_unsafe_lock);
946 }
947 }
948
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)949 static void __unregister_request(struct ceph_mds_client *mdsc,
950 struct ceph_mds_request *req)
951 {
952 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
953
954 /* Never leave an unregistered request on an unsafe list! */
955 list_del_init(&req->r_unsafe_item);
956
957 if (req->r_tid == mdsc->oldest_tid) {
958 struct rb_node *p = rb_next(&req->r_node);
959 mdsc->oldest_tid = 0;
960 while (p) {
961 struct ceph_mds_request *next_req =
962 rb_entry(p, struct ceph_mds_request, r_node);
963 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
964 mdsc->oldest_tid = next_req->r_tid;
965 break;
966 }
967 p = rb_next(p);
968 }
969 }
970
971 erase_request(&mdsc->request_tree, req);
972
973 if (req->r_unsafe_dir) {
974 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
975 spin_lock(&ci->i_unsafe_lock);
976 list_del_init(&req->r_unsafe_dir_item);
977 spin_unlock(&ci->i_unsafe_lock);
978 }
979 if (req->r_target_inode &&
980 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
981 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
982 spin_lock(&ci->i_unsafe_lock);
983 list_del_init(&req->r_unsafe_target_item);
984 spin_unlock(&ci->i_unsafe_lock);
985 }
986
987 if (req->r_unsafe_dir) {
988 /* avoid calling iput_final() in mds dispatch threads */
989 ceph_async_iput(req->r_unsafe_dir);
990 req->r_unsafe_dir = NULL;
991 }
992
993 complete_all(&req->r_safe_completion);
994
995 ceph_mdsc_put_request(req);
996 }
997
998 /*
999 * Walk back up the dentry tree until we hit a dentry representing a
1000 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1001 * when calling this) to ensure that the objects won't disappear while we're
1002 * working with them. Once we hit a candidate dentry, we attempt to take a
1003 * reference to it, and return that as the result.
1004 */
get_nonsnap_parent(struct dentry * dentry)1005 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1006 {
1007 struct inode *inode = NULL;
1008
1009 while (dentry && !IS_ROOT(dentry)) {
1010 inode = d_inode_rcu(dentry);
1011 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1012 break;
1013 dentry = dentry->d_parent;
1014 }
1015 if (inode)
1016 inode = igrab(inode);
1017 return inode;
1018 }
1019
1020 /*
1021 * Choose mds to send request to next. If there is a hint set in the
1022 * request (e.g., due to a prior forward hint from the mds), use that.
1023 * Otherwise, consult frag tree and/or caps to identify the
1024 * appropriate mds. If all else fails, choose randomly.
1025 *
1026 * Called under mdsc->mutex.
1027 */
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,bool * random)1028 static int __choose_mds(struct ceph_mds_client *mdsc,
1029 struct ceph_mds_request *req,
1030 bool *random)
1031 {
1032 struct inode *inode;
1033 struct ceph_inode_info *ci;
1034 struct ceph_cap *cap;
1035 int mode = req->r_direct_mode;
1036 int mds = -1;
1037 u32 hash = req->r_direct_hash;
1038 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1039
1040 if (random)
1041 *random = false;
1042
1043 /*
1044 * is there a specific mds we should try? ignore hint if we have
1045 * no session and the mds is not up (active or recovering).
1046 */
1047 if (req->r_resend_mds >= 0 &&
1048 (__have_session(mdsc, req->r_resend_mds) ||
1049 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1050 dout("%s using resend_mds mds%d\n", __func__,
1051 req->r_resend_mds);
1052 return req->r_resend_mds;
1053 }
1054
1055 if (mode == USE_RANDOM_MDS)
1056 goto random;
1057
1058 inode = NULL;
1059 if (req->r_inode) {
1060 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1061 inode = req->r_inode;
1062 ihold(inode);
1063 } else {
1064 /* req->r_dentry is non-null for LSSNAP request */
1065 rcu_read_lock();
1066 inode = get_nonsnap_parent(req->r_dentry);
1067 rcu_read_unlock();
1068 dout("%s using snapdir's parent %p\n", __func__, inode);
1069 }
1070 } else if (req->r_dentry) {
1071 /* ignore race with rename; old or new d_parent is okay */
1072 struct dentry *parent;
1073 struct inode *dir;
1074
1075 rcu_read_lock();
1076 parent = READ_ONCE(req->r_dentry->d_parent);
1077 dir = req->r_parent ? : d_inode_rcu(parent);
1078
1079 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1080 /* not this fs or parent went negative */
1081 inode = d_inode(req->r_dentry);
1082 if (inode)
1083 ihold(inode);
1084 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1085 /* direct snapped/virtual snapdir requests
1086 * based on parent dir inode */
1087 inode = get_nonsnap_parent(parent);
1088 dout("%s using nonsnap parent %p\n", __func__, inode);
1089 } else {
1090 /* dentry target */
1091 inode = d_inode(req->r_dentry);
1092 if (!inode || mode == USE_AUTH_MDS) {
1093 /* dir + name */
1094 inode = igrab(dir);
1095 hash = ceph_dentry_hash(dir, req->r_dentry);
1096 is_hash = true;
1097 } else {
1098 ihold(inode);
1099 }
1100 }
1101 rcu_read_unlock();
1102 }
1103
1104 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1105 hash, mode);
1106 if (!inode)
1107 goto random;
1108 ci = ceph_inode(inode);
1109
1110 if (is_hash && S_ISDIR(inode->i_mode)) {
1111 struct ceph_inode_frag frag;
1112 int found;
1113
1114 ceph_choose_frag(ci, hash, &frag, &found);
1115 if (found) {
1116 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1117 u8 r;
1118
1119 /* choose a random replica */
1120 get_random_bytes(&r, 1);
1121 r %= frag.ndist;
1122 mds = frag.dist[r];
1123 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1124 __func__, inode, ceph_vinop(inode),
1125 frag.frag, mds, (int)r, frag.ndist);
1126 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1127 CEPH_MDS_STATE_ACTIVE &&
1128 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1129 goto out;
1130 }
1131
1132 /* since this file/dir wasn't known to be
1133 * replicated, then we want to look for the
1134 * authoritative mds. */
1135 if (frag.mds >= 0) {
1136 /* choose auth mds */
1137 mds = frag.mds;
1138 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1139 __func__, inode, ceph_vinop(inode),
1140 frag.frag, mds);
1141 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1142 CEPH_MDS_STATE_ACTIVE) {
1143 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1144 mds))
1145 goto out;
1146 }
1147 }
1148 mode = USE_AUTH_MDS;
1149 }
1150 }
1151
1152 spin_lock(&ci->i_ceph_lock);
1153 cap = NULL;
1154 if (mode == USE_AUTH_MDS)
1155 cap = ci->i_auth_cap;
1156 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1157 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1158 if (!cap) {
1159 spin_unlock(&ci->i_ceph_lock);
1160 ceph_async_iput(inode);
1161 goto random;
1162 }
1163 mds = cap->session->s_mds;
1164 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1165 inode, ceph_vinop(inode), mds,
1166 cap == ci->i_auth_cap ? "auth " : "", cap);
1167 spin_unlock(&ci->i_ceph_lock);
1168 out:
1169 /* avoid calling iput_final() while holding mdsc->mutex or
1170 * in mds dispatch threads */
1171 ceph_async_iput(inode);
1172 return mds;
1173
1174 random:
1175 if (random)
1176 *random = true;
1177
1178 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1179 dout("%s chose random mds%d\n", __func__, mds);
1180 return mds;
1181 }
1182
1183
1184 /*
1185 * session messages
1186 */
ceph_create_session_msg(u32 op,u64 seq)1187 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1188 {
1189 struct ceph_msg *msg;
1190 struct ceph_mds_session_head *h;
1191
1192 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1193 false);
1194 if (!msg) {
1195 pr_err("ENOMEM creating session %s msg\n",
1196 ceph_session_op_name(op));
1197 return NULL;
1198 }
1199 h = msg->front.iov_base;
1200 h->op = cpu_to_le32(op);
1201 h->seq = cpu_to_le64(seq);
1202
1203 return msg;
1204 }
1205
1206 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1207 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
encode_supported_features(void ** p,void * end)1208 static int encode_supported_features(void **p, void *end)
1209 {
1210 static const size_t count = ARRAY_SIZE(feature_bits);
1211
1212 if (count > 0) {
1213 size_t i;
1214 size_t size = FEATURE_BYTES(count);
1215 unsigned long bit;
1216
1217 if (WARN_ON_ONCE(*p + 4 + size > end))
1218 return -ERANGE;
1219
1220 ceph_encode_32(p, size);
1221 memset(*p, 0, size);
1222 for (i = 0; i < count; i++) {
1223 bit = feature_bits[i];
1224 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1225 }
1226 *p += size;
1227 } else {
1228 if (WARN_ON_ONCE(*p + 4 > end))
1229 return -ERANGE;
1230
1231 ceph_encode_32(p, 0);
1232 }
1233
1234 return 0;
1235 }
1236
1237 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1238 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
encode_metric_spec(void ** p,void * end)1239 static int encode_metric_spec(void **p, void *end)
1240 {
1241 static const size_t count = ARRAY_SIZE(metric_bits);
1242
1243 /* header */
1244 if (WARN_ON_ONCE(*p + 2 > end))
1245 return -ERANGE;
1246
1247 ceph_encode_8(p, 1); /* version */
1248 ceph_encode_8(p, 1); /* compat */
1249
1250 if (count > 0) {
1251 size_t i;
1252 size_t size = METRIC_BYTES(count);
1253
1254 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1255 return -ERANGE;
1256
1257 /* metric spec info length */
1258 ceph_encode_32(p, 4 + size);
1259
1260 /* metric spec */
1261 ceph_encode_32(p, size);
1262 memset(*p, 0, size);
1263 for (i = 0; i < count; i++)
1264 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1265 *p += size;
1266 } else {
1267 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1268 return -ERANGE;
1269
1270 /* metric spec info length */
1271 ceph_encode_32(p, 4);
1272 /* metric spec */
1273 ceph_encode_32(p, 0);
1274 }
1275
1276 return 0;
1277 }
1278
1279 /*
1280 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1281 * to include additional client metadata fields.
1282 */
create_session_open_msg(struct ceph_mds_client * mdsc,u64 seq)1283 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1284 {
1285 struct ceph_msg *msg;
1286 struct ceph_mds_session_head *h;
1287 int i = -1;
1288 int extra_bytes = 0;
1289 int metadata_key_count = 0;
1290 struct ceph_options *opt = mdsc->fsc->client->options;
1291 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1292 size_t size, count;
1293 void *p, *end;
1294 int ret;
1295
1296 const char* metadata[][2] = {
1297 {"hostname", mdsc->nodename},
1298 {"kernel_version", init_utsname()->release},
1299 {"entity_id", opt->name ? : ""},
1300 {"root", fsopt->server_path ? : "/"},
1301 {NULL, NULL}
1302 };
1303
1304 /* Calculate serialized length of metadata */
1305 extra_bytes = 4; /* map length */
1306 for (i = 0; metadata[i][0]; ++i) {
1307 extra_bytes += 8 + strlen(metadata[i][0]) +
1308 strlen(metadata[i][1]);
1309 metadata_key_count++;
1310 }
1311
1312 /* supported feature */
1313 size = 0;
1314 count = ARRAY_SIZE(feature_bits);
1315 if (count > 0)
1316 size = FEATURE_BYTES(count);
1317 extra_bytes += 4 + size;
1318
1319 /* metric spec */
1320 size = 0;
1321 count = ARRAY_SIZE(metric_bits);
1322 if (count > 0)
1323 size = METRIC_BYTES(count);
1324 extra_bytes += 2 + 4 + 4 + size;
1325
1326 /* Allocate the message */
1327 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1328 GFP_NOFS, false);
1329 if (!msg) {
1330 pr_err("ENOMEM creating session open msg\n");
1331 return ERR_PTR(-ENOMEM);
1332 }
1333 p = msg->front.iov_base;
1334 end = p + msg->front.iov_len;
1335
1336 h = p;
1337 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1338 h->seq = cpu_to_le64(seq);
1339
1340 /*
1341 * Serialize client metadata into waiting buffer space, using
1342 * the format that userspace expects for map<string, string>
1343 *
1344 * ClientSession messages with metadata are v4
1345 */
1346 msg->hdr.version = cpu_to_le16(4);
1347 msg->hdr.compat_version = cpu_to_le16(1);
1348
1349 /* The write pointer, following the session_head structure */
1350 p += sizeof(*h);
1351
1352 /* Number of entries in the map */
1353 ceph_encode_32(&p, metadata_key_count);
1354
1355 /* Two length-prefixed strings for each entry in the map */
1356 for (i = 0; metadata[i][0]; ++i) {
1357 size_t const key_len = strlen(metadata[i][0]);
1358 size_t const val_len = strlen(metadata[i][1]);
1359
1360 ceph_encode_32(&p, key_len);
1361 memcpy(p, metadata[i][0], key_len);
1362 p += key_len;
1363 ceph_encode_32(&p, val_len);
1364 memcpy(p, metadata[i][1], val_len);
1365 p += val_len;
1366 }
1367
1368 ret = encode_supported_features(&p, end);
1369 if (ret) {
1370 pr_err("encode_supported_features failed!\n");
1371 ceph_msg_put(msg);
1372 return ERR_PTR(ret);
1373 }
1374
1375 ret = encode_metric_spec(&p, end);
1376 if (ret) {
1377 pr_err("encode_metric_spec failed!\n");
1378 ceph_msg_put(msg);
1379 return ERR_PTR(ret);
1380 }
1381
1382 msg->front.iov_len = p - msg->front.iov_base;
1383 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1384
1385 return msg;
1386 }
1387
1388 /*
1389 * send session open request.
1390 *
1391 * called under mdsc->mutex
1392 */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1393 static int __open_session(struct ceph_mds_client *mdsc,
1394 struct ceph_mds_session *session)
1395 {
1396 struct ceph_msg *msg;
1397 int mstate;
1398 int mds = session->s_mds;
1399
1400 /* wait for mds to go active? */
1401 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1402 dout("open_session to mds%d (%s)\n", mds,
1403 ceph_mds_state_name(mstate));
1404 session->s_state = CEPH_MDS_SESSION_OPENING;
1405 session->s_renew_requested = jiffies;
1406
1407 /* send connect message */
1408 msg = create_session_open_msg(mdsc, session->s_seq);
1409 if (IS_ERR(msg))
1410 return PTR_ERR(msg);
1411 ceph_con_send(&session->s_con, msg);
1412 return 0;
1413 }
1414
1415 /*
1416 * open sessions for any export targets for the given mds
1417 *
1418 * called under mdsc->mutex
1419 */
1420 static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client * mdsc,int target)1421 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1422 {
1423 struct ceph_mds_session *session;
1424 int ret;
1425
1426 session = __ceph_lookup_mds_session(mdsc, target);
1427 if (!session) {
1428 session = register_session(mdsc, target);
1429 if (IS_ERR(session))
1430 return session;
1431 }
1432 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1433 session->s_state == CEPH_MDS_SESSION_CLOSING) {
1434 ret = __open_session(mdsc, session);
1435 if (ret)
1436 return ERR_PTR(ret);
1437 }
1438
1439 return session;
1440 }
1441
1442 struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client * mdsc,int target)1443 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1444 {
1445 struct ceph_mds_session *session;
1446
1447 dout("open_export_target_session to mds%d\n", target);
1448
1449 mutex_lock(&mdsc->mutex);
1450 session = __open_export_target_session(mdsc, target);
1451 mutex_unlock(&mdsc->mutex);
1452
1453 return session;
1454 }
1455
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1456 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1457 struct ceph_mds_session *session)
1458 {
1459 struct ceph_mds_info *mi;
1460 struct ceph_mds_session *ts;
1461 int i, mds = session->s_mds;
1462
1463 if (mds >= mdsc->mdsmap->possible_max_rank)
1464 return;
1465
1466 mi = &mdsc->mdsmap->m_info[mds];
1467 dout("open_export_target_sessions for mds%d (%d targets)\n",
1468 session->s_mds, mi->num_export_targets);
1469
1470 for (i = 0; i < mi->num_export_targets; i++) {
1471 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1472 ceph_put_mds_session(ts);
1473 }
1474 }
1475
ceph_mdsc_open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1476 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1477 struct ceph_mds_session *session)
1478 {
1479 mutex_lock(&mdsc->mutex);
1480 __open_export_target_sessions(mdsc, session);
1481 mutex_unlock(&mdsc->mutex);
1482 }
1483
1484 /*
1485 * session caps
1486 */
1487
detach_cap_releases(struct ceph_mds_session * session,struct list_head * target)1488 static void detach_cap_releases(struct ceph_mds_session *session,
1489 struct list_head *target)
1490 {
1491 lockdep_assert_held(&session->s_cap_lock);
1492
1493 list_splice_init(&session->s_cap_releases, target);
1494 session->s_num_cap_releases = 0;
1495 dout("dispose_cap_releases mds%d\n", session->s_mds);
1496 }
1497
dispose_cap_releases(struct ceph_mds_client * mdsc,struct list_head * dispose)1498 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1499 struct list_head *dispose)
1500 {
1501 while (!list_empty(dispose)) {
1502 struct ceph_cap *cap;
1503 /* zero out the in-progress message */
1504 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1505 list_del(&cap->session_caps);
1506 ceph_put_cap(mdsc, cap);
1507 }
1508 }
1509
cleanup_session_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1510 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1511 struct ceph_mds_session *session)
1512 {
1513 struct ceph_mds_request *req;
1514 struct rb_node *p;
1515
1516 dout("cleanup_session_requests mds%d\n", session->s_mds);
1517 mutex_lock(&mdsc->mutex);
1518 while (!list_empty(&session->s_unsafe)) {
1519 req = list_first_entry(&session->s_unsafe,
1520 struct ceph_mds_request, r_unsafe_item);
1521 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1522 req->r_tid);
1523 if (req->r_target_inode)
1524 mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1525 if (req->r_unsafe_dir)
1526 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1527 __unregister_request(mdsc, req);
1528 }
1529 /* zero r_attempts, so kick_requests() will re-send requests */
1530 p = rb_first(&mdsc->request_tree);
1531 while (p) {
1532 req = rb_entry(p, struct ceph_mds_request, r_node);
1533 p = rb_next(p);
1534 if (req->r_session &&
1535 req->r_session->s_mds == session->s_mds)
1536 req->r_attempts = 0;
1537 }
1538 mutex_unlock(&mdsc->mutex);
1539 }
1540
1541 /*
1542 * Helper to safely iterate over all caps associated with a session, with
1543 * special care taken to handle a racing __ceph_remove_cap().
1544 *
1545 * Caller must hold session s_mutex.
1546 */
ceph_iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,struct ceph_cap *,void *),void * arg)1547 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1548 int (*cb)(struct inode *, struct ceph_cap *,
1549 void *), void *arg)
1550 {
1551 struct list_head *p;
1552 struct ceph_cap *cap;
1553 struct inode *inode, *last_inode = NULL;
1554 struct ceph_cap *old_cap = NULL;
1555 int ret;
1556
1557 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1558 spin_lock(&session->s_cap_lock);
1559 p = session->s_caps.next;
1560 while (p != &session->s_caps) {
1561 cap = list_entry(p, struct ceph_cap, session_caps);
1562 inode = igrab(&cap->ci->vfs_inode);
1563 if (!inode) {
1564 p = p->next;
1565 continue;
1566 }
1567 session->s_cap_iterator = cap;
1568 spin_unlock(&session->s_cap_lock);
1569
1570 if (last_inode) {
1571 /* avoid calling iput_final() while holding
1572 * s_mutex or in mds dispatch threads */
1573 ceph_async_iput(last_inode);
1574 last_inode = NULL;
1575 }
1576 if (old_cap) {
1577 ceph_put_cap(session->s_mdsc, old_cap);
1578 old_cap = NULL;
1579 }
1580
1581 ret = cb(inode, cap, arg);
1582 last_inode = inode;
1583
1584 spin_lock(&session->s_cap_lock);
1585 p = p->next;
1586 if (!cap->ci) {
1587 dout("iterate_session_caps finishing cap %p removal\n",
1588 cap);
1589 BUG_ON(cap->session != session);
1590 cap->session = NULL;
1591 list_del_init(&cap->session_caps);
1592 session->s_nr_caps--;
1593 atomic64_dec(&session->s_mdsc->metric.total_caps);
1594 if (cap->queue_release)
1595 __ceph_queue_cap_release(session, cap);
1596 else
1597 old_cap = cap; /* put_cap it w/o locks held */
1598 }
1599 if (ret < 0)
1600 goto out;
1601 }
1602 ret = 0;
1603 out:
1604 session->s_cap_iterator = NULL;
1605 spin_unlock(&session->s_cap_lock);
1606
1607 ceph_async_iput(last_inode);
1608 if (old_cap)
1609 ceph_put_cap(session->s_mdsc, old_cap);
1610
1611 return ret;
1612 }
1613
remove_capsnaps(struct ceph_mds_client * mdsc,struct inode * inode)1614 static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1615 {
1616 struct ceph_inode_info *ci = ceph_inode(inode);
1617 struct ceph_cap_snap *capsnap;
1618 int capsnap_release = 0;
1619
1620 lockdep_assert_held(&ci->i_ceph_lock);
1621
1622 dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1623
1624 while (!list_empty(&ci->i_cap_snaps)) {
1625 capsnap = list_first_entry(&ci->i_cap_snaps,
1626 struct ceph_cap_snap, ci_item);
1627 __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1628 ceph_put_snap_context(capsnap->context);
1629 ceph_put_cap_snap(capsnap);
1630 capsnap_release++;
1631 }
1632 wake_up_all(&ci->i_cap_wq);
1633 wake_up_all(&mdsc->cap_flushing_wq);
1634 return capsnap_release;
1635 }
1636
remove_session_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1637 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1638 void *arg)
1639 {
1640 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1641 struct ceph_mds_client *mdsc = fsc->mdsc;
1642 struct ceph_inode_info *ci = ceph_inode(inode);
1643 LIST_HEAD(to_remove);
1644 bool dirty_dropped = false;
1645 bool invalidate = false;
1646 int capsnap_release = 0;
1647
1648 dout("removing cap %p, ci is %p, inode is %p\n",
1649 cap, ci, &ci->vfs_inode);
1650 spin_lock(&ci->i_ceph_lock);
1651 __ceph_remove_cap(cap, false);
1652 if (!ci->i_auth_cap) {
1653 struct ceph_cap_flush *cf;
1654
1655 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1656 if (inode->i_data.nrpages > 0)
1657 invalidate = true;
1658 if (ci->i_wrbuffer_ref > 0)
1659 mapping_set_error(&inode->i_data, -EIO);
1660 }
1661
1662 while (!list_empty(&ci->i_cap_flush_list)) {
1663 cf = list_first_entry(&ci->i_cap_flush_list,
1664 struct ceph_cap_flush, i_list);
1665 list_move(&cf->i_list, &to_remove);
1666 }
1667
1668 spin_lock(&mdsc->cap_dirty_lock);
1669
1670 list_for_each_entry(cf, &to_remove, i_list)
1671 list_del_init(&cf->g_list);
1672
1673 if (!list_empty(&ci->i_dirty_item)) {
1674 pr_warn_ratelimited(
1675 " dropping dirty %s state for %p %lld\n",
1676 ceph_cap_string(ci->i_dirty_caps),
1677 inode, ceph_ino(inode));
1678 ci->i_dirty_caps = 0;
1679 list_del_init(&ci->i_dirty_item);
1680 dirty_dropped = true;
1681 }
1682 if (!list_empty(&ci->i_flushing_item)) {
1683 pr_warn_ratelimited(
1684 " dropping dirty+flushing %s state for %p %lld\n",
1685 ceph_cap_string(ci->i_flushing_caps),
1686 inode, ceph_ino(inode));
1687 ci->i_flushing_caps = 0;
1688 list_del_init(&ci->i_flushing_item);
1689 mdsc->num_cap_flushing--;
1690 dirty_dropped = true;
1691 }
1692 spin_unlock(&mdsc->cap_dirty_lock);
1693
1694 if (dirty_dropped) {
1695 mapping_set_error(inode->i_mapping, -EIO);
1696
1697 if (ci->i_wrbuffer_ref_head == 0 &&
1698 ci->i_wr_ref == 0 &&
1699 ci->i_dirty_caps == 0 &&
1700 ci->i_flushing_caps == 0) {
1701 ceph_put_snap_context(ci->i_head_snapc);
1702 ci->i_head_snapc = NULL;
1703 }
1704 }
1705
1706 if (atomic_read(&ci->i_filelock_ref) > 0) {
1707 /* make further file lock syscall return -EIO */
1708 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1709 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1710 inode, ceph_ino(inode));
1711 }
1712
1713 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1714 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1715 ci->i_prealloc_cap_flush = NULL;
1716 }
1717
1718 if (!list_empty(&ci->i_cap_snaps))
1719 capsnap_release = remove_capsnaps(mdsc, inode);
1720 }
1721 spin_unlock(&ci->i_ceph_lock);
1722 while (!list_empty(&to_remove)) {
1723 struct ceph_cap_flush *cf;
1724 cf = list_first_entry(&to_remove,
1725 struct ceph_cap_flush, i_list);
1726 list_del_init(&cf->i_list);
1727 if (!cf->is_capsnap)
1728 ceph_free_cap_flush(cf);
1729 }
1730
1731 wake_up_all(&ci->i_cap_wq);
1732 if (invalidate)
1733 ceph_queue_invalidate(inode);
1734 if (dirty_dropped)
1735 iput(inode);
1736 while (capsnap_release--)
1737 iput(inode);
1738 return 0;
1739 }
1740
1741 /*
1742 * caller must hold session s_mutex
1743 */
remove_session_caps(struct ceph_mds_session * session)1744 static void remove_session_caps(struct ceph_mds_session *session)
1745 {
1746 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1747 struct super_block *sb = fsc->sb;
1748 LIST_HEAD(dispose);
1749
1750 dout("remove_session_caps on %p\n", session);
1751 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1752
1753 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1754
1755 spin_lock(&session->s_cap_lock);
1756 if (session->s_nr_caps > 0) {
1757 struct inode *inode;
1758 struct ceph_cap *cap, *prev = NULL;
1759 struct ceph_vino vino;
1760 /*
1761 * iterate_session_caps() skips inodes that are being
1762 * deleted, we need to wait until deletions are complete.
1763 * __wait_on_freeing_inode() is designed for the job,
1764 * but it is not exported, so use lookup inode function
1765 * to access it.
1766 */
1767 while (!list_empty(&session->s_caps)) {
1768 cap = list_entry(session->s_caps.next,
1769 struct ceph_cap, session_caps);
1770 if (cap == prev)
1771 break;
1772 prev = cap;
1773 vino = cap->ci->i_vino;
1774 spin_unlock(&session->s_cap_lock);
1775
1776 inode = ceph_find_inode(sb, vino);
1777 /* avoid calling iput_final() while holding s_mutex */
1778 ceph_async_iput(inode);
1779
1780 spin_lock(&session->s_cap_lock);
1781 }
1782 }
1783
1784 // drop cap expires and unlock s_cap_lock
1785 detach_cap_releases(session, &dispose);
1786
1787 BUG_ON(session->s_nr_caps > 0);
1788 BUG_ON(!list_empty(&session->s_cap_flushing));
1789 spin_unlock(&session->s_cap_lock);
1790 dispose_cap_releases(session->s_mdsc, &dispose);
1791 }
1792
1793 enum {
1794 RECONNECT,
1795 RENEWCAPS,
1796 FORCE_RO,
1797 };
1798
1799 /*
1800 * wake up any threads waiting on this session's caps. if the cap is
1801 * old (didn't get renewed on the client reconnect), remove it now.
1802 *
1803 * caller must hold s_mutex.
1804 */
wake_up_session_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1805 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1806 void *arg)
1807 {
1808 struct ceph_inode_info *ci = ceph_inode(inode);
1809 unsigned long ev = (unsigned long)arg;
1810
1811 if (ev == RECONNECT) {
1812 spin_lock(&ci->i_ceph_lock);
1813 ci->i_wanted_max_size = 0;
1814 ci->i_requested_max_size = 0;
1815 spin_unlock(&ci->i_ceph_lock);
1816 } else if (ev == RENEWCAPS) {
1817 if (cap->cap_gen < cap->session->s_cap_gen) {
1818 /* mds did not re-issue stale cap */
1819 spin_lock(&ci->i_ceph_lock);
1820 cap->issued = cap->implemented = CEPH_CAP_PIN;
1821 spin_unlock(&ci->i_ceph_lock);
1822 }
1823 } else if (ev == FORCE_RO) {
1824 }
1825 wake_up_all(&ci->i_cap_wq);
1826 return 0;
1827 }
1828
wake_up_session_caps(struct ceph_mds_session * session,int ev)1829 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1830 {
1831 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1832 ceph_iterate_session_caps(session, wake_up_session_cb,
1833 (void *)(unsigned long)ev);
1834 }
1835
1836 /*
1837 * Send periodic message to MDS renewing all currently held caps. The
1838 * ack will reset the expiration for all caps from this session.
1839 *
1840 * caller holds s_mutex
1841 */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1842 static int send_renew_caps(struct ceph_mds_client *mdsc,
1843 struct ceph_mds_session *session)
1844 {
1845 struct ceph_msg *msg;
1846 int state;
1847
1848 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1849 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1850 pr_info("mds%d caps stale\n", session->s_mds);
1851 session->s_renew_requested = jiffies;
1852
1853 /* do not try to renew caps until a recovering mds has reconnected
1854 * with its clients. */
1855 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1856 if (state < CEPH_MDS_STATE_RECONNECT) {
1857 dout("send_renew_caps ignoring mds%d (%s)\n",
1858 session->s_mds, ceph_mds_state_name(state));
1859 return 0;
1860 }
1861
1862 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1863 ceph_mds_state_name(state));
1864 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1865 ++session->s_renew_seq);
1866 if (!msg)
1867 return -ENOMEM;
1868 ceph_con_send(&session->s_con, msg);
1869 return 0;
1870 }
1871
send_flushmsg_ack(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,u64 seq)1872 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1873 struct ceph_mds_session *session, u64 seq)
1874 {
1875 struct ceph_msg *msg;
1876
1877 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1878 session->s_mds, ceph_session_state_name(session->s_state), seq);
1879 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1880 if (!msg)
1881 return -ENOMEM;
1882 ceph_con_send(&session->s_con, msg);
1883 return 0;
1884 }
1885
1886
1887 /*
1888 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1889 *
1890 * Called under session->s_mutex
1891 */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)1892 static void renewed_caps(struct ceph_mds_client *mdsc,
1893 struct ceph_mds_session *session, int is_renew)
1894 {
1895 int was_stale;
1896 int wake = 0;
1897
1898 spin_lock(&session->s_cap_lock);
1899 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1900
1901 session->s_cap_ttl = session->s_renew_requested +
1902 mdsc->mdsmap->m_session_timeout*HZ;
1903
1904 if (was_stale) {
1905 if (time_before(jiffies, session->s_cap_ttl)) {
1906 pr_info("mds%d caps renewed\n", session->s_mds);
1907 wake = 1;
1908 } else {
1909 pr_info("mds%d caps still stale\n", session->s_mds);
1910 }
1911 }
1912 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1913 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1914 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1915 spin_unlock(&session->s_cap_lock);
1916
1917 if (wake)
1918 wake_up_session_caps(session, RENEWCAPS);
1919 }
1920
1921 /*
1922 * send a session close request
1923 */
request_close_session(struct ceph_mds_session * session)1924 static int request_close_session(struct ceph_mds_session *session)
1925 {
1926 struct ceph_msg *msg;
1927
1928 dout("request_close_session mds%d state %s seq %lld\n",
1929 session->s_mds, ceph_session_state_name(session->s_state),
1930 session->s_seq);
1931 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1932 session->s_seq);
1933 if (!msg)
1934 return -ENOMEM;
1935 ceph_con_send(&session->s_con, msg);
1936 return 1;
1937 }
1938
1939 /*
1940 * Called with s_mutex held.
1941 */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1942 static int __close_session(struct ceph_mds_client *mdsc,
1943 struct ceph_mds_session *session)
1944 {
1945 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1946 return 0;
1947 session->s_state = CEPH_MDS_SESSION_CLOSING;
1948 return request_close_session(session);
1949 }
1950
drop_negative_children(struct dentry * dentry)1951 static bool drop_negative_children(struct dentry *dentry)
1952 {
1953 struct dentry *child;
1954 bool all_negative = true;
1955
1956 if (!d_is_dir(dentry))
1957 goto out;
1958
1959 spin_lock(&dentry->d_lock);
1960 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1961 if (d_really_is_positive(child)) {
1962 all_negative = false;
1963 break;
1964 }
1965 }
1966 spin_unlock(&dentry->d_lock);
1967
1968 if (all_negative)
1969 shrink_dcache_parent(dentry);
1970 out:
1971 return all_negative;
1972 }
1973
1974 /*
1975 * Trim old(er) caps.
1976 *
1977 * Because we can't cache an inode without one or more caps, we do
1978 * this indirectly: if a cap is unused, we prune its aliases, at which
1979 * point the inode will hopefully get dropped to.
1980 *
1981 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1982 * memory pressure from the MDS, though, so it needn't be perfect.
1983 */
trim_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1984 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1985 {
1986 int *remaining = arg;
1987 struct ceph_inode_info *ci = ceph_inode(inode);
1988 int used, wanted, oissued, mine;
1989
1990 if (*remaining <= 0)
1991 return -1;
1992
1993 spin_lock(&ci->i_ceph_lock);
1994 mine = cap->issued | cap->implemented;
1995 used = __ceph_caps_used(ci);
1996 wanted = __ceph_caps_file_wanted(ci);
1997 oissued = __ceph_caps_issued_other(ci, cap);
1998
1999 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2000 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2001 ceph_cap_string(used), ceph_cap_string(wanted));
2002 if (cap == ci->i_auth_cap) {
2003 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2004 !list_empty(&ci->i_cap_snaps))
2005 goto out;
2006 if ((used | wanted) & CEPH_CAP_ANY_WR)
2007 goto out;
2008 /* Note: it's possible that i_filelock_ref becomes non-zero
2009 * after dropping auth caps. It doesn't hurt because reply
2010 * of lock mds request will re-add auth caps. */
2011 if (atomic_read(&ci->i_filelock_ref) > 0)
2012 goto out;
2013 }
2014 /* The inode has cached pages, but it's no longer used.
2015 * we can safely drop it */
2016 if (S_ISREG(inode->i_mode) &&
2017 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2018 !(oissued & CEPH_CAP_FILE_CACHE)) {
2019 used = 0;
2020 oissued = 0;
2021 }
2022 if ((used | wanted) & ~oissued & mine)
2023 goto out; /* we need these caps */
2024
2025 if (oissued) {
2026 /* we aren't the only cap.. just remove us */
2027 __ceph_remove_cap(cap, true);
2028 (*remaining)--;
2029 } else {
2030 struct dentry *dentry;
2031 /* try dropping referring dentries */
2032 spin_unlock(&ci->i_ceph_lock);
2033 dentry = d_find_any_alias(inode);
2034 if (dentry && drop_negative_children(dentry)) {
2035 int count;
2036 dput(dentry);
2037 d_prune_aliases(inode);
2038 count = atomic_read(&inode->i_count);
2039 if (count == 1)
2040 (*remaining)--;
2041 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2042 inode, cap, count);
2043 } else {
2044 dput(dentry);
2045 }
2046 return 0;
2047 }
2048
2049 out:
2050 spin_unlock(&ci->i_ceph_lock);
2051 return 0;
2052 }
2053
2054 /*
2055 * Trim session cap count down to some max number.
2056 */
ceph_trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)2057 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2058 struct ceph_mds_session *session,
2059 int max_caps)
2060 {
2061 int trim_caps = session->s_nr_caps - max_caps;
2062
2063 dout("trim_caps mds%d start: %d / %d, trim %d\n",
2064 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2065 if (trim_caps > 0) {
2066 int remaining = trim_caps;
2067
2068 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2069 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2070 session->s_mds, session->s_nr_caps, max_caps,
2071 trim_caps - remaining);
2072 }
2073
2074 ceph_flush_cap_releases(mdsc, session);
2075 return 0;
2076 }
2077
check_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2078 static int check_caps_flush(struct ceph_mds_client *mdsc,
2079 u64 want_flush_tid)
2080 {
2081 int ret = 1;
2082
2083 spin_lock(&mdsc->cap_dirty_lock);
2084 if (!list_empty(&mdsc->cap_flush_list)) {
2085 struct ceph_cap_flush *cf =
2086 list_first_entry(&mdsc->cap_flush_list,
2087 struct ceph_cap_flush, g_list);
2088 if (cf->tid <= want_flush_tid) {
2089 dout("check_caps_flush still flushing tid "
2090 "%llu <= %llu\n", cf->tid, want_flush_tid);
2091 ret = 0;
2092 }
2093 }
2094 spin_unlock(&mdsc->cap_dirty_lock);
2095 return ret;
2096 }
2097
2098 /*
2099 * flush all dirty inode data to disk.
2100 *
2101 * returns true if we've flushed through want_flush_tid
2102 */
wait_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2103 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2104 u64 want_flush_tid)
2105 {
2106 dout("check_caps_flush want %llu\n", want_flush_tid);
2107
2108 wait_event(mdsc->cap_flushing_wq,
2109 check_caps_flush(mdsc, want_flush_tid));
2110
2111 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2112 }
2113
2114 /*
2115 * called under s_mutex
2116 */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2117 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2118 struct ceph_mds_session *session)
2119 {
2120 struct ceph_msg *msg = NULL;
2121 struct ceph_mds_cap_release *head;
2122 struct ceph_mds_cap_item *item;
2123 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2124 struct ceph_cap *cap;
2125 LIST_HEAD(tmp_list);
2126 int num_cap_releases;
2127 __le32 barrier, *cap_barrier;
2128
2129 down_read(&osdc->lock);
2130 barrier = cpu_to_le32(osdc->epoch_barrier);
2131 up_read(&osdc->lock);
2132
2133 spin_lock(&session->s_cap_lock);
2134 again:
2135 list_splice_init(&session->s_cap_releases, &tmp_list);
2136 num_cap_releases = session->s_num_cap_releases;
2137 session->s_num_cap_releases = 0;
2138 spin_unlock(&session->s_cap_lock);
2139
2140 while (!list_empty(&tmp_list)) {
2141 if (!msg) {
2142 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2143 PAGE_SIZE, GFP_NOFS, false);
2144 if (!msg)
2145 goto out_err;
2146 head = msg->front.iov_base;
2147 head->num = cpu_to_le32(0);
2148 msg->front.iov_len = sizeof(*head);
2149
2150 msg->hdr.version = cpu_to_le16(2);
2151 msg->hdr.compat_version = cpu_to_le16(1);
2152 }
2153
2154 cap = list_first_entry(&tmp_list, struct ceph_cap,
2155 session_caps);
2156 list_del(&cap->session_caps);
2157 num_cap_releases--;
2158
2159 head = msg->front.iov_base;
2160 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2161 &head->num);
2162 item = msg->front.iov_base + msg->front.iov_len;
2163 item->ino = cpu_to_le64(cap->cap_ino);
2164 item->cap_id = cpu_to_le64(cap->cap_id);
2165 item->migrate_seq = cpu_to_le32(cap->mseq);
2166 item->seq = cpu_to_le32(cap->issue_seq);
2167 msg->front.iov_len += sizeof(*item);
2168
2169 ceph_put_cap(mdsc, cap);
2170
2171 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2172 // Append cap_barrier field
2173 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2174 *cap_barrier = barrier;
2175 msg->front.iov_len += sizeof(*cap_barrier);
2176
2177 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2178 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2179 ceph_con_send(&session->s_con, msg);
2180 msg = NULL;
2181 }
2182 }
2183
2184 BUG_ON(num_cap_releases != 0);
2185
2186 spin_lock(&session->s_cap_lock);
2187 if (!list_empty(&session->s_cap_releases))
2188 goto again;
2189 spin_unlock(&session->s_cap_lock);
2190
2191 if (msg) {
2192 // Append cap_barrier field
2193 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2194 *cap_barrier = barrier;
2195 msg->front.iov_len += sizeof(*cap_barrier);
2196
2197 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2198 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2199 ceph_con_send(&session->s_con, msg);
2200 }
2201 return;
2202 out_err:
2203 pr_err("send_cap_releases mds%d, failed to allocate message\n",
2204 session->s_mds);
2205 spin_lock(&session->s_cap_lock);
2206 list_splice(&tmp_list, &session->s_cap_releases);
2207 session->s_num_cap_releases += num_cap_releases;
2208 spin_unlock(&session->s_cap_lock);
2209 }
2210
ceph_cap_release_work(struct work_struct * work)2211 static void ceph_cap_release_work(struct work_struct *work)
2212 {
2213 struct ceph_mds_session *session =
2214 container_of(work, struct ceph_mds_session, s_cap_release_work);
2215
2216 mutex_lock(&session->s_mutex);
2217 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2218 session->s_state == CEPH_MDS_SESSION_HUNG)
2219 ceph_send_cap_releases(session->s_mdsc, session);
2220 mutex_unlock(&session->s_mutex);
2221 ceph_put_mds_session(session);
2222 }
2223
ceph_flush_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2224 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2225 struct ceph_mds_session *session)
2226 {
2227 if (mdsc->stopping)
2228 return;
2229
2230 ceph_get_mds_session(session);
2231 if (queue_work(mdsc->fsc->cap_wq,
2232 &session->s_cap_release_work)) {
2233 dout("cap release work queued\n");
2234 } else {
2235 ceph_put_mds_session(session);
2236 dout("failed to queue cap release work\n");
2237 }
2238 }
2239
2240 /*
2241 * caller holds session->s_cap_lock
2242 */
__ceph_queue_cap_release(struct ceph_mds_session * session,struct ceph_cap * cap)2243 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2244 struct ceph_cap *cap)
2245 {
2246 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2247 session->s_num_cap_releases++;
2248
2249 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2250 ceph_flush_cap_releases(session->s_mdsc, session);
2251 }
2252
ceph_cap_reclaim_work(struct work_struct * work)2253 static void ceph_cap_reclaim_work(struct work_struct *work)
2254 {
2255 struct ceph_mds_client *mdsc =
2256 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2257 int ret = ceph_trim_dentries(mdsc);
2258 if (ret == -EAGAIN)
2259 ceph_queue_cap_reclaim_work(mdsc);
2260 }
2261
ceph_queue_cap_reclaim_work(struct ceph_mds_client * mdsc)2262 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2263 {
2264 if (mdsc->stopping)
2265 return;
2266
2267 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2268 dout("caps reclaim work queued\n");
2269 } else {
2270 dout("failed to queue caps release work\n");
2271 }
2272 }
2273
ceph_reclaim_caps_nr(struct ceph_mds_client * mdsc,int nr)2274 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2275 {
2276 int val;
2277 if (!nr)
2278 return;
2279 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2280 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2281 atomic_set(&mdsc->cap_reclaim_pending, 0);
2282 ceph_queue_cap_reclaim_work(mdsc);
2283 }
2284 }
2285
2286 /*
2287 * requests
2288 */
2289
ceph_alloc_readdir_reply_buffer(struct ceph_mds_request * req,struct inode * dir)2290 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2291 struct inode *dir)
2292 {
2293 struct ceph_inode_info *ci = ceph_inode(dir);
2294 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2295 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2296 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2297 unsigned int num_entries;
2298 int order;
2299
2300 spin_lock(&ci->i_ceph_lock);
2301 num_entries = ci->i_files + ci->i_subdirs;
2302 spin_unlock(&ci->i_ceph_lock);
2303 num_entries = max(num_entries, 1U);
2304 num_entries = min(num_entries, opt->max_readdir);
2305
2306 order = get_order(size * num_entries);
2307 while (order >= 0) {
2308 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2309 __GFP_NOWARN,
2310 order);
2311 if (rinfo->dir_entries)
2312 break;
2313 order--;
2314 }
2315 if (!rinfo->dir_entries)
2316 return -ENOMEM;
2317
2318 num_entries = (PAGE_SIZE << order) / size;
2319 num_entries = min(num_entries, opt->max_readdir);
2320
2321 rinfo->dir_buf_size = PAGE_SIZE << order;
2322 req->r_num_caps = num_entries + 1;
2323 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2324 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2325 return 0;
2326 }
2327
2328 /*
2329 * Create an mds request.
2330 */
2331 struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)2332 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2333 {
2334 struct ceph_mds_request *req;
2335
2336 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2337 if (!req)
2338 return ERR_PTR(-ENOMEM);
2339
2340 mutex_init(&req->r_fill_mutex);
2341 req->r_mdsc = mdsc;
2342 req->r_started = jiffies;
2343 req->r_start_latency = ktime_get();
2344 req->r_resend_mds = -1;
2345 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2346 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2347 req->r_fmode = -1;
2348 kref_init(&req->r_kref);
2349 RB_CLEAR_NODE(&req->r_node);
2350 INIT_LIST_HEAD(&req->r_wait);
2351 init_completion(&req->r_completion);
2352 init_completion(&req->r_safe_completion);
2353 INIT_LIST_HEAD(&req->r_unsafe_item);
2354
2355 ktime_get_coarse_real_ts64(&req->r_stamp);
2356
2357 req->r_op = op;
2358 req->r_direct_mode = mode;
2359 return req;
2360 }
2361
2362 /*
2363 * return oldest (lowest) request, tid in request tree, 0 if none.
2364 *
2365 * called under mdsc->mutex.
2366 */
__get_oldest_req(struct ceph_mds_client * mdsc)2367 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2368 {
2369 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2370 return NULL;
2371 return rb_entry(rb_first(&mdsc->request_tree),
2372 struct ceph_mds_request, r_node);
2373 }
2374
__get_oldest_tid(struct ceph_mds_client * mdsc)2375 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2376 {
2377 return mdsc->oldest_tid;
2378 }
2379
2380 /*
2381 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2382 * on build_path_from_dentry in fs/cifs/dir.c.
2383 *
2384 * If @stop_on_nosnap, generate path relative to the first non-snapped
2385 * inode.
2386 *
2387 * Encode hidden .snap dirs as a double /, i.e.
2388 * foo/.snap/bar -> foo//bar
2389 */
ceph_mdsc_build_path(struct dentry * dentry,int * plen,u64 * pbase,int stop_on_nosnap)2390 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2391 int stop_on_nosnap)
2392 {
2393 struct dentry *temp;
2394 char *path;
2395 int pos;
2396 unsigned seq;
2397 u64 base;
2398
2399 if (!dentry)
2400 return ERR_PTR(-EINVAL);
2401
2402 path = __getname();
2403 if (!path)
2404 return ERR_PTR(-ENOMEM);
2405 retry:
2406 pos = PATH_MAX - 1;
2407 path[pos] = '\0';
2408
2409 seq = read_seqbegin(&rename_lock);
2410 rcu_read_lock();
2411 temp = dentry;
2412 for (;;) {
2413 struct inode *inode;
2414
2415 spin_lock(&temp->d_lock);
2416 inode = d_inode(temp);
2417 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2418 dout("build_path path+%d: %p SNAPDIR\n",
2419 pos, temp);
2420 } else if (stop_on_nosnap && inode && dentry != temp &&
2421 ceph_snap(inode) == CEPH_NOSNAP) {
2422 spin_unlock(&temp->d_lock);
2423 pos++; /* get rid of any prepended '/' */
2424 break;
2425 } else {
2426 pos -= temp->d_name.len;
2427 if (pos < 0) {
2428 spin_unlock(&temp->d_lock);
2429 break;
2430 }
2431 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2432 }
2433 spin_unlock(&temp->d_lock);
2434 temp = READ_ONCE(temp->d_parent);
2435
2436 /* Are we at the root? */
2437 if (IS_ROOT(temp))
2438 break;
2439
2440 /* Are we out of buffer? */
2441 if (--pos < 0)
2442 break;
2443
2444 path[pos] = '/';
2445 }
2446 base = ceph_ino(d_inode(temp));
2447 rcu_read_unlock();
2448
2449 if (read_seqretry(&rename_lock, seq))
2450 goto retry;
2451
2452 if (pos < 0) {
2453 /*
2454 * A rename didn't occur, but somehow we didn't end up where
2455 * we thought we would. Throw a warning and try again.
2456 */
2457 pr_warn("build_path did not end path lookup where "
2458 "expected, pos is %d\n", pos);
2459 goto retry;
2460 }
2461
2462 *pbase = base;
2463 *plen = PATH_MAX - 1 - pos;
2464 dout("build_path on %p %d built %llx '%.*s'\n",
2465 dentry, d_count(dentry), base, *plen, path + pos);
2466 return path + pos;
2467 }
2468
build_dentry_path(struct dentry * dentry,struct inode * dir,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath,bool parent_locked)2469 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2470 const char **ppath, int *ppathlen, u64 *pino,
2471 bool *pfreepath, bool parent_locked)
2472 {
2473 char *path;
2474
2475 rcu_read_lock();
2476 if (!dir)
2477 dir = d_inode_rcu(dentry->d_parent);
2478 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2479 *pino = ceph_ino(dir);
2480 rcu_read_unlock();
2481 *ppath = dentry->d_name.name;
2482 *ppathlen = dentry->d_name.len;
2483 return 0;
2484 }
2485 rcu_read_unlock();
2486 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2487 if (IS_ERR(path))
2488 return PTR_ERR(path);
2489 *ppath = path;
2490 *pfreepath = true;
2491 return 0;
2492 }
2493
build_inode_path(struct inode * inode,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath)2494 static int build_inode_path(struct inode *inode,
2495 const char **ppath, int *ppathlen, u64 *pino,
2496 bool *pfreepath)
2497 {
2498 struct dentry *dentry;
2499 char *path;
2500
2501 if (ceph_snap(inode) == CEPH_NOSNAP) {
2502 *pino = ceph_ino(inode);
2503 *ppathlen = 0;
2504 return 0;
2505 }
2506 dentry = d_find_alias(inode);
2507 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2508 dput(dentry);
2509 if (IS_ERR(path))
2510 return PTR_ERR(path);
2511 *ppath = path;
2512 *pfreepath = true;
2513 return 0;
2514 }
2515
2516 /*
2517 * request arguments may be specified via an inode *, a dentry *, or
2518 * an explicit ino+path.
2519 */
set_request_path_attr(struct inode * rinode,struct dentry * rdentry,struct inode * rdiri,const char * rpath,u64 rino,const char ** ppath,int * pathlen,u64 * ino,bool * freepath,bool parent_locked)2520 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2521 struct inode *rdiri, const char *rpath,
2522 u64 rino, const char **ppath, int *pathlen,
2523 u64 *ino, bool *freepath, bool parent_locked)
2524 {
2525 int r = 0;
2526
2527 if (rinode) {
2528 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2529 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2530 ceph_snap(rinode));
2531 } else if (rdentry) {
2532 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2533 freepath, parent_locked);
2534 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2535 *ppath);
2536 } else if (rpath || rino) {
2537 *ino = rino;
2538 *ppath = rpath;
2539 *pathlen = rpath ? strlen(rpath) : 0;
2540 dout(" path %.*s\n", *pathlen, rpath);
2541 }
2542
2543 return r;
2544 }
2545
2546 /*
2547 * called under mdsc->mutex
2548 */
create_request_message(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds,bool drop_cap_releases)2549 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2550 struct ceph_mds_request *req,
2551 int mds, bool drop_cap_releases)
2552 {
2553 struct ceph_msg *msg;
2554 struct ceph_mds_request_head *head;
2555 const char *path1 = NULL;
2556 const char *path2 = NULL;
2557 u64 ino1 = 0, ino2 = 0;
2558 int pathlen1 = 0, pathlen2 = 0;
2559 bool freepath1 = false, freepath2 = false;
2560 int len;
2561 u16 releases;
2562 void *p, *end;
2563 int ret;
2564
2565 ret = set_request_path_attr(req->r_inode, req->r_dentry,
2566 req->r_parent, req->r_path1, req->r_ino1.ino,
2567 &path1, &pathlen1, &ino1, &freepath1,
2568 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2569 &req->r_req_flags));
2570 if (ret < 0) {
2571 msg = ERR_PTR(ret);
2572 goto out;
2573 }
2574
2575 /* If r_old_dentry is set, then assume that its parent is locked */
2576 ret = set_request_path_attr(NULL, req->r_old_dentry,
2577 req->r_old_dentry_dir,
2578 req->r_path2, req->r_ino2.ino,
2579 &path2, &pathlen2, &ino2, &freepath2, true);
2580 if (ret < 0) {
2581 msg = ERR_PTR(ret);
2582 goto out_free1;
2583 }
2584
2585 len = sizeof(*head) +
2586 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2587 sizeof(struct ceph_timespec);
2588
2589 /* calculate (max) length for cap releases */
2590 len += sizeof(struct ceph_mds_request_release) *
2591 (!!req->r_inode_drop + !!req->r_dentry_drop +
2592 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2593 if (req->r_dentry_drop)
2594 len += pathlen1;
2595 if (req->r_old_dentry_drop)
2596 len += pathlen2;
2597
2598 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2599 if (!msg) {
2600 msg = ERR_PTR(-ENOMEM);
2601 goto out_free2;
2602 }
2603
2604 msg->hdr.version = cpu_to_le16(2);
2605 msg->hdr.tid = cpu_to_le64(req->r_tid);
2606
2607 head = msg->front.iov_base;
2608 p = msg->front.iov_base + sizeof(*head);
2609 end = msg->front.iov_base + msg->front.iov_len;
2610
2611 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2612 head->op = cpu_to_le32(req->r_op);
2613 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2614 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2615 head->ino = cpu_to_le64(req->r_deleg_ino);
2616 head->args = req->r_args;
2617
2618 ceph_encode_filepath(&p, end, ino1, path1);
2619 ceph_encode_filepath(&p, end, ino2, path2);
2620
2621 /* make note of release offset, in case we need to replay */
2622 req->r_request_release_offset = p - msg->front.iov_base;
2623
2624 /* cap releases */
2625 releases = 0;
2626 if (req->r_inode_drop)
2627 releases += ceph_encode_inode_release(&p,
2628 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2629 mds, req->r_inode_drop, req->r_inode_unless,
2630 req->r_op == CEPH_MDS_OP_READDIR);
2631 if (req->r_dentry_drop)
2632 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2633 req->r_parent, mds, req->r_dentry_drop,
2634 req->r_dentry_unless);
2635 if (req->r_old_dentry_drop)
2636 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2637 req->r_old_dentry_dir, mds,
2638 req->r_old_dentry_drop,
2639 req->r_old_dentry_unless);
2640 if (req->r_old_inode_drop)
2641 releases += ceph_encode_inode_release(&p,
2642 d_inode(req->r_old_dentry),
2643 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2644
2645 if (drop_cap_releases) {
2646 releases = 0;
2647 p = msg->front.iov_base + req->r_request_release_offset;
2648 }
2649
2650 head->num_releases = cpu_to_le16(releases);
2651
2652 /* time stamp */
2653 {
2654 struct ceph_timespec ts;
2655 ceph_encode_timespec64(&ts, &req->r_stamp);
2656 ceph_encode_copy(&p, &ts, sizeof(ts));
2657 }
2658
2659 if (WARN_ON_ONCE(p > end)) {
2660 ceph_msg_put(msg);
2661 msg = ERR_PTR(-ERANGE);
2662 goto out_free2;
2663 }
2664
2665 msg->front.iov_len = p - msg->front.iov_base;
2666 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2667
2668 if (req->r_pagelist) {
2669 struct ceph_pagelist *pagelist = req->r_pagelist;
2670 ceph_msg_data_add_pagelist(msg, pagelist);
2671 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2672 } else {
2673 msg->hdr.data_len = 0;
2674 }
2675
2676 msg->hdr.data_off = cpu_to_le16(0);
2677
2678 out_free2:
2679 if (freepath2)
2680 ceph_mdsc_free_path((char *)path2, pathlen2);
2681 out_free1:
2682 if (freepath1)
2683 ceph_mdsc_free_path((char *)path1, pathlen1);
2684 out:
2685 return msg;
2686 }
2687
2688 /*
2689 * called under mdsc->mutex if error, under no mutex if
2690 * success.
2691 */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)2692 static void complete_request(struct ceph_mds_client *mdsc,
2693 struct ceph_mds_request *req)
2694 {
2695 req->r_end_latency = ktime_get();
2696
2697 if (req->r_callback)
2698 req->r_callback(mdsc, req);
2699 complete_all(&req->r_completion);
2700 }
2701
2702 /*
2703 * called under mdsc->mutex
2704 */
__prepare_send_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds,bool drop_cap_releases)2705 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2706 struct ceph_mds_request *req,
2707 int mds, bool drop_cap_releases)
2708 {
2709 struct ceph_mds_request_head *rhead;
2710 struct ceph_msg *msg;
2711 int flags = 0;
2712
2713 req->r_attempts++;
2714 if (req->r_inode) {
2715 struct ceph_cap *cap =
2716 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2717
2718 if (cap)
2719 req->r_sent_on_mseq = cap->mseq;
2720 else
2721 req->r_sent_on_mseq = -1;
2722 }
2723 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2724 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2725
2726 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2727 void *p;
2728 /*
2729 * Replay. Do not regenerate message (and rebuild
2730 * paths, etc.); just use the original message.
2731 * Rebuilding paths will break for renames because
2732 * d_move mangles the src name.
2733 */
2734 msg = req->r_request;
2735 rhead = msg->front.iov_base;
2736
2737 flags = le32_to_cpu(rhead->flags);
2738 flags |= CEPH_MDS_FLAG_REPLAY;
2739 rhead->flags = cpu_to_le32(flags);
2740
2741 if (req->r_target_inode)
2742 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2743
2744 rhead->num_retry = req->r_attempts - 1;
2745
2746 /* remove cap/dentry releases from message */
2747 rhead->num_releases = 0;
2748
2749 /* time stamp */
2750 p = msg->front.iov_base + req->r_request_release_offset;
2751 {
2752 struct ceph_timespec ts;
2753 ceph_encode_timespec64(&ts, &req->r_stamp);
2754 ceph_encode_copy(&p, &ts, sizeof(ts));
2755 }
2756
2757 msg->front.iov_len = p - msg->front.iov_base;
2758 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2759 return 0;
2760 }
2761
2762 if (req->r_request) {
2763 ceph_msg_put(req->r_request);
2764 req->r_request = NULL;
2765 }
2766 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2767 if (IS_ERR(msg)) {
2768 req->r_err = PTR_ERR(msg);
2769 return PTR_ERR(msg);
2770 }
2771 req->r_request = msg;
2772
2773 rhead = msg->front.iov_base;
2774 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2775 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2776 flags |= CEPH_MDS_FLAG_REPLAY;
2777 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2778 flags |= CEPH_MDS_FLAG_ASYNC;
2779 if (req->r_parent)
2780 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2781 rhead->flags = cpu_to_le32(flags);
2782 rhead->num_fwd = req->r_num_fwd;
2783 rhead->num_retry = req->r_attempts - 1;
2784
2785 dout(" r_parent = %p\n", req->r_parent);
2786 return 0;
2787 }
2788
2789 /*
2790 * called under mdsc->mutex
2791 */
__send_request(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)2792 static int __send_request(struct ceph_mds_client *mdsc,
2793 struct ceph_mds_session *session,
2794 struct ceph_mds_request *req,
2795 bool drop_cap_releases)
2796 {
2797 int err;
2798
2799 err = __prepare_send_request(mdsc, req, session->s_mds,
2800 drop_cap_releases);
2801 if (!err) {
2802 ceph_msg_get(req->r_request);
2803 ceph_con_send(&session->s_con, req->r_request);
2804 }
2805
2806 return err;
2807 }
2808
2809 /*
2810 * send request, or put it on the appropriate wait list.
2811 */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)2812 static void __do_request(struct ceph_mds_client *mdsc,
2813 struct ceph_mds_request *req)
2814 {
2815 struct ceph_mds_session *session = NULL;
2816 int mds = -1;
2817 int err = 0;
2818 bool random;
2819
2820 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2821 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2822 __unregister_request(mdsc, req);
2823 return;
2824 }
2825
2826 if (req->r_timeout &&
2827 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2828 dout("do_request timed out\n");
2829 err = -ETIMEDOUT;
2830 goto finish;
2831 }
2832 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2833 dout("do_request forced umount\n");
2834 err = -EIO;
2835 goto finish;
2836 }
2837 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2838 if (mdsc->mdsmap_err) {
2839 err = mdsc->mdsmap_err;
2840 dout("do_request mdsmap err %d\n", err);
2841 goto finish;
2842 }
2843 if (mdsc->mdsmap->m_epoch == 0) {
2844 dout("do_request no mdsmap, waiting for map\n");
2845 list_add(&req->r_wait, &mdsc->waiting_for_map);
2846 return;
2847 }
2848 if (!(mdsc->fsc->mount_options->flags &
2849 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2850 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2851 err = -EHOSTUNREACH;
2852 goto finish;
2853 }
2854 }
2855
2856 put_request_session(req);
2857
2858 mds = __choose_mds(mdsc, req, &random);
2859 if (mds < 0 ||
2860 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2861 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2862 err = -EJUKEBOX;
2863 goto finish;
2864 }
2865 dout("do_request no mds or not active, waiting for map\n");
2866 list_add(&req->r_wait, &mdsc->waiting_for_map);
2867 return;
2868 }
2869
2870 /* get, open session */
2871 session = __ceph_lookup_mds_session(mdsc, mds);
2872 if (!session) {
2873 session = register_session(mdsc, mds);
2874 if (IS_ERR(session)) {
2875 err = PTR_ERR(session);
2876 goto finish;
2877 }
2878 }
2879 req->r_session = ceph_get_mds_session(session);
2880
2881 dout("do_request mds%d session %p state %s\n", mds, session,
2882 ceph_session_state_name(session->s_state));
2883 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2884 session->s_state != CEPH_MDS_SESSION_HUNG) {
2885 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2886 err = -EACCES;
2887 goto out_session;
2888 }
2889 /*
2890 * We cannot queue async requests since the caps and delegated
2891 * inodes are bound to the session. Just return -EJUKEBOX and
2892 * let the caller retry a sync request in that case.
2893 */
2894 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895 err = -EJUKEBOX;
2896 goto out_session;
2897 }
2898 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2899 session->s_state == CEPH_MDS_SESSION_CLOSING) {
2900 err = __open_session(mdsc, session);
2901 if (err)
2902 goto out_session;
2903 /* retry the same mds later */
2904 if (random)
2905 req->r_resend_mds = mds;
2906 }
2907 list_add(&req->r_wait, &session->s_waiting);
2908 goto out_session;
2909 }
2910
2911 /* send request */
2912 req->r_resend_mds = -1; /* forget any previous mds hint */
2913
2914 if (req->r_request_started == 0) /* note request start time */
2915 req->r_request_started = jiffies;
2916
2917 err = __send_request(mdsc, session, req, false);
2918
2919 out_session:
2920 ceph_put_mds_session(session);
2921 finish:
2922 if (err) {
2923 dout("__do_request early error %d\n", err);
2924 req->r_err = err;
2925 complete_request(mdsc, req);
2926 __unregister_request(mdsc, req);
2927 }
2928 return;
2929 }
2930
2931 /*
2932 * called under mdsc->mutex
2933 */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)2934 static void __wake_requests(struct ceph_mds_client *mdsc,
2935 struct list_head *head)
2936 {
2937 struct ceph_mds_request *req;
2938 LIST_HEAD(tmp_list);
2939
2940 list_splice_init(head, &tmp_list);
2941
2942 while (!list_empty(&tmp_list)) {
2943 req = list_entry(tmp_list.next,
2944 struct ceph_mds_request, r_wait);
2945 list_del_init(&req->r_wait);
2946 dout(" wake request %p tid %llu\n", req, req->r_tid);
2947 __do_request(mdsc, req);
2948 }
2949 }
2950
2951 /*
2952 * Wake up threads with requests pending for @mds, so that they can
2953 * resubmit their requests to a possibly different mds.
2954 */
kick_requests(struct ceph_mds_client * mdsc,int mds)2955 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2956 {
2957 struct ceph_mds_request *req;
2958 struct rb_node *p = rb_first(&mdsc->request_tree);
2959
2960 dout("kick_requests mds%d\n", mds);
2961 while (p) {
2962 req = rb_entry(p, struct ceph_mds_request, r_node);
2963 p = rb_next(p);
2964 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2965 continue;
2966 if (req->r_attempts > 0)
2967 continue; /* only new requests */
2968 if (req->r_session &&
2969 req->r_session->s_mds == mds) {
2970 dout(" kicking tid %llu\n", req->r_tid);
2971 list_del_init(&req->r_wait);
2972 __do_request(mdsc, req);
2973 }
2974 }
2975 }
2976
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)2977 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2978 struct ceph_mds_request *req)
2979 {
2980 int err = 0;
2981
2982 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2983 if (req->r_inode)
2984 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2985 if (req->r_parent) {
2986 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2987 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2988 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2989 spin_lock(&ci->i_ceph_lock);
2990 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2991 __ceph_touch_fmode(ci, mdsc, fmode);
2992 spin_unlock(&ci->i_ceph_lock);
2993 ihold(req->r_parent);
2994 }
2995 if (req->r_old_dentry_dir)
2996 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2997 CEPH_CAP_PIN);
2998
2999 if (req->r_inode) {
3000 err = ceph_wait_on_async_create(req->r_inode);
3001 if (err) {
3002 dout("%s: wait for async create returned: %d\n",
3003 __func__, err);
3004 return err;
3005 }
3006 }
3007
3008 if (!err && req->r_old_inode) {
3009 err = ceph_wait_on_async_create(req->r_old_inode);
3010 if (err) {
3011 dout("%s: wait for async create returned: %d\n",
3012 __func__, err);
3013 return err;
3014 }
3015 }
3016
3017 dout("submit_request on %p for inode %p\n", req, dir);
3018 mutex_lock(&mdsc->mutex);
3019 __register_request(mdsc, req, dir);
3020 __do_request(mdsc, req);
3021 err = req->r_err;
3022 mutex_unlock(&mdsc->mutex);
3023 return err;
3024 }
3025
ceph_mdsc_wait_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3026 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3027 struct ceph_mds_request *req)
3028 {
3029 int err;
3030
3031 /* wait */
3032 dout("do_request waiting\n");
3033 if (!req->r_timeout && req->r_wait_for_completion) {
3034 err = req->r_wait_for_completion(mdsc, req);
3035 } else {
3036 long timeleft = wait_for_completion_killable_timeout(
3037 &req->r_completion,
3038 ceph_timeout_jiffies(req->r_timeout));
3039 if (timeleft > 0)
3040 err = 0;
3041 else if (!timeleft)
3042 err = -ETIMEDOUT; /* timed out */
3043 else
3044 err = timeleft; /* killed */
3045 }
3046 dout("do_request waited, got %d\n", err);
3047 mutex_lock(&mdsc->mutex);
3048
3049 /* only abort if we didn't race with a real reply */
3050 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3051 err = le32_to_cpu(req->r_reply_info.head->result);
3052 } else if (err < 0) {
3053 dout("aborted request %lld with %d\n", req->r_tid, err);
3054
3055 /*
3056 * ensure we aren't running concurrently with
3057 * ceph_fill_trace or ceph_readdir_prepopulate, which
3058 * rely on locks (dir mutex) held by our caller.
3059 */
3060 mutex_lock(&req->r_fill_mutex);
3061 req->r_err = err;
3062 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3063 mutex_unlock(&req->r_fill_mutex);
3064
3065 if (req->r_parent &&
3066 (req->r_op & CEPH_MDS_OP_WRITE))
3067 ceph_invalidate_dir_request(req);
3068 } else {
3069 err = req->r_err;
3070 }
3071
3072 mutex_unlock(&mdsc->mutex);
3073 return err;
3074 }
3075
3076 /*
3077 * Synchrously perform an mds request. Take care of all of the
3078 * session setup, forwarding, retry details.
3079 */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3080 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3081 struct inode *dir,
3082 struct ceph_mds_request *req)
3083 {
3084 int err;
3085
3086 dout("do_request on %p\n", req);
3087
3088 /* issue */
3089 err = ceph_mdsc_submit_request(mdsc, dir, req);
3090 if (!err)
3091 err = ceph_mdsc_wait_request(mdsc, req);
3092 dout("do_request %p done, result %d\n", req, err);
3093 return err;
3094 }
3095
3096 /*
3097 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3098 * namespace request.
3099 */
ceph_invalidate_dir_request(struct ceph_mds_request * req)3100 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3101 {
3102 struct inode *dir = req->r_parent;
3103 struct inode *old_dir = req->r_old_dentry_dir;
3104
3105 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3106
3107 ceph_dir_clear_complete(dir);
3108 if (old_dir)
3109 ceph_dir_clear_complete(old_dir);
3110 if (req->r_dentry)
3111 ceph_invalidate_dentry_lease(req->r_dentry);
3112 if (req->r_old_dentry)
3113 ceph_invalidate_dentry_lease(req->r_old_dentry);
3114 }
3115
3116 /*
3117 * Handle mds reply.
3118 *
3119 * We take the session mutex and parse and process the reply immediately.
3120 * This preserves the logical ordering of replies, capabilities, etc., sent
3121 * by the MDS as they are applied to our local cache.
3122 */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)3123 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3124 {
3125 struct ceph_mds_client *mdsc = session->s_mdsc;
3126 struct ceph_mds_request *req;
3127 struct ceph_mds_reply_head *head = msg->front.iov_base;
3128 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3129 struct ceph_snap_realm *realm;
3130 u64 tid;
3131 int err, result;
3132 int mds = session->s_mds;
3133
3134 if (msg->front.iov_len < sizeof(*head)) {
3135 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3136 ceph_msg_dump(msg);
3137 return;
3138 }
3139
3140 /* get request, session */
3141 tid = le64_to_cpu(msg->hdr.tid);
3142 mutex_lock(&mdsc->mutex);
3143 req = lookup_get_request(mdsc, tid);
3144 if (!req) {
3145 dout("handle_reply on unknown tid %llu\n", tid);
3146 mutex_unlock(&mdsc->mutex);
3147 return;
3148 }
3149 dout("handle_reply %p\n", req);
3150
3151 /* correct session? */
3152 if (req->r_session != session) {
3153 pr_err("mdsc_handle_reply got %llu on session mds%d"
3154 " not mds%d\n", tid, session->s_mds,
3155 req->r_session ? req->r_session->s_mds : -1);
3156 mutex_unlock(&mdsc->mutex);
3157 goto out;
3158 }
3159
3160 /* dup? */
3161 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3162 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3163 pr_warn("got a dup %s reply on %llu from mds%d\n",
3164 head->safe ? "safe" : "unsafe", tid, mds);
3165 mutex_unlock(&mdsc->mutex);
3166 goto out;
3167 }
3168 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3169 pr_warn("got unsafe after safe on %llu from mds%d\n",
3170 tid, mds);
3171 mutex_unlock(&mdsc->mutex);
3172 goto out;
3173 }
3174
3175 result = le32_to_cpu(head->result);
3176
3177 /*
3178 * Handle an ESTALE
3179 * if we're not talking to the authority, send to them
3180 * if the authority has changed while we weren't looking,
3181 * send to new authority
3182 * Otherwise we just have to return an ESTALE
3183 */
3184 if (result == -ESTALE) {
3185 dout("got ESTALE on request %llu\n", req->r_tid);
3186 req->r_resend_mds = -1;
3187 if (req->r_direct_mode != USE_AUTH_MDS) {
3188 dout("not using auth, setting for that now\n");
3189 req->r_direct_mode = USE_AUTH_MDS;
3190 __do_request(mdsc, req);
3191 mutex_unlock(&mdsc->mutex);
3192 goto out;
3193 } else {
3194 int mds = __choose_mds(mdsc, req, NULL);
3195 if (mds >= 0 && mds != req->r_session->s_mds) {
3196 dout("but auth changed, so resending\n");
3197 __do_request(mdsc, req);
3198 mutex_unlock(&mdsc->mutex);
3199 goto out;
3200 }
3201 }
3202 dout("have to return ESTALE on request %llu\n", req->r_tid);
3203 }
3204
3205
3206 if (head->safe) {
3207 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3208 __unregister_request(mdsc, req);
3209
3210 /* last request during umount? */
3211 if (mdsc->stopping && !__get_oldest_req(mdsc))
3212 complete_all(&mdsc->safe_umount_waiters);
3213
3214 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3215 /*
3216 * We already handled the unsafe response, now do the
3217 * cleanup. No need to examine the response; the MDS
3218 * doesn't include any result info in the safe
3219 * response. And even if it did, there is nothing
3220 * useful we could do with a revised return value.
3221 */
3222 dout("got safe reply %llu, mds%d\n", tid, mds);
3223
3224 mutex_unlock(&mdsc->mutex);
3225 goto out;
3226 }
3227 } else {
3228 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3229 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3230 }
3231
3232 dout("handle_reply tid %lld result %d\n", tid, result);
3233 rinfo = &req->r_reply_info;
3234 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3235 err = parse_reply_info(session, msg, rinfo, (u64)-1);
3236 else
3237 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3238 mutex_unlock(&mdsc->mutex);
3239
3240 mutex_lock(&session->s_mutex);
3241 if (err < 0) {
3242 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3243 ceph_msg_dump(msg);
3244 goto out_err;
3245 }
3246
3247 /* snap trace */
3248 realm = NULL;
3249 if (rinfo->snapblob_len) {
3250 down_write(&mdsc->snap_rwsem);
3251 ceph_update_snap_trace(mdsc, rinfo->snapblob,
3252 rinfo->snapblob + rinfo->snapblob_len,
3253 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3254 &realm);
3255 downgrade_write(&mdsc->snap_rwsem);
3256 } else {
3257 down_read(&mdsc->snap_rwsem);
3258 }
3259
3260 /* insert trace into our cache */
3261 mutex_lock(&req->r_fill_mutex);
3262 current->journal_info = req;
3263 err = ceph_fill_trace(mdsc->fsc->sb, req);
3264 if (err == 0) {
3265 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3266 req->r_op == CEPH_MDS_OP_LSSNAP))
3267 ceph_readdir_prepopulate(req, req->r_session);
3268 }
3269 current->journal_info = NULL;
3270 mutex_unlock(&req->r_fill_mutex);
3271
3272 up_read(&mdsc->snap_rwsem);
3273 if (realm)
3274 ceph_put_snap_realm(mdsc, realm);
3275
3276 if (err == 0) {
3277 if (req->r_target_inode &&
3278 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3279 struct ceph_inode_info *ci =
3280 ceph_inode(req->r_target_inode);
3281 spin_lock(&ci->i_unsafe_lock);
3282 list_add_tail(&req->r_unsafe_target_item,
3283 &ci->i_unsafe_iops);
3284 spin_unlock(&ci->i_unsafe_lock);
3285 }
3286
3287 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3288 }
3289 out_err:
3290 mutex_lock(&mdsc->mutex);
3291 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3292 if (err) {
3293 req->r_err = err;
3294 } else {
3295 req->r_reply = ceph_msg_get(msg);
3296 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3297 }
3298 } else {
3299 dout("reply arrived after request %lld was aborted\n", tid);
3300 }
3301 mutex_unlock(&mdsc->mutex);
3302
3303 mutex_unlock(&session->s_mutex);
3304
3305 /* kick calling process */
3306 complete_request(mdsc, req);
3307
3308 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3309 req->r_end_latency, err);
3310 out:
3311 ceph_mdsc_put_request(req);
3312 return;
3313 }
3314
3315
3316
3317 /*
3318 * handle mds notification that our request has been forwarded.
3319 */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)3320 static void handle_forward(struct ceph_mds_client *mdsc,
3321 struct ceph_mds_session *session,
3322 struct ceph_msg *msg)
3323 {
3324 struct ceph_mds_request *req;
3325 u64 tid = le64_to_cpu(msg->hdr.tid);
3326 u32 next_mds;
3327 u32 fwd_seq;
3328 int err = -EINVAL;
3329 void *p = msg->front.iov_base;
3330 void *end = p + msg->front.iov_len;
3331
3332 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3333 next_mds = ceph_decode_32(&p);
3334 fwd_seq = ceph_decode_32(&p);
3335
3336 mutex_lock(&mdsc->mutex);
3337 req = lookup_get_request(mdsc, tid);
3338 if (!req) {
3339 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3340 goto out; /* dup reply? */
3341 }
3342
3343 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3344 dout("forward tid %llu aborted, unregistering\n", tid);
3345 __unregister_request(mdsc, req);
3346 } else if (fwd_seq <= req->r_num_fwd) {
3347 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3348 tid, next_mds, req->r_num_fwd, fwd_seq);
3349 } else {
3350 /* resend. forward race not possible; mds would drop */
3351 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3352 BUG_ON(req->r_err);
3353 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3354 req->r_attempts = 0;
3355 req->r_num_fwd = fwd_seq;
3356 req->r_resend_mds = next_mds;
3357 put_request_session(req);
3358 __do_request(mdsc, req);
3359 }
3360 ceph_mdsc_put_request(req);
3361 out:
3362 mutex_unlock(&mdsc->mutex);
3363 return;
3364
3365 bad:
3366 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3367 }
3368
__decode_session_metadata(void ** p,void * end,bool * blocklisted)3369 static int __decode_session_metadata(void **p, void *end,
3370 bool *blocklisted)
3371 {
3372 /* map<string,string> */
3373 u32 n;
3374 bool err_str;
3375 ceph_decode_32_safe(p, end, n, bad);
3376 while (n-- > 0) {
3377 u32 len;
3378 ceph_decode_32_safe(p, end, len, bad);
3379 ceph_decode_need(p, end, len, bad);
3380 err_str = !strncmp(*p, "error_string", len);
3381 *p += len;
3382 ceph_decode_32_safe(p, end, len, bad);
3383 ceph_decode_need(p, end, len, bad);
3384 /*
3385 * Match "blocklisted (blacklisted)" from newer MDSes,
3386 * or "blacklisted" from older MDSes.
3387 */
3388 if (err_str && strnstr(*p, "blacklisted", len))
3389 *blocklisted = true;
3390 *p += len;
3391 }
3392 return 0;
3393 bad:
3394 return -1;
3395 }
3396
3397 /*
3398 * handle a mds session control message
3399 */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)3400 static void handle_session(struct ceph_mds_session *session,
3401 struct ceph_msg *msg)
3402 {
3403 struct ceph_mds_client *mdsc = session->s_mdsc;
3404 int mds = session->s_mds;
3405 int msg_version = le16_to_cpu(msg->hdr.version);
3406 void *p = msg->front.iov_base;
3407 void *end = p + msg->front.iov_len;
3408 struct ceph_mds_session_head *h;
3409 u32 op;
3410 u64 seq, features = 0;
3411 int wake = 0;
3412 bool blocklisted = false;
3413
3414 /* decode */
3415 ceph_decode_need(&p, end, sizeof(*h), bad);
3416 h = p;
3417 p += sizeof(*h);
3418
3419 op = le32_to_cpu(h->op);
3420 seq = le64_to_cpu(h->seq);
3421
3422 if (msg_version >= 3) {
3423 u32 len;
3424 /* version >= 2, metadata */
3425 if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3426 goto bad;
3427 /* version >= 3, feature bits */
3428 ceph_decode_32_safe(&p, end, len, bad);
3429 if (len) {
3430 ceph_decode_64_safe(&p, end, features, bad);
3431 p += len - sizeof(features);
3432 }
3433 }
3434
3435 mutex_lock(&mdsc->mutex);
3436 if (op == CEPH_SESSION_CLOSE) {
3437 ceph_get_mds_session(session);
3438 __unregister_session(mdsc, session);
3439 }
3440 /* FIXME: this ttl calculation is generous */
3441 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3442 mutex_unlock(&mdsc->mutex);
3443
3444 mutex_lock(&session->s_mutex);
3445
3446 dout("handle_session mds%d %s %p state %s seq %llu\n",
3447 mds, ceph_session_op_name(op), session,
3448 ceph_session_state_name(session->s_state), seq);
3449
3450 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3451 session->s_state = CEPH_MDS_SESSION_OPEN;
3452 pr_info("mds%d came back\n", session->s_mds);
3453 }
3454
3455 switch (op) {
3456 case CEPH_SESSION_OPEN:
3457 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3458 pr_info("mds%d reconnect success\n", session->s_mds);
3459 session->s_state = CEPH_MDS_SESSION_OPEN;
3460 session->s_features = features;
3461 renewed_caps(mdsc, session, 0);
3462 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3463 metric_schedule_delayed(&mdsc->metric);
3464 wake = 1;
3465 if (mdsc->stopping)
3466 __close_session(mdsc, session);
3467 break;
3468
3469 case CEPH_SESSION_RENEWCAPS:
3470 if (session->s_renew_seq == seq)
3471 renewed_caps(mdsc, session, 1);
3472 break;
3473
3474 case CEPH_SESSION_CLOSE:
3475 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3476 pr_info("mds%d reconnect denied\n", session->s_mds);
3477 session->s_state = CEPH_MDS_SESSION_CLOSED;
3478 cleanup_session_requests(mdsc, session);
3479 remove_session_caps(session);
3480 wake = 2; /* for good measure */
3481 wake_up_all(&mdsc->session_close_wq);
3482 break;
3483
3484 case CEPH_SESSION_STALE:
3485 pr_info("mds%d caps went stale, renewing\n",
3486 session->s_mds);
3487 spin_lock(&session->s_gen_ttl_lock);
3488 session->s_cap_gen++;
3489 session->s_cap_ttl = jiffies - 1;
3490 spin_unlock(&session->s_gen_ttl_lock);
3491 send_renew_caps(mdsc, session);
3492 break;
3493
3494 case CEPH_SESSION_RECALL_STATE:
3495 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3496 break;
3497
3498 case CEPH_SESSION_FLUSHMSG:
3499 /* flush cap releases */
3500 spin_lock(&session->s_cap_lock);
3501 if (session->s_num_cap_releases)
3502 ceph_flush_cap_releases(mdsc, session);
3503 spin_unlock(&session->s_cap_lock);
3504
3505 send_flushmsg_ack(mdsc, session, seq);
3506 break;
3507
3508 case CEPH_SESSION_FORCE_RO:
3509 dout("force_session_readonly %p\n", session);
3510 spin_lock(&session->s_cap_lock);
3511 session->s_readonly = true;
3512 spin_unlock(&session->s_cap_lock);
3513 wake_up_session_caps(session, FORCE_RO);
3514 break;
3515
3516 case CEPH_SESSION_REJECT:
3517 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3518 pr_info("mds%d rejected session\n", session->s_mds);
3519 session->s_state = CEPH_MDS_SESSION_REJECTED;
3520 cleanup_session_requests(mdsc, session);
3521 remove_session_caps(session);
3522 if (blocklisted)
3523 mdsc->fsc->blocklisted = true;
3524 wake = 2; /* for good measure */
3525 break;
3526
3527 default:
3528 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3529 WARN_ON(1);
3530 }
3531
3532 mutex_unlock(&session->s_mutex);
3533 if (wake) {
3534 mutex_lock(&mdsc->mutex);
3535 __wake_requests(mdsc, &session->s_waiting);
3536 if (wake == 2)
3537 kick_requests(mdsc, mds);
3538 mutex_unlock(&mdsc->mutex);
3539 }
3540 if (op == CEPH_SESSION_CLOSE)
3541 ceph_put_mds_session(session);
3542 return;
3543
3544 bad:
3545 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3546 (int)msg->front.iov_len);
3547 ceph_msg_dump(msg);
3548 return;
3549 }
3550
ceph_mdsc_release_dir_caps(struct ceph_mds_request * req)3551 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3552 {
3553 int dcaps;
3554
3555 dcaps = xchg(&req->r_dir_caps, 0);
3556 if (dcaps) {
3557 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3558 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3559 }
3560 }
3561
ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request * req)3562 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3563 {
3564 int dcaps;
3565
3566 dcaps = xchg(&req->r_dir_caps, 0);
3567 if (dcaps) {
3568 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3569 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3570 dcaps);
3571 }
3572 }
3573
3574 /*
3575 * called under session->mutex.
3576 */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)3577 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3578 struct ceph_mds_session *session)
3579 {
3580 struct ceph_mds_request *req, *nreq;
3581 struct rb_node *p;
3582
3583 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3584
3585 mutex_lock(&mdsc->mutex);
3586 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3587 __send_request(mdsc, session, req, true);
3588
3589 /*
3590 * also re-send old requests when MDS enters reconnect stage. So that MDS
3591 * can process completed request in clientreplay stage.
3592 */
3593 p = rb_first(&mdsc->request_tree);
3594 while (p) {
3595 req = rb_entry(p, struct ceph_mds_request, r_node);
3596 p = rb_next(p);
3597 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3598 continue;
3599 if (req->r_attempts == 0)
3600 continue; /* only old requests */
3601 if (!req->r_session)
3602 continue;
3603 if (req->r_session->s_mds != session->s_mds)
3604 continue;
3605
3606 ceph_mdsc_release_dir_caps_no_check(req);
3607
3608 __send_request(mdsc, session, req, true);
3609 }
3610 mutex_unlock(&mdsc->mutex);
3611 }
3612
send_reconnect_partial(struct ceph_reconnect_state * recon_state)3613 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3614 {
3615 struct ceph_msg *reply;
3616 struct ceph_pagelist *_pagelist;
3617 struct page *page;
3618 __le32 *addr;
3619 int err = -ENOMEM;
3620
3621 if (!recon_state->allow_multi)
3622 return -ENOSPC;
3623
3624 /* can't handle message that contains both caps and realm */
3625 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3626
3627 /* pre-allocate new pagelist */
3628 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3629 if (!_pagelist)
3630 return -ENOMEM;
3631
3632 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3633 if (!reply)
3634 goto fail_msg;
3635
3636 /* placeholder for nr_caps */
3637 err = ceph_pagelist_encode_32(_pagelist, 0);
3638 if (err < 0)
3639 goto fail;
3640
3641 if (recon_state->nr_caps) {
3642 /* currently encoding caps */
3643 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3644 if (err)
3645 goto fail;
3646 } else {
3647 /* placeholder for nr_realms (currently encoding relams) */
3648 err = ceph_pagelist_encode_32(_pagelist, 0);
3649 if (err < 0)
3650 goto fail;
3651 }
3652
3653 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3654 if (err)
3655 goto fail;
3656
3657 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3658 addr = kmap_atomic(page);
3659 if (recon_state->nr_caps) {
3660 /* currently encoding caps */
3661 *addr = cpu_to_le32(recon_state->nr_caps);
3662 } else {
3663 /* currently encoding relams */
3664 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3665 }
3666 kunmap_atomic(addr);
3667
3668 reply->hdr.version = cpu_to_le16(5);
3669 reply->hdr.compat_version = cpu_to_le16(4);
3670
3671 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3672 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3673
3674 ceph_con_send(&recon_state->session->s_con, reply);
3675 ceph_pagelist_release(recon_state->pagelist);
3676
3677 recon_state->pagelist = _pagelist;
3678 recon_state->nr_caps = 0;
3679 recon_state->nr_realms = 0;
3680 recon_state->msg_version = 5;
3681 return 0;
3682 fail:
3683 ceph_msg_put(reply);
3684 fail_msg:
3685 ceph_pagelist_release(_pagelist);
3686 return err;
3687 }
3688
d_find_primary(struct inode * inode)3689 static struct dentry* d_find_primary(struct inode *inode)
3690 {
3691 struct dentry *alias, *dn = NULL;
3692
3693 if (hlist_empty(&inode->i_dentry))
3694 return NULL;
3695
3696 spin_lock(&inode->i_lock);
3697 if (hlist_empty(&inode->i_dentry))
3698 goto out_unlock;
3699
3700 if (S_ISDIR(inode->i_mode)) {
3701 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3702 if (!IS_ROOT(alias))
3703 dn = dget(alias);
3704 goto out_unlock;
3705 }
3706
3707 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3708 spin_lock(&alias->d_lock);
3709 if (!d_unhashed(alias) &&
3710 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3711 dn = dget_dlock(alias);
3712 }
3713 spin_unlock(&alias->d_lock);
3714 if (dn)
3715 break;
3716 }
3717 out_unlock:
3718 spin_unlock(&inode->i_lock);
3719 return dn;
3720 }
3721
3722 /*
3723 * Encode information about a cap for a reconnect with the MDS.
3724 */
reconnect_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)3725 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3726 void *arg)
3727 {
3728 union {
3729 struct ceph_mds_cap_reconnect v2;
3730 struct ceph_mds_cap_reconnect_v1 v1;
3731 } rec;
3732 struct ceph_inode_info *ci = cap->ci;
3733 struct ceph_reconnect_state *recon_state = arg;
3734 struct ceph_pagelist *pagelist = recon_state->pagelist;
3735 struct dentry *dentry;
3736 char *path;
3737 int pathlen = 0, err;
3738 u64 pathbase;
3739 u64 snap_follows;
3740
3741 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3742 inode, ceph_vinop(inode), cap, cap->cap_id,
3743 ceph_cap_string(cap->issued));
3744
3745 dentry = d_find_primary(inode);
3746 if (dentry) {
3747 /* set pathbase to parent dir when msg_version >= 2 */
3748 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3749 recon_state->msg_version >= 2);
3750 dput(dentry);
3751 if (IS_ERR(path)) {
3752 err = PTR_ERR(path);
3753 goto out_err;
3754 }
3755 } else {
3756 path = NULL;
3757 pathbase = 0;
3758 }
3759
3760 spin_lock(&ci->i_ceph_lock);
3761 cap->seq = 0; /* reset cap seq */
3762 cap->issue_seq = 0; /* and issue_seq */
3763 cap->mseq = 0; /* and migrate_seq */
3764 cap->cap_gen = cap->session->s_cap_gen;
3765
3766 /* These are lost when the session goes away */
3767 if (S_ISDIR(inode->i_mode)) {
3768 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3769 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3770 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3771 }
3772 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3773 }
3774
3775 if (recon_state->msg_version >= 2) {
3776 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3777 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3778 rec.v2.issued = cpu_to_le32(cap->issued);
3779 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3780 rec.v2.pathbase = cpu_to_le64(pathbase);
3781 rec.v2.flock_len = (__force __le32)
3782 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3783 } else {
3784 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3785 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3786 rec.v1.issued = cpu_to_le32(cap->issued);
3787 rec.v1.size = cpu_to_le64(inode->i_size);
3788 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3789 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3790 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3791 rec.v1.pathbase = cpu_to_le64(pathbase);
3792 }
3793
3794 if (list_empty(&ci->i_cap_snaps)) {
3795 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3796 } else {
3797 struct ceph_cap_snap *capsnap =
3798 list_first_entry(&ci->i_cap_snaps,
3799 struct ceph_cap_snap, ci_item);
3800 snap_follows = capsnap->follows;
3801 }
3802 spin_unlock(&ci->i_ceph_lock);
3803
3804 if (recon_state->msg_version >= 2) {
3805 int num_fcntl_locks, num_flock_locks;
3806 struct ceph_filelock *flocks = NULL;
3807 size_t struct_len, total_len = sizeof(u64);
3808 u8 struct_v = 0;
3809
3810 encode_again:
3811 if (rec.v2.flock_len) {
3812 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3813 } else {
3814 num_fcntl_locks = 0;
3815 num_flock_locks = 0;
3816 }
3817 if (num_fcntl_locks + num_flock_locks > 0) {
3818 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3819 sizeof(struct ceph_filelock),
3820 GFP_NOFS);
3821 if (!flocks) {
3822 err = -ENOMEM;
3823 goto out_err;
3824 }
3825 err = ceph_encode_locks_to_buffer(inode, flocks,
3826 num_fcntl_locks,
3827 num_flock_locks);
3828 if (err) {
3829 kfree(flocks);
3830 flocks = NULL;
3831 if (err == -ENOSPC)
3832 goto encode_again;
3833 goto out_err;
3834 }
3835 } else {
3836 kfree(flocks);
3837 flocks = NULL;
3838 }
3839
3840 if (recon_state->msg_version >= 3) {
3841 /* version, compat_version and struct_len */
3842 total_len += 2 * sizeof(u8) + sizeof(u32);
3843 struct_v = 2;
3844 }
3845 /*
3846 * number of encoded locks is stable, so copy to pagelist
3847 */
3848 struct_len = 2 * sizeof(u32) +
3849 (num_fcntl_locks + num_flock_locks) *
3850 sizeof(struct ceph_filelock);
3851 rec.v2.flock_len = cpu_to_le32(struct_len);
3852
3853 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3854
3855 if (struct_v >= 2)
3856 struct_len += sizeof(u64); /* snap_follows */
3857
3858 total_len += struct_len;
3859
3860 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3861 err = send_reconnect_partial(recon_state);
3862 if (err)
3863 goto out_freeflocks;
3864 pagelist = recon_state->pagelist;
3865 }
3866
3867 err = ceph_pagelist_reserve(pagelist, total_len);
3868 if (err)
3869 goto out_freeflocks;
3870
3871 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3872 if (recon_state->msg_version >= 3) {
3873 ceph_pagelist_encode_8(pagelist, struct_v);
3874 ceph_pagelist_encode_8(pagelist, 1);
3875 ceph_pagelist_encode_32(pagelist, struct_len);
3876 }
3877 ceph_pagelist_encode_string(pagelist, path, pathlen);
3878 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3879 ceph_locks_to_pagelist(flocks, pagelist,
3880 num_fcntl_locks, num_flock_locks);
3881 if (struct_v >= 2)
3882 ceph_pagelist_encode_64(pagelist, snap_follows);
3883 out_freeflocks:
3884 kfree(flocks);
3885 } else {
3886 err = ceph_pagelist_reserve(pagelist,
3887 sizeof(u64) + sizeof(u32) +
3888 pathlen + sizeof(rec.v1));
3889 if (err)
3890 goto out_err;
3891
3892 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3893 ceph_pagelist_encode_string(pagelist, path, pathlen);
3894 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3895 }
3896
3897 out_err:
3898 ceph_mdsc_free_path(path, pathlen);
3899 if (!err)
3900 recon_state->nr_caps++;
3901 return err;
3902 }
3903
encode_snap_realms(struct ceph_mds_client * mdsc,struct ceph_reconnect_state * recon_state)3904 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3905 struct ceph_reconnect_state *recon_state)
3906 {
3907 struct rb_node *p;
3908 struct ceph_pagelist *pagelist = recon_state->pagelist;
3909 int err = 0;
3910
3911 if (recon_state->msg_version >= 4) {
3912 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3913 if (err < 0)
3914 goto fail;
3915 }
3916
3917 /*
3918 * snaprealms. we provide mds with the ino, seq (version), and
3919 * parent for all of our realms. If the mds has any newer info,
3920 * it will tell us.
3921 */
3922 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3923 struct ceph_snap_realm *realm =
3924 rb_entry(p, struct ceph_snap_realm, node);
3925 struct ceph_mds_snaprealm_reconnect sr_rec;
3926
3927 if (recon_state->msg_version >= 4) {
3928 size_t need = sizeof(u8) * 2 + sizeof(u32) +
3929 sizeof(sr_rec);
3930
3931 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3932 err = send_reconnect_partial(recon_state);
3933 if (err)
3934 goto fail;
3935 pagelist = recon_state->pagelist;
3936 }
3937
3938 err = ceph_pagelist_reserve(pagelist, need);
3939 if (err)
3940 goto fail;
3941
3942 ceph_pagelist_encode_8(pagelist, 1);
3943 ceph_pagelist_encode_8(pagelist, 1);
3944 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3945 }
3946
3947 dout(" adding snap realm %llx seq %lld parent %llx\n",
3948 realm->ino, realm->seq, realm->parent_ino);
3949 sr_rec.ino = cpu_to_le64(realm->ino);
3950 sr_rec.seq = cpu_to_le64(realm->seq);
3951 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3952
3953 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3954 if (err)
3955 goto fail;
3956
3957 recon_state->nr_realms++;
3958 }
3959 fail:
3960 return err;
3961 }
3962
3963
3964 /*
3965 * If an MDS fails and recovers, clients need to reconnect in order to
3966 * reestablish shared state. This includes all caps issued through
3967 * this session _and_ the snap_realm hierarchy. Because it's not
3968 * clear which snap realms the mds cares about, we send everything we
3969 * know about.. that ensures we'll then get any new info the
3970 * recovering MDS might have.
3971 *
3972 * This is a relatively heavyweight operation, but it's rare.
3973 */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)3974 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3975 struct ceph_mds_session *session)
3976 {
3977 struct ceph_msg *reply;
3978 int mds = session->s_mds;
3979 int err = -ENOMEM;
3980 struct ceph_reconnect_state recon_state = {
3981 .session = session,
3982 };
3983 LIST_HEAD(dispose);
3984
3985 pr_info("mds%d reconnect start\n", mds);
3986
3987 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3988 if (!recon_state.pagelist)
3989 goto fail_nopagelist;
3990
3991 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3992 if (!reply)
3993 goto fail_nomsg;
3994
3995 xa_destroy(&session->s_delegated_inos);
3996
3997 mutex_lock(&session->s_mutex);
3998 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3999 session->s_seq = 0;
4000
4001 dout("session %p state %s\n", session,
4002 ceph_session_state_name(session->s_state));
4003
4004 spin_lock(&session->s_gen_ttl_lock);
4005 session->s_cap_gen++;
4006 spin_unlock(&session->s_gen_ttl_lock);
4007
4008 spin_lock(&session->s_cap_lock);
4009 /* don't know if session is readonly */
4010 session->s_readonly = 0;
4011 /*
4012 * notify __ceph_remove_cap() that we are composing cap reconnect.
4013 * If a cap get released before being added to the cap reconnect,
4014 * __ceph_remove_cap() should skip queuing cap release.
4015 */
4016 session->s_cap_reconnect = 1;
4017 /* drop old cap expires; we're about to reestablish that state */
4018 detach_cap_releases(session, &dispose);
4019 spin_unlock(&session->s_cap_lock);
4020 dispose_cap_releases(mdsc, &dispose);
4021
4022 /* trim unused caps to reduce MDS's cache rejoin time */
4023 if (mdsc->fsc->sb->s_root)
4024 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4025
4026 ceph_con_close(&session->s_con);
4027 ceph_con_open(&session->s_con,
4028 CEPH_ENTITY_TYPE_MDS, mds,
4029 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4030
4031 /* replay unsafe requests */
4032 replay_unsafe_requests(mdsc, session);
4033
4034 ceph_early_kick_flushing_caps(mdsc, session);
4035
4036 down_read(&mdsc->snap_rwsem);
4037
4038 /* placeholder for nr_caps */
4039 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4040 if (err)
4041 goto fail;
4042
4043 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4044 recon_state.msg_version = 3;
4045 recon_state.allow_multi = true;
4046 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4047 recon_state.msg_version = 3;
4048 } else {
4049 recon_state.msg_version = 2;
4050 }
4051 /* trsaverse this session's caps */
4052 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4053
4054 spin_lock(&session->s_cap_lock);
4055 session->s_cap_reconnect = 0;
4056 spin_unlock(&session->s_cap_lock);
4057
4058 if (err < 0)
4059 goto fail;
4060
4061 /* check if all realms can be encoded into current message */
4062 if (mdsc->num_snap_realms) {
4063 size_t total_len =
4064 recon_state.pagelist->length +
4065 mdsc->num_snap_realms *
4066 sizeof(struct ceph_mds_snaprealm_reconnect);
4067 if (recon_state.msg_version >= 4) {
4068 /* number of realms */
4069 total_len += sizeof(u32);
4070 /* version, compat_version and struct_len */
4071 total_len += mdsc->num_snap_realms *
4072 (2 * sizeof(u8) + sizeof(u32));
4073 }
4074 if (total_len > RECONNECT_MAX_SIZE) {
4075 if (!recon_state.allow_multi) {
4076 err = -ENOSPC;
4077 goto fail;
4078 }
4079 if (recon_state.nr_caps) {
4080 err = send_reconnect_partial(&recon_state);
4081 if (err)
4082 goto fail;
4083 }
4084 recon_state.msg_version = 5;
4085 }
4086 }
4087
4088 err = encode_snap_realms(mdsc, &recon_state);
4089 if (err < 0)
4090 goto fail;
4091
4092 if (recon_state.msg_version >= 5) {
4093 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4094 if (err < 0)
4095 goto fail;
4096 }
4097
4098 if (recon_state.nr_caps || recon_state.nr_realms) {
4099 struct page *page =
4100 list_first_entry(&recon_state.pagelist->head,
4101 struct page, lru);
4102 __le32 *addr = kmap_atomic(page);
4103 if (recon_state.nr_caps) {
4104 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4105 *addr = cpu_to_le32(recon_state.nr_caps);
4106 } else if (recon_state.msg_version >= 4) {
4107 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4108 }
4109 kunmap_atomic(addr);
4110 }
4111
4112 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4113 if (recon_state.msg_version >= 4)
4114 reply->hdr.compat_version = cpu_to_le16(4);
4115
4116 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4117 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4118
4119 ceph_con_send(&session->s_con, reply);
4120
4121 mutex_unlock(&session->s_mutex);
4122
4123 mutex_lock(&mdsc->mutex);
4124 __wake_requests(mdsc, &session->s_waiting);
4125 mutex_unlock(&mdsc->mutex);
4126
4127 up_read(&mdsc->snap_rwsem);
4128 ceph_pagelist_release(recon_state.pagelist);
4129 return;
4130
4131 fail:
4132 ceph_msg_put(reply);
4133 up_read(&mdsc->snap_rwsem);
4134 mutex_unlock(&session->s_mutex);
4135 fail_nomsg:
4136 ceph_pagelist_release(recon_state.pagelist);
4137 fail_nopagelist:
4138 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4139 return;
4140 }
4141
4142
4143 /*
4144 * compare old and new mdsmaps, kicking requests
4145 * and closing out old connections as necessary
4146 *
4147 * called under mdsc->mutex.
4148 */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)4149 static void check_new_map(struct ceph_mds_client *mdsc,
4150 struct ceph_mdsmap *newmap,
4151 struct ceph_mdsmap *oldmap)
4152 {
4153 int i;
4154 int oldstate, newstate;
4155 struct ceph_mds_session *s;
4156
4157 dout("check_new_map new %u old %u\n",
4158 newmap->m_epoch, oldmap->m_epoch);
4159
4160 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4161 if (!mdsc->sessions[i])
4162 continue;
4163 s = mdsc->sessions[i];
4164 oldstate = ceph_mdsmap_get_state(oldmap, i);
4165 newstate = ceph_mdsmap_get_state(newmap, i);
4166
4167 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4168 i, ceph_mds_state_name(oldstate),
4169 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4170 ceph_mds_state_name(newstate),
4171 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4172 ceph_session_state_name(s->s_state));
4173
4174 if (i >= newmap->possible_max_rank) {
4175 /* force close session for stopped mds */
4176 ceph_get_mds_session(s);
4177 __unregister_session(mdsc, s);
4178 __wake_requests(mdsc, &s->s_waiting);
4179 mutex_unlock(&mdsc->mutex);
4180
4181 mutex_lock(&s->s_mutex);
4182 cleanup_session_requests(mdsc, s);
4183 remove_session_caps(s);
4184 mutex_unlock(&s->s_mutex);
4185
4186 ceph_put_mds_session(s);
4187
4188 mutex_lock(&mdsc->mutex);
4189 kick_requests(mdsc, i);
4190 continue;
4191 }
4192
4193 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4194 ceph_mdsmap_get_addr(newmap, i),
4195 sizeof(struct ceph_entity_addr))) {
4196 /* just close it */
4197 mutex_unlock(&mdsc->mutex);
4198 mutex_lock(&s->s_mutex);
4199 mutex_lock(&mdsc->mutex);
4200 ceph_con_close(&s->s_con);
4201 mutex_unlock(&s->s_mutex);
4202 s->s_state = CEPH_MDS_SESSION_RESTARTING;
4203 } else if (oldstate == newstate) {
4204 continue; /* nothing new with this mds */
4205 }
4206
4207 /*
4208 * send reconnect?
4209 */
4210 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4211 newstate >= CEPH_MDS_STATE_RECONNECT) {
4212 mutex_unlock(&mdsc->mutex);
4213 send_mds_reconnect(mdsc, s);
4214 mutex_lock(&mdsc->mutex);
4215 }
4216
4217 /*
4218 * kick request on any mds that has gone active.
4219 */
4220 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4221 newstate >= CEPH_MDS_STATE_ACTIVE) {
4222 if (oldstate != CEPH_MDS_STATE_CREATING &&
4223 oldstate != CEPH_MDS_STATE_STARTING)
4224 pr_info("mds%d recovery completed\n", s->s_mds);
4225 kick_requests(mdsc, i);
4226 mutex_unlock(&mdsc->mutex);
4227 mutex_lock(&s->s_mutex);
4228 mutex_lock(&mdsc->mutex);
4229 ceph_kick_flushing_caps(mdsc, s);
4230 mutex_unlock(&s->s_mutex);
4231 wake_up_session_caps(s, RECONNECT);
4232 }
4233 }
4234
4235 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4236 s = mdsc->sessions[i];
4237 if (!s)
4238 continue;
4239 if (!ceph_mdsmap_is_laggy(newmap, i))
4240 continue;
4241 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4242 s->s_state == CEPH_MDS_SESSION_HUNG ||
4243 s->s_state == CEPH_MDS_SESSION_CLOSING) {
4244 dout(" connecting to export targets of laggy mds%d\n",
4245 i);
4246 __open_export_target_sessions(mdsc, s);
4247 }
4248 }
4249 }
4250
4251
4252
4253 /*
4254 * leases
4255 */
4256
4257 /*
4258 * caller must hold session s_mutex, dentry->d_lock
4259 */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)4260 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4261 {
4262 struct ceph_dentry_info *di = ceph_dentry(dentry);
4263
4264 ceph_put_mds_session(di->lease_session);
4265 di->lease_session = NULL;
4266 }
4267
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)4268 static void handle_lease(struct ceph_mds_client *mdsc,
4269 struct ceph_mds_session *session,
4270 struct ceph_msg *msg)
4271 {
4272 struct super_block *sb = mdsc->fsc->sb;
4273 struct inode *inode;
4274 struct dentry *parent, *dentry;
4275 struct ceph_dentry_info *di;
4276 int mds = session->s_mds;
4277 struct ceph_mds_lease *h = msg->front.iov_base;
4278 u32 seq;
4279 struct ceph_vino vino;
4280 struct qstr dname;
4281 int release = 0;
4282
4283 dout("handle_lease from mds%d\n", mds);
4284
4285 /* decode */
4286 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4287 goto bad;
4288 vino.ino = le64_to_cpu(h->ino);
4289 vino.snap = CEPH_NOSNAP;
4290 seq = le32_to_cpu(h->seq);
4291 dname.len = get_unaligned_le32(h + 1);
4292 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4293 goto bad;
4294 dname.name = (void *)(h + 1) + sizeof(u32);
4295
4296 /* lookup inode */
4297 inode = ceph_find_inode(sb, vino);
4298 dout("handle_lease %s, ino %llx %p %.*s\n",
4299 ceph_lease_op_name(h->action), vino.ino, inode,
4300 dname.len, dname.name);
4301
4302 mutex_lock(&session->s_mutex);
4303 inc_session_sequence(session);
4304
4305 if (!inode) {
4306 dout("handle_lease no inode %llx\n", vino.ino);
4307 goto release;
4308 }
4309
4310 /* dentry */
4311 parent = d_find_alias(inode);
4312 if (!parent) {
4313 dout("no parent dentry on inode %p\n", inode);
4314 WARN_ON(1);
4315 goto release; /* hrm... */
4316 }
4317 dname.hash = full_name_hash(parent, dname.name, dname.len);
4318 dentry = d_lookup(parent, &dname);
4319 dput(parent);
4320 if (!dentry)
4321 goto release;
4322
4323 spin_lock(&dentry->d_lock);
4324 di = ceph_dentry(dentry);
4325 switch (h->action) {
4326 case CEPH_MDS_LEASE_REVOKE:
4327 if (di->lease_session == session) {
4328 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4329 h->seq = cpu_to_le32(di->lease_seq);
4330 __ceph_mdsc_drop_dentry_lease(dentry);
4331 }
4332 release = 1;
4333 break;
4334
4335 case CEPH_MDS_LEASE_RENEW:
4336 if (di->lease_session == session &&
4337 di->lease_gen == session->s_cap_gen &&
4338 di->lease_renew_from &&
4339 di->lease_renew_after == 0) {
4340 unsigned long duration =
4341 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4342
4343 di->lease_seq = seq;
4344 di->time = di->lease_renew_from + duration;
4345 di->lease_renew_after = di->lease_renew_from +
4346 (duration >> 1);
4347 di->lease_renew_from = 0;
4348 }
4349 break;
4350 }
4351 spin_unlock(&dentry->d_lock);
4352 dput(dentry);
4353
4354 if (!release)
4355 goto out;
4356
4357 release:
4358 /* let's just reuse the same message */
4359 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4360 ceph_msg_get(msg);
4361 ceph_con_send(&session->s_con, msg);
4362
4363 out:
4364 mutex_unlock(&session->s_mutex);
4365 /* avoid calling iput_final() in mds dispatch threads */
4366 ceph_async_iput(inode);
4367 return;
4368
4369 bad:
4370 pr_err("corrupt lease message\n");
4371 ceph_msg_dump(msg);
4372 }
4373
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct dentry * dentry,char action,u32 seq)4374 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4375 struct dentry *dentry, char action,
4376 u32 seq)
4377 {
4378 struct ceph_msg *msg;
4379 struct ceph_mds_lease *lease;
4380 struct inode *dir;
4381 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4382
4383 dout("lease_send_msg identry %p %s to mds%d\n",
4384 dentry, ceph_lease_op_name(action), session->s_mds);
4385
4386 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4387 if (!msg)
4388 return;
4389 lease = msg->front.iov_base;
4390 lease->action = action;
4391 lease->seq = cpu_to_le32(seq);
4392
4393 spin_lock(&dentry->d_lock);
4394 dir = d_inode(dentry->d_parent);
4395 lease->ino = cpu_to_le64(ceph_ino(dir));
4396 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4397
4398 put_unaligned_le32(dentry->d_name.len, lease + 1);
4399 memcpy((void *)(lease + 1) + 4,
4400 dentry->d_name.name, dentry->d_name.len);
4401 spin_unlock(&dentry->d_lock);
4402 /*
4403 * if this is a preemptive lease RELEASE, no need to
4404 * flush request stream, since the actual request will
4405 * soon follow.
4406 */
4407 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4408
4409 ceph_con_send(&session->s_con, msg);
4410 }
4411
4412 /*
4413 * lock unlock the session, to wait ongoing session activities
4414 */
lock_unlock_session(struct ceph_mds_session * s)4415 static void lock_unlock_session(struct ceph_mds_session *s)
4416 {
4417 mutex_lock(&s->s_mutex);
4418 mutex_unlock(&s->s_mutex);
4419 }
4420
maybe_recover_session(struct ceph_mds_client * mdsc)4421 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4422 {
4423 struct ceph_fs_client *fsc = mdsc->fsc;
4424
4425 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4426 return;
4427
4428 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4429 return;
4430
4431 if (!READ_ONCE(fsc->blocklisted))
4432 return;
4433
4434 if (fsc->last_auto_reconnect &&
4435 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4436 return;
4437
4438 pr_info("auto reconnect after blocklisted\n");
4439 fsc->last_auto_reconnect = jiffies;
4440 ceph_force_reconnect(fsc->sb);
4441 }
4442
check_session_state(struct ceph_mds_session * s)4443 bool check_session_state(struct ceph_mds_session *s)
4444 {
4445 switch (s->s_state) {
4446 case CEPH_MDS_SESSION_OPEN:
4447 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4448 s->s_state = CEPH_MDS_SESSION_HUNG;
4449 pr_info("mds%d hung\n", s->s_mds);
4450 }
4451 break;
4452 case CEPH_MDS_SESSION_CLOSING:
4453 /* Should never reach this when we're unmounting */
4454 WARN_ON_ONCE(s->s_ttl);
4455 fallthrough;
4456 case CEPH_MDS_SESSION_NEW:
4457 case CEPH_MDS_SESSION_RESTARTING:
4458 case CEPH_MDS_SESSION_CLOSED:
4459 case CEPH_MDS_SESSION_REJECTED:
4460 return false;
4461 }
4462
4463 return true;
4464 }
4465
4466 /*
4467 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4468 * then we need to retransmit that request.
4469 */
inc_session_sequence(struct ceph_mds_session * s)4470 void inc_session_sequence(struct ceph_mds_session *s)
4471 {
4472 lockdep_assert_held(&s->s_mutex);
4473
4474 s->s_seq++;
4475
4476 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4477 int ret;
4478
4479 dout("resending session close request for mds%d\n", s->s_mds);
4480 ret = request_close_session(s);
4481 if (ret < 0)
4482 pr_err("unable to close session to mds%d: %d\n",
4483 s->s_mds, ret);
4484 }
4485 }
4486
4487 /*
4488 * delayed work -- periodically trim expired leases, renew caps with mds. If
4489 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4490 * workqueue delay value of 5 secs will be used.
4491 */
schedule_delayed(struct ceph_mds_client * mdsc,unsigned long delay)4492 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4493 {
4494 unsigned long max_delay = HZ * 5;
4495
4496 /* 5 secs default delay */
4497 if (!delay || (delay > max_delay))
4498 delay = max_delay;
4499 schedule_delayed_work(&mdsc->delayed_work,
4500 round_jiffies_relative(delay));
4501 }
4502
delayed_work(struct work_struct * work)4503 static void delayed_work(struct work_struct *work)
4504 {
4505 struct ceph_mds_client *mdsc =
4506 container_of(work, struct ceph_mds_client, delayed_work.work);
4507 unsigned long delay;
4508 int renew_interval;
4509 int renew_caps;
4510 int i;
4511
4512 dout("mdsc delayed_work\n");
4513
4514 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
4515 return;
4516
4517 mutex_lock(&mdsc->mutex);
4518 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4519 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4520 mdsc->last_renew_caps);
4521 if (renew_caps)
4522 mdsc->last_renew_caps = jiffies;
4523
4524 for (i = 0; i < mdsc->max_sessions; i++) {
4525 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4526 if (!s)
4527 continue;
4528
4529 if (!check_session_state(s)) {
4530 ceph_put_mds_session(s);
4531 continue;
4532 }
4533 mutex_unlock(&mdsc->mutex);
4534
4535 mutex_lock(&s->s_mutex);
4536 if (renew_caps)
4537 send_renew_caps(mdsc, s);
4538 else
4539 ceph_con_keepalive(&s->s_con);
4540 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4541 s->s_state == CEPH_MDS_SESSION_HUNG)
4542 ceph_send_cap_releases(mdsc, s);
4543 mutex_unlock(&s->s_mutex);
4544 ceph_put_mds_session(s);
4545
4546 mutex_lock(&mdsc->mutex);
4547 }
4548 mutex_unlock(&mdsc->mutex);
4549
4550 delay = ceph_check_delayed_caps(mdsc);
4551
4552 ceph_queue_cap_reclaim_work(mdsc);
4553
4554 ceph_trim_snapid_map(mdsc);
4555
4556 maybe_recover_session(mdsc);
4557
4558 schedule_delayed(mdsc, delay);
4559 }
4560
ceph_mdsc_init(struct ceph_fs_client * fsc)4561 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4562
4563 {
4564 struct ceph_mds_client *mdsc;
4565 int err;
4566
4567 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4568 if (!mdsc)
4569 return -ENOMEM;
4570 mdsc->fsc = fsc;
4571 mutex_init(&mdsc->mutex);
4572 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4573 if (!mdsc->mdsmap) {
4574 err = -ENOMEM;
4575 goto err_mdsc;
4576 }
4577
4578 init_completion(&mdsc->safe_umount_waiters);
4579 init_waitqueue_head(&mdsc->session_close_wq);
4580 INIT_LIST_HEAD(&mdsc->waiting_for_map);
4581 mdsc->sessions = NULL;
4582 atomic_set(&mdsc->num_sessions, 0);
4583 mdsc->max_sessions = 0;
4584 mdsc->stopping = 0;
4585 atomic64_set(&mdsc->quotarealms_count, 0);
4586 mdsc->quotarealms_inodes = RB_ROOT;
4587 mutex_init(&mdsc->quotarealms_inodes_mutex);
4588 mdsc->last_snap_seq = 0;
4589 init_rwsem(&mdsc->snap_rwsem);
4590 mdsc->snap_realms = RB_ROOT;
4591 INIT_LIST_HEAD(&mdsc->snap_empty);
4592 mdsc->num_snap_realms = 0;
4593 spin_lock_init(&mdsc->snap_empty_lock);
4594 mdsc->last_tid = 0;
4595 mdsc->oldest_tid = 0;
4596 mdsc->request_tree = RB_ROOT;
4597 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4598 mdsc->last_renew_caps = jiffies;
4599 INIT_LIST_HEAD(&mdsc->cap_delay_list);
4600 INIT_LIST_HEAD(&mdsc->cap_wait_list);
4601 spin_lock_init(&mdsc->cap_delay_lock);
4602 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4603 spin_lock_init(&mdsc->snap_flush_lock);
4604 mdsc->last_cap_flush_tid = 1;
4605 INIT_LIST_HEAD(&mdsc->cap_flush_list);
4606 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4607 mdsc->num_cap_flushing = 0;
4608 spin_lock_init(&mdsc->cap_dirty_lock);
4609 init_waitqueue_head(&mdsc->cap_flushing_wq);
4610 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4611 atomic_set(&mdsc->cap_reclaim_pending, 0);
4612 err = ceph_metric_init(&mdsc->metric);
4613 if (err)
4614 goto err_mdsmap;
4615
4616 spin_lock_init(&mdsc->dentry_list_lock);
4617 INIT_LIST_HEAD(&mdsc->dentry_leases);
4618 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4619
4620 ceph_caps_init(mdsc);
4621 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4622
4623 spin_lock_init(&mdsc->snapid_map_lock);
4624 mdsc->snapid_map_tree = RB_ROOT;
4625 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4626
4627 init_rwsem(&mdsc->pool_perm_rwsem);
4628 mdsc->pool_perm_tree = RB_ROOT;
4629
4630 strscpy(mdsc->nodename, utsname()->nodename,
4631 sizeof(mdsc->nodename));
4632
4633 fsc->mdsc = mdsc;
4634 return 0;
4635
4636 err_mdsmap:
4637 kfree(mdsc->mdsmap);
4638 err_mdsc:
4639 kfree(mdsc);
4640 return err;
4641 }
4642
4643 /*
4644 * Wait for safe replies on open mds requests. If we time out, drop
4645 * all requests from the tree to avoid dangling dentry refs.
4646 */
wait_requests(struct ceph_mds_client * mdsc)4647 static void wait_requests(struct ceph_mds_client *mdsc)
4648 {
4649 struct ceph_options *opts = mdsc->fsc->client->options;
4650 struct ceph_mds_request *req;
4651
4652 mutex_lock(&mdsc->mutex);
4653 if (__get_oldest_req(mdsc)) {
4654 mutex_unlock(&mdsc->mutex);
4655
4656 dout("wait_requests waiting for requests\n");
4657 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4658 ceph_timeout_jiffies(opts->mount_timeout));
4659
4660 /* tear down remaining requests */
4661 mutex_lock(&mdsc->mutex);
4662 while ((req = __get_oldest_req(mdsc))) {
4663 dout("wait_requests timed out on tid %llu\n",
4664 req->r_tid);
4665 list_del_init(&req->r_wait);
4666 __unregister_request(mdsc, req);
4667 }
4668 }
4669 mutex_unlock(&mdsc->mutex);
4670 dout("wait_requests done\n");
4671 }
4672
send_flush_mdlog(struct ceph_mds_session * s)4673 void send_flush_mdlog(struct ceph_mds_session *s)
4674 {
4675 struct ceph_msg *msg;
4676
4677 /*
4678 * Pre-luminous MDS crashes when it sees an unknown session request
4679 */
4680 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4681 return;
4682
4683 mutex_lock(&s->s_mutex);
4684 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4685 ceph_session_state_name(s->s_state), s->s_seq);
4686 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4687 s->s_seq);
4688 if (!msg) {
4689 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4690 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4691 } else {
4692 ceph_con_send(&s->s_con, msg);
4693 }
4694 mutex_unlock(&s->s_mutex);
4695 }
4696
4697 /*
4698 * called before mount is ro, and before dentries are torn down.
4699 * (hmm, does this still race with new lookups?)
4700 */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)4701 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4702 {
4703 dout("pre_umount\n");
4704 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
4705
4706 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4707 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4708 ceph_flush_dirty_caps(mdsc);
4709 wait_requests(mdsc);
4710
4711 /*
4712 * wait for reply handlers to drop their request refs and
4713 * their inode/dcache refs
4714 */
4715 ceph_msgr_flush();
4716
4717 ceph_cleanup_quotarealms_inodes(mdsc);
4718 }
4719
4720 /*
4721 * wait for all write mds requests to flush.
4722 */
wait_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)4723 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4724 {
4725 struct ceph_mds_request *req = NULL, *nextreq;
4726 struct rb_node *n;
4727
4728 mutex_lock(&mdsc->mutex);
4729 dout("wait_unsafe_requests want %lld\n", want_tid);
4730 restart:
4731 req = __get_oldest_req(mdsc);
4732 while (req && req->r_tid <= want_tid) {
4733 /* find next request */
4734 n = rb_next(&req->r_node);
4735 if (n)
4736 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4737 else
4738 nextreq = NULL;
4739 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4740 (req->r_op & CEPH_MDS_OP_WRITE)) {
4741 /* write op */
4742 ceph_mdsc_get_request(req);
4743 if (nextreq)
4744 ceph_mdsc_get_request(nextreq);
4745 mutex_unlock(&mdsc->mutex);
4746 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4747 req->r_tid, want_tid);
4748 wait_for_completion(&req->r_safe_completion);
4749 mutex_lock(&mdsc->mutex);
4750 ceph_mdsc_put_request(req);
4751 if (!nextreq)
4752 break; /* next dne before, so we're done! */
4753 if (RB_EMPTY_NODE(&nextreq->r_node)) {
4754 /* next request was removed from tree */
4755 ceph_mdsc_put_request(nextreq);
4756 goto restart;
4757 }
4758 ceph_mdsc_put_request(nextreq); /* won't go away */
4759 }
4760 req = nextreq;
4761 }
4762 mutex_unlock(&mdsc->mutex);
4763 dout("wait_unsafe_requests done\n");
4764 }
4765
ceph_mdsc_sync(struct ceph_mds_client * mdsc)4766 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4767 {
4768 u64 want_tid, want_flush;
4769
4770 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4771 return;
4772
4773 dout("sync\n");
4774 mutex_lock(&mdsc->mutex);
4775 want_tid = mdsc->last_tid;
4776 mutex_unlock(&mdsc->mutex);
4777
4778 ceph_flush_dirty_caps(mdsc);
4779 spin_lock(&mdsc->cap_dirty_lock);
4780 want_flush = mdsc->last_cap_flush_tid;
4781 if (!list_empty(&mdsc->cap_flush_list)) {
4782 struct ceph_cap_flush *cf =
4783 list_last_entry(&mdsc->cap_flush_list,
4784 struct ceph_cap_flush, g_list);
4785 cf->wake = true;
4786 }
4787 spin_unlock(&mdsc->cap_dirty_lock);
4788
4789 dout("sync want tid %lld flush_seq %lld\n",
4790 want_tid, want_flush);
4791
4792 wait_unsafe_requests(mdsc, want_tid);
4793 wait_caps_flush(mdsc, want_flush);
4794 }
4795
4796 /*
4797 * true if all sessions are closed, or we force unmount
4798 */
done_closing_sessions(struct ceph_mds_client * mdsc,int skipped)4799 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4800 {
4801 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4802 return true;
4803 return atomic_read(&mdsc->num_sessions) <= skipped;
4804 }
4805
4806 /*
4807 * called after sb is ro.
4808 */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)4809 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4810 {
4811 struct ceph_options *opts = mdsc->fsc->client->options;
4812 struct ceph_mds_session *session;
4813 int i;
4814 int skipped = 0;
4815
4816 dout("close_sessions\n");
4817
4818 /* close sessions */
4819 mutex_lock(&mdsc->mutex);
4820 for (i = 0; i < mdsc->max_sessions; i++) {
4821 session = __ceph_lookup_mds_session(mdsc, i);
4822 if (!session)
4823 continue;
4824 mutex_unlock(&mdsc->mutex);
4825 mutex_lock(&session->s_mutex);
4826 if (__close_session(mdsc, session) <= 0)
4827 skipped++;
4828 mutex_unlock(&session->s_mutex);
4829 ceph_put_mds_session(session);
4830 mutex_lock(&mdsc->mutex);
4831 }
4832 mutex_unlock(&mdsc->mutex);
4833
4834 dout("waiting for sessions to close\n");
4835 wait_event_timeout(mdsc->session_close_wq,
4836 done_closing_sessions(mdsc, skipped),
4837 ceph_timeout_jiffies(opts->mount_timeout));
4838
4839 /* tear down remaining sessions */
4840 mutex_lock(&mdsc->mutex);
4841 for (i = 0; i < mdsc->max_sessions; i++) {
4842 if (mdsc->sessions[i]) {
4843 session = ceph_get_mds_session(mdsc->sessions[i]);
4844 __unregister_session(mdsc, session);
4845 mutex_unlock(&mdsc->mutex);
4846 mutex_lock(&session->s_mutex);
4847 remove_session_caps(session);
4848 mutex_unlock(&session->s_mutex);
4849 ceph_put_mds_session(session);
4850 mutex_lock(&mdsc->mutex);
4851 }
4852 }
4853 WARN_ON(!list_empty(&mdsc->cap_delay_list));
4854 mutex_unlock(&mdsc->mutex);
4855
4856 ceph_cleanup_snapid_map(mdsc);
4857 ceph_cleanup_empty_realms(mdsc);
4858
4859 cancel_work_sync(&mdsc->cap_reclaim_work);
4860 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4861
4862 dout("stopped\n");
4863 }
4864
ceph_mdsc_force_umount(struct ceph_mds_client * mdsc)4865 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4866 {
4867 struct ceph_mds_session *session;
4868 int mds;
4869
4870 dout("force umount\n");
4871
4872 mutex_lock(&mdsc->mutex);
4873 for (mds = 0; mds < mdsc->max_sessions; mds++) {
4874 session = __ceph_lookup_mds_session(mdsc, mds);
4875 if (!session)
4876 continue;
4877
4878 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4879 __unregister_session(mdsc, session);
4880 __wake_requests(mdsc, &session->s_waiting);
4881 mutex_unlock(&mdsc->mutex);
4882
4883 mutex_lock(&session->s_mutex);
4884 __close_session(mdsc, session);
4885 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4886 cleanup_session_requests(mdsc, session);
4887 remove_session_caps(session);
4888 }
4889 mutex_unlock(&session->s_mutex);
4890 ceph_put_mds_session(session);
4891
4892 mutex_lock(&mdsc->mutex);
4893 kick_requests(mdsc, mds);
4894 }
4895 __wake_requests(mdsc, &mdsc->waiting_for_map);
4896 mutex_unlock(&mdsc->mutex);
4897 }
4898
ceph_mdsc_stop(struct ceph_mds_client * mdsc)4899 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4900 {
4901 dout("stop\n");
4902 /*
4903 * Make sure the delayed work stopped before releasing
4904 * the resources.
4905 *
4906 * Because the cancel_delayed_work_sync() will only
4907 * guarantee that the work finishes executing. But the
4908 * delayed work will re-arm itself again after that.
4909 */
4910 flush_delayed_work(&mdsc->delayed_work);
4911
4912 if (mdsc->mdsmap)
4913 ceph_mdsmap_destroy(mdsc->mdsmap);
4914 kfree(mdsc->sessions);
4915 ceph_caps_finalize(mdsc);
4916 ceph_pool_perm_destroy(mdsc);
4917 }
4918
ceph_mdsc_destroy(struct ceph_fs_client * fsc)4919 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4920 {
4921 struct ceph_mds_client *mdsc = fsc->mdsc;
4922 dout("mdsc_destroy %p\n", mdsc);
4923
4924 if (!mdsc)
4925 return;
4926
4927 /* flush out any connection work with references to us */
4928 ceph_msgr_flush();
4929
4930 ceph_mdsc_stop(mdsc);
4931
4932 ceph_metric_destroy(&mdsc->metric);
4933
4934 fsc->mdsc = NULL;
4935 kfree(mdsc);
4936 dout("mdsc_destroy %p done\n", mdsc);
4937 }
4938
ceph_mdsc_handle_fsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)4939 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4940 {
4941 struct ceph_fs_client *fsc = mdsc->fsc;
4942 const char *mds_namespace = fsc->mount_options->mds_namespace;
4943 void *p = msg->front.iov_base;
4944 void *end = p + msg->front.iov_len;
4945 u32 epoch;
4946 u32 map_len;
4947 u32 num_fs;
4948 u32 mount_fscid = (u32)-1;
4949 u8 struct_v, struct_cv;
4950 int err = -EINVAL;
4951
4952 ceph_decode_need(&p, end, sizeof(u32), bad);
4953 epoch = ceph_decode_32(&p);
4954
4955 dout("handle_fsmap epoch %u\n", epoch);
4956
4957 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4958 struct_v = ceph_decode_8(&p);
4959 struct_cv = ceph_decode_8(&p);
4960 map_len = ceph_decode_32(&p);
4961
4962 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4963 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4964
4965 num_fs = ceph_decode_32(&p);
4966 while (num_fs-- > 0) {
4967 void *info_p, *info_end;
4968 u32 info_len;
4969 u8 info_v, info_cv;
4970 u32 fscid, namelen;
4971
4972 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4973 info_v = ceph_decode_8(&p);
4974 info_cv = ceph_decode_8(&p);
4975 info_len = ceph_decode_32(&p);
4976 ceph_decode_need(&p, end, info_len, bad);
4977 info_p = p;
4978 info_end = p + info_len;
4979 p = info_end;
4980
4981 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4982 fscid = ceph_decode_32(&info_p);
4983 namelen = ceph_decode_32(&info_p);
4984 ceph_decode_need(&info_p, info_end, namelen, bad);
4985
4986 if (mds_namespace &&
4987 strlen(mds_namespace) == namelen &&
4988 !strncmp(mds_namespace, (char *)info_p, namelen)) {
4989 mount_fscid = fscid;
4990 break;
4991 }
4992 }
4993
4994 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4995 if (mount_fscid != (u32)-1) {
4996 fsc->client->monc.fs_cluster_id = mount_fscid;
4997 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4998 0, true);
4999 ceph_monc_renew_subs(&fsc->client->monc);
5000 } else {
5001 err = -ENOENT;
5002 goto err_out;
5003 }
5004 return;
5005
5006 bad:
5007 pr_err("error decoding fsmap\n");
5008 err_out:
5009 mutex_lock(&mdsc->mutex);
5010 mdsc->mdsmap_err = err;
5011 __wake_requests(mdsc, &mdsc->waiting_for_map);
5012 mutex_unlock(&mdsc->mutex);
5013 }
5014
5015 /*
5016 * handle mds map update.
5017 */
ceph_mdsc_handle_mdsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)5018 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5019 {
5020 u32 epoch;
5021 u32 maplen;
5022 void *p = msg->front.iov_base;
5023 void *end = p + msg->front.iov_len;
5024 struct ceph_mdsmap *newmap, *oldmap;
5025 struct ceph_fsid fsid;
5026 int err = -EINVAL;
5027
5028 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5029 ceph_decode_copy(&p, &fsid, sizeof(fsid));
5030 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5031 return;
5032 epoch = ceph_decode_32(&p);
5033 maplen = ceph_decode_32(&p);
5034 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5035
5036 /* do we need it? */
5037 mutex_lock(&mdsc->mutex);
5038 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5039 dout("handle_map epoch %u <= our %u\n",
5040 epoch, mdsc->mdsmap->m_epoch);
5041 mutex_unlock(&mdsc->mutex);
5042 return;
5043 }
5044
5045 newmap = ceph_mdsmap_decode(&p, end);
5046 if (IS_ERR(newmap)) {
5047 err = PTR_ERR(newmap);
5048 goto bad_unlock;
5049 }
5050
5051 /* swap into place */
5052 if (mdsc->mdsmap) {
5053 oldmap = mdsc->mdsmap;
5054 mdsc->mdsmap = newmap;
5055 check_new_map(mdsc, newmap, oldmap);
5056 ceph_mdsmap_destroy(oldmap);
5057 } else {
5058 mdsc->mdsmap = newmap; /* first mds map */
5059 }
5060 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5061 MAX_LFS_FILESIZE);
5062
5063 __wake_requests(mdsc, &mdsc->waiting_for_map);
5064 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5065 mdsc->mdsmap->m_epoch);
5066
5067 mutex_unlock(&mdsc->mutex);
5068 schedule_delayed(mdsc, 0);
5069 return;
5070
5071 bad_unlock:
5072 mutex_unlock(&mdsc->mutex);
5073 bad:
5074 pr_err("error decoding mdsmap %d\n", err);
5075 return;
5076 }
5077
con_get(struct ceph_connection * con)5078 static struct ceph_connection *con_get(struct ceph_connection *con)
5079 {
5080 struct ceph_mds_session *s = con->private;
5081
5082 if (ceph_get_mds_session(s))
5083 return con;
5084 return NULL;
5085 }
5086
con_put(struct ceph_connection * con)5087 static void con_put(struct ceph_connection *con)
5088 {
5089 struct ceph_mds_session *s = con->private;
5090
5091 ceph_put_mds_session(s);
5092 }
5093
5094 /*
5095 * if the client is unresponsive for long enough, the mds will kill
5096 * the session entirely.
5097 */
peer_reset(struct ceph_connection * con)5098 static void peer_reset(struct ceph_connection *con)
5099 {
5100 struct ceph_mds_session *s = con->private;
5101 struct ceph_mds_client *mdsc = s->s_mdsc;
5102
5103 pr_warn("mds%d closed our session\n", s->s_mds);
5104 send_mds_reconnect(mdsc, s);
5105 }
5106
dispatch(struct ceph_connection * con,struct ceph_msg * msg)5107 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5108 {
5109 struct ceph_mds_session *s = con->private;
5110 struct ceph_mds_client *mdsc = s->s_mdsc;
5111 int type = le16_to_cpu(msg->hdr.type);
5112
5113 mutex_lock(&mdsc->mutex);
5114 if (__verify_registered_session(mdsc, s) < 0) {
5115 mutex_unlock(&mdsc->mutex);
5116 goto out;
5117 }
5118 mutex_unlock(&mdsc->mutex);
5119
5120 switch (type) {
5121 case CEPH_MSG_MDS_MAP:
5122 ceph_mdsc_handle_mdsmap(mdsc, msg);
5123 break;
5124 case CEPH_MSG_FS_MAP_USER:
5125 ceph_mdsc_handle_fsmap(mdsc, msg);
5126 break;
5127 case CEPH_MSG_CLIENT_SESSION:
5128 handle_session(s, msg);
5129 break;
5130 case CEPH_MSG_CLIENT_REPLY:
5131 handle_reply(s, msg);
5132 break;
5133 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5134 handle_forward(mdsc, s, msg);
5135 break;
5136 case CEPH_MSG_CLIENT_CAPS:
5137 ceph_handle_caps(s, msg);
5138 break;
5139 case CEPH_MSG_CLIENT_SNAP:
5140 ceph_handle_snap(mdsc, s, msg);
5141 break;
5142 case CEPH_MSG_CLIENT_LEASE:
5143 handle_lease(mdsc, s, msg);
5144 break;
5145 case CEPH_MSG_CLIENT_QUOTA:
5146 ceph_handle_quota(mdsc, s, msg);
5147 break;
5148
5149 default:
5150 pr_err("received unknown message type %d %s\n", type,
5151 ceph_msg_type_name(type));
5152 }
5153 out:
5154 ceph_msg_put(msg);
5155 }
5156
5157 /*
5158 * authentication
5159 */
5160
5161 /*
5162 * Note: returned pointer is the address of a structure that's
5163 * managed separately. Caller must *not* attempt to free it.
5164 */
get_authorizer(struct ceph_connection * con,int * proto,int force_new)5165 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5166 int *proto, int force_new)
5167 {
5168 struct ceph_mds_session *s = con->private;
5169 struct ceph_mds_client *mdsc = s->s_mdsc;
5170 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5171 struct ceph_auth_handshake *auth = &s->s_auth;
5172
5173 if (force_new && auth->authorizer) {
5174 ceph_auth_destroy_authorizer(auth->authorizer);
5175 auth->authorizer = NULL;
5176 }
5177 if (!auth->authorizer) {
5178 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5179 auth);
5180 if (ret)
5181 return ERR_PTR(ret);
5182 } else {
5183 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5184 auth);
5185 if (ret)
5186 return ERR_PTR(ret);
5187 }
5188 *proto = ac->protocol;
5189
5190 return auth;
5191 }
5192
add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5193 static int add_authorizer_challenge(struct ceph_connection *con,
5194 void *challenge_buf, int challenge_buf_len)
5195 {
5196 struct ceph_mds_session *s = con->private;
5197 struct ceph_mds_client *mdsc = s->s_mdsc;
5198 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5199
5200 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5201 challenge_buf, challenge_buf_len);
5202 }
5203
verify_authorizer_reply(struct ceph_connection * con)5204 static int verify_authorizer_reply(struct ceph_connection *con)
5205 {
5206 struct ceph_mds_session *s = con->private;
5207 struct ceph_mds_client *mdsc = s->s_mdsc;
5208 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5209
5210 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
5211 }
5212
invalidate_authorizer(struct ceph_connection * con)5213 static int invalidate_authorizer(struct ceph_connection *con)
5214 {
5215 struct ceph_mds_session *s = con->private;
5216 struct ceph_mds_client *mdsc = s->s_mdsc;
5217 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5218
5219 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5220
5221 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5222 }
5223
mds_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5224 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5225 struct ceph_msg_header *hdr, int *skip)
5226 {
5227 struct ceph_msg *msg;
5228 int type = (int) le16_to_cpu(hdr->type);
5229 int front_len = (int) le32_to_cpu(hdr->front_len);
5230
5231 if (con->in_msg)
5232 return con->in_msg;
5233
5234 *skip = 0;
5235 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5236 if (!msg) {
5237 pr_err("unable to allocate msg type %d len %d\n",
5238 type, front_len);
5239 return NULL;
5240 }
5241
5242 return msg;
5243 }
5244
mds_sign_message(struct ceph_msg * msg)5245 static int mds_sign_message(struct ceph_msg *msg)
5246 {
5247 struct ceph_mds_session *s = msg->con->private;
5248 struct ceph_auth_handshake *auth = &s->s_auth;
5249
5250 return ceph_auth_sign_message(auth, msg);
5251 }
5252
mds_check_message_signature(struct ceph_msg * msg)5253 static int mds_check_message_signature(struct ceph_msg *msg)
5254 {
5255 struct ceph_mds_session *s = msg->con->private;
5256 struct ceph_auth_handshake *auth = &s->s_auth;
5257
5258 return ceph_auth_check_message_signature(auth, msg);
5259 }
5260
5261 static const struct ceph_connection_operations mds_con_ops = {
5262 .get = con_get,
5263 .put = con_put,
5264 .dispatch = dispatch,
5265 .get_authorizer = get_authorizer,
5266 .add_authorizer_challenge = add_authorizer_challenge,
5267 .verify_authorizer_reply = verify_authorizer_reply,
5268 .invalidate_authorizer = invalidate_authorizer,
5269 .peer_reset = peer_reset,
5270 .alloc_msg = mds_alloc_msg,
5271 .sign_message = mds_sign_message,
5272 .check_message_signature = mds_check_message_signature,
5273 };
5274
5275 /* eof */
5276