1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/llog.c
37 *
38 * OST<->MDS recovery logging infrastructure.
39 * Invariants in implementation:
40 * - we do not share logs among different OST<->MDS connections, so that
41 * if an OST or MDS fails it need only look at log(s) relevant to itself
42 *
43 * Author: Andreas Dilger <adilger@clusterfs.com>
44 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45 * Author: Mikhail Pershin <tappro@whamcloud.com>
46 */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50 #include "../include/obd_class.h"
51 #include "../include/lustre_log.h"
52 #include "llog_internal.h"
53
54 /*
55 * Allocate a new log or catalog handle
56 * Used inside llog_open().
57 */
llog_alloc_handle(void)58 static struct llog_handle *llog_alloc_handle(void)
59 {
60 struct llog_handle *loghandle;
61
62 loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
63 if (!loghandle)
64 return NULL;
65
66 init_rwsem(&loghandle->lgh_lock);
67 spin_lock_init(&loghandle->lgh_hdr_lock);
68 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69 atomic_set(&loghandle->lgh_refcount, 1);
70
71 return loghandle;
72 }
73
74 /*
75 * Free llog handle and header data if exists. Used in llog_close() only
76 */
llog_free_handle(struct llog_handle * loghandle)77 static void llog_free_handle(struct llog_handle *loghandle)
78 {
79 LASSERT(loghandle != NULL);
80
81 /* failed llog_init_handle */
82 if (!loghandle->lgh_hdr)
83 goto out;
84
85 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
87 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88 LASSERT(list_empty(&loghandle->u.chd.chd_head));
89 LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
90 kfree(loghandle->lgh_hdr);
91 out:
92 kfree(loghandle);
93 }
94
llog_handle_get(struct llog_handle * loghandle)95 void llog_handle_get(struct llog_handle *loghandle)
96 {
97 atomic_inc(&loghandle->lgh_refcount);
98 }
99
llog_handle_put(struct llog_handle * loghandle)100 void llog_handle_put(struct llog_handle *loghandle)
101 {
102 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103 if (atomic_dec_and_test(&loghandle->lgh_refcount))
104 llog_free_handle(loghandle);
105 }
106
llog_read_header(const struct lu_env * env,struct llog_handle * handle,struct obd_uuid * uuid)107 static int llog_read_header(const struct lu_env *env,
108 struct llog_handle *handle,
109 struct obd_uuid *uuid)
110 {
111 struct llog_operations *lop;
112 int rc;
113
114 rc = llog_handle2ops(handle, &lop);
115 if (rc)
116 return rc;
117
118 if (lop->lop_read_header == NULL)
119 return -EOPNOTSUPP;
120
121 rc = lop->lop_read_header(env, handle);
122 if (rc == LLOG_EEMPTY) {
123 struct llog_log_hdr *llh = handle->lgh_hdr;
124
125 handle->lgh_last_idx = 0; /* header is record with index 0 */
126 llh->llh_count = 1; /* for the header record */
127 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
128 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
129 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
130 llh->llh_timestamp = ktime_get_real_seconds();
131 if (uuid)
132 memcpy(&llh->llh_tgtuuid, uuid,
133 sizeof(llh->llh_tgtuuid));
134 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
135 ext2_set_bit(0, llh->llh_bitmap);
136 rc = 0;
137 }
138 return rc;
139 }
140
llog_init_handle(const struct lu_env * env,struct llog_handle * handle,int flags,struct obd_uuid * uuid)141 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
142 int flags, struct obd_uuid *uuid)
143 {
144 struct llog_log_hdr *llh;
145 int rc;
146
147 LASSERT(handle->lgh_hdr == NULL);
148
149 llh = kzalloc(sizeof(*llh), GFP_NOFS);
150 if (!llh)
151 return -ENOMEM;
152 handle->lgh_hdr = llh;
153 /* first assign flags to use llog_client_ops */
154 llh->llh_flags = flags;
155 rc = llog_read_header(env, handle, uuid);
156 if (rc == 0) {
157 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
158 flags & LLOG_F_IS_CAT) ||
159 (llh->llh_flags & LLOG_F_IS_CAT &&
160 flags & LLOG_F_IS_PLAIN))) {
161 CERROR("%s: llog type is %s but initializing %s\n",
162 handle->lgh_ctxt->loc_obd->obd_name,
163 llh->llh_flags & LLOG_F_IS_CAT ?
164 "catalog" : "plain",
165 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
166 rc = -EINVAL;
167 goto out;
168 } else if (llh->llh_flags &
169 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
170 /*
171 * it is possible to open llog without specifying llog
172 * type so it is taken from llh_flags
173 */
174 flags = llh->llh_flags;
175 } else {
176 /* for some reason the llh_flags has no type set */
177 CERROR("llog type is not specified!\n");
178 rc = -EINVAL;
179 goto out;
180 }
181 if (unlikely(uuid &&
182 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
183 CERROR("%s: llog uuid mismatch: %s/%s\n",
184 handle->lgh_ctxt->loc_obd->obd_name,
185 (char *)uuid->uuid,
186 (char *)llh->llh_tgtuuid.uuid);
187 rc = -EEXIST;
188 goto out;
189 }
190 }
191 if (flags & LLOG_F_IS_CAT) {
192 LASSERT(list_empty(&handle->u.chd.chd_head));
193 INIT_LIST_HEAD(&handle->u.chd.chd_head);
194 llh->llh_size = sizeof(struct llog_logid_rec);
195 } else if (!(flags & LLOG_F_IS_PLAIN)) {
196 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
197 handle->lgh_ctxt->loc_obd->obd_name,
198 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
199 rc = -EINVAL;
200 }
201 out:
202 if (rc) {
203 kfree(llh);
204 handle->lgh_hdr = NULL;
205 }
206 return rc;
207 }
208 EXPORT_SYMBOL(llog_init_handle);
209
llog_process_thread(void * arg)210 static int llog_process_thread(void *arg)
211 {
212 struct llog_process_info *lpi = arg;
213 struct llog_handle *loghandle = lpi->lpi_loghandle;
214 struct llog_log_hdr *llh = loghandle->lgh_hdr;
215 struct llog_process_cat_data *cd = lpi->lpi_catdata;
216 char *buf;
217 __u64 cur_offset = LLOG_CHUNK_SIZE;
218 __u64 last_offset;
219 int rc = 0, index = 1, last_index;
220 int saved_index = 0;
221 int last_called_index = 0;
222
223 LASSERT(llh);
224
225 buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
226 if (!buf) {
227 lpi->lpi_rc = -ENOMEM;
228 return 0;
229 }
230
231 if (cd != NULL) {
232 last_called_index = cd->lpcd_first_idx;
233 index = cd->lpcd_first_idx + 1;
234 }
235 if (cd != NULL && cd->lpcd_last_idx)
236 last_index = cd->lpcd_last_idx;
237 else
238 last_index = LLOG_BITMAP_BYTES * 8 - 1;
239
240 while (rc == 0) {
241 struct llog_rec_hdr *rec;
242
243 /* skip records not set in bitmap */
244 while (index <= last_index &&
245 !ext2_test_bit(index, llh->llh_bitmap))
246 ++index;
247
248 LASSERT(index <= last_index + 1);
249 if (index == last_index + 1)
250 break;
251 repeat:
252 CDEBUG(D_OTHER, "index: %d last_index %d\n",
253 index, last_index);
254
255 /* get the buf with our target record; avoid old garbage */
256 memset(buf, 0, LLOG_CHUNK_SIZE);
257 last_offset = cur_offset;
258 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
259 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
260 if (rc)
261 goto out;
262
263 /* NB: when rec->lrh_len is accessed it is already swabbed
264 * since it is used at the "end" of the loop and the rec
265 * swabbing is done at the beginning of the loop. */
266 for (rec = (struct llog_rec_hdr *)buf;
267 (char *)rec < buf + LLOG_CHUNK_SIZE;
268 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)) {
269
270 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
271 rec, rec->lrh_type);
272
273 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
274 lustre_swab_llog_rec(rec);
275
276 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
277 rec->lrh_type, rec->lrh_index);
278
279 if (rec->lrh_index == 0) {
280 /* probably another rec just got added? */
281 rc = 0;
282 if (index <= loghandle->lgh_last_idx)
283 goto repeat;
284 goto out; /* no more records */
285 }
286 if (rec->lrh_len == 0 ||
287 rec->lrh_len > LLOG_CHUNK_SIZE) {
288 CWARN("invalid length %d in llog record for index %d/%d\n",
289 rec->lrh_len,
290 rec->lrh_index, index);
291 rc = -EINVAL;
292 goto out;
293 }
294
295 if (rec->lrh_index < index) {
296 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
297 rec->lrh_index);
298 continue;
299 }
300
301 CDEBUG(D_OTHER,
302 "lrh_index: %d lrh_len: %d (%d remains)\n",
303 rec->lrh_index, rec->lrh_len,
304 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
305
306 loghandle->lgh_cur_idx = rec->lrh_index;
307 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
308 last_offset;
309
310 /* if set, process the callback on this record */
311 if (ext2_test_bit(index, llh->llh_bitmap)) {
312 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
313 lpi->lpi_cbdata);
314 last_called_index = index;
315 if (rc)
316 goto out;
317 } else {
318 CDEBUG(D_OTHER, "Skipped index %d\n", index);
319 }
320
321 /* next record, still in buffer? */
322 ++index;
323 if (index > last_index) {
324 rc = 0;
325 goto out;
326 }
327 }
328 }
329
330 out:
331 if (cd != NULL)
332 cd->lpcd_last_idx = last_called_index;
333
334 kfree(buf);
335 lpi->lpi_rc = rc;
336 return 0;
337 }
338
llog_process_thread_daemonize(void * arg)339 static int llog_process_thread_daemonize(void *arg)
340 {
341 struct llog_process_info *lpi = arg;
342 struct lu_env env;
343 int rc;
344
345 unshare_fs_struct();
346
347 /* client env has no keys, tags is just 0 */
348 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
349 if (rc)
350 goto out;
351 lpi->lpi_env = &env;
352
353 rc = llog_process_thread(arg);
354
355 lu_env_fini(&env);
356 out:
357 complete(&lpi->lpi_completion);
358 return rc;
359 }
360
llog_process_or_fork(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata,bool fork)361 int llog_process_or_fork(const struct lu_env *env,
362 struct llog_handle *loghandle,
363 llog_cb_t cb, void *data, void *catdata, bool fork)
364 {
365 struct llog_process_info *lpi;
366 int rc;
367
368 lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
369 if (!lpi) {
370 CERROR("cannot alloc pointer\n");
371 return -ENOMEM;
372 }
373 lpi->lpi_loghandle = loghandle;
374 lpi->lpi_cb = cb;
375 lpi->lpi_cbdata = data;
376 lpi->lpi_catdata = catdata;
377
378 if (fork) {
379 /* The new thread can't use parent env,
380 * init the new one in llog_process_thread_daemonize. */
381 lpi->lpi_env = NULL;
382 init_completion(&lpi->lpi_completion);
383 rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
384 "llog_process_thread"));
385 if (IS_ERR_VALUE(rc)) {
386 CERROR("%s: cannot start thread: rc = %d\n",
387 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
388 kfree(lpi);
389 return rc;
390 }
391 wait_for_completion(&lpi->lpi_completion);
392 } else {
393 lpi->lpi_env = env;
394 llog_process_thread(lpi);
395 }
396 rc = lpi->lpi_rc;
397 kfree(lpi);
398 return rc;
399 }
400 EXPORT_SYMBOL(llog_process_or_fork);
401
llog_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)402 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
403 llog_cb_t cb, void *data, void *catdata)
404 {
405 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
406 }
407 EXPORT_SYMBOL(llog_process);
408
llog_open(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** lgh,struct llog_logid * logid,char * name,enum llog_open_param open_param)409 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
410 struct llog_handle **lgh, struct llog_logid *logid,
411 char *name, enum llog_open_param open_param)
412 {
413 int raised;
414 int rc;
415
416 LASSERT(ctxt);
417 LASSERT(ctxt->loc_logops);
418
419 if (ctxt->loc_logops->lop_open == NULL) {
420 *lgh = NULL;
421 return -EOPNOTSUPP;
422 }
423
424 *lgh = llog_alloc_handle();
425 if (*lgh == NULL)
426 return -ENOMEM;
427 (*lgh)->lgh_ctxt = ctxt;
428 (*lgh)->lgh_logops = ctxt->loc_logops;
429
430 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
431 if (!raised)
432 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
433 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
434 if (!raised)
435 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
436 if (rc) {
437 llog_free_handle(*lgh);
438 *lgh = NULL;
439 }
440 return rc;
441 }
442 EXPORT_SYMBOL(llog_open);
443
llog_close(const struct lu_env * env,struct llog_handle * loghandle)444 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
445 {
446 struct llog_operations *lop;
447 int rc;
448
449 rc = llog_handle2ops(loghandle, &lop);
450 if (rc)
451 goto out;
452 if (lop->lop_close == NULL) {
453 rc = -EOPNOTSUPP;
454 goto out;
455 }
456 rc = lop->lop_close(env, loghandle);
457 out:
458 llog_handle_put(loghandle);
459 return rc;
460 }
461 EXPORT_SYMBOL(llog_close);
462