4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2012, 2013, Intel Corporation.
26 * lustre/lfsck/lfsck_engine.c
28 * Author: Fan, Yong <fan.yong@intel.com>
32 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_LFSCK
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <lustre_net.h>
39 #include <lustre_fid.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
43 #include "lfsck_internal.h"
45 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
47 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
48 *cookie = le64_to_cpu(ent->lde_hash);
49 ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
50 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
51 ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
53 /* Make sure the name is terminated with '0'.
54 * The data (type) after ent::lde_name maybe
55 * broken, but we do not care. */
56 ent->lde_name[ent->lde_namelen] = 0;
59 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
61 const struct dt_it_ops *iops;
64 spin_lock(&lfsck->li_lock);
65 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
66 di = lfsck->li_di_oit;
67 lfsck->li_di_oit = NULL;
68 spin_unlock(&lfsck->li_lock);
72 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
74 const struct dt_it_ops *iops;
77 spin_lock(&lfsck->li_lock);
78 iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
79 di = lfsck->li_di_dir;
80 lfsck->li_di_dir = NULL;
81 lfsck->li_cookie_dir = 0;
82 spin_unlock(&lfsck->li_lock);
86 static void lfsck_close_dir(const struct lu_env *env,
87 struct lfsck_instance *lfsck)
89 struct dt_object *dir_obj = lfsck->li_obj_dir;
90 const struct dt_it_ops *dir_iops = &dir_obj->do_index_ops->dio_it;
91 struct dt_it *dir_di = lfsck->li_di_dir;
93 lfsck_di_dir_put(env, lfsck);
94 dir_iops->fini(env, dir_di);
95 lfsck->li_obj_dir = NULL;
96 lfsck_object_put(env, dir_obj);
99 static int lfsck_master_dir_engine(const struct lu_env *env,
100 struct lfsck_instance *lfsck)
102 struct lfsck_thread_info *info = lfsck_env_info(env);
103 const struct dt_it_ops *iops =
104 &lfsck->li_obj_dir->do_index_ops->dio_it;
105 struct dt_it *di = lfsck->li_di_dir;
106 struct lu_dirent *ent = &info->lti_ent;
107 struct lu_fid *fid = &info->lti_fid;
108 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
109 struct ptlrpc_thread *thread = &lfsck->li_thread;
114 struct dt_object *child;
116 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
118 struct l_wait_info lwi;
120 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
122 l_wait_event(thread->t_ctl_waitq,
123 !thread_is_running(thread),
127 lfsck->li_new_scanned++;
128 rc = iops->rec(env, di, (struct dt_rec *)ent,
130 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
132 lfsck_fail(env, lfsck, true);
133 if (bk->lb_param & LPF_FAILOUT)
139 if (ent->lde_attrs & LUDA_IGNORE)
143 child = lfsck_object_find(env, lfsck, fid);
146 } else if (IS_ERR(child)) {
147 lfsck_fail(env, lfsck, true);
148 if (bk->lb_param & LPF_FAILOUT)
149 RETURN(PTR_ERR(child));
154 /* XXX: Currently, skip remote object, the consistency for
155 * remote object will be processed in LFSCK phase III. */
156 if (dt_object_exists(child) && !dt_object_remote(child))
157 rc = lfsck_exec_dir(env, lfsck, child, ent);
158 lfsck_object_put(env, child);
159 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
163 rc = lfsck_checkpoint(env, lfsck);
164 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
168 lfsck_control_speed(lfsck);
169 if (unlikely(!thread_is_running(thread)))
172 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
173 spin_lock(&lfsck->li_lock);
174 thread_set_flags(thread, SVC_STOPPING);
175 spin_unlock(&lfsck->li_lock);
179 rc = iops->next(env, di);
182 if (rc > 0 && !lfsck->li_oit_over)
183 lfsck_close_dir(env, lfsck);
188 static int lfsck_master_oit_engine(const struct lu_env *env,
189 struct lfsck_instance *lfsck)
191 struct lfsck_thread_info *info = lfsck_env_info(env);
192 const struct dt_it_ops *iops =
193 &lfsck->li_obj_oit->do_index_ops->dio_it;
194 struct dt_it *di = lfsck->li_di_oit;
195 struct lu_fid *fid = &info->lti_fid;
196 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
197 struct ptlrpc_thread *thread = &lfsck->li_thread;
202 struct dt_object *target;
204 if (lfsck->li_di_dir != NULL) {
205 rc = lfsck_master_dir_engine(env, lfsck);
210 if (unlikely(lfsck->li_oit_over))
213 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
215 struct l_wait_info lwi;
217 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
219 l_wait_event(thread->t_ctl_waitq,
220 !thread_is_running(thread),
224 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
227 lfsck->li_current_oit_processed = 1;
228 lfsck->li_new_scanned++;
229 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
231 lfsck_fail(env, lfsck, true);
232 if (bk->lb_param & LPF_FAILOUT)
238 target = lfsck_object_find(env, lfsck, fid);
239 if (target == NULL) {
241 } else if (IS_ERR(target)) {
242 lfsck_fail(env, lfsck, true);
243 if (bk->lb_param & LPF_FAILOUT)
244 RETURN(PTR_ERR(target));
249 /* XXX: Currently, skip remote object, the consistency for
250 * remote object will be processed in LFSCK phase III. */
251 if (dt_object_exists(target) && !dt_object_remote(target))
252 rc = lfsck_exec_oit(env, lfsck, target);
253 lfsck_object_put(env, target);
254 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
258 rc = lfsck_checkpoint(env, lfsck);
259 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
263 lfsck_control_speed(lfsck);
265 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
266 spin_lock(&lfsck->li_lock);
267 thread_set_flags(thread, SVC_STOPPING);
268 spin_unlock(&lfsck->li_lock);
272 rc = iops->next(env, di);
273 if (unlikely(rc > 0))
274 lfsck->li_oit_over = 1;
275 else if (likely(rc == 0))
276 lfsck->li_current_oit_processed = 0;
278 if (unlikely(!thread_is_running(thread)))
280 } while (rc == 0 || lfsck->li_di_dir != NULL);
285 int lfsck_master_engine(void *args)
288 struct lfsck_instance *lfsck = (struct lfsck_instance *)args;
289 struct ptlrpc_thread *thread = &lfsck->li_thread;
290 struct dt_object *oit_obj = lfsck->li_obj_oit;
291 const struct dt_it_ops *oit_iops = &oit_obj->do_index_ops->dio_it;
292 struct dt_it *oit_di;
296 cfs_daemonize("lfsck_master");
297 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
299 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
300 lfsck_lfsck2name(lfsck), rc);
304 oit_di = oit_iops->init(&env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
305 if (IS_ERR(oit_di)) {
306 rc = PTR_ERR(oit_di);
307 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
308 lfsck_lfsck2name(lfsck), rc);
312 spin_lock(&lfsck->li_lock);
313 lfsck->li_di_oit = oit_di;
314 spin_unlock(&lfsck->li_lock);
315 rc = lfsck_prep(&env, lfsck);
319 CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
320 "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
321 ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
322 lfsck->li_pos_current.lp_oit_cookie,
323 lfsck->li_pos_current.lp_dir_cookie,
324 PFID(&lfsck->li_pos_current.lp_dir_parent),
327 spin_lock(&lfsck->li_lock);
328 thread_set_flags(thread, SVC_RUNNING);
329 spin_unlock(&lfsck->li_lock);
330 cfs_waitq_broadcast(&thread->t_ctl_waitq);
332 if (!cfs_list_empty(&lfsck->li_list_scan) ||
333 cfs_list_empty(&lfsck->li_list_double_scan))
334 rc = lfsck_master_oit_engine(&env, lfsck);
338 CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
339 "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
340 ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
341 lfsck->li_pos_current.lp_oit_cookie,
342 lfsck->li_pos_current.lp_dir_cookie,
343 PFID(&lfsck->li_pos_current.lp_dir_parent),
344 cfs_curproc_pid(), rc);
346 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
347 rc = lfsck_post(&env, lfsck, rc);
348 if (lfsck->li_di_dir != NULL)
349 lfsck_close_dir(&env, lfsck);
352 lfsck_di_oit_put(&env, lfsck);
353 oit_iops->fini(&env, oit_di);
355 if (!cfs_list_empty(&lfsck->li_list_double_scan))
356 rc = lfsck_double_scan(&env, lfsck);
361 /* XXX: Purge the pinned objects in the future. */
367 spin_lock(&lfsck->li_lock);
368 thread_set_flags(thread, SVC_STOPPED);
369 cfs_waitq_broadcast(&thread->t_ctl_waitq);
370 spin_unlock(&lfsck->li_lock);