• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "dir.h"
16 #include "ast.h"
17 #include "recover.h"
18 #include "lowcomms.h"
19 #include "lock.h"
20 #include "requestqueue.h"
21 #include "recoverd.h"
22 
23 
24 /* If the start for which we're re-enabling locking (seq) has been superseded
25    by a newer stop (ls_recover_seq), we need to leave locking disabled.
26 
27    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
28    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
29    enables locking and clears the requestqueue between a and b. */
30 
enable_locking(struct dlm_ls * ls,uint64_t seq)31 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
32 {
33 	int error = -EINTR;
34 
35 	down_write(&ls->ls_recv_active);
36 
37 	spin_lock(&ls->ls_recover_lock);
38 	if (ls->ls_recover_seq == seq) {
39 		set_bit(LSFL_RUNNING, &ls->ls_flags);
40 		/* unblocks processes waiting to enter the dlm */
41 		up_write(&ls->ls_in_recovery);
42 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
43 		error = 0;
44 	}
45 	spin_unlock(&ls->ls_recover_lock);
46 
47 	up_write(&ls->ls_recv_active);
48 	return error;
49 }
50 
ls_recover(struct dlm_ls * ls,struct dlm_recover * rv)51 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
52 {
53 	unsigned long start;
54 	int error, neg = 0;
55 
56 	log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
57 
58 	mutex_lock(&ls->ls_recoverd_active);
59 
60 	dlm_callback_suspend(ls);
61 
62 	dlm_clear_toss(ls);
63 
64 	/*
65 	 * This list of root rsb's will be the basis of most of the recovery
66 	 * routines.
67 	 */
68 
69 	dlm_create_root_list(ls);
70 
71 	/*
72 	 * Add or remove nodes from the lockspace's ls_nodes list.
73 	 */
74 
75 	error = dlm_recover_members(ls, rv, &neg);
76 	if (error) {
77 		log_rinfo(ls, "dlm_recover_members error %d", error);
78 		goto fail;
79 	}
80 
81 	dlm_recover_dir_nodeid(ls);
82 
83 	ls->ls_recover_dir_sent_res = 0;
84 	ls->ls_recover_dir_sent_msg = 0;
85 	ls->ls_recover_locks_in = 0;
86 
87 	dlm_set_recover_status(ls, DLM_RS_NODES);
88 
89 	error = dlm_recover_members_wait(ls);
90 	if (error) {
91 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
92 		goto fail;
93 	}
94 
95 	start = jiffies;
96 
97 	/*
98 	 * Rebuild our own share of the directory by collecting from all other
99 	 * nodes their master rsb names that hash to us.
100 	 */
101 
102 	error = dlm_recover_directory(ls);
103 	if (error) {
104 		log_rinfo(ls, "dlm_recover_directory error %d", error);
105 		goto fail;
106 	}
107 
108 	dlm_set_recover_status(ls, DLM_RS_DIR);
109 
110 	error = dlm_recover_directory_wait(ls);
111 	if (error) {
112 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
113 		goto fail;
114 	}
115 
116 	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
117 		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
118 
119 	/*
120 	 * We may have outstanding operations that are waiting for a reply from
121 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
122 	 * cancel ops can just be completed.
123 	 */
124 
125 	dlm_recover_waiters_pre(ls);
126 
127 	error = dlm_recovery_stopped(ls);
128 	if (error)
129 		goto fail;
130 
131 	if (neg || dlm_no_directory(ls)) {
132 		/*
133 		 * Clear lkb's for departed nodes.
134 		 */
135 
136 		dlm_recover_purge(ls);
137 
138 		/*
139 		 * Get new master nodeid's for rsb's that were mastered on
140 		 * departed nodes.
141 		 */
142 
143 		error = dlm_recover_masters(ls);
144 		if (error) {
145 			log_rinfo(ls, "dlm_recover_masters error %d", error);
146 			goto fail;
147 		}
148 
149 		/*
150 		 * Send our locks on remastered rsb's to the new masters.
151 		 */
152 
153 		error = dlm_recover_locks(ls);
154 		if (error) {
155 			log_rinfo(ls, "dlm_recover_locks error %d", error);
156 			goto fail;
157 		}
158 
159 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
160 
161 		error = dlm_recover_locks_wait(ls);
162 		if (error) {
163 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
164 			goto fail;
165 		}
166 
167 		log_rinfo(ls, "dlm_recover_locks %u in",
168 			  ls->ls_recover_locks_in);
169 
170 		/*
171 		 * Finalize state in master rsb's now that all locks can be
172 		 * checked.  This includes conversion resolution and lvb
173 		 * settings.
174 		 */
175 
176 		dlm_recover_rsbs(ls);
177 	} else {
178 		/*
179 		 * Other lockspace members may be going through the "neg" steps
180 		 * while also adding us to the lockspace, in which case they'll
181 		 * be doing the recover_locks (RS_LOCKS) barrier.
182 		 */
183 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
184 
185 		error = dlm_recover_locks_wait(ls);
186 		if (error) {
187 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
188 			goto fail;
189 		}
190 	}
191 
192 	dlm_release_root_list(ls);
193 
194 	/*
195 	 * Purge directory-related requests that are saved in requestqueue.
196 	 * All dir requests from before recovery are invalid now due to the dir
197 	 * rebuild and will be resent by the requesting nodes.
198 	 */
199 
200 	dlm_purge_requestqueue(ls);
201 
202 	dlm_set_recover_status(ls, DLM_RS_DONE);
203 
204 	error = dlm_recover_done_wait(ls);
205 	if (error) {
206 		log_rinfo(ls, "dlm_recover_done_wait error %d", error);
207 		goto fail;
208 	}
209 
210 	dlm_clear_members_gone(ls);
211 
212 	dlm_adjust_timeouts(ls);
213 
214 	dlm_callback_resume(ls);
215 
216 	error = enable_locking(ls, rv->seq);
217 	if (error) {
218 		log_rinfo(ls, "enable_locking error %d", error);
219 		goto fail;
220 	}
221 
222 	error = dlm_process_requestqueue(ls);
223 	if (error) {
224 		log_rinfo(ls, "dlm_process_requestqueue error %d", error);
225 		goto fail;
226 	}
227 
228 	error = dlm_recover_waiters_post(ls);
229 	if (error) {
230 		log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
231 		goto fail;
232 	}
233 
234 	dlm_recover_grant(ls);
235 
236 	log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
237 		  (unsigned long long)rv->seq, ls->ls_generation,
238 		  jiffies_to_msecs(jiffies - start));
239 	mutex_unlock(&ls->ls_recoverd_active);
240 
241 	dlm_lsop_recover_done(ls);
242 	return 0;
243 
244  fail:
245 	dlm_release_root_list(ls);
246 	log_rinfo(ls, "dlm_recover %llu error %d",
247 		  (unsigned long long)rv->seq, error);
248 	mutex_unlock(&ls->ls_recoverd_active);
249 	return error;
250 }
251 
252 /* The dlm_ls_start() that created the rv we take here may already have been
253    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
254    flag set. */
255 
do_ls_recovery(struct dlm_ls * ls)256 static void do_ls_recovery(struct dlm_ls *ls)
257 {
258 	struct dlm_recover *rv = NULL;
259 
260 	spin_lock(&ls->ls_recover_lock);
261 	rv = ls->ls_recover_args;
262 	ls->ls_recover_args = NULL;
263 	if (rv && ls->ls_recover_seq == rv->seq)
264 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
265 	spin_unlock(&ls->ls_recover_lock);
266 
267 	if (rv) {
268 		ls_recover(ls, rv);
269 		kfree(rv->nodes);
270 		kfree(rv);
271 	}
272 }
273 
dlm_recoverd(void * arg)274 static int dlm_recoverd(void *arg)
275 {
276 	struct dlm_ls *ls;
277 
278 	ls = dlm_find_lockspace_local(arg);
279 	if (!ls) {
280 		log_print("dlm_recoverd: no lockspace %p", arg);
281 		return -1;
282 	}
283 
284 	down_write(&ls->ls_in_recovery);
285 	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
286 	wake_up(&ls->ls_recover_lock_wait);
287 
288 	while (1) {
289 		/*
290 		 * We call kthread_should_stop() after set_current_state().
291 		 * This is because it works correctly if kthread_stop() is
292 		 * called just before set_current_state().
293 		 */
294 		set_current_state(TASK_INTERRUPTIBLE);
295 		if (kthread_should_stop()) {
296 			set_current_state(TASK_RUNNING);
297 			break;
298 		}
299 		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
300 		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
301 			if (kthread_should_stop())
302 				break;
303 			schedule();
304 		}
305 		set_current_state(TASK_RUNNING);
306 
307 		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
308 			down_write(&ls->ls_in_recovery);
309 			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
310 			wake_up(&ls->ls_recover_lock_wait);
311 		}
312 
313 		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
314 			do_ls_recovery(ls);
315 	}
316 
317 	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
318 		up_write(&ls->ls_in_recovery);
319 
320 	dlm_put_lockspace(ls);
321 	return 0;
322 }
323 
dlm_recoverd_start(struct dlm_ls * ls)324 int dlm_recoverd_start(struct dlm_ls *ls)
325 {
326 	struct task_struct *p;
327 	int error = 0;
328 
329 	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
330 	if (IS_ERR(p))
331 		error = PTR_ERR(p);
332 	else
333                 ls->ls_recoverd_task = p;
334 	return error;
335 }
336 
dlm_recoverd_stop(struct dlm_ls * ls)337 void dlm_recoverd_stop(struct dlm_ls *ls)
338 {
339 	kthread_stop(ls->ls_recoverd_task);
340 }
341 
dlm_recoverd_suspend(struct dlm_ls * ls)342 void dlm_recoverd_suspend(struct dlm_ls *ls)
343 {
344 	wake_up(&ls->ls_wait_general);
345 	mutex_lock(&ls->ls_recoverd_active);
346 }
347 
dlm_recoverd_resume(struct dlm_ls * ls)348 void dlm_recoverd_resume(struct dlm_ls *ls)
349 {
350 	mutex_unlock(&ls->ls_recoverd_active);
351 }
352 
353