X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_lib.c;h=178aa3ad98dfdb0ff44c26ac61f79cf5b7e315ba;hp=28827000862f08f1b2cfe1406fe5e57c8149b06c;hb=8a11cb6282cfbdc8617b809344e6a11223e86a38;hpb=34bdd30ec6a2010dfd98c5110487df210d03815d diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index 2882700..178aa3a 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -30,6 +30,7 @@ #define DEBUG_SUBSYSTEM S_LFSCK +#include #include #include #include @@ -148,7 +149,7 @@ static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds) } cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) { - ltd = LTD_TGT(ltds, idx); + ltd = lfsck_ltd2tgt(ltds, idx); if (likely(ltd != NULL)) { LASSERT(list_empty(<d->ltd_layout_list)); LASSERT(list_empty(<d->ltd_layout_phase_list)); @@ -157,7 +158,7 @@ static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds) ltds->ltd_tgtnr--; cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx); - LTD_TGT(ltds, idx) = NULL; + lfsck_assign_tgt(ltds, NULL, idx); lfsck_tgt_put(ltd); } } @@ -229,7 +230,7 @@ static int __lfsck_add_target(const struct lu_env *env, GOTO(unlock, rc = -ENOMEM); } - LTD_TGT(ltds, index) = ltd; + lfsck_assign_tgt(ltds, ltd, index); cfs_bitmap_set(ltds->ltd_tgts_bitmap, index); ltds->ltd_tgtnr++; @@ -361,29 +362,15 @@ int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck, RETURN(rc); } -/** - * Request the specified ibits lock for the given object. - * - * Before the LFSCK modifying on the namespace visible object, - * it needs to acquire related ibits ldlm lock. - * - * \param[in] env pointer to the thread context - * \param[in] lfsck pointer to the lfsck instance - * \param[in] obj pointer to the dt_object to be locked - * \param[out] lh pointer to the lock handle - * \param[in] ibits the bits for the ldlm lock to be acquired - * \param[in] mode the mode for the ldlm lock to be acquired - * - * \retval 0 for success - * \retval negative error number on failure - */ -int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, - struct dt_object *obj, struct lustre_handle *lh, - __u64 bits, ldlm_mode_t mode) +static int __lfsck_ibits_lock(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *obj, + struct ldlm_res_id *resid, + struct lustre_handle *lh, + __u64 bits, ldlm_mode_t mode) { struct lfsck_thread_info *info = lfsck_env_info(env); ldlm_policy_data_t *policy = &info->lti_policy; - struct ldlm_res_id *resid = &info->lti_resid; __u64 flags = LDLM_FL_ATOMIC_CB; int rc; @@ -391,7 +378,6 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = bits; - fid_build_reg_res_name(lfsck_dto2fid(obj), resid); if (dt_object_remote(obj)) { struct ldlm_enqueue_info *einfo = &info->lti_einfo; @@ -422,6 +408,34 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, } /** + * Request the specified ibits lock for the given object. + * + * Before the LFSCK modifying on the namespace visible object, + * it needs to acquire related ibits ldlm lock. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[in] obj pointer to the dt_object to be locked + * \param[out] lh pointer to the lock handle + * \param[in] bits the bits for the ldlm lock to be acquired + * \param[in] mode the mode for the ldlm lock to be acquired + * + * \retval 0 for success + * \retval negative error number on failure + */ +int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, struct lustre_handle *lh, + __u64 bits, ldlm_mode_t mode) +{ + struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid; + + LASSERT(!lustre_handle_is_used(lh)); + + fid_build_reg_res_name(lfsck_dto2fid(obj), resid); + return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode); +} + +/** * Release the the specified ibits lock. * * If the lock has been acquired before, release it @@ -438,12 +452,91 @@ void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode) } } +/** + * Request compound ibits locks for the given pairs. + * + * Before the LFSCK modifying on the namespace visible object, it needs to + * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for + * the lock purpose. But the simple lfsck_ibits_lock for directory-based + * modificationis (such as insert name entry to the directory) may be too + * coarse-grained and not efficient. + * + * The lfsck_lock() will request compound ibits locks on the specified + * pairs: the PDO (Parallel Directory Operations) ibits (UPDATE) + * lock on the directory object, and the regular ibits lock on the name hash. + * + * \param[in] env pointer to the thread context + * \param[in] lfsck pointer to the lfsck instance + * \param[in] obj pointer to the dt_object to be locked + * \param[in] name used for building the PDO lock resource + * \param[out] llh pointer to the lfsck_lock_handle + * \param[in] bits the bits for the ldlm lock to be acquired + * \param[in] mode the mode for the ldlm lock to be acquired + * + * \retval 0 for success + * \retval negative error number on failure + */ +int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck, + struct dt_object *obj, const char *name, + struct lfsck_lock_handle *llh, __u64 bits, ldlm_mode_t mode) +{ + struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid; + int rc; + + LASSERT(S_ISDIR(lfsck_object_type(obj))); + LASSERT(name != NULL); + LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh)); + LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh)); + + switch (mode) { + case LCK_EX: + llh->llh_pdo_mode = LCK_EX; + break; + case LCK_PW: + llh->llh_pdo_mode = LCK_CW; + break; + case LCK_PR: + llh->llh_pdo_mode = LCK_CR; + break; + default: + CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj " + DFID"\n", lfsck_lfsck2name(lfsck), mode, + PFID(lfsck_dto2fid(obj))); + LBUG(); + } + + fid_build_reg_res_name(lfsck_dto2fid(obj), resid); + rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh, + MDS_INODELOCK_UPDATE, llh->llh_pdo_mode); + if (rc != 0) + return rc; + + llh->llh_reg_mode = mode; + resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name)); + rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh, + bits, llh->llh_reg_mode); + if (rc != 0) + lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode); + + return rc; +} + +/** + * Release the the compound ibits locks. + * + * \param[in] llh pointer to the lfsck_lock_handle to be released + */ +void lfsck_unlock(struct lfsck_lock_handle *llh) +{ + lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode); + lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode); +} + int lfsck_find_mdt_idx_by_fid(const struct lu_env *env, struct lfsck_instance *lfsck, const struct lu_fid *fid) { - struct seq_server_site *ss = - lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + struct seq_server_site *ss = lfsck_dev_site(lfsck); struct lu_seq_range *range = &lfsck_env_info(env)->lti_range; int rc; @@ -479,14 +572,14 @@ static int lfsck_lpf_remove_name_entry(const struct lu_env *env, const char *name) { struct dt_object *parent = lfsck->li_lpf_root_obj; - struct dt_device *dev = lfsck->li_next; + struct dt_device *dev = lfsck_obj2dev(parent); struct thandle *th; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &lfsck_env_info(env)->lti_llh; int rc; ENTRY; - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -502,12 +595,11 @@ static int lfsck_lpf_remove_name_entry(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc); - rc = dt_delete(env, parent, (const struct dt_key *)name, th, - BYPASS_CAPA); + rc = dt_delete(env, parent, (const struct dt_key *)name, th); if (rc != 0) GOTO(stop, rc); @@ -521,7 +613,7 @@ stop: dt_trans_stop(env, dev, th); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n", lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc); @@ -538,7 +630,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env, { struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec; struct dt_object *parent = lfsck->li_lpf_root_obj; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev = lfsck_obj2dev(child); struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; struct dt_object *bk_obj = lfsck->li_bookmark_obj; const struct lu_fid *cfid = lfsck_dto2fid(child); @@ -618,14 +710,14 @@ static int lfsck_create_lpf_local(const struct lu_env *env, /* 1b.2. insert dot into child dir */ rec->rec_fid = cfid; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + (const struct dt_key *)dot, th, 1); if (rc != 0) GOTO(unlock, rc); /* 1b.3. insert dotdot into child dir */ rec->rec_fid = &LU_LPF_FID; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1); + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock, rc); @@ -636,7 +728,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env, /* 3b. insert linkEA for child. */ rc = dt_xattr_set(env, child, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + XATTR_NAME_LINK, 0, th); dt_write_unlock(env, child); if (rc != 0) GOTO(stop, rc); @@ -644,7 +736,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env, /* 4b. insert name into parent dir */ rec->rec_fid = cfid; rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + (const struct dt_key *)name, th, 1); if (rc != 0) GOTO(stop, rc); @@ -727,7 +819,7 @@ static int lfsck_create_lpf_remote(const struct lu_env *env, /* Transaction I: locally */ - dev = lfsck->li_bottom; + dev = lfsck_obj2dev(child); th = dt_trans_create(env, dev); if (IS_ERR(th)) RETURN(PTR_ERR(th)); @@ -773,14 +865,14 @@ static int lfsck_create_lpf_remote(const struct lu_env *env, rec->rec_type = S_IFDIR; rec->rec_fid = cfid; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + (const struct dt_key *)dot, th, 1); if (rc != 0) GOTO(unlock, rc); /* 1b.3. insert dotdot into child dir */ rec->rec_fid = &LU_LPF_FID; rc = dt_insert(env, child, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1); + (const struct dt_key *)dotdot, th, 1); if (rc != 0) GOTO(unlock, rc); @@ -791,7 +883,7 @@ static int lfsck_create_lpf_remote(const struct lu_env *env, /* 3b. insert linkEA for child */ rc = dt_xattr_set(env, child, &linkea_buf, - XATTR_NAME_LINK, 0, th, BYPASS_CAPA); + XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(unlock, rc); @@ -809,11 +901,12 @@ static int lfsck_create_lpf_remote(const struct lu_env *env, /* Transaction II: remotely */ - dev = lfsck->li_next; + dev = lfsck_obj2dev(parent); th = dt_trans_create(env, dev); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + th->th_sync = 1; /* 5a. insert name into parent dir */ rec->rec_fid = cfid; rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec, @@ -826,13 +919,13 @@ static int lfsck_create_lpf_remote(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start(env, dev, th); + rc = dt_trans_start_local(env, dev, th); if (rc != 0) GOTO(stop, rc); /* 5b. insert name into parent dir */ rc = dt_insert(env, parent, (const struct dt_rec *)rec, - (const struct dt_key *)name, th, BYPASS_CAPA, 1); + (const struct dt_key *)name, th, 1); if (rc != 0) GOTO(stop, rc); @@ -848,7 +941,7 @@ unlock: stop: dt_trans_stop(env, dev, th); - if (rc != 0 && dev == lfsck->li_next) + if (rc != 0 && dev == lfsck_obj2dev(parent)) CDEBUG(D_LFSCK, "%s: partially created the object "DFID "for orphans, but failed to insert the name %s " "to the .lustre/lost+found/. Such inconsistency " @@ -882,9 +975,9 @@ static int lfsck_create_lpf(const struct lu_env *env, struct dt_object_format *dof = &info->lti_dof; struct dt_object *parent = lfsck->li_lpf_root_obj; struct dt_object *child = NULL; - struct lustre_handle lh = { 0 }; + struct lfsck_lock_handle *llh = &info->lti_llh; char name[8]; - int node = lfsck_dev_idx(lfsck->li_bottom); + int node = lfsck_dev_idx(lfsck); int rc = 0; ENTRY; @@ -892,8 +985,8 @@ static int lfsck_create_lpf(const struct lu_env *env, LASSERT(parent != NULL); LASSERT(lfsck->li_lpf_obj == NULL); - rc = lfsck_ibits_lock(env, lfsck, parent, &lh, - MDS_INODELOCK_UPDATE, LCK_EX); + rc = lfsck_lock(env, lfsck, parent, name, llh, + MDS_INODELOCK_UPDATE, LCK_PW); if (rc != 0) RETURN(rc); @@ -904,7 +997,7 @@ static int lfsck_create_lpf(const struct lu_env *env, * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup * it from MDT0 firstly. */ rc = dt_lookup(env, parent, (struct dt_rec *)cfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc != 0 && rc != -ENOENT) GOTO(unlock, rc); @@ -920,7 +1013,7 @@ static int lfsck_create_lpf(const struct lu_env *env, *cfid = bk->lb_lpf_fid; } - child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid); + child = lfsck_object_find_bottom(env, lfsck, cfid); if (IS_ERR(child)) GOTO(unlock, rc = PTR_ERR(child)); @@ -951,9 +1044,9 @@ static int lfsck_create_lpf(const struct lu_env *env, GOTO(unlock, rc); unlock: - lfsck_ibits_unlock(&lh, LCK_EX); + lfsck_unlock(llh); if (rc != 0 && child != NULL && !IS_ERR(child)) - lu_object_put(env, &child->do_lu); + lfsck_object_put(env, child); return rc; } @@ -982,7 +1075,7 @@ static int lfsck_scan_lpf_bad_entries(const struct lu_env *env, int rc; ENTRY; - it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA); + it = iops->init(env, parent, LUDA_64BITHASH); if (IS_ERR(it)) RETURN(PTR_ERR(it)); @@ -1113,7 +1206,7 @@ static int lfsck_verify_lpf_pairs(const struct lu_env *env, fid_zero(fid); rc = dt_lookup(env, child, (struct dt_rec *)fid, - (const struct dt_key *)dotdot, BYPASS_CAPA); + (const struct dt_key *)dotdot); if (rc != 0) GOTO(linkea, rc); @@ -1129,8 +1222,7 @@ static int lfsck_verify_lpf_pairs(const struct lu_env *env, } cname = lfsck_name_get_const(env, name, strlen(name)); - rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname, - &LU_LPF_FID); + rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID); if (rc == 0) rc = lfsck_update_lpf_entry(env, lfsck, parent, child, name, type); @@ -1138,18 +1230,18 @@ static int lfsck_verify_lpf_pairs(const struct lu_env *env, GOTO(out_done, rc); } - parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid); + parent2 = lfsck_object_find_bottom(env, lfsck, fid); if (IS_ERR(parent2)) GOTO(linkea, parent2); if (!dt_object_exists(parent2)) { - lu_object_put(env, &parent2->do_lu); + lfsck_object_put(env, parent2); GOTO(linkea, parent2 = ERR_PTR(-ENOENT)); } if (!dt_try_as_dir(env, parent2)) { - lu_object_put(env, &parent2->do_lu); + lfsck_object_put(env, parent2); GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR)); } @@ -1200,7 +1292,7 @@ linkea: } rc = dt_lookup(env, parent2, (struct dt_rec *)fid, - (const struct dt_key *)name2, BYPASS_CAPA); + (const struct dt_key *)name2); dt_read_unlock(env, child); lfsck_ibits_unlock(&lh, LCK_PR); if (rc != 0 && rc != -ENOENT) @@ -1234,7 +1326,7 @@ linkea: out_put: if (parent2 != NULL && !IS_ERR(parent2)) - lu_object_put(env, &parent2->do_lu); + lfsck_object_put(env, parent2); out_done: return rc; @@ -1266,10 +1358,9 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) struct dt_object *child1 = NULL; /* child2's FID is in the name entry MDTxxxx. */ struct dt_object *child2 = NULL; - struct dt_device *dev = lfsck->li_bottom; const struct lu_name *cname; char name[8]; - int node = lfsck_dev_idx(dev); + int node = lfsck_dev_idx(lfsck); int rc = 0; ENTRY; @@ -1279,7 +1370,8 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) RETURN(0); if (node == 0) { - parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID); + parent = lfsck_object_find_by_dev(env, lfsck->li_bottom, + &LU_LPF_FID); } else { struct lfsck_tgt_desc *ltd; @@ -1298,7 +1390,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) LASSERT(dt_object_exists(parent)); if (unlikely(!dt_try_as_dir(env, parent))) { - lu_object_put(env, &parent->do_lu); + lfsck_object_put(env, parent); GOTO(put, rc = -ENOTDIR); } @@ -1327,7 +1419,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) if (rc != 0) GOTO(put, rc); } else { - child1 = lfsck_object_find_by_dev(env, dev, + child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid); if (IS_ERR(child1)) { child1 = NULL; @@ -1350,7 +1442,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) if (rc != 0) GOTO(put, rc); - lu_object_put(env, &child1->do_lu); + lfsck_object_put(env, child1); child1 = NULL; } else if (unlikely(!dt_try_as_dir(env, child1))) { GOTO(put, rc = -ENOTDIR); @@ -1361,7 +1453,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) find_child2: snprintf(name, 8, "MDT%04x", node); rc = dt_lookup(env, parent, (struct dt_rec *)cfid, - (const struct dt_key *)name, BYPASS_CAPA); + (const struct dt_key *)name); if (rc == -ENOENT) { if (!fid_is_zero(&bk->lb_lpf_fid)) goto check_child1; @@ -1381,7 +1473,7 @@ find_child2: goto check_child1; } - child2 = lfsck_object_find_by_dev(env, dev, cfid); + child2 = lfsck_object_find_bottom(env, lfsck, cfid); if (IS_ERR(child2)) GOTO(put, rc = PTR_ERR(child2)); @@ -1415,7 +1507,7 @@ find_child2: } cname = lfsck_name_get_const(env, name, strlen(name)); - rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID); + rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID); } GOTO(put, rc); @@ -1430,7 +1522,7 @@ check_child1: put: if (lfsck->li_lpf_obj != NULL) { if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) { - lu_object_put(env, &lfsck->li_lpf_obj->do_lu); + lfsck_object_put(env, lfsck->li_lpf_obj); lfsck->li_lpf_obj = NULL; rc = -ENOTDIR; } @@ -1439,9 +1531,9 @@ put: } if (child2 != NULL && !IS_ERR(child2)) - lu_object_put(env, &child2->do_lu); + lfsck_object_put(env, child2); if (child1 != NULL && !IS_ERR(child1)) - lu_object_put(env, &child1->do_lu); + lfsck_object_put(env, child1); return rc; } @@ -1449,12 +1541,11 @@ put: static int lfsck_fid_init(struct lfsck_instance *lfsck) { struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; - struct seq_server_site *ss; + struct seq_server_site *ss = lfsck_dev_site(lfsck); char *prefix; int rc = 0; ENTRY; - ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); if (unlikely(ss == NULL)) RETURN(-ENXIO); @@ -1509,7 +1600,7 @@ void lfsck_instance_cleanup(const struct lu_env *env, LASSERT(thread_is_init(thread) || thread_is_stopped(thread)); if (lfsck->li_obj_oit != NULL) { - lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu); + lfsck_object_put(env, lfsck->li_obj_oit); lfsck->li_obj_oit = NULL; } @@ -1544,18 +1635,23 @@ void lfsck_instance_cleanup(const struct lu_env *env, lfsck_tgt_descs_fini(&lfsck->li_ost_descs); lfsck_tgt_descs_fini(&lfsck->li_mdt_descs); + if (lfsck->li_lfsck_dir != NULL) { + lfsck_object_put(env, lfsck->li_lfsck_dir); + lfsck->li_lfsck_dir = NULL; + } + if (lfsck->li_bookmark_obj != NULL) { - lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu); + lfsck_object_put(env, lfsck->li_bookmark_obj); lfsck->li_bookmark_obj = NULL; } if (lfsck->li_lpf_obj != NULL) { - lu_object_put(env, &lfsck->li_lpf_obj->do_lu); + lfsck_object_put(env, lfsck->li_lpf_obj); lfsck->li_lpf_obj = NULL; } if (lfsck->li_lpf_root_obj != NULL) { - lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu); + lfsck_object_put(env, lfsck->li_lpf_root_obj); lfsck->li_lpf_root_obj = NULL; } @@ -1623,8 +1719,11 @@ int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[], int flag; int i; bool newline = (bits != 0 ? false : true); + int rc; - seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n'); + rc = seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n'); + if (rc < 0) + return rc; for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) { if (flag & bits) { @@ -1633,25 +1732,38 @@ int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[], if (bits == 0) newline = true; - seq_printf(m, "%s%c", names[i], - newline ? '\n' : ','); + rc = seq_printf(m, "%s%c", names[i], + newline ? '\n' : ','); + if (rc < 0) + return rc; } } } if (!newline) - seq_printf(m, "\n"); - return 0; + rc = seq_printf(m, "\n"); + + return rc; } -int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix) +int lfsck_time_dump(struct seq_file *m, __u64 time, const char *name) { - if (time != 0) - seq_printf(m, "%s: "LPU64" seconds\n", prefix, - cfs_time_current_sec() - time); - else - seq_printf(m, "%s: N/A\n", prefix); - return 0; + int rc; + + if (time == 0) { + rc = seq_printf(m, "%s_time: N/A\n", name); + if (rc == 0) + rc = seq_printf(m, "time_since_%s: N/A\n", name); + + return rc; + } + + rc = seq_printf(m, "%s_time: "LPU64"\n", name, time); + if (rc == 0) + rc = seq_printf(m, "time_since_%s: "LPU64" seconds\n", + name, cfs_time_current_sec() - time); + + return rc; } int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos, @@ -1659,17 +1771,15 @@ int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos, { if (fid_is_zero(&pos->lp_dir_parent)) { if (pos->lp_oit_cookie == 0) - seq_printf(m, "%s: N/A, N/A, N/A\n", - prefix); - else - seq_printf(m, "%s: "LPU64", N/A, N/A\n", - prefix, pos->lp_oit_cookie); - } else { - seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n", - prefix, pos->lp_oit_cookie, - PFID(&pos->lp_dir_parent), pos->lp_dir_cookie); + return seq_printf(m, "%s: N/A, N/A, N/A\n", prefix); + + return seq_printf(m, "%s: "LPU64", N/A, N/A\n", + prefix, pos->lp_oit_cookie); } - return 0; + + return seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n", + prefix, pos->lp_oit_cookie, + PFID(&pos->lp_dir_parent), pos->lp_dir_cookie); } void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck, @@ -1834,6 +1944,56 @@ lfsck_assistant_data_init(struct lfsck_assistant_operations *lao, return lad; } +struct lfsck_assistant_object * +lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid, + const struct lu_attr *attr, __u64 cookie, + bool is_dir) +{ + struct lfsck_assistant_object *lso; + + OBD_ALLOC_PTR(lso); + if (lso == NULL) + return ERR_PTR(-ENOMEM); + + lso->lso_fid = *fid; + if (attr != NULL) + lso->lso_attr = *attr; + + atomic_set(&lso->lso_ref, 1); + lso->lso_oit_cookie = cookie; + if (is_dir) + lso->lso_is_dir = 1; + + return lso; +} + +struct dt_object * +lfsck_assistant_object_load(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct lfsck_assistant_object *lso) +{ + struct dt_object *obj; + + obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid); + if (IS_ERR(obj)) + return obj; + + if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) { + lso->lso_dead = 1; + lfsck_object_put(env, obj); + + return ERR_PTR(-ENOENT); + } + + if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) { + lfsck_object_put(env, obj); + + return ERR_PTR(-ENOTDIR); + } + + return obj; +} + /** * Generic LFSCK asynchronous communication interpretor function. * The LFSCK RPC reply for both the event notification and status @@ -2120,7 +2280,7 @@ static int lfsck_stop_notify(const struct lu_env *env, spin_unlock(<ds->ltd_lock); memset(lr, 0, sizeof(*lr)); - lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); + lr->lr_index = lfsck_dev_idx(lfsck); lr->lr_event = LE_PEER_EXIT; lr->lr_active = type; lr->lr_status = LS_CO_PAUSED; @@ -2240,6 +2400,7 @@ int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com, lad->lad_to_double_scan = 0; lad->lad_in_double_scan = 0; lad->lad_exit = 0; + lad->lad_advance_lock = false; thread_set_flags(athread, 0); lta = lfsck_thread_args_init(lfsck, com, lsp); @@ -2276,9 +2437,6 @@ int lfsck_checkpoint_generic(const struct lu_env *env, struct ptlrpc_thread *athread = &lad->lad_thread; struct l_wait_info lwi = { 0 }; - if (com->lc_new_checked == 0) - return LFSCK_CHECKPOINT_SKIP; - l_wait_event(mthread->t_ctl_waitq, list_empty(&lad->lad_req_list) || !thread_is_running(mthread) || @@ -2448,12 +2606,10 @@ int lfsck_set_windows(struct dt_device *key, int val) lfsck = lfsck_instance_find(key, true, false); if (likely(lfsck != NULL)) { - if (val > LFSCK_ASYNC_WIN_MAX) { - CWARN("%s: Too large async window size, which " - "may cause memory issues. The valid range " - "is [0 - %u]. If you do not want to restrict " - "the window size for async requests pipeline, " - "just set it as 0.\n", + if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) { + CWARN("%s: invalid async windows size that may " + "cause memory issues. The valid range is " + "[1 - %u].\n", lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX); rc = -EINVAL; } else if (lfsck->li_bookmark_ram.lb_async_windows != val) { @@ -2530,7 +2686,7 @@ static int lfsck_stop_all(const struct lu_env *env, memset(lr, 0, sizeof(*lr)); lr->lr_event = LE_STOP; - lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); + lr->lr_index = lfsck_dev_idx(lfsck); lr->lr_status = stop->ls_status; lr->lr_version = bk->lb_version; lr->lr_active = LFSCK_TYPES_ALL; @@ -2600,7 +2756,7 @@ static int lfsck_start_all(const struct lu_env *env, memset(lr, 0, sizeof(*lr)); lr->lr_event = LE_START; - lr->lr_index = lfsck_dev_idx(lfsck->li_bottom); + lr->lr_index = lfsck_dev_idx(lfsck); lr->lr_speed = bk->lb_speed_limit; lr->lr_version = bk->lb_version; lr->lr_active = start->ls_active; @@ -2624,6 +2780,7 @@ static int lfsck_start_all(const struct lu_env *env, laia->laia_ltd = ltd; ltd->ltd_layout_done = 0; ltd->ltd_namespace_done = 0; + ltd->ltd_synced_failures = 0; rc = lfsck_async_request(env, ltd->ltd_exp, lr, set, lfsck_async_interpret, laia, LFSCK_NOTIFY); @@ -3143,13 +3300,13 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, lfsck->li_local_root_fid = *fid; if (master) { lfsck->li_master = 1; - if (lfsck_dev_idx(key) == 0) { + if (lfsck_dev_idx(lfsck) == 0) { struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2; const struct lu_name *cname; rc = dt_lookup(env, root, (struct dt_rec *)(&lfsck->li_global_root_fid), - (const struct dt_key *)"ROOT", BYPASS_CAPA); + (const struct dt_key *)"ROOT"); if (rc != 0) GOTO(out, rc); @@ -3161,18 +3318,18 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, GOTO(out, rc = -ENOTDIR); rc = dt_lookup(env, obj, (struct dt_rec *)fid, - (const struct dt_key *)dotlustre, BYPASS_CAPA); + (const struct dt_key *)dotlustre); if (rc != 0) GOTO(out, rc); - lu_object_put(env, &obj->do_lu); + lfsck_object_put(env, obj); obj = dt_locate(env, key, fid); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); cname = lfsck_name_get_const(env, dotlustre, strlen(dotlustre)); - rc = lfsck_verify_linkea(env, key, obj, cname, + rc = lfsck_verify_linkea(env, obj, cname, &lfsck->li_global_root_fid); if (rc != 0) GOTO(out, rc); @@ -3182,23 +3339,22 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, *pfid = *fid; rc = dt_lookup(env, obj, (struct dt_rec *)fid, - (const struct dt_key *)lostfound, - BYPASS_CAPA); + (const struct dt_key *)lostfound); if (rc != 0) GOTO(out, rc); - lu_object_put(env, &obj->do_lu); + lfsck_object_put(env, obj); obj = dt_locate(env, key, fid); if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); cname = lfsck_name_get_const(env, lostfound, strlen(lostfound)); - rc = lfsck_verify_linkea(env, key, obj, cname, pfid); + rc = lfsck_verify_linkea(env, obj, cname, pfid); if (rc != 0) GOTO(out, rc); - lu_object_put(env, &obj->do_lu); + lfsck_object_put(env, obj); obj = NULL; } } @@ -3210,12 +3366,18 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); - lu_object_get(&obj->do_lu); - lfsck->li_obj_oit = obj; rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features); if (rc != 0) GOTO(out, rc); + lfsck->li_obj_oit = obj; + obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR, + S_IFDIR | S_IRUGO | S_IWUSR); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + + lu_object_get(&obj->do_lu); + lfsck->li_lfsck_dir = obj; rc = lfsck_bookmark_setup(env, lfsck); if (rc != 0) GOTO(out, rc); @@ -3241,9 +3403,9 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, rc = lfsck_add_target_from_orphan(env, lfsck); out: if (obj != NULL && !IS_ERR(obj)) - lu_object_put(env, &obj->do_lu); + lfsck_object_put(env, obj); if (root != NULL && !IS_ERR(root)) - lu_object_put(env, &root->do_lu); + lfsck_object_put(env, root); if (rc != 0) lfsck_instance_cleanup(env, lfsck); return rc; @@ -3350,7 +3512,7 @@ void lfsck_del_target(const struct lu_env *env, struct dt_device *key, if (unlikely(index >= ltds->ltd_tgts_bitmap->size)) goto unlock; - ltd = LTD_TGT(ltds, index); + ltd = lfsck_ltd2tgt(ltds, index); if (unlikely(ltd == NULL)) goto unlock; @@ -3358,7 +3520,7 @@ void lfsck_del_target(const struct lu_env *env, struct dt_device *key, ltds->ltd_tgtnr--; cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index); - LTD_TGT(ltds, index) = NULL; + lfsck_assign_tgt(ltds, NULL, index); unlock: if (ltd == NULL) { @@ -3430,6 +3592,8 @@ static void __exit lfsck_exit(void) MODULE_AUTHOR("Intel Corporation "); MODULE_DESCRIPTION("LFSCK"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit); +module_init(lfsck_init); +module_exit(lfsck_exit);