• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "dir.h"
16 #include "ast.h"
17 #include "recover.h"
18 #include "lowcomms.h"
19 #include "lock.h"
20 #include "requestqueue.h"
21 #include "recoverd.h"
22 
23 
24 /* If the start for which we're re-enabling locking (seq) has been superseded
25    by a newer stop (ls_recover_seq), we need to leave locking disabled.
26 
27    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
28    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
29    enables locking and clears the requestqueue between a and b. */
30 
enable_locking(struct dlm_ls * ls,uint64_t seq)31 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
32 {
33 	int error = -EINTR;
34 
35 	down_write(&ls->ls_recv_active);
36 
37 	spin_lock(&ls->ls_recover_lock);
38 	if (ls->ls_recover_seq == seq) {
39 		set_bit(LSFL_RUNNING, &ls->ls_flags);
40 		/* unblocks processes waiting to enter the dlm */
41 		up_write(&ls->ls_in_recovery);
42 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
43 		error = 0;
44 	}
45 	spin_unlock(&ls->ls_recover_lock);
46 
47 	up_write(&ls->ls_recv_active);
48 	return error;
49 }
50 
ls_recover(struct dlm_ls * ls,struct dlm_recover * rv)51 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
52 {
53 	unsigned long start;
54 	int error, neg = 0;
55 
56 	log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
57 
58 	mutex_lock(&ls->ls_recoverd_active);
59 
60 	dlm_callback_suspend(ls);
61 
62 	dlm_clear_toss(ls);
63 
64 	/*
65 	 * This list of root rsb's will be the basis of most of the recovery
66 	 * routines.
67 	 */
68 
69 	dlm_create_root_list(ls);
70 
71 	/*
72 	 * Add or remove nodes from the lockspace's ls_nodes list.
73 	 */
74 
75 	error = dlm_recover_members(ls, rv, &neg);
76 	if (error) {
77 		log_rinfo(ls, "dlm_recover_members error %d", error);
78 		goto fail;
79 	}
80 
81 	dlm_recover_dir_nodeid(ls);
82 
83 	ls->ls_recover_dir_sent_res = 0;
84 	ls->ls_recover_dir_sent_msg = 0;
85 	ls->ls_recover_locks_in = 0;
86 
87 	dlm_set_recover_status(ls, DLM_RS_NODES);
88 
89 	error = dlm_recover_members_wait(ls);
90 	if (error) {
91 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
92 		goto fail;
93 	}
94 
95 	start = jiffies;
96 
97 	/*
98 	 * Rebuild our own share of the directory by collecting from all other
99 	 * nodes their master rsb names that hash to us.
100 	 */
101 
102 	error = dlm_recover_directory(ls);
103 	if (error) {
104 		log_rinfo(ls, "dlm_recover_directory error %d", error);
105 		goto fail;
106 	}
107 
108 	dlm_set_recover_status(ls, DLM_RS_DIR);
109 
110 	error = dlm_recover_directory_wait(ls);
111 	if (error) {
112 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
113 		goto fail;
114 	}
115 
116 	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
117 		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
118 
119 	/*
120 	 * We may have outstanding operations that are waiting for a reply from
121 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
122 	 * cancel ops can just be completed.
123 	 */
124 
125 	dlm_recover_waiters_pre(ls);
126 
127 	error = dlm_recovery_stopped(ls);
128 	if (error) {
129 		error = -EINTR;
130 		goto fail;
131 	}
132 
133 	if (neg || dlm_no_directory(ls)) {
134 		/*
135 		 * Clear lkb's for departed nodes.
136 		 */
137 
138 		dlm_recover_purge(ls);
139 
140 		/*
141 		 * Get new master nodeid's for rsb's that were mastered on
142 		 * departed nodes.
143 		 */
144 
145 		error = dlm_recover_masters(ls);
146 		if (error) {
147 			log_rinfo(ls, "dlm_recover_masters error %d", error);
148 			goto fail;
149 		}
150 
151 		/*
152 		 * Send our locks on remastered rsb's to the new masters.
153 		 */
154 
155 		error = dlm_recover_locks(ls);
156 		if (error) {
157 			log_rinfo(ls, "dlm_recover_locks error %d", error);
158 			goto fail;
159 		}
160 
161 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
162 
163 		error = dlm_recover_locks_wait(ls);
164 		if (error) {
165 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
166 			goto fail;
167 		}
168 
169 		log_rinfo(ls, "dlm_recover_locks %u in",
170 			  ls->ls_recover_locks_in);
171 
172 		/*
173 		 * Finalize state in master rsb's now that all locks can be
174 		 * checked.  This includes conversion resolution and lvb
175 		 * settings.
176 		 */
177 
178 		dlm_recover_rsbs(ls);
179 	} else {
180 		/*
181 		 * Other lockspace members may be going through the "neg" steps
182 		 * while also adding us to the lockspace, in which case they'll
183 		 * be doing the recover_locks (RS_LOCKS) barrier.
184 		 */
185 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
186 
187 		error = dlm_recover_locks_wait(ls);
188 		if (error) {
189 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
190 			goto fail;
191 		}
192 	}
193 
194 	dlm_release_root_list(ls);
195 
196 	/*
197 	 * Purge directory-related requests that are saved in requestqueue.
198 	 * All dir requests from before recovery are invalid now due to the dir
199 	 * rebuild and will be resent by the requesting nodes.
200 	 */
201 
202 	dlm_purge_requestqueue(ls);
203 
204 	dlm_set_recover_status(ls, DLM_RS_DONE);
205 
206 	error = dlm_recover_done_wait(ls);
207 	if (error) {
208 		log_rinfo(ls, "dlm_recover_done_wait error %d", error);
209 		goto fail;
210 	}
211 
212 	dlm_clear_members_gone(ls);
213 
214 	dlm_adjust_timeouts(ls);
215 
216 	dlm_callback_resume(ls);
217 
218 	error = enable_locking(ls, rv->seq);
219 	if (error) {
220 		log_rinfo(ls, "enable_locking error %d", error);
221 		goto fail;
222 	}
223 
224 	error = dlm_process_requestqueue(ls);
225 	if (error) {
226 		log_rinfo(ls, "dlm_process_requestqueue error %d", error);
227 		goto fail;
228 	}
229 
230 	error = dlm_recover_waiters_post(ls);
231 	if (error) {
232 		log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
233 		goto fail;
234 	}
235 
236 	dlm_recover_grant(ls);
237 
238 	log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
239 		  (unsigned long long)rv->seq, ls->ls_generation,
240 		  jiffies_to_msecs(jiffies - start));
241 	mutex_unlock(&ls->ls_recoverd_active);
242 
243 	dlm_lsop_recover_done(ls);
244 	return 0;
245 
246  fail:
247 	dlm_release_root_list(ls);
248 	log_rinfo(ls, "dlm_recover %llu error %d",
249 		  (unsigned long long)rv->seq, error);
250 	mutex_unlock(&ls->ls_recoverd_active);
251 	return error;
252 }
253 
254 /* The dlm_ls_start() that created the rv we take here may already have been
255    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
256    flag set. */
257 
do_ls_recovery(struct dlm_ls * ls)258 static void do_ls_recovery(struct dlm_ls *ls)
259 {
260 	struct dlm_recover *rv = NULL;
261 
262 	spin_lock(&ls->ls_recover_lock);
263 	rv = ls->ls_recover_args;
264 	ls->ls_recover_args = NULL;
265 	if (rv && ls->ls_recover_seq == rv->seq)
266 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
267 	spin_unlock(&ls->ls_recover_lock);
268 
269 	if (rv) {
270 		ls_recover(ls, rv);
271 		kfree(rv->nodes);
272 		kfree(rv);
273 	}
274 }
275 
dlm_recoverd(void * arg)276 static int dlm_recoverd(void *arg)
277 {
278 	struct dlm_ls *ls;
279 
280 	ls = dlm_find_lockspace_local(arg);
281 	if (!ls) {
282 		log_print("dlm_recoverd: no lockspace %p", arg);
283 		return -1;
284 	}
285 
286 	down_write(&ls->ls_in_recovery);
287 	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
288 	wake_up(&ls->ls_recover_lock_wait);
289 
290 	while (1) {
291 		/*
292 		 * We call kthread_should_stop() after set_current_state().
293 		 * This is because it works correctly if kthread_stop() is
294 		 * called just before set_current_state().
295 		 */
296 		set_current_state(TASK_INTERRUPTIBLE);
297 		if (kthread_should_stop()) {
298 			set_current_state(TASK_RUNNING);
299 			break;
300 		}
301 		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
302 		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
303 			if (kthread_should_stop())
304 				break;
305 			schedule();
306 		}
307 		set_current_state(TASK_RUNNING);
308 
309 		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
310 			down_write(&ls->ls_in_recovery);
311 			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
312 			wake_up(&ls->ls_recover_lock_wait);
313 		}
314 
315 		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
316 			do_ls_recovery(ls);
317 	}
318 
319 	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
320 		up_write(&ls->ls_in_recovery);
321 
322 	dlm_put_lockspace(ls);
323 	return 0;
324 }
325 
dlm_recoverd_start(struct dlm_ls * ls)326 int dlm_recoverd_start(struct dlm_ls *ls)
327 {
328 	struct task_struct *p;
329 	int error = 0;
330 
331 	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
332 	if (IS_ERR(p))
333 		error = PTR_ERR(p);
334 	else
335                 ls->ls_recoverd_task = p;
336 	return error;
337 }
338 
dlm_recoverd_stop(struct dlm_ls * ls)339 void dlm_recoverd_stop(struct dlm_ls *ls)
340 {
341 	kthread_stop(ls->ls_recoverd_task);
342 }
343 
dlm_recoverd_suspend(struct dlm_ls * ls)344 void dlm_recoverd_suspend(struct dlm_ls *ls)
345 {
346 	wake_up(&ls->ls_wait_general);
347 	mutex_lock(&ls->ls_recoverd_active);
348 }
349 
dlm_recoverd_resume(struct dlm_ls * ls)350 void dlm_recoverd_resume(struct dlm_ls *ls)
351 {
352 	mutex_unlock(&ls->ls_recoverd_active);
353 }
354 
355