4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2012, 2013, Intel Corporation.
26 * lustre/lfsck/lfsck_engine.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
40 #include "lfsck_internal.h"
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
44 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45 *cookie = le64_to_cpu(ent->lde_hash);
46 ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48 ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
50 /* Make sure the name is terminated with '0'.
51 * The data (type) after ent::lde_name maybe
52 * broken, but we do not care. */
53 ent->lde_name[ent->lde_namelen] = 0;
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
58 const struct dt_it_ops *iops;
61 spin_lock(&lfsck->li_lock);
62 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63 di = lfsck->li_di_oit;
64 lfsck->li_di_oit = NULL;
65 spin_unlock(&lfsck->li_lock);
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
71 const struct dt_it_ops *iops;
74 spin_lock(&lfsck->li_lock);
75 iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76 di = lfsck->li_di_dir;
77 lfsck->li_di_dir = NULL;
78 lfsck->li_cookie_dir = 0;
79 spin_unlock(&lfsck->li_lock);
83 static void lfsck_close_dir(const struct lu_env *env,
84 struct lfsck_instance *lfsck)
86 struct dt_object *dir_obj = lfsck->li_obj_dir;
87 const struct dt_it_ops *dir_iops = &dir_obj->do_index_ops->dio_it;
88 struct dt_it *dir_di = lfsck->li_di_dir;
90 lfsck_di_dir_put(env, lfsck);
91 dir_iops->fini(env, dir_di);
92 lfsck->li_obj_dir = NULL;
93 lfsck_object_put(env, dir_obj);
96 static int lfsck_update_lma(const struct lu_env *env,
97 struct lfsck_instance *lfsck, struct dt_object *obj)
99 struct lfsck_thread_info *info = lfsck_env_info(env);
100 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
101 struct dt_device *dt = lfsck->li_bottom;
102 struct lustre_mdt_attrs *lma = &info->lti_lma;
109 if (bk->lb_param & LPF_DRYRUN)
112 buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113 rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
118 fl = LU_XATTR_CREATE;
119 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
121 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
124 fl = LU_XATTR_REPLACE;
125 lustre_lma_swab(lma);
126 lustre_lma_init(lma, lfsck_dto2fid(obj),
127 lma->lma_compat | LMAC_FID_ON_OST,
130 lustre_lma_swab(lma);
132 th = dt_trans_create(env, dt);
136 buf = lfsck_buf_get(env, lma, sizeof(*lma));
137 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
141 rc = dt_trans_start(env, dt, th);
145 rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
150 dt_trans_stop(env, dt, th);
154 static int lfsck_master_dir_engine(const struct lu_env *env,
155 struct lfsck_instance *lfsck)
157 struct lfsck_thread_info *info = lfsck_env_info(env);
158 struct dt_object *dir = lfsck->li_obj_dir;
159 const struct dt_it_ops *iops = &dir->do_index_ops->dio_it;
160 struct dt_it *di = lfsck->li_di_dir;
161 struct lu_dirent *ent =
162 (struct lu_dirent *)info->lti_key;
163 struct lu_fid *fid = &info->lti_fid;
164 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
165 struct ptlrpc_thread *thread = &lfsck->li_thread;
170 struct dt_object *child;
172 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
174 struct l_wait_info lwi;
176 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
178 l_wait_event(thread->t_ctl_waitq,
179 !thread_is_running(thread),
183 lfsck->li_new_scanned++;
184 rc = iops->rec(env, di, (struct dt_rec *)ent,
186 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
188 CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
189 "parent "DFID", cookie "LPX64": rc = %d\n",
190 lfsck_lfsck2name(lfsck),
191 PFID(lfsck_dto2fid(dir)),
192 lfsck->li_cookie_dir, rc);
193 lfsck_fail(env, lfsck, true);
194 if (bk->lb_param & LPF_FAILOUT)
200 if (ent->lde_attrs & LUDA_IGNORE)
204 child = lfsck_object_find(env, lfsck, fid);
207 } else if (IS_ERR(child)) {
208 CDEBUG(D_LFSCK, "%s: scan dir failed at find target, "
209 "parent "DFID", child %.*s "DFID": rc = %d\n",
210 lfsck_lfsck2name(lfsck),
211 PFID(lfsck_dto2fid(dir)),
212 ent->lde_namelen, ent->lde_name,
213 PFID(&ent->lde_fid), rc);
214 lfsck_fail(env, lfsck, true);
215 if (bk->lb_param & LPF_FAILOUT)
216 RETURN(PTR_ERR(child));
221 /* XXX: Currently, skip remote object, the consistency for
222 * remote object will be processed in LFSCK phase III. */
223 if (dt_object_exists(child) && !dt_object_remote(child))
224 rc = lfsck_exec_dir(env, lfsck, child, ent);
225 lfsck_object_put(env, child);
226 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
230 rc = lfsck_checkpoint(env, lfsck);
231 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
235 lfsck_control_speed(lfsck);
236 if (unlikely(!thread_is_running(thread))) {
237 CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
238 "parent "DFID", cookie "LPX64"\n",
239 lfsck_lfsck2name(lfsck),
240 PFID(lfsck_dto2fid(dir)),
241 lfsck->li_cookie_dir);
245 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
246 spin_lock(&lfsck->li_lock);
247 thread_set_flags(thread, SVC_STOPPING);
248 spin_unlock(&lfsck->li_lock);
252 rc = iops->next(env, di);
255 if (rc > 0 && !lfsck->li_oit_over)
256 lfsck_close_dir(env, lfsck);
261 static int lfsck_master_oit_engine(const struct lu_env *env,
262 struct lfsck_instance *lfsck)
264 struct lfsck_thread_info *info = lfsck_env_info(env);
265 const struct dt_it_ops *iops =
266 &lfsck->li_obj_oit->do_index_ops->dio_it;
267 struct dt_it *di = lfsck->li_di_oit;
268 struct lu_fid *fid = &info->lti_fid;
269 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
270 struct ptlrpc_thread *thread = &lfsck->li_thread;
272 lfsck_dev_idx(lfsck->li_bottom);
277 struct dt_object *target;
278 bool update_lma = false;
280 if (lfsck->li_di_dir != NULL) {
281 rc = lfsck_master_dir_engine(env, lfsck);
286 if (unlikely(lfsck->li_oit_over))
289 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
291 struct l_wait_info lwi;
293 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
295 l_wait_event(thread->t_ctl_waitq,
296 !thread_is_running(thread),
300 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
303 lfsck->li_current_oit_processed = 1;
304 lfsck->li_new_scanned++;
305 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
307 CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
308 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
309 lfsck_fail(env, lfsck, true);
310 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
316 if (fid_is_idif(fid)) {
317 __u32 idx1 = fid_idif_ost_idx(fid);
319 LASSERT(!lfsck->li_master);
321 /* It is an old format device, update the LMA. */
323 struct ost_id *oi = &info->lti_oi;
325 fid_to_ostid(fid, oi);
326 ostid_to_fid(fid, oi, idx);
329 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
330 !fid_is_last_id(fid) && !fid_is_root(fid) &&
331 !fid_seq_is_dot(fid_seq(fid))) {
332 /* If the FID/object is only used locally and invisible
333 * to external nodes, then LFSCK will not handle it. */
337 target = lfsck_object_find(env, lfsck, fid);
338 if (target == NULL) {
340 } else if (IS_ERR(target)) {
341 CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
342 DFID", cookie "LPU64": rc = %d\n",
343 lfsck_lfsck2name(lfsck), PFID(fid),
344 iops->store(env, di), rc);
345 lfsck_fail(env, lfsck, true);
346 if (bk->lb_param & LPF_FAILOUT)
347 RETURN(PTR_ERR(target));
352 /* XXX: Currently, skip remote object, the consistency for
353 * remote object will be processed in LFSCK phase III. */
354 if (dt_object_exists(target) && !dt_object_remote(target)) {
356 rc = lfsck_update_lma(env, lfsck, target);
358 CDEBUG(D_LFSCK, "%s: fail to update "
359 "LMA for "DFID": rc = %d\n",
360 lfsck_lfsck2name(lfsck),
361 PFID(lfsck_dto2fid(target)), rc);
364 rc = lfsck_exec_oit(env, lfsck, target);
366 lfsck_object_put(env, target);
367 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
371 rc = lfsck_checkpoint(env, lfsck);
372 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
376 lfsck_control_speed(lfsck);
378 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
379 spin_lock(&lfsck->li_lock);
380 thread_set_flags(thread, SVC_STOPPING);
381 spin_unlock(&lfsck->li_lock);
385 rc = iops->next(env, di);
386 if (unlikely(rc > 0))
387 lfsck->li_oit_over = 1;
388 else if (likely(rc == 0))
389 lfsck->li_current_oit_processed = 0;
391 if (unlikely(!thread_is_running(thread))) {
392 CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
393 "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
394 iops->store(env, di));
397 } while (rc == 0 || lfsck->li_di_dir != NULL);
402 int lfsck_master_engine(void *args)
404 struct lfsck_thread_args *lta = args;
405 struct lu_env *env = <a->lta_env;
406 struct lfsck_instance *lfsck = lta->lta_lfsck;
407 struct ptlrpc_thread *thread = &lfsck->li_thread;
408 struct dt_object *oit_obj = lfsck->li_obj_oit;
409 const struct dt_it_ops *oit_iops = &oit_obj->do_index_ops->dio_it;
410 struct dt_it *oit_di;
411 struct l_wait_info lwi = { 0 };
415 oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
416 if (IS_ERR(oit_di)) {
417 rc = PTR_ERR(oit_di);
418 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
419 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
424 spin_lock(&lfsck->li_lock);
425 lfsck->li_di_oit = oit_di;
426 spin_unlock(&lfsck->li_lock);
427 rc = lfsck_prep(env, lfsck, lta->lta_lsp);
431 CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
432 "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
433 ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
434 lfsck->li_pos_current.lp_oit_cookie,
435 lfsck->li_pos_current.lp_dir_cookie,
436 PFID(&lfsck->li_pos_current.lp_dir_parent),
439 spin_lock(&lfsck->li_lock);
440 thread_set_flags(thread, SVC_RUNNING);
441 spin_unlock(&lfsck->li_lock);
442 wake_up_all(&thread->t_ctl_waitq);
444 l_wait_event(thread->t_ctl_waitq,
445 lfsck->li_start_unplug ||
446 !thread_is_running(thread),
448 if (!thread_is_running(thread))
449 GOTO(fini_oit, rc = 0);
451 if (!cfs_list_empty(&lfsck->li_list_scan) ||
452 cfs_list_empty(&lfsck->li_list_double_scan))
453 rc = lfsck_master_oit_engine(env, lfsck);
457 CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
458 "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
459 ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
460 lfsck->li_pos_current.lp_oit_cookie,
461 lfsck->li_pos_current.lp_dir_cookie,
462 PFID(&lfsck->li_pos_current.lp_dir_parent),
465 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
466 rc = lfsck_post(env, lfsck, rc);
468 if (lfsck->li_di_dir != NULL)
469 lfsck_close_dir(env, lfsck);
472 lfsck_di_oit_put(env, lfsck);
473 oit_iops->fini(env, oit_di);
475 if (!cfs_list_empty(&lfsck->li_list_double_scan))
476 rc = lfsck_double_scan(env, lfsck);
480 lfsck_quit(env, lfsck);
483 /* XXX: Purge the pinned objects in the future. */
486 spin_lock(&lfsck->li_lock);
487 thread_set_flags(thread, SVC_STOPPED);
488 spin_unlock(&lfsck->li_lock);
489 wake_up_all(&thread->t_ctl_waitq);
490 lfsck_thread_args_fini(lta);