X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flfsck%2Flfsck_lib.c;h=6ea74b8764e83fc2649aa4bb66f99534fa67fb3c;hp=de7b849644b40398f39093ccbae7c4e62311a68b;hb=76f0977b7ea5d46836cb459deb7b9ad9e781d585;hpb=32a11dbd310b44d2a6e084c48310f21ef6649145 diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c index de7b849..6ea74b8 100644 --- a/lustre/lfsck/lfsck_lib.c +++ b/lustre/lfsck/lfsck_lib.c @@ -322,6 +322,468 @@ void lfsck_component_cleanup(const struct lu_env *env, lfsck_component_put(env, com); } +int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck, + struct lu_fid *fid, bool locked) +{ + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + int rc = 0; + ENTRY; + + if (!locked) + mutex_lock(&lfsck->li_mutex); + + rc = seq_client_alloc_fid(env, lfsck->li_seq, fid); + if (rc >= 0) { + bk->lb_last_fid = *fid; + /* We do not care about whether the subsequent sub-operations + * failed or not. The worst case is that one FID is lost that + * is not a big issue for the LFSCK since it is relative rare + * for LFSCK create. */ + rc = lfsck_bookmark_store(env, lfsck); + } + + if (!locked) + mutex_unlock(&lfsck->li_mutex); + + RETURN(rc); +} + +static const char dot[] = "."; +static const char dotdot[] = ".."; + +static int lfsck_create_lpf_local(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *parent, + struct dt_object *child, + struct lu_attr *la, + struct dt_object_format *dof, + const char *name) +{ + struct dt_device *dev = lfsck->li_bottom; + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct dt_object *bk_obj = lfsck->li_bookmark_obj; + const struct lu_fid *cfid = lu_object_fid(&child->do_lu); + struct thandle *th = NULL; + loff_t pos = 0; + int len = sizeof(struct lfsck_bookmark); + int rc = 0; + ENTRY; + + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + /* 1a. create child */ + rc = dt_declare_create(env, child, la, NULL, dof, th); + if (rc != 0) + GOTO(stop, rc); + + /* 2a. increase child nlink */ + rc = dt_declare_ref_add(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + /* 3a. insert name into parent dir */ + rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid, + (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + /* 4a. increase parent nlink */ + rc = dt_declare_ref_add(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + + /* 5a. update bookmark */ + rc = dt_declare_record_write(env, bk_obj, len, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + /* 1b.1 create child */ + rc = dt_create(env, child, la, NULL, dof, th); + if (rc != 0) + GOTO(unlock, rc); + + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(unlock, rc = -ENOTDIR); + + /* 1b.2 insert dot into child dir */ + rc = dt_insert(env, child, (const struct dt_rec *)cfid, + (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock, rc); + + /* 1b.3 insert dotdot into child dir */ + rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID, + (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock, rc); + + /* 2b. increase child nlink */ + rc = dt_ref_add(env, child, th); + dt_write_unlock(env, child); + if (rc != 0) + GOTO(stop, rc); + + /* 3b. insert name into parent dir */ + rc = dt_insert(env, parent, (const struct dt_rec *)cfid, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, parent, 0); + /* 4b. increase parent nlink */ + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + if (rc != 0) + GOTO(stop, rc); + + bk->lb_lpf_fid = *cfid; + lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk); + + /* 5b. update bookmark */ + rc = dt_record_write(env, bk_obj, + lfsck_buf_get(env, bk, len), &pos, th); + + GOTO(stop, rc); + +unlock: + dt_write_unlock(env, child); + +stop: + dt_trans_stop(env, dev, th); + + return rc; +} + +static int lfsck_create_lpf_remote(const struct lu_env *env, + struct lfsck_instance *lfsck, + struct dt_object *parent, + struct dt_object *child, + struct lu_attr *la, + struct dt_object_format *dof, + const char *name) +{ + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct dt_object *bk_obj = lfsck->li_bookmark_obj; + const struct lu_fid *cfid = lu_object_fid(&child->do_lu); + struct thandle *th = NULL; + struct dt_device *dev; + loff_t pos = 0; + int len = sizeof(struct lfsck_bookmark); + int rc = 0; + ENTRY; + + /* Create .lustre/lost+found/MDTxxxx. */ + + /* XXX: Currently, cross-MDT create operation needs to create the child + * object firstly, then insert name into the parent directory. For + * this case, the child object resides on current MDT (local), but + * the parent ".lustre/lost+found" may be on remote MDT. It is not + * easy to contain all the sub-modifications orderly within single + * transaction. + * + * To avoid more inconsistency, we split the create operation into + * two transactions: + * + * 1) create the child locally. + * 2) insert the name "MDTXXXX" in the parent ".lustre/lost+found" + * remotely and update the lfsck_bookmark::lb_lpf_fid locally. + * + * If 1) done but 2) failed, then the worst case is that we lose + * one object locally, which is not a big issue. (can be repaird + * by LFSCK phase III) */ + + /* Transaction I: */ + + dev = lfsck->li_bottom; + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + /* 1a. create child locally. */ + rc = dt_declare_create(env, child, la, NULL, dof, th); + if (rc != 0) + GOTO(stop, rc); + + /* 2a. increase child nlink locally. */ + rc = dt_declare_ref_add(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + dt_write_lock(env, child, 0); + /* 1b. create child locally. */ + rc = dt_create(env, child, la, NULL, dof, th); + if (rc != 0) + GOTO(unlock, rc); + + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(unlock, rc = -ENOTDIR); + + /* 2b.1 insert dot into child dir locally. */ + rc = dt_insert(env, child, (const struct dt_rec *)cfid, + (const struct dt_key *)dot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock, rc); + + /* 2b.2 insert dotdot into child dir locally. */ + rc = dt_insert(env, child, (const struct dt_rec *)&LU_LPF_FID, + (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1); + if (rc != 0) + GOTO(unlock, rc); + + /* 2b.3 increase child nlink locally. */ + rc = dt_ref_add(env, child, th); + dt_write_unlock(env, child); + dt_trans_stop(env, dev, th); + if (rc != 0) + RETURN(rc); + + /* Transaction II: */ + + dev = lfsck->li_next; + th = dt_trans_create(env, dev); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + /* 3a. insert name into parent dir remotely. */ + rc = dt_declare_insert(env, parent, (const struct dt_rec *)cfid, + (const struct dt_key *)name, th); + if (rc != 0) + GOTO(stop, rc); + + /* 4a. increase parent nlink remotely. */ + rc = dt_declare_ref_add(env, parent, th); + if (rc != 0) + GOTO(stop, rc); + + /* 5a. decrease child nlink for dotdot locally if former remote + * update failed. */ + rc = dt_declare_ref_del(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + /* 6a. decrease child nlink for dot locally if former remote + * update failed. */ + rc = dt_declare_ref_del(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + /* 7a. destroy child locally if former remote update failed. */ + rc = dt_declare_destroy(env, child, th); + if (rc != 0) + GOTO(stop, rc); + + /* 8a. update bookmark locally. */ + rc = dt_declare_record_write(env, bk_obj, len, 0, th); + if (rc != 0) + GOTO(stop, rc); + + rc = dt_trans_start(env, dev, th); + if (rc != 0) + GOTO(stop, rc); + + /* 3b. insert name into parent dir remotely. */ + rc = dt_insert(env, parent, (const struct dt_rec *)cfid, + (const struct dt_key *)name, th, BYPASS_CAPA, 1); + if (rc == 0) { + dt_write_lock(env, parent, 0); + /* 4b. increase parent nlink remotely. */ + rc = dt_ref_add(env, parent, th); + dt_write_unlock(env, parent); + } + if (rc != 0) { + /* 5b. decrease child nlink for dotdot locally. */ + dt_ref_del(env, child, th); + /* 6b. decrease child nlink for dot locally. */ + dt_ref_del(env, child, th); + /* 7b. destroy child locally. */ + dt_destroy(env, child, th); + GOTO(stop, rc); + } + + bk->lb_lpf_fid = *cfid; + lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk); + + /* 8b. update bookmark locally. */ + rc = dt_record_write(env, bk_obj, + lfsck_buf_get(env, bk, len), &pos, th); + + GOTO(stop, rc); + +unlock: + dt_write_unlock(env, child); +stop: + dt_trans_stop(env, dev, th); + + return rc; +} + +/* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance, + * because the MDT0 maybe not reaady for sequence allocation yet. We do that + * only when it is required, such as orphan OST-objects repairing. */ +int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck) +{ + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct lfsck_thread_info *info = lfsck_env_info(env); + struct lu_fid *cfid = &info->lti_fid2; + struct lu_attr *la = &info->lti_la; + struct dt_object_format *dof = &info->lti_dof; + struct dt_object *parent = NULL; + struct dt_object *child = NULL; + char name[8]; + int node = lfsck_dev_idx(lfsck->li_bottom); + int rc = 0; + ENTRY; + + LASSERT(lfsck->li_master); + + sprintf(name, "MDT%04x", node); + if (node == 0) { + parent = lfsck_object_find_by_dev(env, lfsck->li_bottom, + &LU_LPF_FID); + } else { + struct lfsck_tgt_desc *ltd; + + ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0); + if (unlikely(ltd == NULL)) + RETURN(-ENODEV); + + parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt, + &LU_LPF_FID); + lfsck_tgt_put(ltd); + } + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + if (unlikely(!dt_try_as_dir(env, parent))) + GOTO(out, rc = -ENOTDIR); + + mutex_lock(&lfsck->li_mutex); + if (lfsck->li_lpf_obj != NULL) + GOTO(unlock, rc = 0); + + if (fid_is_zero(&bk->lb_lpf_fid)) { + /* There is corner case that: in former LFSCK scanning we have + * created the .lustre/lost+found/MDTxxxx but failed to update + * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup + * it from MDT0 firstly. */ + rc = dt_lookup(env, parent, (struct dt_rec *)cfid, + (const struct dt_key *)name, BYPASS_CAPA); + if (rc != 0 && rc != -ENOENT) + GOTO(unlock, rc); + + if (rc == 0) { + bk->lb_lpf_fid = *cfid; + rc = lfsck_bookmark_store(env, lfsck); + } else { + rc = lfsck_fid_alloc(env, lfsck, cfid, true); + } + if (rc != 0) + GOTO(unlock, rc); + } else { + *cfid = bk->lb_lpf_fid; + } + + child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid); + if (IS_ERR(child)) + GOTO(unlock, rc = PTR_ERR(child)); + + if (dt_object_exists(child) != 0) { + if (unlikely(!dt_try_as_dir(env, child))) + GOTO(unlock, rc = -ENOTDIR); + + lfsck->li_lpf_obj = child; + GOTO(unlock, rc = 0); + } + + memset(la, 0, sizeof(*la)); + la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec(); + la->la_mode = S_IFDIR | S_IRWXU; + la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | + LA_UID | LA_GID; + memset(dof, 0, sizeof(*dof)); + dof->dof_type = dt_mode_to_dft(S_IFDIR); + + if (node == 0) + rc = lfsck_create_lpf_local(env, lfsck, parent, child, la, + dof, name); + else + rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la, + dof, name); + if (rc == 0) + lfsck->li_lpf_obj = child; + + GOTO(unlock, rc); + +unlock: + mutex_unlock(&lfsck->li_mutex); + if (rc != 0 && child != NULL && !IS_ERR(child)) + lu_object_put(env, &child->do_lu); +out: + if (parent != NULL && !IS_ERR(parent)) + lu_object_put(env, &parent->do_lu); + + return rc; +} + +static int lfsck_fid_init(struct lfsck_instance *lfsck) +{ + struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram; + struct seq_server_site *ss; + char *prefix; + int rc = 0; + ENTRY; + + ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site); + if (unlikely(ss == NULL)) + RETURN(-ENODEV); + + OBD_ALLOC_PTR(lfsck->li_seq); + if (lfsck->li_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 7); + if (prefix == NULL) + GOTO(out, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck)); + rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix, + ss->ss_server_seq); + OBD_FREE(prefix, MAX_OBD_NAME + 7); + if (rc != 0) + GOTO(out, rc); + + if (fid_is_sane(&bk->lb_last_fid)) + lfsck->li_seq->lcs_fid = bk->lb_last_fid; + + RETURN(0); + +out: + OBD_FREE_PTR(lfsck->li_seq); + lfsck->li_seq = NULL; + + return rc; +} + +static void lfsck_fid_fini(struct lfsck_instance *lfsck) +{ + if (lfsck->li_seq != NULL) { + seq_client_fini(lfsck->li_seq); + OBD_FREE_PTR(lfsck->li_seq); + lfsck->li_seq = NULL; + } +} + void lfsck_instance_cleanup(const struct lu_env *env, struct lfsck_instance *lfsck) { @@ -370,11 +832,18 @@ void lfsck_instance_cleanup(const struct lu_env *env, lfsck->li_bookmark_obj = NULL; } + if (lfsck->li_lpf_obj != NULL) { + lu_object_put(env, &lfsck->li_lpf_obj->do_lu); + lfsck->li_lpf_obj = NULL; + } + if (lfsck->li_los != NULL) { local_oid_storage_fini(env, lfsck->li_los); lfsck->li_los = NULL; } + lfsck_fid_fini(lfsck); + OBD_FREE_PTR(lfsck); } @@ -2031,6 +2500,10 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key, GOTO(out, rc); if (master) { + rc = lfsck_fid_init(lfsck); + if (rc < 0) + GOTO(out, rc); + rc = lfsck_namespace_setup(env, lfsck); if (rc < 0) GOTO(out, rc);