X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=d11ffea5956eff3b091d884bab5b51658d45c9b0;hb=90dfed8469d46f4b79ebdff8121eb109b5db6746;hp=d3edabc85beaed0e7c2be951ed952dbe365bd02d;hpb=04452dfa7a13b22de0f16b51bf93a1199b62a0ae;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index d3edabc..d11ffea 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -46,14 +46,21 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include #include #include - +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include @@ -196,9 +203,8 @@ out: * * returns < 0: if error */ -static int mdd_is_subdir(const struct lu_env *env, - struct md_object *mo, const struct lu_fid *fid, - struct lu_fid *sfid) +int mdd_is_subdir(const struct lu_env *env, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) { struct mdd_device *mdd = mdo2mdd(mo); int rc; @@ -391,7 +397,13 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, if (!mdd_object_exists(cobj)) RETURN(-ENOENT); + if (mdd_is_dead_obj(cobj)) + RETURN(-ESTALE); + if (pobj) { + if (!mdd_object_exists(pobj)) + RETURN(-ENOENT); + if (mdd_is_dead_obj(pobj)) RETURN(-ENOENT); @@ -448,6 +460,12 @@ int mdd_link_sanity_check(const struct lu_env *env, int rc = 0; ENTRY; + if (!mdd_object_exists(src_obj)) + RETURN(-ENOENT); + + if (mdd_is_dead_obj(src_obj)) + RETURN(-ESTALE); + /* Local ops, no lookup before link, check filename length here. */ if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)) RETURN(-ENAMETOOLONG); @@ -507,10 +525,29 @@ void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, mdo_ref_del(env, obj, handle); } -/* insert named index, add reference if isdir */ -static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, int is_dir, - struct thandle *handle, struct lustre_capa *capa) +static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj, + const char *name, struct thandle *handle, + struct lustre_capa *capa) +{ + struct dt_object *next = mdd_object_child(pobj); + int rc; + ENTRY; + + if (dt_try_as_dir(env, next)) { + rc = next->do_index_ops->dio_delete(env, next, + (struct dt_key *)name, + handle, capa); + } else + rc = -ENOTDIR; + + RETURN(rc); +} + +static int __mdd_index_insert_only(const struct lu_env *env, + struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, + struct thandle *handle, + struct lustre_capa *capa) { struct dt_object *next = mdd_object_child(pobj); int rc; @@ -520,75 +557,56 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, struct md_ucred *uc = md_ucred(env); rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), + (struct dt_rec*)lf, (const struct dt_key *)name, handle, capa, uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } - - if (rc == 0) { - if (is_dir) { - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_add(env, pobj, handle); - mdd_write_unlock(env, pobj); - } - } RETURN(rc); } -/* delete named index, drop reference if isdir */ -static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, - const char *name, int is_dir, struct thandle *handle, - struct lustre_capa *capa) +/* insert named index, add reference if isdir */ +static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, int is_dir, + struct thandle *handle, struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(pobj); int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - rc = next->do_index_ops->dio_delete(env, next, - (struct dt_key *)name, - handle, capa); - if (rc == 0 && is_dir) { - int is_dot = 0; - - if (name != NULL && name[0] == '.' && name[1] == 0) - is_dot = 1; - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_del(env, pobj, handle, is_dot); - mdd_write_unlock(env, pobj); - } - } else - rc = -ENOTDIR; - + rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa); + if (rc == 0 && is_dir) { + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_add(env, pobj, handle); + mdd_write_unlock(env, pobj); + } RETURN(rc); } -static int -__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, - struct thandle *handle, struct lustre_capa *capa) +/* delete named index, drop reference if isdir */ +static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, + const char *name, int is_dir, struct thandle *handle, + struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(pobj); int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - struct md_ucred *uc = md_ucred(env); + rc = __mdd_index_delete_only(env, pobj, name, handle, capa); + if (rc == 0 && is_dir) { + int is_dot = 0; - rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), - (const struct dt_key *)name, - handle, capa, uc->mu_cap & - CFS_CAP_SYS_RESOURCE_MASK); - } else { - rc = -ENOTDIR; + if (name != NULL && name[0] == '.' && name[1] == 0) + is_dot = 1; + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_del(env, pobj, handle, is_dot); + mdd_write_unlock(env, pobj); } + RETURN(rc); } + /** Store a namespace change changelog record * If this fails, we must fail the whole transaction; we don't * want the change to commit without the log entry. @@ -629,13 +647,13 @@ static int mdd_changelog_ns_store(const struct lu_env *env, RETURN(-ENOMEM); rec = (struct llog_changelog_rec *)buf->lb_buf; - rec->cr_flags = CLF_VERSION; - rec->cr_type = (__u32)type; + rec->cr.cr_flags = CLF_VERSION; + rec->cr.cr_type = (__u32)type; tfid = tf ? tf : mdo2fid(target); - rec->cr_tfid = *tfid; - rec->cr_pfid = *tpfid; - rec->cr_namelen = tname->ln_namelen; - memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen); + rec->cr.cr_tfid = *tfid; + rec->cr.cr_pfid = *tpfid; + rec->cr.cr_namelen = tname->ln_namelen; + memcpy(rec->cr.cr_name, tname->ln_name, rec->cr.cr_namelen); if (likely(target)) target->mod_cltime = cfs_time_current_64(); @@ -662,9 +680,10 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; @@ -675,13 +694,14 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tobj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ - lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK, - NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif @@ -733,10 +753,8 @@ out_trans: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - rec_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, + qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, @@ -755,18 +773,27 @@ int mdd_finish_unlink(const struct lu_env *env, int reset = 1; ENTRY; + LASSERT(mdd_write_locked(env, obj) != 0); + rc = mdd_iattr_get(env, obj, ma); if (rc == 0 && ma->ma_attr.la_nlink == 0) { + obj->mod_flags |= DEAD_OBJ; /* add new orphan and the object * will be deleted during mdd_close() */ if (obj->mod_count) { rc = __mdd_orphan_add(env, obj, th); if (rc == 0) - obj->mod_flags |= ORPHAN_OBJ; - } - - obj->mod_flags |= DEAD_OBJ; - if (!(obj->mod_flags & ORPHAN_OBJ)) { + CDEBUG(D_HA, "Object "DFID" is inserted into " + "orphan list, open count = %d\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + else + CERROR("Object "DFID" fail to be an orphan, " + "open count = %d, maybe cause failed " + "open replay\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + } else { rc = mdd_object_kill(env, obj, ma); if (rc == 0) reset = 0; @@ -877,11 +904,6 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, } } #endif - - if (rc == 0) - obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, - sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, - NULL, NULL); if (!is_dir) /* old files may not have link ea; ignore errors */ mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), @@ -945,9 +967,10 @@ static int mdd_name_insert(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT struct md_ucred *uc = md_ucred(env); struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; cfs_cap_t save = uc->mu_cap; #endif int rc; @@ -960,13 +983,14 @@ static int mdd_name_insert(const struct lu_env *env, rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - 1, &rec_pending, NULL, - LQUOTA_FLAGS_BLK, NULL, 0); + exp, qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } else { uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK; @@ -1012,11 +1036,8 @@ out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, - obd, qids[USRQUOTA], - qids[GRPQUOTA], - rec_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, + obd, qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it*/ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, @@ -1149,7 +1170,7 @@ static int mdd_rt_sanity_check(const struct lu_env *env, * processed in cmr_rename_tgt before mdd_rename_tgt and enable * MDS_PERM_BYPASS. * So check may_delete, but not check nlink of tgt_pobj. */ - LASSERT(tobj); + rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1); RETURN(rc); @@ -1170,10 +1191,12 @@ static int mdd_rename_tgt(const struct lu_env *env, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; @@ -1184,13 +1207,14 @@ static int mdd_rename_tgt(const struct lu_env *env, rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qpids); /* get block quota for target parent */ - lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK, - NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qpids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif @@ -1254,7 +1278,7 @@ static int mdd_rename_tgt(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qcids); } #endif @@ -1275,17 +1299,16 @@ out_trans: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], - qpids[GRPQUOTA], - rec_pending, 1); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qpids, rec_pending, 1); + + if (quota_copc) + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ - lquota_adjust(mds_quota_interface_ref, obd, qcids, - qpids, rc, quota_opc); + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, + rc, quota_copc); } #endif return rc; @@ -1327,7 +1350,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (!md_should_create(spec->sp_cr_flags)) RETURN(0); - + lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) @@ -1380,7 +1403,6 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); struct dt_object *dir = mdd_object_child(mdd_obj); - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; int rc; ENTRY; @@ -1409,10 +1431,10 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, dt_try_as_dir(env, dir))) { rc = dir->do_index_ops->dio_lookup(env, dir, - (struct dt_rec *)pack, key, + (struct dt_rec *)fid, key, mdd_object_capa(env, mdd_obj)); if (rc > 0) - rc = fid_unpack(pack, fid); + rc = 0; else if (rc == 0) rc = -ENOENT; } else @@ -1446,20 +1468,12 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, __mdd_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), dot, handle, BYPASS_CAPA); - if (rc == 0) { + if (rc == 0) rc = __mdd_index_insert_only(env, child, pfid, dotdot, handle, BYPASS_CAPA); - if (rc != 0) { - int rc2; - - rc2 = __mdd_index_delete(env, child, dot, 1, - handle, BYPASS_CAPA); - if (rc2 != 0) - CERROR("Failure to cleanup after dotdot" - " creation: %d (%d)\n", rc2, rc); - } - } + if (rc != 0) + __mdd_ref_del(env, child, handle, 1); } if (rc == 0) mdd_links_add(env, child, pfid, lname, handle); @@ -1576,11 +1590,14 @@ static int mdd_create(const struct lu_env *env, int got_def_acl = 0; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0, parent_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; + int parent_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1637,9 +1654,9 @@ static int mdd_create(const struct lu_env *env, mdd_quota_wrapper(&ma->ma_attr, qcids); mdd_quota_wrapper(la_tmp, qpids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], 1, - &inode_pending, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qcids, inode_pending, 1, NULL, 0, NULL, + 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1657,15 +1674,14 @@ static int mdd_create(const struct lu_env *env, /* get block quota for child and parent */ if (block_count) lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_count, - &block_pending, NULL, + exp, qcids, block_pending, + block_count, NULL, LQUOTA_FLAGS_BLK, NULL, 0); if (!same) lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &parent_pending, NULL, - LQUOTA_FLAGS_BLK, NULL, 0); + exp, qpids, parent_pending, 1, + NULL, LQUOTA_FLAGS_BLK, NULL, + 0); } } #endif @@ -1675,6 +1691,7 @@ static int mdd_create(const struct lu_env *env, * first. */ if (S_ISREG(attr->la_mode)) { + lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) @@ -1834,18 +1851,12 @@ out_free: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - inode_pending, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_pending, 1); - if (parent_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], - parent_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qpids, + parent_pending, 1); /* Trigger dqacq on the owner of child and parent. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, @@ -1914,6 +1925,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env, * the other case has been processed in cml_rename * before mdd_rename and enable MDS_PERM_BYPASS. */ LASSERT(sobj); + rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0); if (rc) RETURN(rc); @@ -1955,16 +1967,19 @@ static int mdd_rename(const struct lu_env *env, struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); + const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj); int is_dir; - int rc; + int rc, rc2; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qspids[MAXQUOTAS] = { 0, 0 }; unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1985,15 +2000,16 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qtpids); /* get block quota for target parent */ lquota_chkquota(mds_quota_interface_ref, - obd, qtpids[USRQUOTA], - qtpids[GRPQUOTA], 1, - &rec_pending, NULL, + obd, exp, qtpids, + rec_pending, 1, NULL, LQUOTA_FLAGS_BLK, - NULL, 0); + data, 1); } } } @@ -2042,17 +2058,16 @@ static int mdd_rename(const struct lu_env *env, GOTO(cleanup, rc); /* "mv dir1 dir2" needs "dir1/.." link update */ - if (is_dir && mdd_sobj) { - rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, - mdd_object_capa(env, mdd_spobj)); + if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) { + rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + mdd_object_capa(env, mdd_sobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_spobj2, rc); - rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, - is_dir, handle, - mdd_object_capa(env, mdd_tpobj)); + rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot, + handle, mdd_object_capa(env, mdd_sobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_spobj, rc); } /* Remove target name from target directory @@ -2061,14 +2076,20 @@ static int mdd_rename(const struct lu_env *env, */ rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); - if (rc != 0 && rc != -ENOENT) - GOTO(cleanup, rc); + if (rc != 0) { + if (mdd_tobj) { + /* tname might been renamed to something else */ + GOTO(fixup_spobj, rc); + } + if (rc != -ENOENT) + GOTO(fixup_spobj, rc); + } /* Insert new fid with target name into target dir */ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -2079,7 +2100,7 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); } /* Remove old target object @@ -2089,6 +2110,13 @@ static int mdd_rename(const struct lu_env *env, */ if (tobj && mdd_object_exists(mdd_tobj)) { mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); + if (mdd_is_dead_obj(mdd_tobj)) { + mdd_write_unlock(env, mdd_tobj); + /* shld not be dead, something is wrong */ + CERROR("tobj is dead, something is wrong\n"); + rc = -EINVAL; + goto cleanup; + } __mdd_ref_del(env, mdd_tobj, handle, 0); /* Remove dot reference. */ @@ -2098,17 +2126,17 @@ static int mdd_rename(const struct lu_env *env, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); mdd_write_unlock(env, mdd_tobj); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qtcids); } #endif @@ -2117,7 +2145,7 @@ static int mdd_rename(const struct lu_env *env, la->la_valid = LA_CTIME | LA_MTIME; rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); if (mdd_spobj != mdd_tpobj) { la->la_valid = LA_CTIME | LA_MTIME; @@ -2140,6 +2168,48 @@ static int mdd_rename(const struct lu_env *env, } EXIT; + +fixup_tpobj: + if (rc) { + rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, + BYPASS_CAPA); + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + + if (mdd_tobj && mdd_object_exists(mdd_tobj) && + !mdd_is_dead_obj(mdd_tobj)) { + rc2 = __mdd_index_insert(env, mdd_tpobj, + mdo2fid(mdd_tobj), tname, + is_dir, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + } + } + +fixup_spobj: + if (rc && is_dir && mdd_sobj) { + rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("sp obj dotdot delete error %d\n",rc2); + + + rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, + dotdot, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj dotdot insert error %d\n",rc2); + } + +fixup_spobj2: + if (rc) { + rc2 = __mdd_index_insert(env, mdd_spobj, + lf, sname, is_dir, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj fix error %d\n",rc2); + } cleanup: if (likely(tdlh) && sdlh != tdlh) mdd_pdo_write_unlock(env, mdd_tpobj, tdlh); @@ -2159,22 +2229,23 @@ cleanup_unlocked: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qtpids[USRQUOTA], - qtpids[GRPQUOTA], - rec_pending, 1); - /* Trigger dqrel on the source owner of parent. - * If failed, the next call for lquota_chkquota will - * process it. */ - lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, - FSFILT_OP_UNLINK_PARTIAL_PARENT); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qtpids, rec_pending, 1); + + if (quota_copc) { + /* Trigger dqrel on the source owner of parent. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, + FSFILT_OP_UNLINK_PARTIAL_PARENT); + + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qtcids, - qtpids, rc, quota_opc); + qtpids, rc, quota_copc); + } } #endif return rc; @@ -2207,8 +2278,7 @@ struct lu_buf *mdd_links_get(const struct lu_env *env, rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); if (rc == -ERANGE) { /* Buf was too small, figure out what we need. */ - buf->lb_buf = NULL; - buf->lb_len = 0; + mdd_buf_put(buf); rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); if (rc < 0) return ERR_PTR(rc); @@ -2238,20 +2308,19 @@ struct lu_buf *mdd_links_get(const struct lu_env *env, /** Pack a link_ea_entry. * All elements are stored as chars to avoid alignment issues. * Numbers are always big-endian - * \param packbuf is a temp fid buffer * \retval record length */ static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, - const struct lu_fid *pfid, struct lu_fid* packbuf) + const struct lu_fid *pfid) { - char *ptr; - int reclen; + struct lu_fid tmpfid; + int reclen; + + fid_cpu_to_be(&tmpfid, pfid); + memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid)); + memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen); + reclen = sizeof(struct link_ea_entry) + lname->ln_namelen; - fid_pack(&lee->lee_parent_fid, pfid, packbuf); - ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len; - strncpy(ptr, lname->ln_name, lname->ln_namelen); - reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen + - sizeof(lee->lee_reclen); lee->lee_reclen[0] = (reclen >> 8) & 0xff; lee->lee_reclen[1] = reclen & 0xff; return reclen; @@ -2261,11 +2330,10 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, struct lu_name *lname, struct lu_fid *pfid) { *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; - fid_unpack(&lee->lee_parent_fid, pfid); - lname->ln_name = (char *)&lee->lee_parent_fid + - lee->lee_parent_fid.fp_len; - lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len - - sizeof(lee->lee_reclen); + memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid)); + fid_be_to_cpu(pfid, pfid); + lname->ln_name = lee->lee_name; + lname->ln_namelen = *reclen - sizeof(struct link_ea_entry); } /** Add a record to the end of link ea buf */ @@ -2290,7 +2358,7 @@ static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, leh = buf->lb_buf; lee = buf->lb_buf + leh->leh_len; - reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2); + reclen = mdd_lee_pack(lee, lname, pfid); leh->leh_len += reclen; leh->leh_reccount++; return 0; @@ -2346,9 +2414,14 @@ static int mdd_links_add(const struct lu_env *env, rc = __mdd_xattr_set(env, mdd_obj, mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), XATTR_NAME_LINK, 0, handle); - if (rc) - CERROR("link_ea add failed %d "DFID"\n", rc, - PFID(mdd_object_fid(mdd_obj))); + if (rc) { + if (rc == -ENOSPC) + CDEBUG(D_INODE, "link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + } if (buf->lb_vmalloc) /* if we vmalloced a large buffer drop it */ @@ -2385,15 +2458,19 @@ static int mdd_links_rename(const struct lu_env *env, buf = mdd_links_get(env, mdd_obj); if (IS_ERR(buf)) { rc = PTR_ERR(buf); - CERROR("link_ea read failed %d "DFID"\n", - rc, PFID(mdd_object_fid(mdd_obj))); + if (rc == -ENODATA) + CDEBUG(D_INODE, "link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); RETURN(rc); } leh = buf->lb_buf; lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ /* Find the old record */ - for(count = 0; count <= leh->leh_reccount; count++) { + for(count = 0; count < leh->leh_reccount; count++) { mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); if (tmpname->ln_namelen == oldlname->ln_namelen && lu_fid_eq(tmpfid, oldpfid) && @@ -2402,7 +2479,7 @@ static int mdd_links_rename(const struct lu_env *env, break; lee = (struct link_ea_entry *)((char *)lee + reclen); } - if (count > leh->leh_reccount) { + if ((count + 1) > leh->leh_reccount) { CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", oldlname->ln_namelen, oldlname->ln_name); GOTO(out, rc = -ENOENT);