X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_orphans.c;h=e58c4d6a67965b15b5cd5af38445886eb41432ef;hb=83ddd179225821e5c2aee1adb72dab26150ab385;hp=4f6c32c6e9b94001db4c88e3f7f524c2afd1c86b;hpb=473a4adc4ec2bea393c5b438b38b86f2ada19288;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 4f6c32c..e58c4d6 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lustre/mdd/mdd_orphans.c * @@ -45,7 +44,7 @@ #include #include "mdd_internal.h" -static const char mdd_orphan_index_name[] = "PENDING"; +static const char mdd_orphan_index_name[] = MDT_ORPHAN_DIR; static const char dotdot[] = ".."; enum { @@ -61,8 +60,9 @@ static struct dt_key *mdd_orphan_key_fill(const struct lu_env *env, char *key = mdd_env_info(env)->mti_key; LASSERT(key); - snprintf(key, sizeof(mdd_env_info(env)->mti_key), - DFID_NOBRACE, PFID(lf)); + if (!(MTI_KEEP_KEY & mdd_env_info(env)->mti_flags)) + snprintf(key, sizeof(mdd_env_info(env)->mti_key), + DFID_NOBRACE, PFID(lf)); return (struct dt_key *)key; } @@ -74,9 +74,11 @@ static struct dt_key *mdd_orphan_key_fill_20(const struct lu_env *env, char *key = mdd_env_info(env)->mti_key; LASSERT(key); - snprintf(key, sizeof(mdd_env_info(env)->mti_key), - ORPHAN_FILE_NAME_FORMAT_20, - fid_seq(lf), fid_oid(lf), fid_ver(lf), ORPH_OP_UNLINK); + if (!(MTI_KEEP_KEY & mdd_env_info(env)->mti_flags)) + snprintf(key, sizeof(mdd_env_info(env)->mti_key), + ORPHAN_FILE_NAME_FORMAT_20, + fid_seq(lf), fid_oid(lf), fid_ver(lf), + ORPH_OP_UNLINK); return (struct dt_key *)key; } @@ -88,13 +90,13 @@ static inline int mdd_orphan_insert_obj(const struct lu_env *env, { struct dt_insert_rec *rec = &mdd_env_info(env)->mti_dt_rec; struct dt_object *dor = mdd->mdd_orphans; - const struct lu_fid *lf = mdo2fid(obj); + const struct lu_fid *lf = mdd_object_fid(obj); struct dt_key *key = mdd_orphan_key_fill(env, lf); rec->rec_fid = lf; rec->rec_type = mdd_object_type(obj); - return dt_insert(env, dor, (const struct dt_rec *)rec, key, th, 1); + return dt_insert(env, dor, (const struct dt_rec *)rec, key, th); } int mdd_orphan_declare_insert(const struct lu_env *env, struct mdd_object *obj, @@ -105,9 +107,9 @@ int mdd_orphan_declare_insert(const struct lu_env *env, struct mdd_object *obj, struct dt_key *key; int rc; - key = mdd_orphan_key_fill(env, mdo2fid(obj)); + key = mdd_orphan_key_fill(env, mdd_object_fid(obj)); - rec->rec_fid = mdo2fid(obj); + rec->rec_fid = mdd_object_fid(obj); rec->rec_type = mode; rc = dt_declare_insert(env, mdd->mdd_orphans, (const struct dt_rec *)rec, key, th); @@ -164,7 +166,7 @@ int mdd_orphan_insert(const struct lu_env *env, struct mdd_object *obj, LASSERT(mdd_write_locked(env, obj) != 0); LASSERT(!(obj->mod_flags & ORPHAN_OBJ)); - dt_write_lock(env, mdd->mdd_orphans, MOR_TGT_ORPHAN); + dt_write_lock(env, mdd->mdd_orphans, DT_TGT_ORPHAN); rc = mdd_orphan_insert_obj(env, mdd, obj, th); if (rc) @@ -186,7 +188,7 @@ int mdd_orphan_insert(const struct lu_env *env, struct mdd_object *obj, rec->rec_fid = lf_dor; rec->rec_type = S_IFDIR; dt_insert(env, next, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 1); + (const struct dt_key *)dotdot, th); out: if (rc == 0) @@ -204,12 +206,15 @@ int mdd_orphan_declare_delete(const struct lu_env *env, struct mdd_object *obj, struct dt_key *key; int rc; - key = mdd_orphan_key_fill(env, mdo2fid(obj)); + key = mdd_orphan_key_fill(env, mdd_object_fid(obj)); rc = dt_declare_delete(env, mdd->mdd_orphans, key, th); if (rc) return rc; + if (!mdd_object_exists(obj)) + return -ENOENT; + rc = mdo_declare_ref_del(env, obj, th); if (rc) return rc; @@ -241,7 +246,7 @@ int mdd_orphan_delete(const struct lu_env *env, struct mdd_object *obj, struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); struct dt_object *dor = mdd->mdd_orphans; struct dt_key *key; - int rc; + int rc = 0; ENTRY; @@ -251,15 +256,19 @@ int mdd_orphan_delete(const struct lu_env *env, struct mdd_object *obj, LASSERT(dor); - key = mdd_orphan_key_fill(env, mdo2fid(obj)); - dt_write_lock(env, mdd->mdd_orphans, MOR_TGT_ORPHAN); + key = mdd_orphan_key_fill(env, mdd_object_fid(obj)); + dt_write_lock(env, mdd->mdd_orphans, DT_TGT_ORPHAN); + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ORPHAN_DELETE)) + goto ref_del; rc = dt_delete(env, mdd->mdd_orphans, key, th); if (rc == -ENOENT) { - key = mdd_orphan_key_fill_20(env, mdo2fid(obj)); + key = mdd_orphan_key_fill_20(env, mdd_object_fid(obj)); rc = dt_delete(env, mdd->mdd_orphans, key, th); } +ref_del: if (!rc) { /* lov objects will be destroyed by caller */ mdo_ref_del(env, obj, th); @@ -270,7 +279,7 @@ int mdd_orphan_delete(const struct lu_env *env, struct mdd_object *obj, obj->mod_flags &= ~ORPHAN_OBJ; } else { CERROR("%s: could not delete orphan object "DFID": rc = %d\n", - mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(obj)), rc); + mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)), rc); } dt_write_unlock(env, mdd->mdd_orphans); @@ -283,7 +292,8 @@ static int mdd_orphan_destroy(const struct lu_env *env, struct mdd_object *obj, { struct thandle *th = NULL; struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - int rc = 0; + bool orphan_exists = true; + int rc = 0, rc1 = 0; ENTRY; th = mdd_trans_create(env, mdd); @@ -295,42 +305,42 @@ static int mdd_orphan_destroy(const struct lu_env *env, struct mdd_object *obj, RETURN(rc); } + mdd_write_lock(env, obj, DT_TGT_CHILD); rc = mdd_orphan_declare_delete(env, obj, th); - if (rc) - GOTO(stop, rc); - - rc = mdo_declare_destroy(env, obj, th); - if (rc) - GOTO(stop, rc); + if (rc == -ENOENT || lu_object_is_dying(obj->mod_obj.mo_lu.lo_header)) + orphan_exists = false; + else if (rc) + GOTO(unlock, rc); + + if (orphan_exists) { + rc = mdo_declare_destroy(env, obj, th); + if (rc) + GOTO(unlock, rc); + } rc = mdd_trans_start(env, mdd, th); if (rc) - GOTO(stop, rc); + GOTO(unlock, rc); - mdd_write_lock(env, obj, MOR_TGT_CHILD); if (likely(obj->mod_count == 0)) { - dt_write_lock(env, mdd->mdd_orphans, MOR_TGT_ORPHAN); + dt_write_lock(env, mdd->mdd_orphans, DT_TGT_ORPHAN); rc = dt_delete(env, mdd->mdd_orphans, key, th); - if (rc == 0) { + /* We should remove object even dt_delete failed */ + if (orphan_exists) { mdo_ref_del(env, obj, th); if (S_ISDIR(mdd_object_type(obj))) { mdo_ref_del(env, obj, th); dt_ref_del(env, mdd->mdd_orphans, th); } - rc = mdo_destroy(env, obj, th); - } else { - CERROR("%s: could not delete orphan "DFID": rc = %d\n", - mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(obj)), - rc); + rc1 = mdo_destroy(env, obj, th); } dt_write_unlock(env, mdd->mdd_orphans); } +unlock: mdd_write_unlock(env, obj); + mdd_trans_stop(env, mdd, 0, th); -stop: - rc = mdd_trans_stop(env, mdd, 0, th); - - RETURN(rc); + RETURN(rc ? rc : rc1); } /** @@ -359,11 +369,11 @@ static int mdd_orphan_key_test_and_delete(const struct lu_env *env, if (mdo->mod_count == 0) { CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf)); rc = mdd_orphan_destroy(env, mdo, key); - if (rc) /* so replay-single.sh test_37 works */ + if (rc) /* below message checked in replay-single.sh test_37 */ CERROR("%s: error unlinking orphan "DFID": rc = %d\n", mdd2obd_dev(mdd)->obd_name, PFID(lf), rc); } else { - mdd_write_lock(env, mdo, MOR_TGT_CHILD); + mdd_write_lock(env, mdo, DT_TGT_CHILD); if (likely(mdo->mod_count > 0)) { CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n", PFID(lf), mdo->mod_count); @@ -399,7 +409,6 @@ static int mdd_orphan_index_iterate(const struct lu_env *env, struct lu_fid fid; int key_sz = 0; int rc; - __u64 cookie; ENTRY; iops = &dor->do_index_ops->dio_it; @@ -421,6 +430,7 @@ static int mdd_orphan_index_iterate(const struct lu_env *env, GOTO(out_put, rc = -EIO); } + mdd_env_info(env)->mti_flags |= MTI_KEEP_KEY; do { if (thread->mgt_abort) break; @@ -446,16 +456,12 @@ static int mdd_orphan_index_iterate(const struct lu_env *env, } /* kill orphan object */ - cookie = iops->store(env, it); iops->put(env, it); rc = mdd_orphan_key_test_and_delete(env, mdd, &fid, (struct dt_key *)ent->lde_name); - /* after index delete reset iterator */ if (rc == 0) rc = iops->get(env, it, (const void *)""); - else - rc = iops->load(env, it, cookie); next: rc = iops->next(env, it); } while (rc == 0);