1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14
15 #include "super.h"
16 #include "mds_client.h"
17
18 #include <linux/ceph/ceph_features.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/pagelist.h>
22 #include <linux/ceph/auth.h>
23 #include <linux/ceph/debugfs.h>
24
25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27 /*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage. Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid. If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
52 struct ceph_reconnect_state {
53 struct ceph_mds_session *session;
54 int nr_caps, nr_realms;
55 struct ceph_pagelist *pagelist;
56 unsigned msg_version;
57 bool allow_multi;
58 };
59
60 static void __wake_requests(struct ceph_mds_client *mdsc,
61 struct list_head *head);
62 static void ceph_cap_release_work(struct work_struct *work);
63 static void ceph_cap_reclaim_work(struct work_struct *work);
64
65 static const struct ceph_connection_operations mds_con_ops;
66
67
68 /*
69 * mds reply parsing
70 */
71
parse_reply_info_quota(void ** p,void * end,struct ceph_mds_reply_info_in * info)72 static int parse_reply_info_quota(void **p, void *end,
73 struct ceph_mds_reply_info_in *info)
74 {
75 u8 struct_v, struct_compat;
76 u32 struct_len;
77
78 ceph_decode_8_safe(p, end, struct_v, bad);
79 ceph_decode_8_safe(p, end, struct_compat, bad);
80 /* struct_v is expected to be >= 1. we only
81 * understand encoding with struct_compat == 1. */
82 if (!struct_v || struct_compat != 1)
83 goto bad;
84 ceph_decode_32_safe(p, end, struct_len, bad);
85 ceph_decode_need(p, end, struct_len, bad);
86 end = *p + struct_len;
87 ceph_decode_64_safe(p, end, info->max_bytes, bad);
88 ceph_decode_64_safe(p, end, info->max_files, bad);
89 *p = end;
90 return 0;
91 bad:
92 return -EIO;
93 }
94
95 /*
96 * parse individual inode info
97 */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,u64 features)98 static int parse_reply_info_in(void **p, void *end,
99 struct ceph_mds_reply_info_in *info,
100 u64 features)
101 {
102 int err = 0;
103 u8 struct_v = 0;
104
105 if (features == (u64)-1) {
106 u32 struct_len;
107 u8 struct_compat;
108 ceph_decode_8_safe(p, end, struct_v, bad);
109 ceph_decode_8_safe(p, end, struct_compat, bad);
110 /* struct_v is expected to be >= 1. we only understand
111 * encoding with struct_compat == 1. */
112 if (!struct_v || struct_compat != 1)
113 goto bad;
114 ceph_decode_32_safe(p, end, struct_len, bad);
115 ceph_decode_need(p, end, struct_len, bad);
116 end = *p + struct_len;
117 }
118
119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120 info->in = *p;
121 *p += sizeof(struct ceph_mds_reply_inode) +
122 sizeof(*info->in->fragtree.splits) *
123 le32_to_cpu(info->in->fragtree.nsplits);
124
125 ceph_decode_32_safe(p, end, info->symlink_len, bad);
126 ceph_decode_need(p, end, info->symlink_len, bad);
127 info->symlink = *p;
128 *p += info->symlink_len;
129
130 ceph_decode_copy_safe(p, end, &info->dir_layout,
131 sizeof(info->dir_layout), bad);
132 ceph_decode_32_safe(p, end, info->xattr_len, bad);
133 ceph_decode_need(p, end, info->xattr_len, bad);
134 info->xattr_data = *p;
135 *p += info->xattr_len;
136
137 if (features == (u64)-1) {
138 /* inline data */
139 ceph_decode_64_safe(p, end, info->inline_version, bad);
140 ceph_decode_32_safe(p, end, info->inline_len, bad);
141 ceph_decode_need(p, end, info->inline_len, bad);
142 info->inline_data = *p;
143 *p += info->inline_len;
144 /* quota */
145 err = parse_reply_info_quota(p, end, info);
146 if (err < 0)
147 goto out_bad;
148 /* pool namespace */
149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150 if (info->pool_ns_len > 0) {
151 ceph_decode_need(p, end, info->pool_ns_len, bad);
152 info->pool_ns_data = *p;
153 *p += info->pool_ns_len;
154 }
155
156 /* btime */
157 ceph_decode_need(p, end, sizeof(info->btime), bad);
158 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160 /* change attribute */
161 ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163 /* dir pin */
164 if (struct_v >= 2) {
165 ceph_decode_32_safe(p, end, info->dir_pin, bad);
166 } else {
167 info->dir_pin = -ENODATA;
168 }
169
170 /* snapshot birth time, remains zero for v<=2 */
171 if (struct_v >= 3) {
172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173 ceph_decode_copy(p, &info->snap_btime,
174 sizeof(info->snap_btime));
175 } else {
176 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177 }
178
179 *p = end;
180 } else {
181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182 ceph_decode_64_safe(p, end, info->inline_version, bad);
183 ceph_decode_32_safe(p, end, info->inline_len, bad);
184 ceph_decode_need(p, end, info->inline_len, bad);
185 info->inline_data = *p;
186 *p += info->inline_len;
187 } else
188 info->inline_version = CEPH_INLINE_NONE;
189
190 if (features & CEPH_FEATURE_MDS_QUOTA) {
191 err = parse_reply_info_quota(p, end, info);
192 if (err < 0)
193 goto out_bad;
194 } else {
195 info->max_bytes = 0;
196 info->max_files = 0;
197 }
198
199 info->pool_ns_len = 0;
200 info->pool_ns_data = NULL;
201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203 if (info->pool_ns_len > 0) {
204 ceph_decode_need(p, end, info->pool_ns_len, bad);
205 info->pool_ns_data = *p;
206 *p += info->pool_ns_len;
207 }
208 }
209
210 if (features & CEPH_FEATURE_FS_BTIME) {
211 ceph_decode_need(p, end, sizeof(info->btime), bad);
212 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
213 ceph_decode_64_safe(p, end, info->change_attr, bad);
214 }
215
216 info->dir_pin = -ENODATA;
217 /* info->snap_btime remains zero */
218 }
219 return 0;
220 bad:
221 err = -EIO;
222 out_bad:
223 return err;
224 }
225
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_dirfrag ** dirfrag,u64 features)226 static int parse_reply_info_dir(void **p, void *end,
227 struct ceph_mds_reply_dirfrag **dirfrag,
228 u64 features)
229 {
230 if (features == (u64)-1) {
231 u8 struct_v, struct_compat;
232 u32 struct_len;
233 ceph_decode_8_safe(p, end, struct_v, bad);
234 ceph_decode_8_safe(p, end, struct_compat, bad);
235 /* struct_v is expected to be >= 1. we only understand
236 * encoding whose struct_compat == 1. */
237 if (!struct_v || struct_compat != 1)
238 goto bad;
239 ceph_decode_32_safe(p, end, struct_len, bad);
240 ceph_decode_need(p, end, struct_len, bad);
241 end = *p + struct_len;
242 }
243
244 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245 *dirfrag = *p;
246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247 if (unlikely(*p > end))
248 goto bad;
249 if (features == (u64)-1)
250 *p = end;
251 return 0;
252 bad:
253 return -EIO;
254 }
255
parse_reply_info_lease(void ** p,void * end,struct ceph_mds_reply_lease ** lease,u64 features)256 static int parse_reply_info_lease(void **p, void *end,
257 struct ceph_mds_reply_lease **lease,
258 u64 features)
259 {
260 if (features == (u64)-1) {
261 u8 struct_v, struct_compat;
262 u32 struct_len;
263 ceph_decode_8_safe(p, end, struct_v, bad);
264 ceph_decode_8_safe(p, end, struct_compat, bad);
265 /* struct_v is expected to be >= 1. we only understand
266 * encoding whose struct_compat == 1. */
267 if (!struct_v || struct_compat != 1)
268 goto bad;
269 ceph_decode_32_safe(p, end, struct_len, bad);
270 ceph_decode_need(p, end, struct_len, bad);
271 end = *p + struct_len;
272 }
273
274 ceph_decode_need(p, end, sizeof(**lease), bad);
275 *lease = *p;
276 *p += sizeof(**lease);
277 if (features == (u64)-1)
278 *p = end;
279 return 0;
280 bad:
281 return -EIO;
282 }
283
284 /*
285 * parse a normal reply, which may contain a (dir+)dentry and/or a
286 * target inode.
287 */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)288 static int parse_reply_info_trace(void **p, void *end,
289 struct ceph_mds_reply_info_parsed *info,
290 u64 features)
291 {
292 int err;
293
294 if (info->head->is_dentry) {
295 err = parse_reply_info_in(p, end, &info->diri, features);
296 if (err < 0)
297 goto out_bad;
298
299 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300 if (err < 0)
301 goto out_bad;
302
303 ceph_decode_32_safe(p, end, info->dname_len, bad);
304 ceph_decode_need(p, end, info->dname_len, bad);
305 info->dname = *p;
306 *p += info->dname_len;
307
308 err = parse_reply_info_lease(p, end, &info->dlease, features);
309 if (err < 0)
310 goto out_bad;
311 }
312
313 if (info->head->is_target) {
314 err = parse_reply_info_in(p, end, &info->targeti, features);
315 if (err < 0)
316 goto out_bad;
317 }
318
319 if (unlikely(*p != end))
320 goto bad;
321 return 0;
322
323 bad:
324 err = -EIO;
325 out_bad:
326 pr_err("problem parsing mds trace %d\n", err);
327 return err;
328 }
329
330 /*
331 * parse readdir results
332 */
parse_reply_info_readdir(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)333 static int parse_reply_info_readdir(void **p, void *end,
334 struct ceph_mds_reply_info_parsed *info,
335 u64 features)
336 {
337 u32 num, i = 0;
338 int err;
339
340 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341 if (err < 0)
342 goto out_bad;
343
344 ceph_decode_need(p, end, sizeof(num) + 2, bad);
345 num = ceph_decode_32(p);
346 {
347 u16 flags = ceph_decode_16(p);
348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
352 }
353 if (num == 0)
354 goto done;
355
356 BUG_ON(!info->dir_entries);
357 if ((unsigned long)(info->dir_entries + num) >
358 (unsigned long)info->dir_entries + info->dir_buf_size) {
359 pr_err("dir contents are larger than expected\n");
360 WARN_ON(1);
361 goto bad;
362 }
363
364 info->dir_nr = num;
365 while (num) {
366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
367 /* dentry */
368 ceph_decode_32_safe(p, end, rde->name_len, bad);
369 ceph_decode_need(p, end, rde->name_len, bad);
370 rde->name = *p;
371 *p += rde->name_len;
372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
373
374 /* dentry lease */
375 err = parse_reply_info_lease(p, end, &rde->lease, features);
376 if (err)
377 goto out_bad;
378 /* inode */
379 err = parse_reply_info_in(p, end, &rde->inode, features);
380 if (err < 0)
381 goto out_bad;
382 /* ceph_readdir_prepopulate() will update it */
383 rde->offset = 0;
384 i++;
385 num--;
386 }
387
388 done:
389 /* Skip over any unrecognized fields */
390 *p = end;
391 return 0;
392
393 bad:
394 err = -EIO;
395 out_bad:
396 pr_err("problem parsing dir contents %d\n", err);
397 return err;
398 }
399
400 /*
401 * parse fcntl F_GETLK results
402 */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)403 static int parse_reply_info_filelock(void **p, void *end,
404 struct ceph_mds_reply_info_parsed *info,
405 u64 features)
406 {
407 if (*p + sizeof(*info->filelock_reply) > end)
408 goto bad;
409
410 info->filelock_reply = *p;
411
412 /* Skip over any unrecognized fields */
413 *p = end;
414 return 0;
415 bad:
416 return -EIO;
417 }
418
419
420 #if BITS_PER_LONG == 64
421
422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1)
423
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)424 static int ceph_parse_deleg_inos(void **p, void *end,
425 struct ceph_mds_session *s)
426 {
427 u32 sets;
428
429 ceph_decode_32_safe(p, end, sets, bad);
430 dout("got %u sets of delegated inodes\n", sets);
431 while (sets--) {
432 u64 start, len, ino;
433
434 ceph_decode_64_safe(p, end, start, bad);
435 ceph_decode_64_safe(p, end, len, bad);
436
437 /* Don't accept a delegation of system inodes */
438 if (start < CEPH_INO_SYSTEM_BASE) {
439 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
440 start, len);
441 continue;
442 }
443 while (len--) {
444 int err = xa_insert(&s->s_delegated_inos, ino = start++,
445 DELEGATED_INO_AVAILABLE,
446 GFP_KERNEL);
447 if (!err) {
448 dout("added delegated inode 0x%llx\n",
449 start - 1);
450 } else if (err == -EBUSY) {
451 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
452 start - 1);
453 } else {
454 return err;
455 }
456 }
457 }
458 return 0;
459 bad:
460 return -EIO;
461 }
462
ceph_get_deleg_ino(struct ceph_mds_session * s)463 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
464 {
465 unsigned long ino;
466 void *val;
467
468 xa_for_each(&s->s_delegated_inos, ino, val) {
469 val = xa_erase(&s->s_delegated_inos, ino);
470 if (val == DELEGATED_INO_AVAILABLE)
471 return ino;
472 }
473 return 0;
474 }
475
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)476 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
477 {
478 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
479 GFP_KERNEL);
480 }
481 #else /* BITS_PER_LONG == 64 */
482 /*
483 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
484 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
485 * and bottom words?
486 */
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)487 static int ceph_parse_deleg_inos(void **p, void *end,
488 struct ceph_mds_session *s)
489 {
490 u32 sets;
491
492 ceph_decode_32_safe(p, end, sets, bad);
493 if (sets)
494 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
495 return 0;
496 bad:
497 return -EIO;
498 }
499
ceph_get_deleg_ino(struct ceph_mds_session * s)500 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
501 {
502 return 0;
503 }
504
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)505 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
506 {
507 return 0;
508 }
509 #endif /* BITS_PER_LONG == 64 */
510
511 /*
512 * parse create results
513 */
parse_reply_info_create(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)514 static int parse_reply_info_create(void **p, void *end,
515 struct ceph_mds_reply_info_parsed *info,
516 u64 features, struct ceph_mds_session *s)
517 {
518 int ret;
519
520 if (features == (u64)-1 ||
521 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
522 if (*p == end) {
523 /* Malformed reply? */
524 info->has_create_ino = false;
525 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
526 u8 struct_v, struct_compat;
527 u32 len;
528
529 info->has_create_ino = true;
530 ceph_decode_8_safe(p, end, struct_v, bad);
531 ceph_decode_8_safe(p, end, struct_compat, bad);
532 ceph_decode_32_safe(p, end, len, bad);
533 ceph_decode_64_safe(p, end, info->ino, bad);
534 ret = ceph_parse_deleg_inos(p, end, s);
535 if (ret)
536 return ret;
537 } else {
538 /* legacy */
539 ceph_decode_64_safe(p, end, info->ino, bad);
540 info->has_create_ino = true;
541 }
542 } else {
543 if (*p != end)
544 goto bad;
545 }
546
547 /* Skip over any unrecognized fields */
548 *p = end;
549 return 0;
550 bad:
551 return -EIO;
552 }
553
554 /*
555 * parse extra results
556 */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)557 static int parse_reply_info_extra(void **p, void *end,
558 struct ceph_mds_reply_info_parsed *info,
559 u64 features, struct ceph_mds_session *s)
560 {
561 u32 op = le32_to_cpu(info->head->op);
562
563 if (op == CEPH_MDS_OP_GETFILELOCK)
564 return parse_reply_info_filelock(p, end, info, features);
565 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
566 return parse_reply_info_readdir(p, end, info, features);
567 else if (op == CEPH_MDS_OP_CREATE)
568 return parse_reply_info_create(p, end, info, features, s);
569 else
570 return -EIO;
571 }
572
573 /*
574 * parse entire mds reply
575 */
parse_reply_info(struct ceph_mds_session * s,struct ceph_msg * msg,struct ceph_mds_reply_info_parsed * info,u64 features)576 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
577 struct ceph_mds_reply_info_parsed *info,
578 u64 features)
579 {
580 void *p, *end;
581 u32 len;
582 int err;
583
584 info->head = msg->front.iov_base;
585 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
586 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
587
588 /* trace */
589 ceph_decode_32_safe(&p, end, len, bad);
590 if (len > 0) {
591 ceph_decode_need(&p, end, len, bad);
592 err = parse_reply_info_trace(&p, p+len, info, features);
593 if (err < 0)
594 goto out_bad;
595 }
596
597 /* extra */
598 ceph_decode_32_safe(&p, end, len, bad);
599 if (len > 0) {
600 ceph_decode_need(&p, end, len, bad);
601 err = parse_reply_info_extra(&p, p+len, info, features, s);
602 if (err < 0)
603 goto out_bad;
604 }
605
606 /* snap blob */
607 ceph_decode_32_safe(&p, end, len, bad);
608 info->snapblob_len = len;
609 info->snapblob = p;
610 p += len;
611
612 if (p != end)
613 goto bad;
614 return 0;
615
616 bad:
617 err = -EIO;
618 out_bad:
619 pr_err("mds parse_reply err %d\n", err);
620 return err;
621 }
622
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)623 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
624 {
625 if (!info->dir_entries)
626 return;
627 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
628 }
629
630
631 /*
632 * sessions
633 */
ceph_session_state_name(int s)634 const char *ceph_session_state_name(int s)
635 {
636 switch (s) {
637 case CEPH_MDS_SESSION_NEW: return "new";
638 case CEPH_MDS_SESSION_OPENING: return "opening";
639 case CEPH_MDS_SESSION_OPEN: return "open";
640 case CEPH_MDS_SESSION_HUNG: return "hung";
641 case CEPH_MDS_SESSION_CLOSING: return "closing";
642 case CEPH_MDS_SESSION_CLOSED: return "closed";
643 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
644 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
645 case CEPH_MDS_SESSION_REJECTED: return "rejected";
646 default: return "???";
647 }
648 }
649
ceph_get_mds_session(struct ceph_mds_session * s)650 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
651 {
652 if (refcount_inc_not_zero(&s->s_ref)) {
653 dout("mdsc get_session %p %d -> %d\n", s,
654 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
655 return s;
656 } else {
657 dout("mdsc get_session %p 0 -- FAIL\n", s);
658 return NULL;
659 }
660 }
661
ceph_put_mds_session(struct ceph_mds_session * s)662 void ceph_put_mds_session(struct ceph_mds_session *s)
663 {
664 if (IS_ERR_OR_NULL(s))
665 return;
666
667 dout("mdsc put_session %p %d -> %d\n", s,
668 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
669 if (refcount_dec_and_test(&s->s_ref)) {
670 if (s->s_auth.authorizer)
671 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672 WARN_ON(mutex_is_locked(&s->s_mutex));
673 xa_destroy(&s->s_delegated_inos);
674 kfree(s);
675 }
676 }
677
678 /*
679 * called under mdsc->mutex
680 */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
682 int mds)
683 {
684 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
685 return NULL;
686 return ceph_get_mds_session(mdsc->sessions[mds]);
687 }
688
__have_session(struct ceph_mds_client * mdsc,int mds)689 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
690 {
691 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
692 return false;
693 else
694 return true;
695 }
696
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)697 static int __verify_registered_session(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *s)
699 {
700 if (s->s_mds >= mdsc->max_sessions ||
701 mdsc->sessions[s->s_mds] != s)
702 return -ENOENT;
703 return 0;
704 }
705
706 /*
707 * create+register a new session for given mds.
708 * called under mdsc->mutex.
709 */
register_session(struct ceph_mds_client * mdsc,int mds)710 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
711 int mds)
712 {
713 struct ceph_mds_session *s;
714
715 if (mds >= mdsc->mdsmap->possible_max_rank)
716 return ERR_PTR(-EINVAL);
717
718 s = kzalloc(sizeof(*s), GFP_NOFS);
719 if (!s)
720 return ERR_PTR(-ENOMEM);
721
722 if (mds >= mdsc->max_sessions) {
723 int newmax = 1 << get_count_order(mds + 1);
724 struct ceph_mds_session **sa;
725
726 dout("%s: realloc to %d\n", __func__, newmax);
727 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
728 if (!sa)
729 goto fail_realloc;
730 if (mdsc->sessions) {
731 memcpy(sa, mdsc->sessions,
732 mdsc->max_sessions * sizeof(void *));
733 kfree(mdsc->sessions);
734 }
735 mdsc->sessions = sa;
736 mdsc->max_sessions = newmax;
737 }
738
739 dout("%s: mds%d\n", __func__, mds);
740 s->s_mdsc = mdsc;
741 s->s_mds = mds;
742 s->s_state = CEPH_MDS_SESSION_NEW;
743 s->s_ttl = 0;
744 s->s_seq = 0;
745 mutex_init(&s->s_mutex);
746
747 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
748
749 spin_lock_init(&s->s_gen_ttl_lock);
750 s->s_cap_gen = 1;
751 s->s_cap_ttl = jiffies - 1;
752
753 spin_lock_init(&s->s_cap_lock);
754 s->s_renew_requested = 0;
755 s->s_renew_seq = 0;
756 INIT_LIST_HEAD(&s->s_caps);
757 s->s_nr_caps = 0;
758 refcount_set(&s->s_ref, 1);
759 INIT_LIST_HEAD(&s->s_waiting);
760 INIT_LIST_HEAD(&s->s_unsafe);
761 xa_init(&s->s_delegated_inos);
762 s->s_num_cap_releases = 0;
763 s->s_cap_reconnect = 0;
764 s->s_cap_iterator = NULL;
765 INIT_LIST_HEAD(&s->s_cap_releases);
766 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767
768 INIT_LIST_HEAD(&s->s_cap_dirty);
769 INIT_LIST_HEAD(&s->s_cap_flushing);
770
771 mdsc->sessions[mds] = s;
772 atomic_inc(&mdsc->num_sessions);
773 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
774
775 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
776 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
777
778 return s;
779
780 fail_realloc:
781 kfree(s);
782 return ERR_PTR(-ENOMEM);
783 }
784
785 /*
786 * called under mdsc->mutex
787 */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)788 static void __unregister_session(struct ceph_mds_client *mdsc,
789 struct ceph_mds_session *s)
790 {
791 dout("__unregister_session mds%d %p\n", s->s_mds, s);
792 BUG_ON(mdsc->sessions[s->s_mds] != s);
793 mdsc->sessions[s->s_mds] = NULL;
794 ceph_con_close(&s->s_con);
795 ceph_put_mds_session(s);
796 atomic_dec(&mdsc->num_sessions);
797 }
798
799 /*
800 * drop session refs in request.
801 *
802 * should be last request ref, or hold mdsc->mutex
803 */
put_request_session(struct ceph_mds_request * req)804 static void put_request_session(struct ceph_mds_request *req)
805 {
806 if (req->r_session) {
807 ceph_put_mds_session(req->r_session);
808 req->r_session = NULL;
809 }
810 }
811
ceph_mdsc_iterate_sessions(struct ceph_mds_client * mdsc,void (* cb)(struct ceph_mds_session *),bool check_state)812 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
813 void (*cb)(struct ceph_mds_session *),
814 bool check_state)
815 {
816 int mds;
817
818 mutex_lock(&mdsc->mutex);
819 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
820 struct ceph_mds_session *s;
821
822 s = __ceph_lookup_mds_session(mdsc, mds);
823 if (!s)
824 continue;
825
826 if (check_state && !check_session_state(s)) {
827 ceph_put_mds_session(s);
828 continue;
829 }
830
831 mutex_unlock(&mdsc->mutex);
832 cb(s);
833 ceph_put_mds_session(s);
834 mutex_lock(&mdsc->mutex);
835 }
836 mutex_unlock(&mdsc->mutex);
837 }
838
ceph_mdsc_release_request(struct kref * kref)839 void ceph_mdsc_release_request(struct kref *kref)
840 {
841 struct ceph_mds_request *req = container_of(kref,
842 struct ceph_mds_request,
843 r_kref);
844 ceph_mdsc_release_dir_caps_no_check(req);
845 destroy_reply_info(&req->r_reply_info);
846 if (req->r_request)
847 ceph_msg_put(req->r_request);
848 if (req->r_reply)
849 ceph_msg_put(req->r_reply);
850 if (req->r_inode) {
851 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
852 /* avoid calling iput_final() in mds dispatch threads */
853 ceph_async_iput(req->r_inode);
854 }
855 if (req->r_parent) {
856 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
857 ceph_async_iput(req->r_parent);
858 }
859 ceph_async_iput(req->r_target_inode);
860 if (req->r_dentry)
861 dput(req->r_dentry);
862 if (req->r_old_dentry)
863 dput(req->r_old_dentry);
864 if (req->r_old_dentry_dir) {
865 /*
866 * track (and drop pins for) r_old_dentry_dir
867 * separately, since r_old_dentry's d_parent may have
868 * changed between the dir mutex being dropped and
869 * this request being freed.
870 */
871 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
872 CEPH_CAP_PIN);
873 ceph_async_iput(req->r_old_dentry_dir);
874 }
875 kfree(req->r_path1);
876 kfree(req->r_path2);
877 if (req->r_pagelist)
878 ceph_pagelist_release(req->r_pagelist);
879 put_request_session(req);
880 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
881 WARN_ON_ONCE(!list_empty(&req->r_wait));
882 kmem_cache_free(ceph_mds_request_cachep, req);
883 }
884
DEFINE_RB_FUNCS(request,struct ceph_mds_request,r_tid,r_node)885 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
886
887 /*
888 * lookup session, bump ref if found.
889 *
890 * called under mdsc->mutex.
891 */
892 static struct ceph_mds_request *
893 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
894 {
895 struct ceph_mds_request *req;
896
897 req = lookup_request(&mdsc->request_tree, tid);
898 if (req)
899 ceph_mdsc_get_request(req);
900
901 return req;
902 }
903
904 /*
905 * Register an in-flight request, and assign a tid. Link to directory
906 * are modifying (if any).
907 *
908 * Called under mdsc->mutex.
909 */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)910 static void __register_request(struct ceph_mds_client *mdsc,
911 struct ceph_mds_request *req,
912 struct inode *dir)
913 {
914 int ret = 0;
915
916 req->r_tid = ++mdsc->last_tid;
917 if (req->r_num_caps) {
918 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
919 req->r_num_caps);
920 if (ret < 0) {
921 pr_err("__register_request %p "
922 "failed to reserve caps: %d\n", req, ret);
923 /* set req->r_err to fail early from __do_request */
924 req->r_err = ret;
925 return;
926 }
927 }
928 dout("__register_request %p tid %lld\n", req, req->r_tid);
929 ceph_mdsc_get_request(req);
930 insert_request(&mdsc->request_tree, req);
931
932 req->r_uid = current_fsuid();
933 req->r_gid = current_fsgid();
934
935 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
936 mdsc->oldest_tid = req->r_tid;
937
938 if (dir) {
939 struct ceph_inode_info *ci = ceph_inode(dir);
940
941 ihold(dir);
942 req->r_unsafe_dir = dir;
943 spin_lock(&ci->i_unsafe_lock);
944 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
945 spin_unlock(&ci->i_unsafe_lock);
946 }
947 }
948
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)949 static void __unregister_request(struct ceph_mds_client *mdsc,
950 struct ceph_mds_request *req)
951 {
952 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
953
954 /* Never leave an unregistered request on an unsafe list! */
955 list_del_init(&req->r_unsafe_item);
956
957 if (req->r_tid == mdsc->oldest_tid) {
958 struct rb_node *p = rb_next(&req->r_node);
959 mdsc->oldest_tid = 0;
960 while (p) {
961 struct ceph_mds_request *next_req =
962 rb_entry(p, struct ceph_mds_request, r_node);
963 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
964 mdsc->oldest_tid = next_req->r_tid;
965 break;
966 }
967 p = rb_next(p);
968 }
969 }
970
971 erase_request(&mdsc->request_tree, req);
972
973 if (req->r_unsafe_dir) {
974 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
975 spin_lock(&ci->i_unsafe_lock);
976 list_del_init(&req->r_unsafe_dir_item);
977 spin_unlock(&ci->i_unsafe_lock);
978 }
979 if (req->r_target_inode &&
980 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
981 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
982 spin_lock(&ci->i_unsafe_lock);
983 list_del_init(&req->r_unsafe_target_item);
984 spin_unlock(&ci->i_unsafe_lock);
985 }
986
987 if (req->r_unsafe_dir) {
988 /* avoid calling iput_final() in mds dispatch threads */
989 ceph_async_iput(req->r_unsafe_dir);
990 req->r_unsafe_dir = NULL;
991 }
992
993 complete_all(&req->r_safe_completion);
994
995 ceph_mdsc_put_request(req);
996 }
997
998 /*
999 * Walk back up the dentry tree until we hit a dentry representing a
1000 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1001 * when calling this) to ensure that the objects won't disappear while we're
1002 * working with them. Once we hit a candidate dentry, we attempt to take a
1003 * reference to it, and return that as the result.
1004 */
get_nonsnap_parent(struct dentry * dentry)1005 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1006 {
1007 struct inode *inode = NULL;
1008
1009 while (dentry && !IS_ROOT(dentry)) {
1010 inode = d_inode_rcu(dentry);
1011 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1012 break;
1013 dentry = dentry->d_parent;
1014 }
1015 if (inode)
1016 inode = igrab(inode);
1017 return inode;
1018 }
1019
1020 /*
1021 * Choose mds to send request to next. If there is a hint set in the
1022 * request (e.g., due to a prior forward hint from the mds), use that.
1023 * Otherwise, consult frag tree and/or caps to identify the
1024 * appropriate mds. If all else fails, choose randomly.
1025 *
1026 * Called under mdsc->mutex.
1027 */
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,bool * random)1028 static int __choose_mds(struct ceph_mds_client *mdsc,
1029 struct ceph_mds_request *req,
1030 bool *random)
1031 {
1032 struct inode *inode;
1033 struct ceph_inode_info *ci;
1034 struct ceph_cap *cap;
1035 int mode = req->r_direct_mode;
1036 int mds = -1;
1037 u32 hash = req->r_direct_hash;
1038 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1039
1040 if (random)
1041 *random = false;
1042
1043 /*
1044 * is there a specific mds we should try? ignore hint if we have
1045 * no session and the mds is not up (active or recovering).
1046 */
1047 if (req->r_resend_mds >= 0 &&
1048 (__have_session(mdsc, req->r_resend_mds) ||
1049 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1050 dout("%s using resend_mds mds%d\n", __func__,
1051 req->r_resend_mds);
1052 return req->r_resend_mds;
1053 }
1054
1055 if (mode == USE_RANDOM_MDS)
1056 goto random;
1057
1058 inode = NULL;
1059 if (req->r_inode) {
1060 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1061 inode = req->r_inode;
1062 ihold(inode);
1063 } else {
1064 /* req->r_dentry is non-null for LSSNAP request */
1065 rcu_read_lock();
1066 inode = get_nonsnap_parent(req->r_dentry);
1067 rcu_read_unlock();
1068 dout("%s using snapdir's parent %p\n", __func__, inode);
1069 }
1070 } else if (req->r_dentry) {
1071 /* ignore race with rename; old or new d_parent is okay */
1072 struct dentry *parent;
1073 struct inode *dir;
1074
1075 rcu_read_lock();
1076 parent = READ_ONCE(req->r_dentry->d_parent);
1077 dir = req->r_parent ? : d_inode_rcu(parent);
1078
1079 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1080 /* not this fs or parent went negative */
1081 inode = d_inode(req->r_dentry);
1082 if (inode)
1083 ihold(inode);
1084 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1085 /* direct snapped/virtual snapdir requests
1086 * based on parent dir inode */
1087 inode = get_nonsnap_parent(parent);
1088 dout("%s using nonsnap parent %p\n", __func__, inode);
1089 } else {
1090 /* dentry target */
1091 inode = d_inode(req->r_dentry);
1092 if (!inode || mode == USE_AUTH_MDS) {
1093 /* dir + name */
1094 inode = igrab(dir);
1095 hash = ceph_dentry_hash(dir, req->r_dentry);
1096 is_hash = true;
1097 } else {
1098 ihold(inode);
1099 }
1100 }
1101 rcu_read_unlock();
1102 }
1103
1104 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1105 hash, mode);
1106 if (!inode)
1107 goto random;
1108 ci = ceph_inode(inode);
1109
1110 if (is_hash && S_ISDIR(inode->i_mode)) {
1111 struct ceph_inode_frag frag;
1112 int found;
1113
1114 ceph_choose_frag(ci, hash, &frag, &found);
1115 if (found) {
1116 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1117 u8 r;
1118
1119 /* choose a random replica */
1120 get_random_bytes(&r, 1);
1121 r %= frag.ndist;
1122 mds = frag.dist[r];
1123 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1124 __func__, inode, ceph_vinop(inode),
1125 frag.frag, mds, (int)r, frag.ndist);
1126 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1127 CEPH_MDS_STATE_ACTIVE &&
1128 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1129 goto out;
1130 }
1131
1132 /* since this file/dir wasn't known to be
1133 * replicated, then we want to look for the
1134 * authoritative mds. */
1135 if (frag.mds >= 0) {
1136 /* choose auth mds */
1137 mds = frag.mds;
1138 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1139 __func__, inode, ceph_vinop(inode),
1140 frag.frag, mds);
1141 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1142 CEPH_MDS_STATE_ACTIVE) {
1143 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1144 mds))
1145 goto out;
1146 }
1147 }
1148 mode = USE_AUTH_MDS;
1149 }
1150 }
1151
1152 spin_lock(&ci->i_ceph_lock);
1153 cap = NULL;
1154 if (mode == USE_AUTH_MDS)
1155 cap = ci->i_auth_cap;
1156 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1157 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1158 if (!cap) {
1159 spin_unlock(&ci->i_ceph_lock);
1160 ceph_async_iput(inode);
1161 goto random;
1162 }
1163 mds = cap->session->s_mds;
1164 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1165 inode, ceph_vinop(inode), mds,
1166 cap == ci->i_auth_cap ? "auth " : "", cap);
1167 spin_unlock(&ci->i_ceph_lock);
1168 out:
1169 /* avoid calling iput_final() while holding mdsc->mutex or
1170 * in mds dispatch threads */
1171 ceph_async_iput(inode);
1172 return mds;
1173
1174 random:
1175 if (random)
1176 *random = true;
1177
1178 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1179 dout("%s chose random mds%d\n", __func__, mds);
1180 return mds;
1181 }
1182
1183
1184 /*
1185 * session messages
1186 */
ceph_create_session_msg(u32 op,u64 seq)1187 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1188 {
1189 struct ceph_msg *msg;
1190 struct ceph_mds_session_head *h;
1191
1192 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1193 false);
1194 if (!msg) {
1195 pr_err("ENOMEM creating session %s msg\n",
1196 ceph_session_op_name(op));
1197 return NULL;
1198 }
1199 h = msg->front.iov_base;
1200 h->op = cpu_to_le32(op);
1201 h->seq = cpu_to_le64(seq);
1202
1203 return msg;
1204 }
1205
1206 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1207 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
encode_supported_features(void ** p,void * end)1208 static int encode_supported_features(void **p, void *end)
1209 {
1210 static const size_t count = ARRAY_SIZE(feature_bits);
1211
1212 if (count > 0) {
1213 size_t i;
1214 size_t size = FEATURE_BYTES(count);
1215 unsigned long bit;
1216
1217 if (WARN_ON_ONCE(*p + 4 + size > end))
1218 return -ERANGE;
1219
1220 ceph_encode_32(p, size);
1221 memset(*p, 0, size);
1222 for (i = 0; i < count; i++) {
1223 bit = feature_bits[i];
1224 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1225 }
1226 *p += size;
1227 } else {
1228 if (WARN_ON_ONCE(*p + 4 > end))
1229 return -ERANGE;
1230
1231 ceph_encode_32(p, 0);
1232 }
1233
1234 return 0;
1235 }
1236
1237 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1238 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
encode_metric_spec(void ** p,void * end)1239 static int encode_metric_spec(void **p, void *end)
1240 {
1241 static const size_t count = ARRAY_SIZE(metric_bits);
1242
1243 /* header */
1244 if (WARN_ON_ONCE(*p + 2 > end))
1245 return -ERANGE;
1246
1247 ceph_encode_8(p, 1); /* version */
1248 ceph_encode_8(p, 1); /* compat */
1249
1250 if (count > 0) {
1251 size_t i;
1252 size_t size = METRIC_BYTES(count);
1253
1254 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1255 return -ERANGE;
1256
1257 /* metric spec info length */
1258 ceph_encode_32(p, 4 + size);
1259
1260 /* metric spec */
1261 ceph_encode_32(p, size);
1262 memset(*p, 0, size);
1263 for (i = 0; i < count; i++)
1264 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1265 *p += size;
1266 } else {
1267 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1268 return -ERANGE;
1269
1270 /* metric spec info length */
1271 ceph_encode_32(p, 4);
1272 /* metric spec */
1273 ceph_encode_32(p, 0);
1274 }
1275
1276 return 0;
1277 }
1278
1279 /*
1280 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1281 * to include additional client metadata fields.
1282 */
create_session_open_msg(struct ceph_mds_client * mdsc,u64 seq)1283 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1284 {
1285 struct ceph_msg *msg;
1286 struct ceph_mds_session_head *h;
1287 int i = -1;
1288 int extra_bytes = 0;
1289 int metadata_key_count = 0;
1290 struct ceph_options *opt = mdsc->fsc->client->options;
1291 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1292 size_t size, count;
1293 void *p, *end;
1294 int ret;
1295
1296 const char* metadata[][2] = {
1297 {"hostname", mdsc->nodename},
1298 {"kernel_version", init_utsname()->release},
1299 {"entity_id", opt->name ? : ""},
1300 {"root", fsopt->server_path ? : "/"},
1301 {NULL, NULL}
1302 };
1303
1304 /* Calculate serialized length of metadata */
1305 extra_bytes = 4; /* map length */
1306 for (i = 0; metadata[i][0]; ++i) {
1307 extra_bytes += 8 + strlen(metadata[i][0]) +
1308 strlen(metadata[i][1]);
1309 metadata_key_count++;
1310 }
1311
1312 /* supported feature */
1313 size = 0;
1314 count = ARRAY_SIZE(feature_bits);
1315 if (count > 0)
1316 size = FEATURE_BYTES(count);
1317 extra_bytes += 4 + size;
1318
1319 /* metric spec */
1320 size = 0;
1321 count = ARRAY_SIZE(metric_bits);
1322 if (count > 0)
1323 size = METRIC_BYTES(count);
1324 extra_bytes += 2 + 4 + 4 + size;
1325
1326 /* Allocate the message */
1327 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1328 GFP_NOFS, false);
1329 if (!msg) {
1330 pr_err("ENOMEM creating session open msg\n");
1331 return ERR_PTR(-ENOMEM);
1332 }
1333 p = msg->front.iov_base;
1334 end = p + msg->front.iov_len;
1335
1336 h = p;
1337 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1338 h->seq = cpu_to_le64(seq);
1339
1340 /*
1341 * Serialize client metadata into waiting buffer space, using
1342 * the format that userspace expects for map<string, string>
1343 *
1344 * ClientSession messages with metadata are v4
1345 */
1346 msg->hdr.version = cpu_to_le16(4);
1347 msg->hdr.compat_version = cpu_to_le16(1);
1348
1349 /* The write pointer, following the session_head structure */
1350 p += sizeof(*h);
1351
1352 /* Number of entries in the map */
1353 ceph_encode_32(&p, metadata_key_count);
1354
1355 /* Two length-prefixed strings for each entry in the map */
1356 for (i = 0; metadata[i][0]; ++i) {
1357 size_t const key_len = strlen(metadata[i][0]);
1358 size_t const val_len = strlen(metadata[i][1]);
1359
1360 ceph_encode_32(&p, key_len);
1361 memcpy(p, metadata[i][0], key_len);
1362 p += key_len;
1363 ceph_encode_32(&p, val_len);
1364 memcpy(p, metadata[i][1], val_len);
1365 p += val_len;
1366 }
1367
1368 ret = encode_supported_features(&p, end);
1369 if (ret) {
1370 pr_err("encode_supported_features failed!\n");
1371 ceph_msg_put(msg);
1372 return ERR_PTR(ret);
1373 }
1374
1375 ret = encode_metric_spec(&p, end);
1376 if (ret) {
1377 pr_err("encode_metric_spec failed!\n");
1378 ceph_msg_put(msg);
1379 return ERR_PTR(ret);
1380 }
1381
1382 msg->front.iov_len = p - msg->front.iov_base;
1383 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1384
1385 return msg;
1386 }
1387
1388 /*
1389 * send session open request.
1390 *
1391 * called under mdsc->mutex
1392 */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1393 static int __open_session(struct ceph_mds_client *mdsc,
1394 struct ceph_mds_session *session)
1395 {
1396 struct ceph_msg *msg;
1397 int mstate;
1398 int mds = session->s_mds;
1399
1400 /* wait for mds to go active? */
1401 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1402 dout("open_session to mds%d (%s)\n", mds,
1403 ceph_mds_state_name(mstate));
1404 session->s_state = CEPH_MDS_SESSION_OPENING;
1405 session->s_renew_requested = jiffies;
1406
1407 /* send connect message */
1408 msg = create_session_open_msg(mdsc, session->s_seq);
1409 if (IS_ERR(msg))
1410 return PTR_ERR(msg);
1411 ceph_con_send(&session->s_con, msg);
1412 return 0;
1413 }
1414
1415 /*
1416 * open sessions for any export targets for the given mds
1417 *
1418 * called under mdsc->mutex
1419 */
1420 static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client * mdsc,int target)1421 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1422 {
1423 struct ceph_mds_session *session;
1424 int ret;
1425
1426 session = __ceph_lookup_mds_session(mdsc, target);
1427 if (!session) {
1428 session = register_session(mdsc, target);
1429 if (IS_ERR(session))
1430 return session;
1431 }
1432 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1433 session->s_state == CEPH_MDS_SESSION_CLOSING) {
1434 ret = __open_session(mdsc, session);
1435 if (ret)
1436 return ERR_PTR(ret);
1437 }
1438
1439 return session;
1440 }
1441
1442 struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client * mdsc,int target)1443 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1444 {
1445 struct ceph_mds_session *session;
1446
1447 dout("open_export_target_session to mds%d\n", target);
1448
1449 mutex_lock(&mdsc->mutex);
1450 session = __open_export_target_session(mdsc, target);
1451 mutex_unlock(&mdsc->mutex);
1452
1453 return session;
1454 }
1455
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1456 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1457 struct ceph_mds_session *session)
1458 {
1459 struct ceph_mds_info *mi;
1460 struct ceph_mds_session *ts;
1461 int i, mds = session->s_mds;
1462
1463 if (mds >= mdsc->mdsmap->possible_max_rank)
1464 return;
1465
1466 mi = &mdsc->mdsmap->m_info[mds];
1467 dout("open_export_target_sessions for mds%d (%d targets)\n",
1468 session->s_mds, mi->num_export_targets);
1469
1470 for (i = 0; i < mi->num_export_targets; i++) {
1471 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1472 ceph_put_mds_session(ts);
1473 }
1474 }
1475
ceph_mdsc_open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1476 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1477 struct ceph_mds_session *session)
1478 {
1479 mutex_lock(&mdsc->mutex);
1480 __open_export_target_sessions(mdsc, session);
1481 mutex_unlock(&mdsc->mutex);
1482 }
1483
1484 /*
1485 * session caps
1486 */
1487
detach_cap_releases(struct ceph_mds_session * session,struct list_head * target)1488 static void detach_cap_releases(struct ceph_mds_session *session,
1489 struct list_head *target)
1490 {
1491 lockdep_assert_held(&session->s_cap_lock);
1492
1493 list_splice_init(&session->s_cap_releases, target);
1494 session->s_num_cap_releases = 0;
1495 dout("dispose_cap_releases mds%d\n", session->s_mds);
1496 }
1497
dispose_cap_releases(struct ceph_mds_client * mdsc,struct list_head * dispose)1498 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1499 struct list_head *dispose)
1500 {
1501 while (!list_empty(dispose)) {
1502 struct ceph_cap *cap;
1503 /* zero out the in-progress message */
1504 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1505 list_del(&cap->session_caps);
1506 ceph_put_cap(mdsc, cap);
1507 }
1508 }
1509
cleanup_session_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1510 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1511 struct ceph_mds_session *session)
1512 {
1513 struct ceph_mds_request *req;
1514 struct rb_node *p;
1515
1516 dout("cleanup_session_requests mds%d\n", session->s_mds);
1517 mutex_lock(&mdsc->mutex);
1518 while (!list_empty(&session->s_unsafe)) {
1519 req = list_first_entry(&session->s_unsafe,
1520 struct ceph_mds_request, r_unsafe_item);
1521 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1522 req->r_tid);
1523 if (req->r_target_inode)
1524 mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1525 if (req->r_unsafe_dir)
1526 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1527 __unregister_request(mdsc, req);
1528 }
1529 /* zero r_attempts, so kick_requests() will re-send requests */
1530 p = rb_first(&mdsc->request_tree);
1531 while (p) {
1532 req = rb_entry(p, struct ceph_mds_request, r_node);
1533 p = rb_next(p);
1534 if (req->r_session &&
1535 req->r_session->s_mds == session->s_mds)
1536 req->r_attempts = 0;
1537 }
1538 mutex_unlock(&mdsc->mutex);
1539 }
1540
1541 /*
1542 * Helper to safely iterate over all caps associated with a session, with
1543 * special care taken to handle a racing __ceph_remove_cap().
1544 *
1545 * Caller must hold session s_mutex.
1546 */
ceph_iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,struct ceph_cap *,void *),void * arg)1547 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1548 int (*cb)(struct inode *, struct ceph_cap *,
1549 void *), void *arg)
1550 {
1551 struct list_head *p;
1552 struct ceph_cap *cap;
1553 struct inode *inode, *last_inode = NULL;
1554 struct ceph_cap *old_cap = NULL;
1555 int ret;
1556
1557 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1558 spin_lock(&session->s_cap_lock);
1559 p = session->s_caps.next;
1560 while (p != &session->s_caps) {
1561 cap = list_entry(p, struct ceph_cap, session_caps);
1562 inode = igrab(&cap->ci->vfs_inode);
1563 if (!inode) {
1564 p = p->next;
1565 continue;
1566 }
1567 session->s_cap_iterator = cap;
1568 spin_unlock(&session->s_cap_lock);
1569
1570 if (last_inode) {
1571 /* avoid calling iput_final() while holding
1572 * s_mutex or in mds dispatch threads */
1573 ceph_async_iput(last_inode);
1574 last_inode = NULL;
1575 }
1576 if (old_cap) {
1577 ceph_put_cap(session->s_mdsc, old_cap);
1578 old_cap = NULL;
1579 }
1580
1581 ret = cb(inode, cap, arg);
1582 last_inode = inode;
1583
1584 spin_lock(&session->s_cap_lock);
1585 p = p->next;
1586 if (!cap->ci) {
1587 dout("iterate_session_caps finishing cap %p removal\n",
1588 cap);
1589 BUG_ON(cap->session != session);
1590 cap->session = NULL;
1591 list_del_init(&cap->session_caps);
1592 session->s_nr_caps--;
1593 atomic64_dec(&session->s_mdsc->metric.total_caps);
1594 if (cap->queue_release)
1595 __ceph_queue_cap_release(session, cap);
1596 else
1597 old_cap = cap; /* put_cap it w/o locks held */
1598 }
1599 if (ret < 0)
1600 goto out;
1601 }
1602 ret = 0;
1603 out:
1604 session->s_cap_iterator = NULL;
1605 spin_unlock(&session->s_cap_lock);
1606
1607 ceph_async_iput(last_inode);
1608 if (old_cap)
1609 ceph_put_cap(session->s_mdsc, old_cap);
1610
1611 return ret;
1612 }
1613
remove_capsnaps(struct ceph_mds_client * mdsc,struct inode * inode)1614 static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1615 {
1616 struct ceph_inode_info *ci = ceph_inode(inode);
1617 struct ceph_cap_snap *capsnap;
1618 int capsnap_release = 0;
1619
1620 lockdep_assert_held(&ci->i_ceph_lock);
1621
1622 dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1623
1624 while (!list_empty(&ci->i_cap_snaps)) {
1625 capsnap = list_first_entry(&ci->i_cap_snaps,
1626 struct ceph_cap_snap, ci_item);
1627 __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1628 ceph_put_snap_context(capsnap->context);
1629 ceph_put_cap_snap(capsnap);
1630 capsnap_release++;
1631 }
1632 wake_up_all(&ci->i_cap_wq);
1633 wake_up_all(&mdsc->cap_flushing_wq);
1634 return capsnap_release;
1635 }
1636
remove_session_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1637 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1638 void *arg)
1639 {
1640 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1641 struct ceph_mds_client *mdsc = fsc->mdsc;
1642 struct ceph_inode_info *ci = ceph_inode(inode);
1643 LIST_HEAD(to_remove);
1644 bool dirty_dropped = false;
1645 bool invalidate = false;
1646 int capsnap_release = 0;
1647
1648 dout("removing cap %p, ci is %p, inode is %p\n",
1649 cap, ci, &ci->vfs_inode);
1650 spin_lock(&ci->i_ceph_lock);
1651 __ceph_remove_cap(cap, false);
1652 if (!ci->i_auth_cap) {
1653 struct ceph_cap_flush *cf;
1654
1655 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1656 if (inode->i_data.nrpages > 0)
1657 invalidate = true;
1658 if (ci->i_wrbuffer_ref > 0)
1659 mapping_set_error(&inode->i_data, -EIO);
1660 }
1661
1662 while (!list_empty(&ci->i_cap_flush_list)) {
1663 cf = list_first_entry(&ci->i_cap_flush_list,
1664 struct ceph_cap_flush, i_list);
1665 list_move(&cf->i_list, &to_remove);
1666 }
1667
1668 spin_lock(&mdsc->cap_dirty_lock);
1669
1670 list_for_each_entry(cf, &to_remove, i_list)
1671 list_del_init(&cf->g_list);
1672
1673 if (!list_empty(&ci->i_dirty_item)) {
1674 pr_warn_ratelimited(
1675 " dropping dirty %s state for %p %lld\n",
1676 ceph_cap_string(ci->i_dirty_caps),
1677 inode, ceph_ino(inode));
1678 ci->i_dirty_caps = 0;
1679 list_del_init(&ci->i_dirty_item);
1680 dirty_dropped = true;
1681 }
1682 if (!list_empty(&ci->i_flushing_item)) {
1683 pr_warn_ratelimited(
1684 " dropping dirty+flushing %s state for %p %lld\n",
1685 ceph_cap_string(ci->i_flushing_caps),
1686 inode, ceph_ino(inode));
1687 ci->i_flushing_caps = 0;
1688 list_del_init(&ci->i_flushing_item);
1689 mdsc->num_cap_flushing--;
1690 dirty_dropped = true;
1691 }
1692 spin_unlock(&mdsc->cap_dirty_lock);
1693
1694 if (dirty_dropped) {
1695 mapping_set_error(inode->i_mapping, -EIO);
1696
1697 if (ci->i_wrbuffer_ref_head == 0 &&
1698 ci->i_wr_ref == 0 &&
1699 ci->i_dirty_caps == 0 &&
1700 ci->i_flushing_caps == 0) {
1701 ceph_put_snap_context(ci->i_head_snapc);
1702 ci->i_head_snapc = NULL;
1703 }
1704 }
1705
1706 if (atomic_read(&ci->i_filelock_ref) > 0) {
1707 /* make further file lock syscall return -EIO */
1708 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1709 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1710 inode, ceph_ino(inode));
1711 }
1712
1713 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1714 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1715 ci->i_prealloc_cap_flush = NULL;
1716 }
1717
1718 if (!list_empty(&ci->i_cap_snaps))
1719 capsnap_release = remove_capsnaps(mdsc, inode);
1720 }
1721 spin_unlock(&ci->i_ceph_lock);
1722 while (!list_empty(&to_remove)) {
1723 struct ceph_cap_flush *cf;
1724 cf = list_first_entry(&to_remove,
1725 struct ceph_cap_flush, i_list);
1726 list_del_init(&cf->i_list);
1727 if (!cf->is_capsnap)
1728 ceph_free_cap_flush(cf);
1729 }
1730
1731 wake_up_all(&ci->i_cap_wq);
1732 if (invalidate)
1733 ceph_queue_invalidate(inode);
1734 if (dirty_dropped)
1735 iput(inode);
1736 while (capsnap_release--)
1737 iput(inode);
1738 return 0;
1739 }
1740
1741 /*
1742 * caller must hold session s_mutex
1743 */
remove_session_caps(struct ceph_mds_session * session)1744 static void remove_session_caps(struct ceph_mds_session *session)
1745 {
1746 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1747 struct super_block *sb = fsc->sb;
1748 LIST_HEAD(dispose);
1749
1750 dout("remove_session_caps on %p\n", session);
1751 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1752
1753 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1754
1755 spin_lock(&session->s_cap_lock);
1756 if (session->s_nr_caps > 0) {
1757 struct inode *inode;
1758 struct ceph_cap *cap, *prev = NULL;
1759 struct ceph_vino vino;
1760 /*
1761 * iterate_session_caps() skips inodes that are being
1762 * deleted, we need to wait until deletions are complete.
1763 * __wait_on_freeing_inode() is designed for the job,
1764 * but it is not exported, so use lookup inode function
1765 * to access it.
1766 */
1767 while (!list_empty(&session->s_caps)) {
1768 cap = list_entry(session->s_caps.next,
1769 struct ceph_cap, session_caps);
1770 if (cap == prev)
1771 break;
1772 prev = cap;
1773 vino = cap->ci->i_vino;
1774 spin_unlock(&session->s_cap_lock);
1775
1776 inode = ceph_find_inode(sb, vino);
1777 /* avoid calling iput_final() while holding s_mutex */
1778 ceph_async_iput(inode);
1779
1780 spin_lock(&session->s_cap_lock);
1781 }
1782 }
1783
1784 // drop cap expires and unlock s_cap_lock
1785 detach_cap_releases(session, &dispose);
1786
1787 BUG_ON(session->s_nr_caps > 0);
1788 BUG_ON(!list_empty(&session->s_cap_flushing));
1789 spin_unlock(&session->s_cap_lock);
1790 dispose_cap_releases(session->s_mdsc, &dispose);
1791 }
1792
1793 enum {
1794 RECONNECT,
1795 RENEWCAPS,
1796 FORCE_RO,
1797 };
1798
1799 /*
1800 * wake up any threads waiting on this session's caps. if the cap is
1801 * old (didn't get renewed on the client reconnect), remove it now.
1802 *
1803 * caller must hold s_mutex.
1804 */
wake_up_session_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1805 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1806 void *arg)
1807 {
1808 struct ceph_inode_info *ci = ceph_inode(inode);
1809 unsigned long ev = (unsigned long)arg;
1810
1811 if (ev == RECONNECT) {
1812 spin_lock(&ci->i_ceph_lock);
1813 ci->i_wanted_max_size = 0;
1814 ci->i_requested_max_size = 0;
1815 spin_unlock(&ci->i_ceph_lock);
1816 } else if (ev == RENEWCAPS) {
1817 if (cap->cap_gen < cap->session->s_cap_gen) {
1818 /* mds did not re-issue stale cap */
1819 spin_lock(&ci->i_ceph_lock);
1820 cap->issued = cap->implemented = CEPH_CAP_PIN;
1821 spin_unlock(&ci->i_ceph_lock);
1822 }
1823 } else if (ev == FORCE_RO) {
1824 }
1825 wake_up_all(&ci->i_cap_wq);
1826 return 0;
1827 }
1828
wake_up_session_caps(struct ceph_mds_session * session,int ev)1829 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1830 {
1831 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1832 ceph_iterate_session_caps(session, wake_up_session_cb,
1833 (void *)(unsigned long)ev);
1834 }
1835
1836 /*
1837 * Send periodic message to MDS renewing all currently held caps. The
1838 * ack will reset the expiration for all caps from this session.
1839 *
1840 * caller holds s_mutex
1841 */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1842 static int send_renew_caps(struct ceph_mds_client *mdsc,
1843 struct ceph_mds_session *session)
1844 {
1845 struct ceph_msg *msg;
1846 int state;
1847
1848 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1849 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1850 pr_info("mds%d caps stale\n", session->s_mds);
1851 session->s_renew_requested = jiffies;
1852
1853 /* do not try to renew caps until a recovering mds has reconnected
1854 * with its clients. */
1855 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1856 if (state < CEPH_MDS_STATE_RECONNECT) {
1857 dout("send_renew_caps ignoring mds%d (%s)\n",
1858 session->s_mds, ceph_mds_state_name(state));
1859 return 0;
1860 }
1861
1862 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1863 ceph_mds_state_name(state));
1864 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1865 ++session->s_renew_seq);
1866 if (!msg)
1867 return -ENOMEM;
1868 ceph_con_send(&session->s_con, msg);
1869 return 0;
1870 }
1871
send_flushmsg_ack(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,u64 seq)1872 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1873 struct ceph_mds_session *session, u64 seq)
1874 {
1875 struct ceph_msg *msg;
1876
1877 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1878 session->s_mds, ceph_session_state_name(session->s_state), seq);
1879 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1880 if (!msg)
1881 return -ENOMEM;
1882 ceph_con_send(&session->s_con, msg);
1883 return 0;
1884 }
1885
1886
1887 /*
1888 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1889 *
1890 * Called under session->s_mutex
1891 */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)1892 static void renewed_caps(struct ceph_mds_client *mdsc,
1893 struct ceph_mds_session *session, int is_renew)
1894 {
1895 int was_stale;
1896 int wake = 0;
1897
1898 spin_lock(&session->s_cap_lock);
1899 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1900
1901 session->s_cap_ttl = session->s_renew_requested +
1902 mdsc->mdsmap->m_session_timeout*HZ;
1903
1904 if (was_stale) {
1905 if (time_before(jiffies, session->s_cap_ttl)) {
1906 pr_info("mds%d caps renewed\n", session->s_mds);
1907 wake = 1;
1908 } else {
1909 pr_info("mds%d caps still stale\n", session->s_mds);
1910 }
1911 }
1912 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1913 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1914 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1915 spin_unlock(&session->s_cap_lock);
1916
1917 if (wake)
1918 wake_up_session_caps(session, RENEWCAPS);
1919 }
1920
1921 /*
1922 * send a session close request
1923 */
request_close_session(struct ceph_mds_session * session)1924 static int request_close_session(struct ceph_mds_session *session)
1925 {
1926 struct ceph_msg *msg;
1927
1928 dout("request_close_session mds%d state %s seq %lld\n",
1929 session->s_mds, ceph_session_state_name(session->s_state),
1930 session->s_seq);
1931 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1932 session->s_seq);
1933 if (!msg)
1934 return -ENOMEM;
1935 ceph_con_send(&session->s_con, msg);
1936 return 1;
1937 }
1938
1939 /*
1940 * Called with s_mutex held.
1941 */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1942 static int __close_session(struct ceph_mds_client *mdsc,
1943 struct ceph_mds_session *session)
1944 {
1945 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1946 return 0;
1947 session->s_state = CEPH_MDS_SESSION_CLOSING;
1948 return request_close_session(session);
1949 }
1950
drop_negative_children(struct dentry * dentry)1951 static bool drop_negative_children(struct dentry *dentry)
1952 {
1953 struct dentry *child;
1954 bool all_negative = true;
1955
1956 if (!d_is_dir(dentry))
1957 goto out;
1958
1959 spin_lock(&dentry->d_lock);
1960 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1961 if (d_really_is_positive(child)) {
1962 all_negative = false;
1963 break;
1964 }
1965 }
1966 spin_unlock(&dentry->d_lock);
1967
1968 if (all_negative)
1969 shrink_dcache_parent(dentry);
1970 out:
1971 return all_negative;
1972 }
1973
1974 /*
1975 * Trim old(er) caps.
1976 *
1977 * Because we can't cache an inode without one or more caps, we do
1978 * this indirectly: if a cap is unused, we prune its aliases, at which
1979 * point the inode will hopefully get dropped to.
1980 *
1981 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1982 * memory pressure from the MDS, though, so it needn't be perfect.
1983 */
trim_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1984 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1985 {
1986 int *remaining = arg;
1987 struct ceph_inode_info *ci = ceph_inode(inode);
1988 int used, wanted, oissued, mine;
1989
1990 if (*remaining <= 0)
1991 return -1;
1992
1993 spin_lock(&ci->i_ceph_lock);
1994 mine = cap->issued | cap->implemented;
1995 used = __ceph_caps_used(ci);
1996 wanted = __ceph_caps_file_wanted(ci);
1997 oissued = __ceph_caps_issued_other(ci, cap);
1998
1999 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2000 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2001 ceph_cap_string(used), ceph_cap_string(wanted));
2002 if (cap == ci->i_auth_cap) {
2003 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2004 !list_empty(&ci->i_cap_snaps))
2005 goto out;
2006 if ((used | wanted) & CEPH_CAP_ANY_WR)
2007 goto out;
2008 /* Note: it's possible that i_filelock_ref becomes non-zero
2009 * after dropping auth caps. It doesn't hurt because reply
2010 * of lock mds request will re-add auth caps. */
2011 if (atomic_read(&ci->i_filelock_ref) > 0)
2012 goto out;
2013 }
2014 /* The inode has cached pages, but it's no longer used.
2015 * we can safely drop it */
2016 if (S_ISREG(inode->i_mode) &&
2017 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2018 !(oissued & CEPH_CAP_FILE_CACHE)) {
2019 used = 0;
2020 oissued = 0;
2021 }
2022 if ((used | wanted) & ~oissued & mine)
2023 goto out; /* we need these caps */
2024
2025 if (oissued) {
2026 /* we aren't the only cap.. just remove us */
2027 __ceph_remove_cap(cap, true);
2028 (*remaining)--;
2029 } else {
2030 struct dentry *dentry;
2031 /* try dropping referring dentries */
2032 spin_unlock(&ci->i_ceph_lock);
2033 dentry = d_find_any_alias(inode);
2034 if (dentry && drop_negative_children(dentry)) {
2035 int count;
2036 dput(dentry);
2037 d_prune_aliases(inode);
2038 count = atomic_read(&inode->i_count);
2039 if (count == 1)
2040 (*remaining)--;
2041 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2042 inode, cap, count);
2043 } else {
2044 dput(dentry);
2045 }
2046 return 0;
2047 }
2048
2049 out:
2050 spin_unlock(&ci->i_ceph_lock);
2051 return 0;
2052 }
2053
2054 /*
2055 * Trim session cap count down to some max number.
2056 */
ceph_trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)2057 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2058 struct ceph_mds_session *session,
2059 int max_caps)
2060 {
2061 int trim_caps = session->s_nr_caps - max_caps;
2062
2063 dout("trim_caps mds%d start: %d / %d, trim %d\n",
2064 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2065 if (trim_caps > 0) {
2066 int remaining = trim_caps;
2067
2068 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2069 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2070 session->s_mds, session->s_nr_caps, max_caps,
2071 trim_caps - remaining);
2072 }
2073
2074 ceph_flush_cap_releases(mdsc, session);
2075 return 0;
2076 }
2077
check_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2078 static int check_caps_flush(struct ceph_mds_client *mdsc,
2079 u64 want_flush_tid)
2080 {
2081 int ret = 1;
2082
2083 spin_lock(&mdsc->cap_dirty_lock);
2084 if (!list_empty(&mdsc->cap_flush_list)) {
2085 struct ceph_cap_flush *cf =
2086 list_first_entry(&mdsc->cap_flush_list,
2087 struct ceph_cap_flush, g_list);
2088 if (cf->tid <= want_flush_tid) {
2089 dout("check_caps_flush still flushing tid "
2090 "%llu <= %llu\n", cf->tid, want_flush_tid);
2091 ret = 0;
2092 }
2093 }
2094 spin_unlock(&mdsc->cap_dirty_lock);
2095 return ret;
2096 }
2097
2098 /*
2099 * flush all dirty inode data to disk.
2100 *
2101 * returns true if we've flushed through want_flush_tid
2102 */
wait_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2103 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2104 u64 want_flush_tid)
2105 {
2106 dout("check_caps_flush want %llu\n", want_flush_tid);
2107
2108 wait_event(mdsc->cap_flushing_wq,
2109 check_caps_flush(mdsc, want_flush_tid));
2110
2111 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2112 }
2113
2114 /*
2115 * called under s_mutex
2116 */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2117 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2118 struct ceph_mds_session *session)
2119 {
2120 struct ceph_msg *msg = NULL;
2121 struct ceph_mds_cap_release *head;
2122 struct ceph_mds_cap_item *item;
2123 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2124 struct ceph_cap *cap;
2125 LIST_HEAD(tmp_list);
2126 int num_cap_releases;
2127 __le32 barrier, *cap_barrier;
2128
2129 down_read(&osdc->lock);
2130 barrier = cpu_to_le32(osdc->epoch_barrier);
2131 up_read(&osdc->lock);
2132
2133 spin_lock(&session->s_cap_lock);
2134 again:
2135 list_splice_init(&session->s_cap_releases, &tmp_list);
2136 num_cap_releases = session->s_num_cap_releases;
2137 session->s_num_cap_releases = 0;
2138 spin_unlock(&session->s_cap_lock);
2139
2140 while (!list_empty(&tmp_list)) {
2141 if (!msg) {
2142 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2143 PAGE_SIZE, GFP_NOFS, false);
2144 if (!msg)
2145 goto out_err;
2146 head = msg->front.iov_base;
2147 head->num = cpu_to_le32(0);
2148 msg->front.iov_len = sizeof(*head);
2149
2150 msg->hdr.version = cpu_to_le16(2);
2151 msg->hdr.compat_version = cpu_to_le16(1);
2152 }
2153
2154 cap = list_first_entry(&tmp_list, struct ceph_cap,
2155 session_caps);
2156 list_del(&cap->session_caps);
2157 num_cap_releases--;
2158
2159 head = msg->front.iov_base;
2160 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2161 &head->num);
2162 item = msg->front.iov_base + msg->front.iov_len;
2163 item->ino = cpu_to_le64(cap->cap_ino);
2164 item->cap_id = cpu_to_le64(cap->cap_id);
2165 item->migrate_seq = cpu_to_le32(cap->mseq);
2166 item->seq = cpu_to_le32(cap->issue_seq);
2167 msg->front.iov_len += sizeof(*item);
2168
2169 ceph_put_cap(mdsc, cap);
2170
2171 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2172 // Append cap_barrier field
2173 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2174 *cap_barrier = barrier;
2175 msg->front.iov_len += sizeof(*cap_barrier);
2176
2177 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2178 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2179 ceph_con_send(&session->s_con, msg);
2180 msg = NULL;
2181 }
2182 }
2183
2184 BUG_ON(num_cap_releases != 0);
2185
2186 spin_lock(&session->s_cap_lock);
2187 if (!list_empty(&session->s_cap_releases))
2188 goto again;
2189 spin_unlock(&session->s_cap_lock);
2190
2191 if (msg) {
2192 // Append cap_barrier field
2193 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2194 *cap_barrier = barrier;
2195 msg->front.iov_len += sizeof(*cap_barrier);
2196
2197 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2198 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2199 ceph_con_send(&session->s_con, msg);
2200 }
2201 return;
2202 out_err:
2203 pr_err("send_cap_releases mds%d, failed to allocate message\n",
2204 session->s_mds);
2205 spin_lock(&session->s_cap_lock);
2206 list_splice(&tmp_list, &session->s_cap_releases);
2207 session->s_num_cap_releases += num_cap_releases;
2208 spin_unlock(&session->s_cap_lock);
2209 }
2210
ceph_cap_release_work(struct work_struct * work)2211 static void ceph_cap_release_work(struct work_struct *work)
2212 {
2213 struct ceph_mds_session *session =
2214 container_of(work, struct ceph_mds_session, s_cap_release_work);
2215
2216 mutex_lock(&session->s_mutex);
2217 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2218 session->s_state == CEPH_MDS_SESSION_HUNG)
2219 ceph_send_cap_releases(session->s_mdsc, session);
2220 mutex_unlock(&session->s_mutex);
2221 ceph_put_mds_session(session);
2222 }
2223
ceph_flush_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2224 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2225 struct ceph_mds_session *session)
2226 {
2227 if (mdsc->stopping)
2228 return;
2229
2230 ceph_get_mds_session(session);
2231 if (queue_work(mdsc->fsc->cap_wq,
2232 &session->s_cap_release_work)) {
2233 dout("cap release work queued\n");
2234 } else {
2235 ceph_put_mds_session(session);
2236 dout("failed to queue cap release work\n");
2237 }
2238 }
2239
2240 /*
2241 * caller holds session->s_cap_lock
2242 */
__ceph_queue_cap_release(struct ceph_mds_session * session,struct ceph_cap * cap)2243 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2244 struct ceph_cap *cap)
2245 {
2246 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2247 session->s_num_cap_releases++;
2248
2249 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2250 ceph_flush_cap_releases(session->s_mdsc, session);
2251 }
2252
ceph_cap_reclaim_work(struct work_struct * work)2253 static void ceph_cap_reclaim_work(struct work_struct *work)
2254 {
2255 struct ceph_mds_client *mdsc =
2256 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2257 int ret = ceph_trim_dentries(mdsc);
2258 if (ret == -EAGAIN)
2259 ceph_queue_cap_reclaim_work(mdsc);
2260 }
2261
ceph_queue_cap_reclaim_work(struct ceph_mds_client * mdsc)2262 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2263 {
2264 if (mdsc->stopping)
2265 return;
2266
2267 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2268 dout("caps reclaim work queued\n");
2269 } else {
2270 dout("failed to queue caps release work\n");
2271 }
2272 }
2273
ceph_reclaim_caps_nr(struct ceph_mds_client * mdsc,int nr)2274 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2275 {
2276 int val;
2277 if (!nr)
2278 return;
2279 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2280 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2281 atomic_set(&mdsc->cap_reclaim_pending, 0);
2282 ceph_queue_cap_reclaim_work(mdsc);
2283 }
2284 }
2285
2286 /*
2287 * requests
2288 */
2289
ceph_alloc_readdir_reply_buffer(struct ceph_mds_request * req,struct inode * dir)2290 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2291 struct inode *dir)
2292 {
2293 struct ceph_inode_info *ci = ceph_inode(dir);
2294 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2295 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2296 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2297 unsigned int num_entries;
2298 int order;
2299
2300 spin_lock(&ci->i_ceph_lock);
2301 num_entries = ci->i_files + ci->i_subdirs;
2302 spin_unlock(&ci->i_ceph_lock);
2303 num_entries = max(num_entries, 1U);
2304 num_entries = min(num_entries, opt->max_readdir);
2305
2306 order = get_order(size * num_entries);
2307 while (order >= 0) {
2308 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2309 __GFP_NOWARN,
2310 order);
2311 if (rinfo->dir_entries)
2312 break;
2313 order--;
2314 }
2315 if (!rinfo->dir_entries)
2316 return -ENOMEM;
2317
2318 num_entries = (PAGE_SIZE << order) / size;
2319 num_entries = min(num_entries, opt->max_readdir);
2320
2321 rinfo->dir_buf_size = PAGE_SIZE << order;
2322 req->r_num_caps = num_entries + 1;
2323 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2324 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2325 return 0;
2326 }
2327
2328 /*
2329 * Create an mds request.
2330 */
2331 struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)2332 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2333 {
2334 struct ceph_mds_request *req;
2335
2336 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2337 if (!req)
2338 return ERR_PTR(-ENOMEM);
2339
2340 mutex_init(&req->r_fill_mutex);
2341 req->r_mdsc = mdsc;
2342 req->r_started = jiffies;
2343 req->r_start_latency = ktime_get();
2344 req->r_resend_mds = -1;
2345 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2346 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2347 req->r_fmode = -1;
2348 kref_init(&req->r_kref);
2349 RB_CLEAR_NODE(&req->r_node);
2350 INIT_LIST_HEAD(&req->r_wait);
2351 init_completion(&req->r_completion);
2352 init_completion(&req->r_safe_completion);
2353 INIT_LIST_HEAD(&req->r_unsafe_item);
2354
2355 ktime_get_coarse_real_ts64(&req->r_stamp);
2356
2357 req->r_op = op;
2358 req->r_direct_mode = mode;
2359 return req;
2360 }
2361
2362 /*
2363 * return oldest (lowest) request, tid in request tree, 0 if none.
2364 *
2365 * called under mdsc->mutex.
2366 */
__get_oldest_req(struct ceph_mds_client * mdsc)2367 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2368 {
2369 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2370 return NULL;
2371 return rb_entry(rb_first(&mdsc->request_tree),
2372 struct ceph_mds_request, r_node);
2373 }
2374
__get_oldest_tid(struct ceph_mds_client * mdsc)2375 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2376 {
2377 return mdsc->oldest_tid;
2378 }
2379
2380 /*
2381 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2382 * on build_path_from_dentry in fs/cifs/dir.c.
2383 *
2384 * If @stop_on_nosnap, generate path relative to the first non-snapped
2385 * inode.
2386 *
2387 * Encode hidden .snap dirs as a double /, i.e.
2388 * foo/.snap/bar -> foo//bar
2389 */
ceph_mdsc_build_path(struct dentry * dentry,int * plen,u64 * pbase,int stop_on_nosnap)2390 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2391 int stop_on_nosnap)
2392 {
2393 struct dentry *temp;
2394 char *path;
2395 int pos;
2396 unsigned seq;
2397 u64 base;
2398
2399 if (!dentry)
2400 return ERR_PTR(-EINVAL);
2401
2402 path = __getname();
2403 if (!path)
2404 return ERR_PTR(-ENOMEM);
2405 retry:
2406 pos = PATH_MAX - 1;
2407 path[pos] = '\0';
2408
2409 seq = read_seqbegin(&rename_lock);
2410 rcu_read_lock();
2411 temp = dentry;
2412 for (;;) {
2413 struct inode *inode;
2414
2415 spin_lock(&temp->d_lock);
2416 inode = d_inode(temp);
2417 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2418 dout("build_path path+%d: %p SNAPDIR\n",
2419 pos, temp);
2420 } else if (stop_on_nosnap && inode && dentry != temp &&
2421 ceph_snap(inode) == CEPH_NOSNAP) {
2422 spin_unlock(&temp->d_lock);
2423 pos++; /* get rid of any prepended '/' */
2424 break;
2425 } else {
2426 pos -= temp->d_name.len;
2427 if (pos < 0) {
2428 spin_unlock(&temp->d_lock);
2429 break;
2430 }
2431 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2432 }
2433 spin_unlock(&temp->d_lock);
2434 temp = READ_ONCE(temp->d_parent);
2435
2436 /* Are we at the root? */
2437 if (IS_ROOT(temp))
2438 break;
2439
2440 /* Are we out of buffer? */
2441 if (--pos < 0)
2442 break;
2443
2444 path[pos] = '/';
2445 }
2446 base = ceph_ino(d_inode(temp));
2447 rcu_read_unlock();
2448
2449 if (read_seqretry(&rename_lock, seq))
2450 goto retry;
2451
2452 if (pos < 0) {
2453 /*
2454 * A rename didn't occur, but somehow we didn't end up where
2455 * we thought we would. Throw a warning and try again.
2456 */
2457 pr_warn("build_path did not end path lookup where "
2458 "expected, pos is %d\n", pos);
2459 goto retry;
2460 }
2461
2462 *pbase = base;
2463 *plen = PATH_MAX - 1 - pos;
2464 dout("build_path on %p %d built %llx '%.*s'\n",
2465 dentry, d_count(dentry), base, *plen, path + pos);
2466 return path + pos;
2467 }
2468
build_dentry_path(struct dentry * dentry,struct inode * dir,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath,bool parent_locked)2469 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2470 const char **ppath, int *ppathlen, u64 *pino,
2471 bool *pfreepath, bool parent_locked)
2472 {
2473 char *path;
2474
2475 rcu_read_lock();
2476 if (!dir)
2477 dir = d_inode_rcu(dentry->d_parent);
2478 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2479 *pino = ceph_ino(dir);
2480 rcu_read_unlock();
2481 *ppath = dentry->d_name.name;
2482 *ppathlen = dentry->d_name.len;
2483 return 0;
2484 }
2485 rcu_read_unlock();
2486 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2487 if (IS_ERR(path))
2488 return PTR_ERR(path);
2489 *ppath = path;
2490 *pfreepath = true;
2491 return 0;
2492 }
2493
build_inode_path(struct inode * inode,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath)2494 static int build_inode_path(struct inode *inode,
2495 const char **ppath, int *ppathlen, u64 *pino,
2496 bool *pfreepath)
2497 {
2498 struct dentry *dentry;
2499 char *path;
2500
2501 if (ceph_snap(inode) == CEPH_NOSNAP) {
2502 *pino = ceph_ino(inode);
2503 *ppathlen = 0;
2504 return 0;
2505 }
2506 dentry = d_find_alias(inode);
2507 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2508 dput(dentry);
2509 if (IS_ERR(path))
2510 return PTR_ERR(path);
2511 *ppath = path;
2512 *pfreepath = true;
2513 return 0;
2514 }
2515
2516 /*
2517 * request arguments may be specified via an inode *, a dentry *, or
2518 * an explicit ino+path.
2519 */
set_request_path_attr(struct inode * rinode,struct dentry * rdentry,struct inode * rdiri,const char * rpath,u64 rino,const char ** ppath,int * pathlen,u64 * ino,bool * freepath,bool parent_locked)2520 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2521 struct inode *rdiri, const char *rpath,
2522 u64 rino, const char **ppath, int *pathlen,
2523 u64 *ino, bool *freepath, bool parent_locked)
2524 {
2525 int r = 0;
2526
2527 if (rinode) {
2528 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2529 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2530 ceph_snap(rinode));
2531 } else if (rdentry) {
2532 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2533 freepath, parent_locked);
2534 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2535 *ppath);
2536 } else if (rpath || rino) {
2537 *ino = rino;
2538 *ppath = rpath;
2539 *pathlen = rpath ? strlen(rpath) : 0;
2540 dout(" path %.*s\n", *pathlen, rpath);
2541 }
2542
2543 return r;
2544 }
2545
2546 /*
2547 * called under mdsc->mutex
2548 */
create_request_message(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds,bool drop_cap_releases)2549 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2550 struct ceph_mds_request *req,
2551 int mds, bool drop_cap_releases)
2552 {
2553 struct ceph_msg *msg;
2554 struct ceph_mds_request_head *head;
2555 const char *path1 = NULL;
2556 const char *path2 = NULL;
2557 u64 ino1 = 0, ino2 = 0;
2558 int pathlen1 = 0, pathlen2 = 0;
2559 bool freepath1 = false, freepath2 = false;
2560 int len;
2561 u16 releases;
2562 void *p, *end;
2563 int ret;
2564
2565 ret = set_request_path_attr(req->r_inode, req->r_dentry,
2566 req->r_parent, req->r_path1, req->r_ino1.ino,
2567 &path1, &pathlen1, &ino1, &freepath1,
2568 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2569 &req->r_req_flags));
2570 if (ret < 0) {
2571 msg = ERR_PTR(ret);
2572 goto out;
2573 }
2574
2575 /* If r_old_dentry is set, then assume that its parent is locked */
2576 ret = set_request_path_attr(NULL, req->r_old_dentry,
2577 req->r_old_dentry_dir,
2578 req->r_path2, req->r_ino2.ino,
2579 &path2, &pathlen2, &ino2, &freepath2, true);
2580 if (ret < 0) {
2581 msg = ERR_PTR(ret);
2582 goto out_free1;
2583 }
2584
2585 len = sizeof(*head) +
2586 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2587 sizeof(struct ceph_timespec);
2588
2589 /* calculate (max) length for cap releases */
2590 len += sizeof(struct ceph_mds_request_release) *
2591 (!!req->r_inode_drop + !!req->r_dentry_drop +
2592 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2593 if (req->r_dentry_drop)
2594 len += pathlen1;
2595 if (req->r_old_dentry_drop)
2596 len += pathlen2;
2597
2598 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2599 if (!msg) {
2600 msg = ERR_PTR(-ENOMEM);
2601 goto out_free2;
2602 }
2603
2604 msg->hdr.version = cpu_to_le16(2);
2605 msg->hdr.tid = cpu_to_le64(req->r_tid);
2606
2607 head = msg->front.iov_base;
2608 p = msg->front.iov_base + sizeof(*head);
2609 end = msg->front.iov_base + msg->front.iov_len;
2610
2611 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2612 head->op = cpu_to_le32(req->r_op);
2613 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2614 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2615 head->ino = cpu_to_le64(req->r_deleg_ino);
2616 head->args = req->r_args;
2617
2618 ceph_encode_filepath(&p, end, ino1, path1);
2619 ceph_encode_filepath(&p, end, ino2, path2);
2620
2621 /* make note of release offset, in case we need to replay */
2622 req->r_request_release_offset = p - msg->front.iov_base;
2623
2624 /* cap releases */
2625 releases = 0;
2626 if (req->r_inode_drop)
2627 releases += ceph_encode_inode_release(&p,
2628 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2629 mds, req->r_inode_drop, req->r_inode_unless,
2630 req->r_op == CEPH_MDS_OP_READDIR);
2631 if (req->r_dentry_drop)
2632 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2633 req->r_parent, mds, req->r_dentry_drop,
2634 req->r_dentry_unless);
2635 if (req->r_old_dentry_drop)
2636 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2637 req->r_old_dentry_dir, mds,
2638 req->r_old_dentry_drop,
2639 req->r_old_dentry_unless);
2640 if (req->r_old_inode_drop)
2641 releases += ceph_encode_inode_release(&p,
2642 d_inode(req->r_old_dentry),
2643 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2644
2645 if (drop_cap_releases) {
2646 releases = 0;
2647 p = msg->front.iov_base + req->r_request_release_offset;
2648 }
2649
2650 head->num_releases = cpu_to_le16(releases);
2651
2652 /* time stamp */
2653 {
2654 struct ceph_timespec ts;
2655 ceph_encode_timespec64(&ts, &req->r_stamp);
2656 ceph_encode_copy(&p, &ts, sizeof(ts));
2657 }
2658
2659 if (WARN_ON_ONCE(p > end)) {
2660 ceph_msg_put(msg);
2661 msg = ERR_PTR(-ERANGE);
2662 goto out_free2;
2663 }
2664
2665 msg->front.iov_len = p - msg->front.iov_base;
2666 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2667
2668 if (req->r_pagelist) {
2669 struct ceph_pagelist *pagelist = req->r_pagelist;
2670 ceph_msg_data_add_pagelist(msg, pagelist);
2671 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2672 } else {
2673 msg->hdr.data_len = 0;
2674 }
2675
2676 msg->hdr.data_off = cpu_to_le16(0);
2677
2678 out_free2:
2679 if (freepath2)
2680 ceph_mdsc_free_path((char *)path2, pathlen2);
2681 out_free1:
2682 if (freepath1)
2683 ceph_mdsc_free_path((char *)path1, pathlen1);
2684 out:
2685 return msg;
2686 }
2687
2688 /*
2689 * called under mdsc->mutex if error, under no mutex if
2690 * success.
2691 */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)2692 static void complete_request(struct ceph_mds_client *mdsc,
2693 struct ceph_mds_request *req)
2694 {
2695 req->r_end_latency = ktime_get();
2696
2697 if (req->r_callback)
2698 req->r_callback(mdsc, req);
2699 complete_all(&req->r_completion);
2700 }
2701
2702 /*
2703 * called under mdsc->mutex
2704 */
__prepare_send_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds,bool drop_cap_releases)2705 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2706 struct ceph_mds_request *req,
2707 int mds, bool drop_cap_releases)
2708 {
2709 struct ceph_mds_request_head *rhead;
2710 struct ceph_msg *msg;
2711 int flags = 0;
2712
2713 req->r_attempts++;
2714 if (req->r_inode) {
2715 struct ceph_cap *cap =
2716 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2717
2718 if (cap)
2719 req->r_sent_on_mseq = cap->mseq;
2720 else
2721 req->r_sent_on_mseq = -1;
2722 }
2723 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2724 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2725
2726 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2727 void *p;
2728 /*
2729 * Replay. Do not regenerate message (and rebuild
2730 * paths, etc.); just use the original message.
2731 * Rebuilding paths will break for renames because
2732 * d_move mangles the src name.
2733 */
2734 msg = req->r_request;
2735 rhead = msg->front.iov_base;
2736
2737 flags = le32_to_cpu(rhead->flags);
2738 flags |= CEPH_MDS_FLAG_REPLAY;
2739 rhead->flags = cpu_to_le32(flags);
2740
2741 if (req->r_target_inode)
2742 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2743
2744 rhead->num_retry = req->r_attempts - 1;
2745
2746 /* remove cap/dentry releases from message */
2747 rhead->num_releases = 0;
2748
2749 /* time stamp */
2750 p = msg->front.iov_base + req->r_request_release_offset;
2751 {
2752 struct ceph_timespec ts;
2753 ceph_encode_timespec64(&ts, &req->r_stamp);
2754 ceph_encode_copy(&p, &ts, sizeof(ts));
2755 }
2756
2757 msg->front.iov_len = p - msg->front.iov_base;
2758 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2759 return 0;
2760 }
2761
2762 if (req->r_request) {
2763 ceph_msg_put(req->r_request);
2764 req->r_request = NULL;
2765 }
2766 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2767 if (IS_ERR(msg)) {
2768 req->r_err = PTR_ERR(msg);
2769 return PTR_ERR(msg);
2770 }
2771 req->r_request = msg;
2772
2773 rhead = msg->front.iov_base;
2774 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2775 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2776 flags |= CEPH_MDS_FLAG_REPLAY;
2777 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2778 flags |= CEPH_MDS_FLAG_ASYNC;
2779 if (req->r_parent)
2780 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2781 rhead->flags = cpu_to_le32(flags);
2782 rhead->num_fwd = req->r_num_fwd;
2783 rhead->num_retry = req->r_attempts - 1;
2784
2785 dout(" r_parent = %p\n", req->r_parent);
2786 return 0;
2787 }
2788
2789 /*
2790 * called under mdsc->mutex
2791 */
__send_request(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)2792 static int __send_request(struct ceph_mds_client *mdsc,
2793 struct ceph_mds_session *session,
2794 struct ceph_mds_request *req,
2795 bool drop_cap_releases)
2796 {
2797 int err;
2798
2799 err = __prepare_send_request(mdsc, req, session->s_mds,
2800 drop_cap_releases);
2801 if (!err) {
2802 ceph_msg_get(req->r_request);
2803 ceph_con_send(&session->s_con, req->r_request);
2804 }
2805
2806 return err;
2807 }
2808
2809 /*
2810 * send request, or put it on the appropriate wait list.
2811 */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)2812 static void __do_request(struct ceph_mds_client *mdsc,
2813 struct ceph_mds_request *req)
2814 {
2815 struct ceph_mds_session *session = NULL;
2816 int mds = -1;
2817 int err = 0;
2818 bool random;
2819
2820 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2821 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2822 __unregister_request(mdsc, req);
2823 return;
2824 }
2825
2826 if (req->r_timeout &&
2827 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2828 dout("do_request timed out\n");
2829 err = -ETIMEDOUT;
2830 goto finish;
2831 }
2832 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2833 dout("do_request forced umount\n");
2834 err = -EIO;
2835 goto finish;
2836 }
2837 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2838 if (mdsc->mdsmap_err) {
2839 err = mdsc->mdsmap_err;
2840 dout("do_request mdsmap err %d\n", err);
2841 goto finish;
2842 }
2843 if (mdsc->mdsmap->m_epoch == 0) {
2844 dout("do_request no mdsmap, waiting for map\n");
2845 list_add(&req->r_wait, &mdsc->waiting_for_map);
2846 return;
2847 }
2848 if (!(mdsc->fsc->mount_options->flags &
2849 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2850 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2851 err = -EHOSTUNREACH;
2852 goto finish;
2853 }
2854 }
2855
2856 put_request_session(req);
2857
2858 mds = __choose_mds(mdsc, req, &random);
2859 if (mds < 0 ||
2860 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2861 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2862 err = -EJUKEBOX;
2863 goto finish;
2864 }
2865 dout("do_request no mds or not active, waiting for map\n");
2866 list_add(&req->r_wait, &mdsc->waiting_for_map);
2867 return;
2868 }
2869
2870 /* get, open session */
2871 session = __ceph_lookup_mds_session(mdsc, mds);
2872 if (!session) {
2873 session = register_session(mdsc, mds);
2874 if (IS_ERR(session)) {
2875 err = PTR_ERR(session);
2876 goto finish;
2877 }
2878 }
2879 req->r_session = ceph_get_mds_session(session);
2880
2881 dout("do_request mds%d session %p state %s\n", mds, session,
2882 ceph_session_state_name(session->s_state));
2883 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2884 session->s_state != CEPH_MDS_SESSION_HUNG) {
2885 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2886 err = -EACCES;
2887 goto out_session;
2888 }
2889 /*
2890 * We cannot queue async requests since the caps and delegated
2891 * inodes are bound to the session. Just return -EJUKEBOX and
2892 * let the caller retry a sync request in that case.
2893 */
2894 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895 err = -EJUKEBOX;
2896 goto out_session;
2897 }
2898 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2899 session->s_state == CEPH_MDS_SESSION_CLOSING) {
2900 err = __open_session(mdsc, session);
2901 if (err)
2902 goto out_session;
2903 /* retry the same mds later */
2904 if (random)
2905 req->r_resend_mds = mds;
2906 }
2907 list_add(&req->r_wait, &session->s_waiting);
2908 goto out_session;
2909 }
2910
2911 /* send request */
2912 req->r_resend_mds = -1; /* forget any previous mds hint */
2913
2914 if (req->r_request_started == 0) /* note request start time */
2915 req->r_request_started = jiffies;
2916
2917 err = __send_request(mdsc, session, req, false);
2918
2919 out_session:
2920 ceph_put_mds_session(session);
2921 finish:
2922 if (err) {
2923 dout("__do_request early error %d\n", err);
2924 req->r_err = err;
2925 complete_request(mdsc, req);
2926 __unregister_request(mdsc, req);
2927 }
2928 return;
2929 }
2930
2931 /*
2932 * called under mdsc->mutex
2933 */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)2934 static void __wake_requests(struct ceph_mds_client *mdsc,
2935 struct list_head *head)
2936 {
2937 struct ceph_mds_request *req;
2938 LIST_HEAD(tmp_list);
2939
2940 list_splice_init(head, &tmp_list);
2941
2942 while (!list_empty(&tmp_list)) {
2943 req = list_entry(tmp_list.next,
2944 struct ceph_mds_request, r_wait);
2945 list_del_init(&req->r_wait);
2946 dout(" wake request %p tid %llu\n", req, req->r_tid);
2947 __do_request(mdsc, req);
2948 }
2949 }
2950
2951 /*
2952 * Wake up threads with requests pending for @mds, so that they can
2953 * resubmit their requests to a possibly different mds.
2954 */
kick_requests(struct ceph_mds_client * mdsc,int mds)2955 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2956 {
2957 struct ceph_mds_request *req;
2958 struct rb_node *p = rb_first(&mdsc->request_tree);
2959
2960 dout("kick_requests mds%d\n", mds);
2961 while (p) {
2962 req = rb_entry(p, struct ceph_mds_request, r_node);
2963 p = rb_next(p);
2964 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2965 continue;
2966 if (req->r_attempts > 0)
2967 continue; /* only new requests */
2968 if (req->r_session &&
2969 req->r_session->s_mds == mds) {
2970 dout(" kicking tid %llu\n", req->r_tid);
2971 list_del_init(&req->r_wait);
2972 __do_request(mdsc, req);
2973 }
2974 }
2975 }
2976
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)2977 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2978 struct ceph_mds_request *req)
2979 {
2980 int err = 0;
2981
2982 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2983 if (req->r_inode)
2984 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2985 if (req->r_parent) {
2986 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2987 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2988 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2989 spin_lock(&ci->i_ceph_lock);
2990 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2991 __ceph_touch_fmode(ci, mdsc, fmode);
2992 spin_unlock(&ci->i_ceph_lock);
2993 ihold(req->r_parent);
2994 }
2995 if (req->r_old_dentry_dir)
2996 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2997 CEPH_CAP_PIN);
2998
2999 if (req->r_inode) {
3000 err = ceph_wait_on_async_create(req->r_inode);
3001 if (err) {
3002 dout("%s: wait for async create returned: %d\n",
3003 __func__, err);
3004 return err;
3005 }
3006 }
3007
3008 if (!err && req->r_old_inode) {
3009 err = ceph_wait_on_async_create(req->r_old_inode);
3010 if (err) {
3011 dout("%s: wait for async create returned: %d\n",
3012 __func__, err);
3013 return err;
3014 }
3015 }
3016
3017 dout("submit_request on %p for inode %p\n", req, dir);
3018 mutex_lock(&mdsc->mutex);
3019 __register_request(mdsc, req, dir);
3020 __do_request(mdsc, req);
3021 err = req->r_err;
3022 mutex_unlock(&mdsc->mutex);
3023 return err;
3024 }
3025
ceph_mdsc_wait_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3026 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3027 struct ceph_mds_request *req)
3028 {
3029 int err;
3030
3031 /* wait */
3032 dout("do_request waiting\n");
3033 if (!req->r_timeout && req->r_wait_for_completion) {
3034 err = req->r_wait_for_completion(mdsc, req);
3035 } else {
3036 long timeleft = wait_for_completion_killable_timeout(
3037 &req->r_completion,
3038 ceph_timeout_jiffies(req->r_timeout));
3039 if (timeleft > 0)
3040 err = 0;
3041 else if (!timeleft)
3042 err = -ETIMEDOUT; /* timed out */
3043 else
3044 err = timeleft; /* killed */
3045 }
3046 dout("do_request waited, got %d\n", err);
3047 mutex_lock(&mdsc->mutex);
3048
3049 /* only abort if we didn't race with a real reply */
3050 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3051 err = le32_to_cpu(req->r_reply_info.head->result);
3052 } else if (err < 0) {
3053 dout("aborted request %lld with %d\n", req->r_tid, err);
3054
3055 /*
3056 * ensure we aren't running concurrently with
3057 * ceph_fill_trace or ceph_readdir_prepopulate, which
3058 * rely on locks (dir mutex) held by our caller.
3059 */
3060 mutex_lock(&req->r_fill_mutex);
3061 req->r_err = err;
3062 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3063 mutex_unlock(&req->r_fill_mutex);
3064
3065 if (req->r_parent &&
3066 (req->r_op & CEPH_MDS_OP_WRITE))
3067 ceph_invalidate_dir_request(req);
3068 } else {
3069 err = req->r_err;
3070 }
3071
3072 mutex_unlock(&mdsc->mutex);
3073 return err;
3074 }
3075
3076 /*
3077 * Synchrously perform an mds request. Take care of all of the
3078 * session setup, forwarding, retry details.
3079 */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3080 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3081 struct inode *dir,
3082 struct ceph_mds_request *req)
3083 {
3084 int err;
3085
3086 dout("do_request on %p\n", req);
3087
3088 /* issue */
3089 err = ceph_mdsc_submit_request(mdsc, dir, req);
3090 if (!err)
3091 err = ceph_mdsc_wait_request(mdsc, req);
3092 dout("do_request %p done, result %d\n", req, err);
3093 return err;
3094 }
3095
3096 /*
3097 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3098 * namespace request.
3099 */
ceph_invalidate_dir_request(struct ceph_mds_request * req)3100 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3101 {
3102 struct inode *dir = req->r_parent;
3103 struct inode *old_dir = req->r_old_dentry_dir;
3104
3105 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3106
3107 ceph_dir_clear_complete(dir);
3108 if (old_dir)
3109 ceph_dir_clear_complete(old_dir);
3110 if (req->r_dentry)
3111 ceph_invalidate_dentry_lease(req->r_dentry);
3112 if (req->r_old_dentry)
3113 ceph_invalidate_dentry_lease(req->r_old_dentry);
3114 }
3115
3116 /*
3117 * Handle mds reply.
3118 *
3119 * We take the session mutex and parse and process the reply immediately.
3120 * This preserves the logical ordering of replies, capabilities, etc., sent
3121 * by the MDS as they are applied to our local cache.
3122 */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)3123 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3124 {
3125 struct ceph_mds_client *mdsc = session->s_mdsc;
3126 struct ceph_mds_request *req;
3127 struct ceph_mds_reply_head *head = msg->front.iov_base;
3128 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3129 struct ceph_snap_realm *realm;
3130 u64 tid;
3131 int err, result;
3132 int mds = session->s_mds;
3133
3134 if (msg->front.iov_len < sizeof(*head)) {
3135 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3136 ceph_msg_dump(msg);
3137 return;
3138 }
3139
3140 /* get request, session */
3141 tid = le64_to_cpu(msg->hdr.tid);
3142 mutex_lock(&mdsc->mutex);
3143 req = lookup_get_request(mdsc, tid);
3144 if (!req) {
3145 dout("handle_reply on unknown tid %llu\n", tid);
3146 mutex_unlock(&mdsc->mutex);
3147 return;
3148 }
3149 dout("handle_reply %p\n", req);
3150
3151 /* correct session? */
3152 if (req->r_session != session) {
3153 pr_err("mdsc_handle_reply got %llu on session mds%d"
3154 " not mds%d\n", tid, session->s_mds,
3155 req->r_session ? req->r_session->s_mds : -1);
3156 mutex_unlock(&mdsc->mutex);
3157 goto out;
3158 }
3159
3160 /* dup? */
3161 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3162 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3163 pr_warn("got a dup %s reply on %llu from mds%d\n",
3164 head->safe ? "safe" : "unsafe", tid, mds);
3165 mutex_unlock(&mdsc->mutex);
3166 goto out;
3167 }
3168 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3169 pr_warn("got unsafe after safe on %llu from mds%d\n",
3170 tid, mds);
3171 mutex_unlock(&mdsc->mutex);
3172 goto out;
3173 }
3174
3175 result = le32_to_cpu(head->result);
3176
3177 /*
3178 * Handle an ESTALE
3179 * if we're not talking to the authority, send to them
3180 * if the authority has changed while we weren't looking,
3181 * send to new authority
3182 * Otherwise we just have to return an ESTALE
3183 */
3184 if (result == -ESTALE) {
3185 dout("got ESTALE on request %llu\n", req->r_tid);
3186 req->r_resend_mds = -1;
3187 if (req->r_direct_mode != USE_AUTH_MDS) {
3188 dout("not using auth, setting for that now\n");
3189 req->r_direct_mode = USE_AUTH_MDS;
3190 __do_request(mdsc, req);
3191 mutex_unlock(&mdsc->mutex);
3192 goto out;
3193 } else {
3194 int mds = __choose_mds(mdsc, req, NULL);
3195 if (mds >= 0 && mds != req->r_session->s_mds) {
3196 dout("but auth changed, so resending\n");
3197 __do_request(mdsc, req);
3198 mutex_unlock(&mdsc->mutex);
3199 goto out;
3200 }
3201 }
3202 dout("have to return ESTALE on request %llu\n", req->r_tid);
3203 }
3204
3205
3206 if (head->safe) {
3207 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3208 __unregister_request(mdsc, req);
3209
3210 /* last request during umount? */
3211 if (mdsc->stopping && !__get_oldest_req(mdsc))
3212 complete_all(&mdsc->safe_umount_waiters);
3213
3214 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3215 /*
3216 * We already handled the unsafe response, now do the
3217 * cleanup. No need to examine the response; the MDS
3218 * doesn't include any result info in the safe
3219 * response. And even if it did, there is nothing
3220 * useful we could do with a revised return value.
3221 */
3222 dout("got safe reply %llu, mds%d\n", tid, mds);
3223
3224 mutex_unlock(&mdsc->mutex);
3225 goto out;
3226 }
3227 } else {
3228 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3229 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3230 }
3231
3232 dout("handle_reply tid %lld result %d\n", tid, result);
3233 rinfo = &req->r_reply_info;
3234 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3235 err = parse_reply_info(session, msg, rinfo, (u64)-1);
3236 else
3237 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3238 mutex_unlock(&mdsc->mutex);
3239
3240 mutex_lock(&session->s_mutex);
3241 if (err < 0) {
3242 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3243 ceph_msg_dump(msg);
3244 goto out_err;
3245 }
3246
3247 /* snap trace */
3248 realm = NULL;
3249 if (rinfo->snapblob_len) {
3250 down_write(&mdsc->snap_rwsem);
3251 ceph_update_snap_trace(mdsc, rinfo->snapblob,
3252 rinfo->snapblob + rinfo->snapblob_len,
3253 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3254 &realm);
3255 downgrade_write(&mdsc->snap_rwsem);
3256 } else {
3257 down_read(&mdsc->snap_rwsem);
3258 }
3259
3260 /* insert trace into our cache */
3261 mutex_lock(&req->r_fill_mutex);
3262 current->journal_info = req;
3263 err = ceph_fill_trace(mdsc->fsc->sb, req);
3264 if (err == 0) {
3265 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3266 req->r_op == CEPH_MDS_OP_LSSNAP))
3267 ceph_readdir_prepopulate(req, req->r_session);
3268 }
3269 current->journal_info = NULL;
3270 mutex_unlock(&req->r_fill_mutex);
3271
3272 up_read(&mdsc->snap_rwsem);
3273 if (realm)
3274 ceph_put_snap_realm(mdsc, realm);
3275
3276 if (err == 0) {
3277 if (req->r_target_inode &&
3278 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3279 struct ceph_inode_info *ci =
3280 ceph_inode(req->r_target_inode);
3281 spin_lock(&ci->i_unsafe_lock);
3282 list_add_tail(&req->r_unsafe_target_item,
3283 &ci->i_unsafe_iops);
3284 spin_unlock(&ci->i_unsafe_lock);
3285 }
3286
3287 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3288 }
3289 out_err:
3290 mutex_lock(&mdsc->mutex);
3291 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3292 if (err) {
3293 req->r_err = err;
3294 } else {
3295 req->r_reply = ceph_msg_get(msg);
3296 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3297 }
3298 } else {
3299 dout("reply arrived after request %lld was aborted\n", tid);
3300 }
3301 mutex_unlock(&mdsc->mutex);
3302
3303 mutex_unlock(&session->s_mutex);
3304
3305 /* kick calling process */
3306 complete_request(mdsc, req);
3307
3308 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3309 req->r_end_latency, err);
3310 out:
3311 ceph_mdsc_put_request(req);
3312 return;
3313 }
3314
3315
3316
3317 /*
3318 * handle mds notification that our request has been forwarded.
3319 */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)3320 static void handle_forward(struct ceph_mds_client *mdsc,
3321 struct ceph_mds_session *session,
3322 struct ceph_msg *msg)
3323 {
3324 struct ceph_mds_request *req;
3325 u64 tid = le64_to_cpu(msg->hdr.tid);
3326 u32 next_mds;
3327 u32 fwd_seq;
3328 int err = -EINVAL;
3329 void *p = msg->front.iov_base;
3330 void *end = p + msg->front.iov_len;
3331
3332 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3333 next_mds = ceph_decode_32(&p);
3334 fwd_seq = ceph_decode_32(&p);
3335
3336 mutex_lock(&mdsc->mutex);
3337 req = lookup_get_request(mdsc, tid);
3338 if (!req) {
3339 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3340 goto out; /* dup reply? */
3341 }
3342
3343 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3344 dout("forward tid %llu aborted, unregistering\n", tid);
3345 __unregister_request(mdsc, req);
3346 } else if (fwd_seq <= req->r_num_fwd) {
3347 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3348 tid, next_mds, req->r_num_fwd, fwd_seq);
3349 } else {
3350 /* resend. forward race not possible; mds would drop */
3351 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3352 BUG_ON(req->r_err);
3353 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3354 req->r_attempts = 0;
3355 req->r_num_fwd = fwd_seq;
3356 req->r_resend_mds = next_mds;
3357 put_request_session(req);
3358 __do_request(mdsc, req);
3359 }
3360 ceph_mdsc_put_request(req);
3361 out:
3362 mutex_unlock(&mdsc->mutex);
3363 return;
3364
3365 bad:
3366 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3367 }
3368
__decode_session_metadata(void ** p,void * end,bool * blocklisted)3369 static int __decode_session_metadata(void **p, void *end,
3370 bool *blocklisted)
3371 {
3372 /* map<string,string> */
3373 u32 n;
3374 bool err_str;
3375 ceph_decode_32_safe(p, end, n, bad);
3376 while (n-- > 0) {
3377 u32 len;
3378 ceph_decode_32_safe(p, end, len, bad);
3379 ceph_decode_need(p, end, len, bad);
3380 err_str = !strncmp(*p, "error_string", len);
3381 *p += len;
3382 ceph_decode_32_safe(p, end, len, bad);
3383 ceph_decode_need(p, end, len, bad);
3384 /*
3385 * Match "blocklisted (blacklisted)" from newer MDSes,
3386 * or "blacklisted" from older MDSes.
3387 */
3388 if (err_str && strnstr(*p, "blacklisted", len))
3389 *blocklisted = true;
3390 *p += len;
3391 }
3392 return 0;
3393 bad:
3394 return -1;
3395 }
3396
3397 /*
3398 * handle a mds session control message
3399 */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)3400 static void handle_session(struct ceph_mds_session *session,
3401 struct ceph_msg *msg)
3402 {
3403 struct ceph_mds_client *mdsc = session->s_mdsc;
3404 int mds = session->s_mds;
3405 int msg_version = le16_to_cpu(msg->hdr.version);
3406 void *p = msg->front.iov_base;
3407 void *end = p + msg->front.iov_len;
3408 struct ceph_mds_session_head *h;
3409 u32 op;
3410 u64 seq, features = 0;
3411 int wake = 0;
3412 bool blocklisted = false;
3413
3414 /* decode */
3415 ceph_decode_need(&p, end, sizeof(*h), bad);
3416 h = p;
3417 p += sizeof(*h);
3418
3419 op = le32_to_cpu(h->op);
3420 seq = le64_to_cpu(h->seq);
3421
3422 if (msg_version >= 3) {
3423 u32 len;
3424 /* version >= 2, metadata */
3425 if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3426 goto bad;
3427 /* version >= 3, feature bits */
3428 ceph_decode_32_safe(&p, end, len, bad);
3429 if (len) {
3430 ceph_decode_64_safe(&p, end, features, bad);
3431 p += len - sizeof(features);
3432 }
3433 }
3434
3435 mutex_lock(&mdsc->mutex);
3436 if (op == CEPH_SESSION_CLOSE) {
3437 ceph_get_mds_session(session);
3438 __unregister_session(mdsc, session);
3439 }
3440 /* FIXME: this ttl calculation is generous */
3441 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3442 mutex_unlock(&mdsc->mutex);
3443
3444 mutex_lock(&session->s_mutex);
3445
3446 dout("handle_session mds%d %s %p state %s seq %llu\n",
3447 mds, ceph_session_op_name(op), session,
3448 ceph_session_state_name(session->s_state), seq);
3449
3450 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3451 session->s_state = CEPH_MDS_SESSION_OPEN;
3452 pr_info("mds%d came back\n", session->s_mds);
3453 }
3454
3455 switch (op) {
3456 case CEPH_SESSION_OPEN:
3457 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3458 pr_info("mds%d reconnect success\n", session->s_mds);
3459 session->s_state = CEPH_MDS_SESSION_OPEN;
3460 session->s_features = features;
3461 renewed_caps(mdsc, session, 0);
3462 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3463 metric_schedule_delayed(&mdsc->metric);
3464 wake = 1;
3465 if (mdsc->stopping)
3466 __close_session(mdsc, session);
3467 break;
3468
3469 case CEPH_SESSION_RENEWCAPS:
3470 if (session->s_renew_seq == seq)
3471 renewed_caps(mdsc, session, 1);
3472 break;
3473
3474 case CEPH_SESSION_CLOSE:
3475 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3476 pr_info("mds%d reconnect denied\n", session->s_mds);
3477 session->s_state = CEPH_MDS_SESSION_CLOSED;
3478 cleanup_session_requests(mdsc, session);
3479 remove_session_caps(session);
3480 wake = 2; /* for good measure */
3481 wake_up_all(&mdsc->session_close_wq);
3482 break;
3483
3484 case CEPH_SESSION_STALE:
3485 pr_info("mds%d caps went stale, renewing\n",
3486 session->s_mds);
3487 spin_lock(&session->s_gen_ttl_lock);
3488 session->s_cap_gen++;
3489 session->s_cap_ttl = jiffies - 1;
3490 spin_unlock(&session->s_gen_ttl_lock);
3491 send_renew_caps(mdsc, session);
3492 break;
3493
3494 case CEPH_SESSION_RECALL_STATE:
3495 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3496 break;
3497
3498 case CEPH_SESSION_FLUSHMSG:
3499 send_flushmsg_ack(mdsc, session, seq);
3500 break;
3501
3502 case CEPH_SESSION_FORCE_RO:
3503 dout("force_session_readonly %p\n", session);
3504 spin_lock(&session->s_cap_lock);
3505 session->s_readonly = true;
3506 spin_unlock(&session->s_cap_lock);
3507 wake_up_session_caps(session, FORCE_RO);
3508 break;
3509
3510 case CEPH_SESSION_REJECT:
3511 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3512 pr_info("mds%d rejected session\n", session->s_mds);
3513 session->s_state = CEPH_MDS_SESSION_REJECTED;
3514 cleanup_session_requests(mdsc, session);
3515 remove_session_caps(session);
3516 if (blocklisted)
3517 mdsc->fsc->blocklisted = true;
3518 wake = 2; /* for good measure */
3519 break;
3520
3521 default:
3522 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3523 WARN_ON(1);
3524 }
3525
3526 mutex_unlock(&session->s_mutex);
3527 if (wake) {
3528 mutex_lock(&mdsc->mutex);
3529 __wake_requests(mdsc, &session->s_waiting);
3530 if (wake == 2)
3531 kick_requests(mdsc, mds);
3532 mutex_unlock(&mdsc->mutex);
3533 }
3534 if (op == CEPH_SESSION_CLOSE)
3535 ceph_put_mds_session(session);
3536 return;
3537
3538 bad:
3539 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3540 (int)msg->front.iov_len);
3541 ceph_msg_dump(msg);
3542 return;
3543 }
3544
ceph_mdsc_release_dir_caps(struct ceph_mds_request * req)3545 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3546 {
3547 int dcaps;
3548
3549 dcaps = xchg(&req->r_dir_caps, 0);
3550 if (dcaps) {
3551 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3552 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3553 }
3554 }
3555
ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request * req)3556 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3557 {
3558 int dcaps;
3559
3560 dcaps = xchg(&req->r_dir_caps, 0);
3561 if (dcaps) {
3562 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3563 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3564 dcaps);
3565 }
3566 }
3567
3568 /*
3569 * called under session->mutex.
3570 */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)3571 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3572 struct ceph_mds_session *session)
3573 {
3574 struct ceph_mds_request *req, *nreq;
3575 struct rb_node *p;
3576
3577 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3578
3579 mutex_lock(&mdsc->mutex);
3580 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3581 __send_request(mdsc, session, req, true);
3582
3583 /*
3584 * also re-send old requests when MDS enters reconnect stage. So that MDS
3585 * can process completed request in clientreplay stage.
3586 */
3587 p = rb_first(&mdsc->request_tree);
3588 while (p) {
3589 req = rb_entry(p, struct ceph_mds_request, r_node);
3590 p = rb_next(p);
3591 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3592 continue;
3593 if (req->r_attempts == 0)
3594 continue; /* only old requests */
3595 if (!req->r_session)
3596 continue;
3597 if (req->r_session->s_mds != session->s_mds)
3598 continue;
3599
3600 ceph_mdsc_release_dir_caps_no_check(req);
3601
3602 __send_request(mdsc, session, req, true);
3603 }
3604 mutex_unlock(&mdsc->mutex);
3605 }
3606
send_reconnect_partial(struct ceph_reconnect_state * recon_state)3607 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3608 {
3609 struct ceph_msg *reply;
3610 struct ceph_pagelist *_pagelist;
3611 struct page *page;
3612 __le32 *addr;
3613 int err = -ENOMEM;
3614
3615 if (!recon_state->allow_multi)
3616 return -ENOSPC;
3617
3618 /* can't handle message that contains both caps and realm */
3619 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3620
3621 /* pre-allocate new pagelist */
3622 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3623 if (!_pagelist)
3624 return -ENOMEM;
3625
3626 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3627 if (!reply)
3628 goto fail_msg;
3629
3630 /* placeholder for nr_caps */
3631 err = ceph_pagelist_encode_32(_pagelist, 0);
3632 if (err < 0)
3633 goto fail;
3634
3635 if (recon_state->nr_caps) {
3636 /* currently encoding caps */
3637 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3638 if (err)
3639 goto fail;
3640 } else {
3641 /* placeholder for nr_realms (currently encoding relams) */
3642 err = ceph_pagelist_encode_32(_pagelist, 0);
3643 if (err < 0)
3644 goto fail;
3645 }
3646
3647 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3648 if (err)
3649 goto fail;
3650
3651 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3652 addr = kmap_atomic(page);
3653 if (recon_state->nr_caps) {
3654 /* currently encoding caps */
3655 *addr = cpu_to_le32(recon_state->nr_caps);
3656 } else {
3657 /* currently encoding relams */
3658 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3659 }
3660 kunmap_atomic(addr);
3661
3662 reply->hdr.version = cpu_to_le16(5);
3663 reply->hdr.compat_version = cpu_to_le16(4);
3664
3665 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3666 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3667
3668 ceph_con_send(&recon_state->session->s_con, reply);
3669 ceph_pagelist_release(recon_state->pagelist);
3670
3671 recon_state->pagelist = _pagelist;
3672 recon_state->nr_caps = 0;
3673 recon_state->nr_realms = 0;
3674 recon_state->msg_version = 5;
3675 return 0;
3676 fail:
3677 ceph_msg_put(reply);
3678 fail_msg:
3679 ceph_pagelist_release(_pagelist);
3680 return err;
3681 }
3682
d_find_primary(struct inode * inode)3683 static struct dentry* d_find_primary(struct inode *inode)
3684 {
3685 struct dentry *alias, *dn = NULL;
3686
3687 if (hlist_empty(&inode->i_dentry))
3688 return NULL;
3689
3690 spin_lock(&inode->i_lock);
3691 if (hlist_empty(&inode->i_dentry))
3692 goto out_unlock;
3693
3694 if (S_ISDIR(inode->i_mode)) {
3695 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3696 if (!IS_ROOT(alias))
3697 dn = dget(alias);
3698 goto out_unlock;
3699 }
3700
3701 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3702 spin_lock(&alias->d_lock);
3703 if (!d_unhashed(alias) &&
3704 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3705 dn = dget_dlock(alias);
3706 }
3707 spin_unlock(&alias->d_lock);
3708 if (dn)
3709 break;
3710 }
3711 out_unlock:
3712 spin_unlock(&inode->i_lock);
3713 return dn;
3714 }
3715
3716 /*
3717 * Encode information about a cap for a reconnect with the MDS.
3718 */
reconnect_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)3719 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3720 void *arg)
3721 {
3722 union {
3723 struct ceph_mds_cap_reconnect v2;
3724 struct ceph_mds_cap_reconnect_v1 v1;
3725 } rec;
3726 struct ceph_inode_info *ci = cap->ci;
3727 struct ceph_reconnect_state *recon_state = arg;
3728 struct ceph_pagelist *pagelist = recon_state->pagelist;
3729 struct dentry *dentry;
3730 char *path;
3731 int pathlen = 0, err;
3732 u64 pathbase;
3733 u64 snap_follows;
3734
3735 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3736 inode, ceph_vinop(inode), cap, cap->cap_id,
3737 ceph_cap_string(cap->issued));
3738
3739 dentry = d_find_primary(inode);
3740 if (dentry) {
3741 /* set pathbase to parent dir when msg_version >= 2 */
3742 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3743 recon_state->msg_version >= 2);
3744 dput(dentry);
3745 if (IS_ERR(path)) {
3746 err = PTR_ERR(path);
3747 goto out_err;
3748 }
3749 } else {
3750 path = NULL;
3751 pathbase = 0;
3752 }
3753
3754 spin_lock(&ci->i_ceph_lock);
3755 cap->seq = 0; /* reset cap seq */
3756 cap->issue_seq = 0; /* and issue_seq */
3757 cap->mseq = 0; /* and migrate_seq */
3758 cap->cap_gen = cap->session->s_cap_gen;
3759
3760 /* These are lost when the session goes away */
3761 if (S_ISDIR(inode->i_mode)) {
3762 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3763 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3764 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3765 }
3766 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3767 }
3768
3769 if (recon_state->msg_version >= 2) {
3770 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3771 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3772 rec.v2.issued = cpu_to_le32(cap->issued);
3773 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3774 rec.v2.pathbase = cpu_to_le64(pathbase);
3775 rec.v2.flock_len = (__force __le32)
3776 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3777 } else {
3778 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3779 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3780 rec.v1.issued = cpu_to_le32(cap->issued);
3781 rec.v1.size = cpu_to_le64(inode->i_size);
3782 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3783 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3784 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3785 rec.v1.pathbase = cpu_to_le64(pathbase);
3786 }
3787
3788 if (list_empty(&ci->i_cap_snaps)) {
3789 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3790 } else {
3791 struct ceph_cap_snap *capsnap =
3792 list_first_entry(&ci->i_cap_snaps,
3793 struct ceph_cap_snap, ci_item);
3794 snap_follows = capsnap->follows;
3795 }
3796 spin_unlock(&ci->i_ceph_lock);
3797
3798 if (recon_state->msg_version >= 2) {
3799 int num_fcntl_locks, num_flock_locks;
3800 struct ceph_filelock *flocks = NULL;
3801 size_t struct_len, total_len = sizeof(u64);
3802 u8 struct_v = 0;
3803
3804 encode_again:
3805 if (rec.v2.flock_len) {
3806 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3807 } else {
3808 num_fcntl_locks = 0;
3809 num_flock_locks = 0;
3810 }
3811 if (num_fcntl_locks + num_flock_locks > 0) {
3812 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3813 sizeof(struct ceph_filelock),
3814 GFP_NOFS);
3815 if (!flocks) {
3816 err = -ENOMEM;
3817 goto out_err;
3818 }
3819 err = ceph_encode_locks_to_buffer(inode, flocks,
3820 num_fcntl_locks,
3821 num_flock_locks);
3822 if (err) {
3823 kfree(flocks);
3824 flocks = NULL;
3825 if (err == -ENOSPC)
3826 goto encode_again;
3827 goto out_err;
3828 }
3829 } else {
3830 kfree(flocks);
3831 flocks = NULL;
3832 }
3833
3834 if (recon_state->msg_version >= 3) {
3835 /* version, compat_version and struct_len */
3836 total_len += 2 * sizeof(u8) + sizeof(u32);
3837 struct_v = 2;
3838 }
3839 /*
3840 * number of encoded locks is stable, so copy to pagelist
3841 */
3842 struct_len = 2 * sizeof(u32) +
3843 (num_fcntl_locks + num_flock_locks) *
3844 sizeof(struct ceph_filelock);
3845 rec.v2.flock_len = cpu_to_le32(struct_len);
3846
3847 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3848
3849 if (struct_v >= 2)
3850 struct_len += sizeof(u64); /* snap_follows */
3851
3852 total_len += struct_len;
3853
3854 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3855 err = send_reconnect_partial(recon_state);
3856 if (err)
3857 goto out_freeflocks;
3858 pagelist = recon_state->pagelist;
3859 }
3860
3861 err = ceph_pagelist_reserve(pagelist, total_len);
3862 if (err)
3863 goto out_freeflocks;
3864
3865 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3866 if (recon_state->msg_version >= 3) {
3867 ceph_pagelist_encode_8(pagelist, struct_v);
3868 ceph_pagelist_encode_8(pagelist, 1);
3869 ceph_pagelist_encode_32(pagelist, struct_len);
3870 }
3871 ceph_pagelist_encode_string(pagelist, path, pathlen);
3872 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3873 ceph_locks_to_pagelist(flocks, pagelist,
3874 num_fcntl_locks, num_flock_locks);
3875 if (struct_v >= 2)
3876 ceph_pagelist_encode_64(pagelist, snap_follows);
3877 out_freeflocks:
3878 kfree(flocks);
3879 } else {
3880 err = ceph_pagelist_reserve(pagelist,
3881 sizeof(u64) + sizeof(u32) +
3882 pathlen + sizeof(rec.v1));
3883 if (err)
3884 goto out_err;
3885
3886 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3887 ceph_pagelist_encode_string(pagelist, path, pathlen);
3888 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3889 }
3890
3891 out_err:
3892 ceph_mdsc_free_path(path, pathlen);
3893 if (!err)
3894 recon_state->nr_caps++;
3895 return err;
3896 }
3897
encode_snap_realms(struct ceph_mds_client * mdsc,struct ceph_reconnect_state * recon_state)3898 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3899 struct ceph_reconnect_state *recon_state)
3900 {
3901 struct rb_node *p;
3902 struct ceph_pagelist *pagelist = recon_state->pagelist;
3903 int err = 0;
3904
3905 if (recon_state->msg_version >= 4) {
3906 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3907 if (err < 0)
3908 goto fail;
3909 }
3910
3911 /*
3912 * snaprealms. we provide mds with the ino, seq (version), and
3913 * parent for all of our realms. If the mds has any newer info,
3914 * it will tell us.
3915 */
3916 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3917 struct ceph_snap_realm *realm =
3918 rb_entry(p, struct ceph_snap_realm, node);
3919 struct ceph_mds_snaprealm_reconnect sr_rec;
3920
3921 if (recon_state->msg_version >= 4) {
3922 size_t need = sizeof(u8) * 2 + sizeof(u32) +
3923 sizeof(sr_rec);
3924
3925 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3926 err = send_reconnect_partial(recon_state);
3927 if (err)
3928 goto fail;
3929 pagelist = recon_state->pagelist;
3930 }
3931
3932 err = ceph_pagelist_reserve(pagelist, need);
3933 if (err)
3934 goto fail;
3935
3936 ceph_pagelist_encode_8(pagelist, 1);
3937 ceph_pagelist_encode_8(pagelist, 1);
3938 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3939 }
3940
3941 dout(" adding snap realm %llx seq %lld parent %llx\n",
3942 realm->ino, realm->seq, realm->parent_ino);
3943 sr_rec.ino = cpu_to_le64(realm->ino);
3944 sr_rec.seq = cpu_to_le64(realm->seq);
3945 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3946
3947 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3948 if (err)
3949 goto fail;
3950
3951 recon_state->nr_realms++;
3952 }
3953 fail:
3954 return err;
3955 }
3956
3957
3958 /*
3959 * If an MDS fails and recovers, clients need to reconnect in order to
3960 * reestablish shared state. This includes all caps issued through
3961 * this session _and_ the snap_realm hierarchy. Because it's not
3962 * clear which snap realms the mds cares about, we send everything we
3963 * know about.. that ensures we'll then get any new info the
3964 * recovering MDS might have.
3965 *
3966 * This is a relatively heavyweight operation, but it's rare.
3967 */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)3968 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3969 struct ceph_mds_session *session)
3970 {
3971 struct ceph_msg *reply;
3972 int mds = session->s_mds;
3973 int err = -ENOMEM;
3974 struct ceph_reconnect_state recon_state = {
3975 .session = session,
3976 };
3977 LIST_HEAD(dispose);
3978
3979 pr_info("mds%d reconnect start\n", mds);
3980
3981 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3982 if (!recon_state.pagelist)
3983 goto fail_nopagelist;
3984
3985 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3986 if (!reply)
3987 goto fail_nomsg;
3988
3989 xa_destroy(&session->s_delegated_inos);
3990
3991 mutex_lock(&session->s_mutex);
3992 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3993 session->s_seq = 0;
3994
3995 dout("session %p state %s\n", session,
3996 ceph_session_state_name(session->s_state));
3997
3998 spin_lock(&session->s_gen_ttl_lock);
3999 session->s_cap_gen++;
4000 spin_unlock(&session->s_gen_ttl_lock);
4001
4002 spin_lock(&session->s_cap_lock);
4003 /* don't know if session is readonly */
4004 session->s_readonly = 0;
4005 /*
4006 * notify __ceph_remove_cap() that we are composing cap reconnect.
4007 * If a cap get released before being added to the cap reconnect,
4008 * __ceph_remove_cap() should skip queuing cap release.
4009 */
4010 session->s_cap_reconnect = 1;
4011 /* drop old cap expires; we're about to reestablish that state */
4012 detach_cap_releases(session, &dispose);
4013 spin_unlock(&session->s_cap_lock);
4014 dispose_cap_releases(mdsc, &dispose);
4015
4016 /* trim unused caps to reduce MDS's cache rejoin time */
4017 if (mdsc->fsc->sb->s_root)
4018 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4019
4020 ceph_con_close(&session->s_con);
4021 ceph_con_open(&session->s_con,
4022 CEPH_ENTITY_TYPE_MDS, mds,
4023 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4024
4025 /* replay unsafe requests */
4026 replay_unsafe_requests(mdsc, session);
4027
4028 ceph_early_kick_flushing_caps(mdsc, session);
4029
4030 down_read(&mdsc->snap_rwsem);
4031
4032 /* placeholder for nr_caps */
4033 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4034 if (err)
4035 goto fail;
4036
4037 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4038 recon_state.msg_version = 3;
4039 recon_state.allow_multi = true;
4040 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4041 recon_state.msg_version = 3;
4042 } else {
4043 recon_state.msg_version = 2;
4044 }
4045 /* trsaverse this session's caps */
4046 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4047
4048 spin_lock(&session->s_cap_lock);
4049 session->s_cap_reconnect = 0;
4050 spin_unlock(&session->s_cap_lock);
4051
4052 if (err < 0)
4053 goto fail;
4054
4055 /* check if all realms can be encoded into current message */
4056 if (mdsc->num_snap_realms) {
4057 size_t total_len =
4058 recon_state.pagelist->length +
4059 mdsc->num_snap_realms *
4060 sizeof(struct ceph_mds_snaprealm_reconnect);
4061 if (recon_state.msg_version >= 4) {
4062 /* number of realms */
4063 total_len += sizeof(u32);
4064 /* version, compat_version and struct_len */
4065 total_len += mdsc->num_snap_realms *
4066 (2 * sizeof(u8) + sizeof(u32));
4067 }
4068 if (total_len > RECONNECT_MAX_SIZE) {
4069 if (!recon_state.allow_multi) {
4070 err = -ENOSPC;
4071 goto fail;
4072 }
4073 if (recon_state.nr_caps) {
4074 err = send_reconnect_partial(&recon_state);
4075 if (err)
4076 goto fail;
4077 }
4078 recon_state.msg_version = 5;
4079 }
4080 }
4081
4082 err = encode_snap_realms(mdsc, &recon_state);
4083 if (err < 0)
4084 goto fail;
4085
4086 if (recon_state.msg_version >= 5) {
4087 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4088 if (err < 0)
4089 goto fail;
4090 }
4091
4092 if (recon_state.nr_caps || recon_state.nr_realms) {
4093 struct page *page =
4094 list_first_entry(&recon_state.pagelist->head,
4095 struct page, lru);
4096 __le32 *addr = kmap_atomic(page);
4097 if (recon_state.nr_caps) {
4098 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4099 *addr = cpu_to_le32(recon_state.nr_caps);
4100 } else if (recon_state.msg_version >= 4) {
4101 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4102 }
4103 kunmap_atomic(addr);
4104 }
4105
4106 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4107 if (recon_state.msg_version >= 4)
4108 reply->hdr.compat_version = cpu_to_le16(4);
4109
4110 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4111 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4112
4113 ceph_con_send(&session->s_con, reply);
4114
4115 mutex_unlock(&session->s_mutex);
4116
4117 mutex_lock(&mdsc->mutex);
4118 __wake_requests(mdsc, &session->s_waiting);
4119 mutex_unlock(&mdsc->mutex);
4120
4121 up_read(&mdsc->snap_rwsem);
4122 ceph_pagelist_release(recon_state.pagelist);
4123 return;
4124
4125 fail:
4126 ceph_msg_put(reply);
4127 up_read(&mdsc->snap_rwsem);
4128 mutex_unlock(&session->s_mutex);
4129 fail_nomsg:
4130 ceph_pagelist_release(recon_state.pagelist);
4131 fail_nopagelist:
4132 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4133 return;
4134 }
4135
4136
4137 /*
4138 * compare old and new mdsmaps, kicking requests
4139 * and closing out old connections as necessary
4140 *
4141 * called under mdsc->mutex.
4142 */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)4143 static void check_new_map(struct ceph_mds_client *mdsc,
4144 struct ceph_mdsmap *newmap,
4145 struct ceph_mdsmap *oldmap)
4146 {
4147 int i;
4148 int oldstate, newstate;
4149 struct ceph_mds_session *s;
4150
4151 dout("check_new_map new %u old %u\n",
4152 newmap->m_epoch, oldmap->m_epoch);
4153
4154 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4155 if (!mdsc->sessions[i])
4156 continue;
4157 s = mdsc->sessions[i];
4158 oldstate = ceph_mdsmap_get_state(oldmap, i);
4159 newstate = ceph_mdsmap_get_state(newmap, i);
4160
4161 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4162 i, ceph_mds_state_name(oldstate),
4163 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4164 ceph_mds_state_name(newstate),
4165 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4166 ceph_session_state_name(s->s_state));
4167
4168 if (i >= newmap->possible_max_rank) {
4169 /* force close session for stopped mds */
4170 ceph_get_mds_session(s);
4171 __unregister_session(mdsc, s);
4172 __wake_requests(mdsc, &s->s_waiting);
4173 mutex_unlock(&mdsc->mutex);
4174
4175 mutex_lock(&s->s_mutex);
4176 cleanup_session_requests(mdsc, s);
4177 remove_session_caps(s);
4178 mutex_unlock(&s->s_mutex);
4179
4180 ceph_put_mds_session(s);
4181
4182 mutex_lock(&mdsc->mutex);
4183 kick_requests(mdsc, i);
4184 continue;
4185 }
4186
4187 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4188 ceph_mdsmap_get_addr(newmap, i),
4189 sizeof(struct ceph_entity_addr))) {
4190 /* just close it */
4191 mutex_unlock(&mdsc->mutex);
4192 mutex_lock(&s->s_mutex);
4193 mutex_lock(&mdsc->mutex);
4194 ceph_con_close(&s->s_con);
4195 mutex_unlock(&s->s_mutex);
4196 s->s_state = CEPH_MDS_SESSION_RESTARTING;
4197 } else if (oldstate == newstate) {
4198 continue; /* nothing new with this mds */
4199 }
4200
4201 /*
4202 * send reconnect?
4203 */
4204 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4205 newstate >= CEPH_MDS_STATE_RECONNECT) {
4206 mutex_unlock(&mdsc->mutex);
4207 send_mds_reconnect(mdsc, s);
4208 mutex_lock(&mdsc->mutex);
4209 }
4210
4211 /*
4212 * kick request on any mds that has gone active.
4213 */
4214 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4215 newstate >= CEPH_MDS_STATE_ACTIVE) {
4216 if (oldstate != CEPH_MDS_STATE_CREATING &&
4217 oldstate != CEPH_MDS_STATE_STARTING)
4218 pr_info("mds%d recovery completed\n", s->s_mds);
4219 kick_requests(mdsc, i);
4220 mutex_unlock(&mdsc->mutex);
4221 mutex_lock(&s->s_mutex);
4222 mutex_lock(&mdsc->mutex);
4223 ceph_kick_flushing_caps(mdsc, s);
4224 mutex_unlock(&s->s_mutex);
4225 wake_up_session_caps(s, RECONNECT);
4226 }
4227 }
4228
4229 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4230 s = mdsc->sessions[i];
4231 if (!s)
4232 continue;
4233 if (!ceph_mdsmap_is_laggy(newmap, i))
4234 continue;
4235 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4236 s->s_state == CEPH_MDS_SESSION_HUNG ||
4237 s->s_state == CEPH_MDS_SESSION_CLOSING) {
4238 dout(" connecting to export targets of laggy mds%d\n",
4239 i);
4240 __open_export_target_sessions(mdsc, s);
4241 }
4242 }
4243 }
4244
4245
4246
4247 /*
4248 * leases
4249 */
4250
4251 /*
4252 * caller must hold session s_mutex, dentry->d_lock
4253 */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)4254 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4255 {
4256 struct ceph_dentry_info *di = ceph_dentry(dentry);
4257
4258 ceph_put_mds_session(di->lease_session);
4259 di->lease_session = NULL;
4260 }
4261
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)4262 static void handle_lease(struct ceph_mds_client *mdsc,
4263 struct ceph_mds_session *session,
4264 struct ceph_msg *msg)
4265 {
4266 struct super_block *sb = mdsc->fsc->sb;
4267 struct inode *inode;
4268 struct dentry *parent, *dentry;
4269 struct ceph_dentry_info *di;
4270 int mds = session->s_mds;
4271 struct ceph_mds_lease *h = msg->front.iov_base;
4272 u32 seq;
4273 struct ceph_vino vino;
4274 struct qstr dname;
4275 int release = 0;
4276
4277 dout("handle_lease from mds%d\n", mds);
4278
4279 /* decode */
4280 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4281 goto bad;
4282 vino.ino = le64_to_cpu(h->ino);
4283 vino.snap = CEPH_NOSNAP;
4284 seq = le32_to_cpu(h->seq);
4285 dname.len = get_unaligned_le32(h + 1);
4286 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4287 goto bad;
4288 dname.name = (void *)(h + 1) + sizeof(u32);
4289
4290 /* lookup inode */
4291 inode = ceph_find_inode(sb, vino);
4292 dout("handle_lease %s, ino %llx %p %.*s\n",
4293 ceph_lease_op_name(h->action), vino.ino, inode,
4294 dname.len, dname.name);
4295
4296 mutex_lock(&session->s_mutex);
4297 inc_session_sequence(session);
4298
4299 if (!inode) {
4300 dout("handle_lease no inode %llx\n", vino.ino);
4301 goto release;
4302 }
4303
4304 /* dentry */
4305 parent = d_find_alias(inode);
4306 if (!parent) {
4307 dout("no parent dentry on inode %p\n", inode);
4308 WARN_ON(1);
4309 goto release; /* hrm... */
4310 }
4311 dname.hash = full_name_hash(parent, dname.name, dname.len);
4312 dentry = d_lookup(parent, &dname);
4313 dput(parent);
4314 if (!dentry)
4315 goto release;
4316
4317 spin_lock(&dentry->d_lock);
4318 di = ceph_dentry(dentry);
4319 switch (h->action) {
4320 case CEPH_MDS_LEASE_REVOKE:
4321 if (di->lease_session == session) {
4322 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4323 h->seq = cpu_to_le32(di->lease_seq);
4324 __ceph_mdsc_drop_dentry_lease(dentry);
4325 }
4326 release = 1;
4327 break;
4328
4329 case CEPH_MDS_LEASE_RENEW:
4330 if (di->lease_session == session &&
4331 di->lease_gen == session->s_cap_gen &&
4332 di->lease_renew_from &&
4333 di->lease_renew_after == 0) {
4334 unsigned long duration =
4335 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4336
4337 di->lease_seq = seq;
4338 di->time = di->lease_renew_from + duration;
4339 di->lease_renew_after = di->lease_renew_from +
4340 (duration >> 1);
4341 di->lease_renew_from = 0;
4342 }
4343 break;
4344 }
4345 spin_unlock(&dentry->d_lock);
4346 dput(dentry);
4347
4348 if (!release)
4349 goto out;
4350
4351 release:
4352 /* let's just reuse the same message */
4353 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4354 ceph_msg_get(msg);
4355 ceph_con_send(&session->s_con, msg);
4356
4357 out:
4358 mutex_unlock(&session->s_mutex);
4359 /* avoid calling iput_final() in mds dispatch threads */
4360 ceph_async_iput(inode);
4361 return;
4362
4363 bad:
4364 pr_err("corrupt lease message\n");
4365 ceph_msg_dump(msg);
4366 }
4367
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct dentry * dentry,char action,u32 seq)4368 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4369 struct dentry *dentry, char action,
4370 u32 seq)
4371 {
4372 struct ceph_msg *msg;
4373 struct ceph_mds_lease *lease;
4374 struct inode *dir;
4375 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4376
4377 dout("lease_send_msg identry %p %s to mds%d\n",
4378 dentry, ceph_lease_op_name(action), session->s_mds);
4379
4380 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4381 if (!msg)
4382 return;
4383 lease = msg->front.iov_base;
4384 lease->action = action;
4385 lease->seq = cpu_to_le32(seq);
4386
4387 spin_lock(&dentry->d_lock);
4388 dir = d_inode(dentry->d_parent);
4389 lease->ino = cpu_to_le64(ceph_ino(dir));
4390 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4391
4392 put_unaligned_le32(dentry->d_name.len, lease + 1);
4393 memcpy((void *)(lease + 1) + 4,
4394 dentry->d_name.name, dentry->d_name.len);
4395 spin_unlock(&dentry->d_lock);
4396 /*
4397 * if this is a preemptive lease RELEASE, no need to
4398 * flush request stream, since the actual request will
4399 * soon follow.
4400 */
4401 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4402
4403 ceph_con_send(&session->s_con, msg);
4404 }
4405
4406 /*
4407 * lock unlock the session, to wait ongoing session activities
4408 */
lock_unlock_session(struct ceph_mds_session * s)4409 static void lock_unlock_session(struct ceph_mds_session *s)
4410 {
4411 mutex_lock(&s->s_mutex);
4412 mutex_unlock(&s->s_mutex);
4413 }
4414
maybe_recover_session(struct ceph_mds_client * mdsc)4415 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4416 {
4417 struct ceph_fs_client *fsc = mdsc->fsc;
4418
4419 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4420 return;
4421
4422 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4423 return;
4424
4425 if (!READ_ONCE(fsc->blocklisted))
4426 return;
4427
4428 if (fsc->last_auto_reconnect &&
4429 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4430 return;
4431
4432 pr_info("auto reconnect after blocklisted\n");
4433 fsc->last_auto_reconnect = jiffies;
4434 ceph_force_reconnect(fsc->sb);
4435 }
4436
check_session_state(struct ceph_mds_session * s)4437 bool check_session_state(struct ceph_mds_session *s)
4438 {
4439 switch (s->s_state) {
4440 case CEPH_MDS_SESSION_OPEN:
4441 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4442 s->s_state = CEPH_MDS_SESSION_HUNG;
4443 pr_info("mds%d hung\n", s->s_mds);
4444 }
4445 break;
4446 case CEPH_MDS_SESSION_CLOSING:
4447 /* Should never reach this when we're unmounting */
4448 WARN_ON_ONCE(s->s_ttl);
4449 fallthrough;
4450 case CEPH_MDS_SESSION_NEW:
4451 case CEPH_MDS_SESSION_RESTARTING:
4452 case CEPH_MDS_SESSION_CLOSED:
4453 case CEPH_MDS_SESSION_REJECTED:
4454 return false;
4455 }
4456
4457 return true;
4458 }
4459
4460 /*
4461 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4462 * then we need to retransmit that request.
4463 */
inc_session_sequence(struct ceph_mds_session * s)4464 void inc_session_sequence(struct ceph_mds_session *s)
4465 {
4466 lockdep_assert_held(&s->s_mutex);
4467
4468 s->s_seq++;
4469
4470 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4471 int ret;
4472
4473 dout("resending session close request for mds%d\n", s->s_mds);
4474 ret = request_close_session(s);
4475 if (ret < 0)
4476 pr_err("unable to close session to mds%d: %d\n",
4477 s->s_mds, ret);
4478 }
4479 }
4480
4481 /*
4482 * delayed work -- periodically trim expired leases, renew caps with mds. If
4483 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4484 * workqueue delay value of 5 secs will be used.
4485 */
schedule_delayed(struct ceph_mds_client * mdsc,unsigned long delay)4486 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4487 {
4488 unsigned long max_delay = HZ * 5;
4489
4490 /* 5 secs default delay */
4491 if (!delay || (delay > max_delay))
4492 delay = max_delay;
4493 schedule_delayed_work(&mdsc->delayed_work,
4494 round_jiffies_relative(delay));
4495 }
4496
delayed_work(struct work_struct * work)4497 static void delayed_work(struct work_struct *work)
4498 {
4499 struct ceph_mds_client *mdsc =
4500 container_of(work, struct ceph_mds_client, delayed_work.work);
4501 unsigned long delay;
4502 int renew_interval;
4503 int renew_caps;
4504 int i;
4505
4506 dout("mdsc delayed_work\n");
4507
4508 if (mdsc->stopping)
4509 return;
4510
4511 mutex_lock(&mdsc->mutex);
4512 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4513 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4514 mdsc->last_renew_caps);
4515 if (renew_caps)
4516 mdsc->last_renew_caps = jiffies;
4517
4518 for (i = 0; i < mdsc->max_sessions; i++) {
4519 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4520 if (!s)
4521 continue;
4522
4523 if (!check_session_state(s)) {
4524 ceph_put_mds_session(s);
4525 continue;
4526 }
4527 mutex_unlock(&mdsc->mutex);
4528
4529 mutex_lock(&s->s_mutex);
4530 if (renew_caps)
4531 send_renew_caps(mdsc, s);
4532 else
4533 ceph_con_keepalive(&s->s_con);
4534 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4535 s->s_state == CEPH_MDS_SESSION_HUNG)
4536 ceph_send_cap_releases(mdsc, s);
4537 mutex_unlock(&s->s_mutex);
4538 ceph_put_mds_session(s);
4539
4540 mutex_lock(&mdsc->mutex);
4541 }
4542 mutex_unlock(&mdsc->mutex);
4543
4544 delay = ceph_check_delayed_caps(mdsc);
4545
4546 ceph_queue_cap_reclaim_work(mdsc);
4547
4548 ceph_trim_snapid_map(mdsc);
4549
4550 maybe_recover_session(mdsc);
4551
4552 schedule_delayed(mdsc, delay);
4553 }
4554
ceph_mdsc_init(struct ceph_fs_client * fsc)4555 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4556
4557 {
4558 struct ceph_mds_client *mdsc;
4559 int err;
4560
4561 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4562 if (!mdsc)
4563 return -ENOMEM;
4564 mdsc->fsc = fsc;
4565 mutex_init(&mdsc->mutex);
4566 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4567 if (!mdsc->mdsmap) {
4568 err = -ENOMEM;
4569 goto err_mdsc;
4570 }
4571
4572 init_completion(&mdsc->safe_umount_waiters);
4573 init_waitqueue_head(&mdsc->session_close_wq);
4574 INIT_LIST_HEAD(&mdsc->waiting_for_map);
4575 mdsc->sessions = NULL;
4576 atomic_set(&mdsc->num_sessions, 0);
4577 mdsc->max_sessions = 0;
4578 mdsc->stopping = 0;
4579 atomic64_set(&mdsc->quotarealms_count, 0);
4580 mdsc->quotarealms_inodes = RB_ROOT;
4581 mutex_init(&mdsc->quotarealms_inodes_mutex);
4582 mdsc->last_snap_seq = 0;
4583 init_rwsem(&mdsc->snap_rwsem);
4584 mdsc->snap_realms = RB_ROOT;
4585 INIT_LIST_HEAD(&mdsc->snap_empty);
4586 mdsc->num_snap_realms = 0;
4587 spin_lock_init(&mdsc->snap_empty_lock);
4588 mdsc->last_tid = 0;
4589 mdsc->oldest_tid = 0;
4590 mdsc->request_tree = RB_ROOT;
4591 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4592 mdsc->last_renew_caps = jiffies;
4593 INIT_LIST_HEAD(&mdsc->cap_delay_list);
4594 INIT_LIST_HEAD(&mdsc->cap_wait_list);
4595 spin_lock_init(&mdsc->cap_delay_lock);
4596 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4597 spin_lock_init(&mdsc->snap_flush_lock);
4598 mdsc->last_cap_flush_tid = 1;
4599 INIT_LIST_HEAD(&mdsc->cap_flush_list);
4600 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4601 mdsc->num_cap_flushing = 0;
4602 spin_lock_init(&mdsc->cap_dirty_lock);
4603 init_waitqueue_head(&mdsc->cap_flushing_wq);
4604 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4605 atomic_set(&mdsc->cap_reclaim_pending, 0);
4606 err = ceph_metric_init(&mdsc->metric);
4607 if (err)
4608 goto err_mdsmap;
4609
4610 spin_lock_init(&mdsc->dentry_list_lock);
4611 INIT_LIST_HEAD(&mdsc->dentry_leases);
4612 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4613
4614 ceph_caps_init(mdsc);
4615 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4616
4617 spin_lock_init(&mdsc->snapid_map_lock);
4618 mdsc->snapid_map_tree = RB_ROOT;
4619 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4620
4621 init_rwsem(&mdsc->pool_perm_rwsem);
4622 mdsc->pool_perm_tree = RB_ROOT;
4623
4624 strscpy(mdsc->nodename, utsname()->nodename,
4625 sizeof(mdsc->nodename));
4626
4627 fsc->mdsc = mdsc;
4628 return 0;
4629
4630 err_mdsmap:
4631 kfree(mdsc->mdsmap);
4632 err_mdsc:
4633 kfree(mdsc);
4634 return err;
4635 }
4636
4637 /*
4638 * Wait for safe replies on open mds requests. If we time out, drop
4639 * all requests from the tree to avoid dangling dentry refs.
4640 */
wait_requests(struct ceph_mds_client * mdsc)4641 static void wait_requests(struct ceph_mds_client *mdsc)
4642 {
4643 struct ceph_options *opts = mdsc->fsc->client->options;
4644 struct ceph_mds_request *req;
4645
4646 mutex_lock(&mdsc->mutex);
4647 if (__get_oldest_req(mdsc)) {
4648 mutex_unlock(&mdsc->mutex);
4649
4650 dout("wait_requests waiting for requests\n");
4651 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4652 ceph_timeout_jiffies(opts->mount_timeout));
4653
4654 /* tear down remaining requests */
4655 mutex_lock(&mdsc->mutex);
4656 while ((req = __get_oldest_req(mdsc))) {
4657 dout("wait_requests timed out on tid %llu\n",
4658 req->r_tid);
4659 list_del_init(&req->r_wait);
4660 __unregister_request(mdsc, req);
4661 }
4662 }
4663 mutex_unlock(&mdsc->mutex);
4664 dout("wait_requests done\n");
4665 }
4666
send_flush_mdlog(struct ceph_mds_session * s)4667 void send_flush_mdlog(struct ceph_mds_session *s)
4668 {
4669 struct ceph_msg *msg;
4670
4671 /*
4672 * Pre-luminous MDS crashes when it sees an unknown session request
4673 */
4674 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4675 return;
4676
4677 mutex_lock(&s->s_mutex);
4678 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4679 ceph_session_state_name(s->s_state), s->s_seq);
4680 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4681 s->s_seq);
4682 if (!msg) {
4683 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4684 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4685 } else {
4686 ceph_con_send(&s->s_con, msg);
4687 }
4688 mutex_unlock(&s->s_mutex);
4689 }
4690
4691 /*
4692 * called before mount is ro, and before dentries are torn down.
4693 * (hmm, does this still race with new lookups?)
4694 */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)4695 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4696 {
4697 dout("pre_umount\n");
4698 mdsc->stopping = 1;
4699
4700 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4701 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4702 ceph_flush_dirty_caps(mdsc);
4703 wait_requests(mdsc);
4704
4705 /*
4706 * wait for reply handlers to drop their request refs and
4707 * their inode/dcache refs
4708 */
4709 ceph_msgr_flush();
4710
4711 ceph_cleanup_quotarealms_inodes(mdsc);
4712 }
4713
4714 /*
4715 * wait for all write mds requests to flush.
4716 */
wait_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)4717 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4718 {
4719 struct ceph_mds_request *req = NULL, *nextreq;
4720 struct rb_node *n;
4721
4722 mutex_lock(&mdsc->mutex);
4723 dout("wait_unsafe_requests want %lld\n", want_tid);
4724 restart:
4725 req = __get_oldest_req(mdsc);
4726 while (req && req->r_tid <= want_tid) {
4727 /* find next request */
4728 n = rb_next(&req->r_node);
4729 if (n)
4730 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4731 else
4732 nextreq = NULL;
4733 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4734 (req->r_op & CEPH_MDS_OP_WRITE)) {
4735 /* write op */
4736 ceph_mdsc_get_request(req);
4737 if (nextreq)
4738 ceph_mdsc_get_request(nextreq);
4739 mutex_unlock(&mdsc->mutex);
4740 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4741 req->r_tid, want_tid);
4742 wait_for_completion(&req->r_safe_completion);
4743 mutex_lock(&mdsc->mutex);
4744 ceph_mdsc_put_request(req);
4745 if (!nextreq)
4746 break; /* next dne before, so we're done! */
4747 if (RB_EMPTY_NODE(&nextreq->r_node)) {
4748 /* next request was removed from tree */
4749 ceph_mdsc_put_request(nextreq);
4750 goto restart;
4751 }
4752 ceph_mdsc_put_request(nextreq); /* won't go away */
4753 }
4754 req = nextreq;
4755 }
4756 mutex_unlock(&mdsc->mutex);
4757 dout("wait_unsafe_requests done\n");
4758 }
4759
ceph_mdsc_sync(struct ceph_mds_client * mdsc)4760 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4761 {
4762 u64 want_tid, want_flush;
4763
4764 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4765 return;
4766
4767 dout("sync\n");
4768 mutex_lock(&mdsc->mutex);
4769 want_tid = mdsc->last_tid;
4770 mutex_unlock(&mdsc->mutex);
4771
4772 ceph_flush_dirty_caps(mdsc);
4773 spin_lock(&mdsc->cap_dirty_lock);
4774 want_flush = mdsc->last_cap_flush_tid;
4775 if (!list_empty(&mdsc->cap_flush_list)) {
4776 struct ceph_cap_flush *cf =
4777 list_last_entry(&mdsc->cap_flush_list,
4778 struct ceph_cap_flush, g_list);
4779 cf->wake = true;
4780 }
4781 spin_unlock(&mdsc->cap_dirty_lock);
4782
4783 dout("sync want tid %lld flush_seq %lld\n",
4784 want_tid, want_flush);
4785
4786 wait_unsafe_requests(mdsc, want_tid);
4787 wait_caps_flush(mdsc, want_flush);
4788 }
4789
4790 /*
4791 * true if all sessions are closed, or we force unmount
4792 */
done_closing_sessions(struct ceph_mds_client * mdsc,int skipped)4793 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4794 {
4795 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4796 return true;
4797 return atomic_read(&mdsc->num_sessions) <= skipped;
4798 }
4799
4800 /*
4801 * called after sb is ro.
4802 */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)4803 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4804 {
4805 struct ceph_options *opts = mdsc->fsc->client->options;
4806 struct ceph_mds_session *session;
4807 int i;
4808 int skipped = 0;
4809
4810 dout("close_sessions\n");
4811
4812 /* close sessions */
4813 mutex_lock(&mdsc->mutex);
4814 for (i = 0; i < mdsc->max_sessions; i++) {
4815 session = __ceph_lookup_mds_session(mdsc, i);
4816 if (!session)
4817 continue;
4818 mutex_unlock(&mdsc->mutex);
4819 mutex_lock(&session->s_mutex);
4820 if (__close_session(mdsc, session) <= 0)
4821 skipped++;
4822 mutex_unlock(&session->s_mutex);
4823 ceph_put_mds_session(session);
4824 mutex_lock(&mdsc->mutex);
4825 }
4826 mutex_unlock(&mdsc->mutex);
4827
4828 dout("waiting for sessions to close\n");
4829 wait_event_timeout(mdsc->session_close_wq,
4830 done_closing_sessions(mdsc, skipped),
4831 ceph_timeout_jiffies(opts->mount_timeout));
4832
4833 /* tear down remaining sessions */
4834 mutex_lock(&mdsc->mutex);
4835 for (i = 0; i < mdsc->max_sessions; i++) {
4836 if (mdsc->sessions[i]) {
4837 session = ceph_get_mds_session(mdsc->sessions[i]);
4838 __unregister_session(mdsc, session);
4839 mutex_unlock(&mdsc->mutex);
4840 mutex_lock(&session->s_mutex);
4841 remove_session_caps(session);
4842 mutex_unlock(&session->s_mutex);
4843 ceph_put_mds_session(session);
4844 mutex_lock(&mdsc->mutex);
4845 }
4846 }
4847 WARN_ON(!list_empty(&mdsc->cap_delay_list));
4848 mutex_unlock(&mdsc->mutex);
4849
4850 ceph_cleanup_snapid_map(mdsc);
4851 ceph_cleanup_empty_realms(mdsc);
4852
4853 cancel_work_sync(&mdsc->cap_reclaim_work);
4854 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4855
4856 dout("stopped\n");
4857 }
4858
ceph_mdsc_force_umount(struct ceph_mds_client * mdsc)4859 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4860 {
4861 struct ceph_mds_session *session;
4862 int mds;
4863
4864 dout("force umount\n");
4865
4866 mutex_lock(&mdsc->mutex);
4867 for (mds = 0; mds < mdsc->max_sessions; mds++) {
4868 session = __ceph_lookup_mds_session(mdsc, mds);
4869 if (!session)
4870 continue;
4871
4872 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4873 __unregister_session(mdsc, session);
4874 __wake_requests(mdsc, &session->s_waiting);
4875 mutex_unlock(&mdsc->mutex);
4876
4877 mutex_lock(&session->s_mutex);
4878 __close_session(mdsc, session);
4879 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4880 cleanup_session_requests(mdsc, session);
4881 remove_session_caps(session);
4882 }
4883 mutex_unlock(&session->s_mutex);
4884 ceph_put_mds_session(session);
4885
4886 mutex_lock(&mdsc->mutex);
4887 kick_requests(mdsc, mds);
4888 }
4889 __wake_requests(mdsc, &mdsc->waiting_for_map);
4890 mutex_unlock(&mdsc->mutex);
4891 }
4892
ceph_mdsc_stop(struct ceph_mds_client * mdsc)4893 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4894 {
4895 dout("stop\n");
4896 /*
4897 * Make sure the delayed work stopped before releasing
4898 * the resources.
4899 *
4900 * Because the cancel_delayed_work_sync() will only
4901 * guarantee that the work finishes executing. But the
4902 * delayed work will re-arm itself again after that.
4903 */
4904 flush_delayed_work(&mdsc->delayed_work);
4905
4906 if (mdsc->mdsmap)
4907 ceph_mdsmap_destroy(mdsc->mdsmap);
4908 kfree(mdsc->sessions);
4909 ceph_caps_finalize(mdsc);
4910 ceph_pool_perm_destroy(mdsc);
4911 }
4912
ceph_mdsc_destroy(struct ceph_fs_client * fsc)4913 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4914 {
4915 struct ceph_mds_client *mdsc = fsc->mdsc;
4916 dout("mdsc_destroy %p\n", mdsc);
4917
4918 if (!mdsc)
4919 return;
4920
4921 /* flush out any connection work with references to us */
4922 ceph_msgr_flush();
4923
4924 ceph_mdsc_stop(mdsc);
4925
4926 ceph_metric_destroy(&mdsc->metric);
4927
4928 fsc->mdsc = NULL;
4929 kfree(mdsc);
4930 dout("mdsc_destroy %p done\n", mdsc);
4931 }
4932
ceph_mdsc_handle_fsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)4933 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4934 {
4935 struct ceph_fs_client *fsc = mdsc->fsc;
4936 const char *mds_namespace = fsc->mount_options->mds_namespace;
4937 void *p = msg->front.iov_base;
4938 void *end = p + msg->front.iov_len;
4939 u32 epoch;
4940 u32 map_len;
4941 u32 num_fs;
4942 u32 mount_fscid = (u32)-1;
4943 u8 struct_v, struct_cv;
4944 int err = -EINVAL;
4945
4946 ceph_decode_need(&p, end, sizeof(u32), bad);
4947 epoch = ceph_decode_32(&p);
4948
4949 dout("handle_fsmap epoch %u\n", epoch);
4950
4951 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4952 struct_v = ceph_decode_8(&p);
4953 struct_cv = ceph_decode_8(&p);
4954 map_len = ceph_decode_32(&p);
4955
4956 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4957 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4958
4959 num_fs = ceph_decode_32(&p);
4960 while (num_fs-- > 0) {
4961 void *info_p, *info_end;
4962 u32 info_len;
4963 u8 info_v, info_cv;
4964 u32 fscid, namelen;
4965
4966 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4967 info_v = ceph_decode_8(&p);
4968 info_cv = ceph_decode_8(&p);
4969 info_len = ceph_decode_32(&p);
4970 ceph_decode_need(&p, end, info_len, bad);
4971 info_p = p;
4972 info_end = p + info_len;
4973 p = info_end;
4974
4975 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4976 fscid = ceph_decode_32(&info_p);
4977 namelen = ceph_decode_32(&info_p);
4978 ceph_decode_need(&info_p, info_end, namelen, bad);
4979
4980 if (mds_namespace &&
4981 strlen(mds_namespace) == namelen &&
4982 !strncmp(mds_namespace, (char *)info_p, namelen)) {
4983 mount_fscid = fscid;
4984 break;
4985 }
4986 }
4987
4988 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4989 if (mount_fscid != (u32)-1) {
4990 fsc->client->monc.fs_cluster_id = mount_fscid;
4991 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4992 0, true);
4993 ceph_monc_renew_subs(&fsc->client->monc);
4994 } else {
4995 err = -ENOENT;
4996 goto err_out;
4997 }
4998 return;
4999
5000 bad:
5001 pr_err("error decoding fsmap\n");
5002 err_out:
5003 mutex_lock(&mdsc->mutex);
5004 mdsc->mdsmap_err = err;
5005 __wake_requests(mdsc, &mdsc->waiting_for_map);
5006 mutex_unlock(&mdsc->mutex);
5007 }
5008
5009 /*
5010 * handle mds map update.
5011 */
ceph_mdsc_handle_mdsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)5012 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5013 {
5014 u32 epoch;
5015 u32 maplen;
5016 void *p = msg->front.iov_base;
5017 void *end = p + msg->front.iov_len;
5018 struct ceph_mdsmap *newmap, *oldmap;
5019 struct ceph_fsid fsid;
5020 int err = -EINVAL;
5021
5022 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5023 ceph_decode_copy(&p, &fsid, sizeof(fsid));
5024 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5025 return;
5026 epoch = ceph_decode_32(&p);
5027 maplen = ceph_decode_32(&p);
5028 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5029
5030 /* do we need it? */
5031 mutex_lock(&mdsc->mutex);
5032 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5033 dout("handle_map epoch %u <= our %u\n",
5034 epoch, mdsc->mdsmap->m_epoch);
5035 mutex_unlock(&mdsc->mutex);
5036 return;
5037 }
5038
5039 newmap = ceph_mdsmap_decode(&p, end);
5040 if (IS_ERR(newmap)) {
5041 err = PTR_ERR(newmap);
5042 goto bad_unlock;
5043 }
5044
5045 /* swap into place */
5046 if (mdsc->mdsmap) {
5047 oldmap = mdsc->mdsmap;
5048 mdsc->mdsmap = newmap;
5049 check_new_map(mdsc, newmap, oldmap);
5050 ceph_mdsmap_destroy(oldmap);
5051 } else {
5052 mdsc->mdsmap = newmap; /* first mds map */
5053 }
5054 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5055 MAX_LFS_FILESIZE);
5056
5057 __wake_requests(mdsc, &mdsc->waiting_for_map);
5058 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5059 mdsc->mdsmap->m_epoch);
5060
5061 mutex_unlock(&mdsc->mutex);
5062 schedule_delayed(mdsc, 0);
5063 return;
5064
5065 bad_unlock:
5066 mutex_unlock(&mdsc->mutex);
5067 bad:
5068 pr_err("error decoding mdsmap %d\n", err);
5069 return;
5070 }
5071
con_get(struct ceph_connection * con)5072 static struct ceph_connection *con_get(struct ceph_connection *con)
5073 {
5074 struct ceph_mds_session *s = con->private;
5075
5076 if (ceph_get_mds_session(s))
5077 return con;
5078 return NULL;
5079 }
5080
con_put(struct ceph_connection * con)5081 static void con_put(struct ceph_connection *con)
5082 {
5083 struct ceph_mds_session *s = con->private;
5084
5085 ceph_put_mds_session(s);
5086 }
5087
5088 /*
5089 * if the client is unresponsive for long enough, the mds will kill
5090 * the session entirely.
5091 */
peer_reset(struct ceph_connection * con)5092 static void peer_reset(struct ceph_connection *con)
5093 {
5094 struct ceph_mds_session *s = con->private;
5095 struct ceph_mds_client *mdsc = s->s_mdsc;
5096
5097 pr_warn("mds%d closed our session\n", s->s_mds);
5098 send_mds_reconnect(mdsc, s);
5099 }
5100
dispatch(struct ceph_connection * con,struct ceph_msg * msg)5101 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5102 {
5103 struct ceph_mds_session *s = con->private;
5104 struct ceph_mds_client *mdsc = s->s_mdsc;
5105 int type = le16_to_cpu(msg->hdr.type);
5106
5107 mutex_lock(&mdsc->mutex);
5108 if (__verify_registered_session(mdsc, s) < 0) {
5109 mutex_unlock(&mdsc->mutex);
5110 goto out;
5111 }
5112 mutex_unlock(&mdsc->mutex);
5113
5114 switch (type) {
5115 case CEPH_MSG_MDS_MAP:
5116 ceph_mdsc_handle_mdsmap(mdsc, msg);
5117 break;
5118 case CEPH_MSG_FS_MAP_USER:
5119 ceph_mdsc_handle_fsmap(mdsc, msg);
5120 break;
5121 case CEPH_MSG_CLIENT_SESSION:
5122 handle_session(s, msg);
5123 break;
5124 case CEPH_MSG_CLIENT_REPLY:
5125 handle_reply(s, msg);
5126 break;
5127 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5128 handle_forward(mdsc, s, msg);
5129 break;
5130 case CEPH_MSG_CLIENT_CAPS:
5131 ceph_handle_caps(s, msg);
5132 break;
5133 case CEPH_MSG_CLIENT_SNAP:
5134 ceph_handle_snap(mdsc, s, msg);
5135 break;
5136 case CEPH_MSG_CLIENT_LEASE:
5137 handle_lease(mdsc, s, msg);
5138 break;
5139 case CEPH_MSG_CLIENT_QUOTA:
5140 ceph_handle_quota(mdsc, s, msg);
5141 break;
5142
5143 default:
5144 pr_err("received unknown message type %d %s\n", type,
5145 ceph_msg_type_name(type));
5146 }
5147 out:
5148 ceph_msg_put(msg);
5149 }
5150
5151 /*
5152 * authentication
5153 */
5154
5155 /*
5156 * Note: returned pointer is the address of a structure that's
5157 * managed separately. Caller must *not* attempt to free it.
5158 */
get_authorizer(struct ceph_connection * con,int * proto,int force_new)5159 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5160 int *proto, int force_new)
5161 {
5162 struct ceph_mds_session *s = con->private;
5163 struct ceph_mds_client *mdsc = s->s_mdsc;
5164 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5165 struct ceph_auth_handshake *auth = &s->s_auth;
5166
5167 if (force_new && auth->authorizer) {
5168 ceph_auth_destroy_authorizer(auth->authorizer);
5169 auth->authorizer = NULL;
5170 }
5171 if (!auth->authorizer) {
5172 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5173 auth);
5174 if (ret)
5175 return ERR_PTR(ret);
5176 } else {
5177 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5178 auth);
5179 if (ret)
5180 return ERR_PTR(ret);
5181 }
5182 *proto = ac->protocol;
5183
5184 return auth;
5185 }
5186
add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5187 static int add_authorizer_challenge(struct ceph_connection *con,
5188 void *challenge_buf, int challenge_buf_len)
5189 {
5190 struct ceph_mds_session *s = con->private;
5191 struct ceph_mds_client *mdsc = s->s_mdsc;
5192 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5193
5194 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5195 challenge_buf, challenge_buf_len);
5196 }
5197
verify_authorizer_reply(struct ceph_connection * con)5198 static int verify_authorizer_reply(struct ceph_connection *con)
5199 {
5200 struct ceph_mds_session *s = con->private;
5201 struct ceph_mds_client *mdsc = s->s_mdsc;
5202 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5203
5204 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
5205 }
5206
invalidate_authorizer(struct ceph_connection * con)5207 static int invalidate_authorizer(struct ceph_connection *con)
5208 {
5209 struct ceph_mds_session *s = con->private;
5210 struct ceph_mds_client *mdsc = s->s_mdsc;
5211 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5212
5213 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5214
5215 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5216 }
5217
mds_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5218 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5219 struct ceph_msg_header *hdr, int *skip)
5220 {
5221 struct ceph_msg *msg;
5222 int type = (int) le16_to_cpu(hdr->type);
5223 int front_len = (int) le32_to_cpu(hdr->front_len);
5224
5225 if (con->in_msg)
5226 return con->in_msg;
5227
5228 *skip = 0;
5229 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5230 if (!msg) {
5231 pr_err("unable to allocate msg type %d len %d\n",
5232 type, front_len);
5233 return NULL;
5234 }
5235
5236 return msg;
5237 }
5238
mds_sign_message(struct ceph_msg * msg)5239 static int mds_sign_message(struct ceph_msg *msg)
5240 {
5241 struct ceph_mds_session *s = msg->con->private;
5242 struct ceph_auth_handshake *auth = &s->s_auth;
5243
5244 return ceph_auth_sign_message(auth, msg);
5245 }
5246
mds_check_message_signature(struct ceph_msg * msg)5247 static int mds_check_message_signature(struct ceph_msg *msg)
5248 {
5249 struct ceph_mds_session *s = msg->con->private;
5250 struct ceph_auth_handshake *auth = &s->s_auth;
5251
5252 return ceph_auth_check_message_signature(auth, msg);
5253 }
5254
5255 static const struct ceph_connection_operations mds_con_ops = {
5256 .get = con_get,
5257 .put = con_put,
5258 .dispatch = dispatch,
5259 .get_authorizer = get_authorizer,
5260 .add_authorizer_challenge = add_authorizer_challenge,
5261 .verify_authorizer_reply = verify_authorizer_reply,
5262 .invalidate_authorizer = invalidate_authorizer,
5263 .peer_reset = peer_reset,
5264 .alloc_msg = mds_alloc_msg,
5265 .sign_message = mds_sign_message,
5266 .check_message_signature = mds_check_message_signature,
5267 };
5268
5269 /* eof */
5270