• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47 
48 #define DEBUG_SUBSYSTEM S_LOG
49 
50 #include "../include/obd_class.h"
51 #include "../include/lustre_log.h"
52 #include "llog_internal.h"
53 
54 /*
55  * Allocate a new log or catalog handle
56  * Used inside llog_open().
57  */
llog_alloc_handle(void)58 static struct llog_handle *llog_alloc_handle(void)
59 {
60 	struct llog_handle *loghandle;
61 
62 	loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
63 	if (!loghandle)
64 		return NULL;
65 
66 	init_rwsem(&loghandle->lgh_lock);
67 	spin_lock_init(&loghandle->lgh_hdr_lock);
68 	INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69 	atomic_set(&loghandle->lgh_refcount, 1);
70 
71 	return loghandle;
72 }
73 
74 /*
75  * Free llog handle and header data if exists. Used in llog_close() only
76  */
llog_free_handle(struct llog_handle * loghandle)77 static void llog_free_handle(struct llog_handle *loghandle)
78 {
79 	LASSERT(loghandle != NULL);
80 
81 	/* failed llog_init_handle */
82 	if (!loghandle->lgh_hdr)
83 		goto out;
84 
85 	if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86 		LASSERT(list_empty(&loghandle->u.phd.phd_entry));
87 	else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88 		LASSERT(list_empty(&loghandle->u.chd.chd_head));
89 	LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
90 	kfree(loghandle->lgh_hdr);
91 out:
92 	kfree(loghandle);
93 }
94 
llog_handle_get(struct llog_handle * loghandle)95 void llog_handle_get(struct llog_handle *loghandle)
96 {
97 	atomic_inc(&loghandle->lgh_refcount);
98 }
99 
llog_handle_put(struct llog_handle * loghandle)100 void llog_handle_put(struct llog_handle *loghandle)
101 {
102 	LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103 	if (atomic_dec_and_test(&loghandle->lgh_refcount))
104 		llog_free_handle(loghandle);
105 }
106 
llog_read_header(const struct lu_env * env,struct llog_handle * handle,struct obd_uuid * uuid)107 static int llog_read_header(const struct lu_env *env,
108 			    struct llog_handle *handle,
109 			    struct obd_uuid *uuid)
110 {
111 	struct llog_operations *lop;
112 	int rc;
113 
114 	rc = llog_handle2ops(handle, &lop);
115 	if (rc)
116 		return rc;
117 
118 	if (lop->lop_read_header == NULL)
119 		return -EOPNOTSUPP;
120 
121 	rc = lop->lop_read_header(env, handle);
122 	if (rc == LLOG_EEMPTY) {
123 		struct llog_log_hdr *llh = handle->lgh_hdr;
124 
125 		handle->lgh_last_idx = 0; /* header is record with index 0 */
126 		llh->llh_count = 1;	 /* for the header record */
127 		llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
128 		llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
129 		llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
130 		llh->llh_timestamp = ktime_get_real_seconds();
131 		if (uuid)
132 			memcpy(&llh->llh_tgtuuid, uuid,
133 			       sizeof(llh->llh_tgtuuid));
134 		llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
135 		ext2_set_bit(0, llh->llh_bitmap);
136 		rc = 0;
137 	}
138 	return rc;
139 }
140 
llog_init_handle(const struct lu_env * env,struct llog_handle * handle,int flags,struct obd_uuid * uuid)141 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
142 		     int flags, struct obd_uuid *uuid)
143 {
144 	struct llog_log_hdr	*llh;
145 	int			 rc;
146 
147 	LASSERT(handle->lgh_hdr == NULL);
148 
149 	llh = kzalloc(sizeof(*llh), GFP_NOFS);
150 	if (!llh)
151 		return -ENOMEM;
152 	handle->lgh_hdr = llh;
153 	/* first assign flags to use llog_client_ops */
154 	llh->llh_flags = flags;
155 	rc = llog_read_header(env, handle, uuid);
156 	if (rc == 0) {
157 		if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
158 			      flags & LLOG_F_IS_CAT) ||
159 			     (llh->llh_flags & LLOG_F_IS_CAT &&
160 			      flags & LLOG_F_IS_PLAIN))) {
161 			CERROR("%s: llog type is %s but initializing %s\n",
162 			       handle->lgh_ctxt->loc_obd->obd_name,
163 			       llh->llh_flags & LLOG_F_IS_CAT ?
164 			       "catalog" : "plain",
165 			       flags & LLOG_F_IS_CAT ? "catalog" : "plain");
166 			rc = -EINVAL;
167 			goto out;
168 		} else if (llh->llh_flags &
169 			   (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
170 			/*
171 			 * it is possible to open llog without specifying llog
172 			 * type so it is taken from llh_flags
173 			 */
174 			flags = llh->llh_flags;
175 		} else {
176 			/* for some reason the llh_flags has no type set */
177 			CERROR("llog type is not specified!\n");
178 			rc = -EINVAL;
179 			goto out;
180 		}
181 		if (unlikely(uuid &&
182 			     !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
183 			CERROR("%s: llog uuid mismatch: %s/%s\n",
184 			       handle->lgh_ctxt->loc_obd->obd_name,
185 			       (char *)uuid->uuid,
186 			       (char *)llh->llh_tgtuuid.uuid);
187 			rc = -EEXIST;
188 			goto out;
189 		}
190 	}
191 	if (flags & LLOG_F_IS_CAT) {
192 		LASSERT(list_empty(&handle->u.chd.chd_head));
193 		INIT_LIST_HEAD(&handle->u.chd.chd_head);
194 		llh->llh_size = sizeof(struct llog_logid_rec);
195 	} else if (!(flags & LLOG_F_IS_PLAIN)) {
196 		CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
197 		       handle->lgh_ctxt->loc_obd->obd_name,
198 		       flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
199 		rc = -EINVAL;
200 	}
201 out:
202 	if (rc) {
203 		kfree(llh);
204 		handle->lgh_hdr = NULL;
205 	}
206 	return rc;
207 }
208 EXPORT_SYMBOL(llog_init_handle);
209 
llog_process_thread(void * arg)210 static int llog_process_thread(void *arg)
211 {
212 	struct llog_process_info	*lpi = arg;
213 	struct llog_handle		*loghandle = lpi->lpi_loghandle;
214 	struct llog_log_hdr		*llh = loghandle->lgh_hdr;
215 	struct llog_process_cat_data	*cd  = lpi->lpi_catdata;
216 	char				*buf;
217 	__u64				 cur_offset = LLOG_CHUNK_SIZE;
218 	__u64				 last_offset;
219 	int				 rc = 0, index = 1, last_index;
220 	int				 saved_index = 0;
221 	int				 last_called_index = 0;
222 
223 	LASSERT(llh);
224 
225 	buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
226 	if (!buf) {
227 		lpi->lpi_rc = -ENOMEM;
228 		return 0;
229 	}
230 
231 	if (cd != NULL) {
232 		last_called_index = cd->lpcd_first_idx;
233 		index = cd->lpcd_first_idx + 1;
234 	}
235 	if (cd != NULL && cd->lpcd_last_idx)
236 		last_index = cd->lpcd_last_idx;
237 	else
238 		last_index = LLOG_BITMAP_BYTES * 8 - 1;
239 
240 	while (rc == 0) {
241 		struct llog_rec_hdr *rec;
242 
243 		/* skip records not set in bitmap */
244 		while (index <= last_index &&
245 		       !ext2_test_bit(index, llh->llh_bitmap))
246 			++index;
247 
248 		LASSERT(index <= last_index + 1);
249 		if (index == last_index + 1)
250 			break;
251 repeat:
252 		CDEBUG(D_OTHER, "index: %d last_index %d\n",
253 		       index, last_index);
254 
255 		/* get the buf with our target record; avoid old garbage */
256 		memset(buf, 0, LLOG_CHUNK_SIZE);
257 		last_offset = cur_offset;
258 		rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
259 				     index, &cur_offset, buf, LLOG_CHUNK_SIZE);
260 		if (rc)
261 			goto out;
262 
263 		/* NB: when rec->lrh_len is accessed it is already swabbed
264 		 * since it is used at the "end" of the loop and the rec
265 		 * swabbing is done at the beginning of the loop. */
266 		for (rec = (struct llog_rec_hdr *)buf;
267 		     (char *)rec < buf + LLOG_CHUNK_SIZE;
268 		     rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)) {
269 
270 			CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
271 			       rec, rec->lrh_type);
272 
273 			if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
274 				lustre_swab_llog_rec(rec);
275 
276 			CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
277 			       rec->lrh_type, rec->lrh_index);
278 
279 			if (rec->lrh_index == 0) {
280 				/* probably another rec just got added? */
281 				rc = 0;
282 				if (index <= loghandle->lgh_last_idx)
283 					goto repeat;
284 				goto out; /* no more records */
285 			}
286 			if (rec->lrh_len == 0 ||
287 			    rec->lrh_len > LLOG_CHUNK_SIZE) {
288 				CWARN("invalid length %d in llog record for index %d/%d\n",
289 				      rec->lrh_len,
290 				      rec->lrh_index, index);
291 				rc = -EINVAL;
292 				goto out;
293 			}
294 
295 			if (rec->lrh_index < index) {
296 				CDEBUG(D_OTHER, "skipping lrh_index %d\n",
297 				       rec->lrh_index);
298 				continue;
299 			}
300 
301 			CDEBUG(D_OTHER,
302 			       "lrh_index: %d lrh_len: %d (%d remains)\n",
303 			       rec->lrh_index, rec->lrh_len,
304 			       (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
305 
306 			loghandle->lgh_cur_idx = rec->lrh_index;
307 			loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
308 						    last_offset;
309 
310 			/* if set, process the callback on this record */
311 			if (ext2_test_bit(index, llh->llh_bitmap)) {
312 				rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
313 						 lpi->lpi_cbdata);
314 				last_called_index = index;
315 				if (rc)
316 					goto out;
317 			} else {
318 				CDEBUG(D_OTHER, "Skipped index %d\n", index);
319 			}
320 
321 			/* next record, still in buffer? */
322 			++index;
323 			if (index > last_index) {
324 				rc = 0;
325 				goto out;
326 			}
327 		}
328 	}
329 
330 out:
331 	if (cd != NULL)
332 		cd->lpcd_last_idx = last_called_index;
333 
334 	kfree(buf);
335 	lpi->lpi_rc = rc;
336 	return 0;
337 }
338 
llog_process_thread_daemonize(void * arg)339 static int llog_process_thread_daemonize(void *arg)
340 {
341 	struct llog_process_info	*lpi = arg;
342 	struct lu_env			 env;
343 	int				 rc;
344 
345 	unshare_fs_struct();
346 
347 	/* client env has no keys, tags is just 0 */
348 	rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
349 	if (rc)
350 		goto out;
351 	lpi->lpi_env = &env;
352 
353 	rc = llog_process_thread(arg);
354 
355 	lu_env_fini(&env);
356 out:
357 	complete(&lpi->lpi_completion);
358 	return rc;
359 }
360 
llog_process_or_fork(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata,bool fork)361 int llog_process_or_fork(const struct lu_env *env,
362 			 struct llog_handle *loghandle,
363 			 llog_cb_t cb, void *data, void *catdata, bool fork)
364 {
365 	struct llog_process_info *lpi;
366 	int		      rc;
367 
368 	lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
369 	if (!lpi) {
370 		CERROR("cannot alloc pointer\n");
371 		return -ENOMEM;
372 	}
373 	lpi->lpi_loghandle = loghandle;
374 	lpi->lpi_cb	= cb;
375 	lpi->lpi_cbdata    = data;
376 	lpi->lpi_catdata   = catdata;
377 
378 	if (fork) {
379 		/* The new thread can't use parent env,
380 		 * init the new one in llog_process_thread_daemonize. */
381 		lpi->lpi_env = NULL;
382 		init_completion(&lpi->lpi_completion);
383 		rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
384 					     "llog_process_thread"));
385 		if (IS_ERR_VALUE(rc)) {
386 			CERROR("%s: cannot start thread: rc = %d\n",
387 			       loghandle->lgh_ctxt->loc_obd->obd_name, rc);
388 			kfree(lpi);
389 			return rc;
390 		}
391 		wait_for_completion(&lpi->lpi_completion);
392 	} else {
393 		lpi->lpi_env = env;
394 		llog_process_thread(lpi);
395 	}
396 	rc = lpi->lpi_rc;
397 	kfree(lpi);
398 	return rc;
399 }
400 EXPORT_SYMBOL(llog_process_or_fork);
401 
llog_process(const struct lu_env * env,struct llog_handle * loghandle,llog_cb_t cb,void * data,void * catdata)402 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
403 		 llog_cb_t cb, void *data, void *catdata)
404 {
405 	return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
406 }
407 EXPORT_SYMBOL(llog_process);
408 
llog_open(const struct lu_env * env,struct llog_ctxt * ctxt,struct llog_handle ** lgh,struct llog_logid * logid,char * name,enum llog_open_param open_param)409 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
410 	      struct llog_handle **lgh, struct llog_logid *logid,
411 	      char *name, enum llog_open_param open_param)
412 {
413 	int	 raised;
414 	int	 rc;
415 
416 	LASSERT(ctxt);
417 	LASSERT(ctxt->loc_logops);
418 
419 	if (ctxt->loc_logops->lop_open == NULL) {
420 		*lgh = NULL;
421 		return -EOPNOTSUPP;
422 	}
423 
424 	*lgh = llog_alloc_handle();
425 	if (*lgh == NULL)
426 		return -ENOMEM;
427 	(*lgh)->lgh_ctxt = ctxt;
428 	(*lgh)->lgh_logops = ctxt->loc_logops;
429 
430 	raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
431 	if (!raised)
432 		cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
433 	rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
434 	if (!raised)
435 		cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
436 	if (rc) {
437 		llog_free_handle(*lgh);
438 		*lgh = NULL;
439 	}
440 	return rc;
441 }
442 EXPORT_SYMBOL(llog_open);
443 
llog_close(const struct lu_env * env,struct llog_handle * loghandle)444 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
445 {
446 	struct llog_operations	*lop;
447 	int			 rc;
448 
449 	rc = llog_handle2ops(loghandle, &lop);
450 	if (rc)
451 		goto out;
452 	if (lop->lop_close == NULL) {
453 		rc = -EOPNOTSUPP;
454 		goto out;
455 	}
456 	rc = lop->lop_close(env, loghandle);
457 out:
458 	llog_handle_put(loghandle);
459 	return rc;
460 }
461 EXPORT_SYMBOL(llog_close);
462