X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=06561d92f51208118f4ed46dc16c0dae721375b0;hb=8901d15087b224610a26c829635d5d305b02307b;hp=cb8423901c52020c9b605656e9d5f73f16477eb4;hpb=d8d64a27f52303a411a41bf04e44783d66a63264;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index cb84239..06561d9 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -46,14 +46,21 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include #include #include - +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include @@ -71,6 +78,19 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle); +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle); + static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) @@ -89,9 +109,9 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, return rc; } -static int mdd_lookup(const struct lu_env *env, - struct md_object *pobj, const struct lu_name *lname, - struct lu_fid* fid, struct md_op_spec *spec) +int mdd_lookup(const struct lu_env *env, + struct md_object *pobj, const struct lu_name *lname, + struct lu_fid* fid, struct md_op_spec *spec) { int rc; ENTRY; @@ -99,7 +119,6 @@ static int mdd_lookup(const struct lu_env *env, RETURN(rc); } - static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, struct lu_fid *fid) { @@ -107,10 +126,10 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, } /* - * For root fid use special function, whcih does not compare version component - * of fid. Vresion component is different for root fids on all MDTs. + * For root fid use special function, which does not compare version component + * of fid. Version component is different for root fids on all MDTs. */ -static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) +int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) { return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) && fid_oid(&mdd->mdd_root_fid) == fid_oid(fid); @@ -184,9 +203,8 @@ out: * * returns < 0: if error */ -static int mdd_is_subdir(const struct lu_env *env, - struct md_object *mo, const struct lu_fid *fid, - struct lu_fid *sfid) +int mdd_is_subdir(const struct lu_env *env, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) { struct mdd_device *mdd = mdo2mdd(mo); int rc; @@ -379,7 +397,13 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, if (!mdd_object_exists(cobj)) RETURN(-ENOENT); + if (mdd_is_dead_obj(cobj)) + RETURN(-ESTALE); + if (pobj) { + if (!mdd_object_exists(pobj)) + RETURN(-ENOENT); + if (mdd_is_dead_obj(pobj)) RETURN(-ENOENT); @@ -436,6 +460,12 @@ int mdd_link_sanity_check(const struct lu_env *env, int rc = 0; ENTRY; + if (!mdd_object_exists(src_obj)) + RETURN(-ENOENT); + + if (mdd_is_dead_obj(src_obj)) + RETURN(-ESTALE); + /* Local ops, no lookup before link, check filename length here. */ if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)) RETURN(-ENAMETOOLONG); @@ -495,10 +525,29 @@ void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, mdo_ref_del(env, obj, handle); } -/* insert named index, add reference if isdir */ -static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, int is_dir, - struct thandle *handle, struct lustre_capa *capa) +static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj, + const char *name, struct thandle *handle, + struct lustre_capa *capa) +{ + struct dt_object *next = mdd_object_child(pobj); + int rc; + ENTRY; + + if (dt_try_as_dir(env, next)) { + rc = next->do_index_ops->dio_delete(env, next, + (struct dt_key *)name, + handle, capa); + } else + rc = -ENOTDIR; + + RETURN(rc); +} + +static int __mdd_index_insert_only(const struct lu_env *env, + struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, + struct thandle *handle, + struct lustre_capa *capa) { struct dt_object *next = mdd_object_child(pobj); int rc; @@ -508,20 +557,29 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, struct md_ucred *uc = md_ucred(env); rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), + (struct dt_rec*)lf, (const struct dt_key *)name, handle, capa, uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } + RETURN(rc); +} - if (rc == 0) { - if (is_dir) { - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_add(env, pobj, handle); - mdd_write_unlock(env, pobj); - } +/* insert named index, add reference if isdir */ +static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, int is_dir, + struct thandle *handle, struct lustre_capa *capa) +{ + int rc; + ENTRY; + + rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa); + if (rc == 0 && is_dir) { + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_add(env, pobj, handle); + mdd_write_unlock(env, pobj); } RETURN(rc); } @@ -531,50 +589,82 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, const char *name, int is_dir, struct thandle *handle, struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(pobj); int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - rc = next->do_index_ops->dio_delete(env, next, - (struct dt_key *)name, - handle, capa); - if (rc == 0 && is_dir) { - int is_dot = 0; - - if (name != NULL && name[0] == '.' && name[1] == 0) - is_dot = 1; - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_del(env, pobj, handle, is_dot); - mdd_write_unlock(env, pobj); - } - } else - rc = -ENOTDIR; + rc = __mdd_index_delete_only(env, pobj, name, handle, capa); + if (rc == 0 && is_dir) { + int is_dot = 0; + + if (name != NULL && name[0] == '.' && name[1] == 0) + is_dot = 1; + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_del(env, pobj, handle, is_dot); + mdd_write_unlock(env, pobj); + } RETURN(rc); } -static int -__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, - struct thandle *handle, struct lustre_capa *capa) + +/** Store a namespace change changelog record + * If this fails, we must fail the whole transaction; we don't + * want the change to commit without the log entry. + * \param target - mdd_object of change + * \param parent - parent dir/object + * \param tf - target lu_fid, overrides fid of \a target if this is non-null + * \param tname - target name string + * \param handle - transacion handle + */ +static int mdd_changelog_ns_store(const struct lu_env *env, + struct mdd_device *mdd, + enum changelog_rec_type type, + struct mdd_object *target, + struct mdd_object *parent, + const struct lu_fid *tf, + const struct lu_name *tname, + struct thandle *handle) { - struct dt_object *next = mdd_object_child(pobj); - int rc; + const struct lu_fid *tfid; + const struct lu_fid *tpfid = mdo2fid(parent); + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - struct md_ucred *uc = md_ucred(env); + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + RETURN(0); - rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), - (const struct dt_key *)name, - handle, capa, uc->mu_cap & - CFS_CAP_SYS_RESOURCE_MASK); - } else { - rc = -ENOTDIR; + LASSERT(parent != NULL); + LASSERT(tname != NULL); + LASSERT(handle != NULL); + + /* target */ + reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + rec = (struct llog_changelog_rec *)buf->lb_buf; + + rec->cr.cr_flags = CLF_VERSION; + rec->cr.cr_type = (__u32)type; + tfid = tf ? tf : mdo2fid(target); + rec->cr.cr_tfid = *tfid; + rec->cr.cr_pfid = *tpfid; + rec->cr.cr_namelen = tname->ln_namelen; + memcpy(rec->cr.cr_name, tname->ln_name, rec->cr.cr_namelen); + if (likely(target)) + target->mod_cltime = cfs_time_current_64(); + + rc = mdd_changelog_llog_write(mdd, rec, handle); + if (rc < 0) { + CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", + rc, type, tname->ln_name, PFID(tfid), PFID(tpfid)); + return -EFAULT; } - RETURN(rc); + + return 0; } static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, @@ -592,7 +682,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct obd_device *obd = mdd->mdd_obd_dev; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; @@ -603,12 +693,14 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tobj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK); + qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif @@ -645,19 +737,23 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0); + if (rc == 0) + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle); + EXIT; out_unlock: mdd_write_unlock(env, mdd_sobj); mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj, + mdd_tobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - 1, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, + qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, @@ -676,18 +772,27 @@ int mdd_finish_unlink(const struct lu_env *env, int reset = 1; ENTRY; + LASSERT(mdd_write_locked(env, obj) != 0); + rc = mdd_iattr_get(env, obj, ma); if (rc == 0 && ma->ma_attr.la_nlink == 0) { + obj->mod_flags |= DEAD_OBJ; /* add new orphan and the object * will be deleted during mdd_close() */ if (obj->mod_count) { rc = __mdd_orphan_add(env, obj, th); if (rc == 0) - obj->mod_flags |= ORPHAN_OBJ; - } - - obj->mod_flags |= DEAD_OBJ; - if (!(obj->mod_flags & ORPHAN_OBJ)) { + CDEBUG(D_HA, "Object "DFID" is inserted into " + "orphan list, open count = %d\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + else + CERROR("Object "DFID" fail to be an orphan, " + "open count = %d, maybe cause failed " + "open replay\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + } else { rc = mdd_object_kill(env, obj, ma); if (rc == 0) reset = 0; @@ -733,7 +838,8 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, unsigned int qpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0; #endif - int rc, is_dir; + int is_dir = S_ISDIR(ma->ma_attr.la_mode); + int rc; ENTRY; LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", @@ -747,13 +853,11 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - is_dir = S_ISDIR(ma->ma_attr.la_mode); rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); if (rc) GOTO(cleanup, rc); @@ -804,11 +908,22 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, NULL, NULL); + if (!is_dir) + /* old files may not have link ea; ignore errors */ + mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), + lname, NULL, NULL, handle); + EXIT; cleanup: mdd_write_unlock(env, mdd_cobj); mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + is_dir ? CL_RMDIR : CL_UNLINK, + mdd_cobj, mdd_pobj, NULL, lname, + handle); + mdd_trans_stop(env, mdd, rc, handle); #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) @@ -858,7 +973,7 @@ static int mdd_name_insert(const struct lu_env *env, struct obd_device *obd = mdd->mdd_obd_dev; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; cfs_cap_t save = uc->mu_cap; #endif int rc; @@ -871,13 +986,14 @@ static int mdd_name_insert(const struct lu_env *env, rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - 1, &rec_pending, NULL, - LQUOTA_FLAGS_BLK); + qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } else { uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK; @@ -923,10 +1039,8 @@ out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, - obd, qids[USRQUOTA], - qids[GRPQUOTA], 1, 1); + lquota_pending_commit(mds_quota_interface_ref, + obd, qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it*/ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, @@ -1059,12 +1173,13 @@ static int mdd_rt_sanity_check(const struct lu_env *env, * processed in cmr_rename_tgt before mdd_rename_tgt and enable * MDS_PERM_BYPASS. * So check may_delete, but not check nlink of tgt_pobj. */ - LASSERT(tobj); + rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1); RETURN(rc); } +/* Partial rename op on slave MDD */ static int mdd_rename_tgt(const struct lu_env *env, struct md_object *pobj, struct md_object *tobj, const struct lu_fid *lf, const struct lu_name *lname, @@ -1082,7 +1197,8 @@ static int mdd_rename_tgt(const struct lu_env *env, struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; @@ -1093,12 +1209,14 @@ static int mdd_rename_tgt(const struct lu_env *env, rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qpids); /* get block quota for target parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK); + qpids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif @@ -1162,7 +1280,7 @@ static int mdd_rename_tgt(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qcids); } #endif @@ -1173,21 +1291,26 @@ cleanup: mdd_write_unlock(env, mdd_tobj); mdd_pdo_write_unlock(env, mdd_tpobj, dlh); out_trans: + if (rc == 0) + /* Bare EXT record with no RENAME in front of it signifies + a partial slave op */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, NULL, lname, handle); + mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], - qpids[GRPQUOTA], - 1, 1); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qpids, rec_pending, 1); + + if (quota_copc) + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ - lquota_adjust(mds_quota_interface_ref, obd, qcids, - qpids, rc, quota_opc); + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, + rc, quota_copc); } #endif return rc; @@ -1229,7 +1352,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (!md_should_create(spec->sp_cr_flags)) RETURN(0); - + lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) @@ -1272,6 +1395,7 @@ out_free: RETURN(rc); } +/* Get fid from name and parent */ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) @@ -1281,7 +1405,6 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); struct dt_object *dir = mdd_object_child(mdd_obj); - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; int rc; ENTRY; @@ -1308,11 +1431,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, if (likely(S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))) { + rc = dir->do_index_ops->dio_lookup(env, dir, - (struct dt_rec *)pack, key, + (struct dt_rec *)fid, key, mdd_object_capa(env, mdd_obj)); if (rc > 0) - rc = fid_unpack(pack, fid); + rc = 0; else if (rc == 0) rc = -ENOENT; } else @@ -1322,8 +1446,9 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, } int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, - struct mdd_object *child, struct md_attr *ma, - struct thandle *handle, const struct md_op_spec *spec) + const struct lu_name *lname, struct mdd_object *child, + struct md_attr *ma, struct thandle *handle, + const struct md_op_spec *spec) { int rc; ENTRY; @@ -1345,21 +1470,16 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, __mdd_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), dot, handle, BYPASS_CAPA); - if (rc == 0) { + if (rc == 0) rc = __mdd_index_insert_only(env, child, pfid, dotdot, handle, BYPASS_CAPA); - if (rc != 0) { - int rc2; - - rc2 = __mdd_index_delete(env, child, dot, 1, - handle, BYPASS_CAPA); - if (rc2 != 0) - CERROR("Failure to cleanup after dotdot" - " creation: %d (%d)\n", rc2, rc); - } - } + if (rc != 0) + __mdd_ref_del(env, child, handle, 1); } + if (rc == 0) + mdd_links_add(env, child, pfid, lname, handle); + RETURN(rc); } @@ -1476,7 +1596,9 @@ static int mdd_create(const struct lu_env *env, unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0, parent_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; + int parent_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1533,9 +1655,8 @@ static int mdd_create(const struct lu_env *env, mdd_quota_wrapper(&ma->ma_attr, qcids); mdd_quota_wrapper(la_tmp, qpids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], 1, - &inode_pending, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, qcids, + inode_pending, 1, NULL, 0, NULL, 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1553,15 +1674,13 @@ static int mdd_create(const struct lu_env *env, /* get block quota for child and parent */ if (block_count) lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_count, - &block_pending, NULL, - LQUOTA_FLAGS_BLK); + qcids, block_pending, + block_count, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); if (!same) lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &parent_pending, NULL, - LQUOTA_FLAGS_BLK); + qpids, parent_pending, 1, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); } } #endif @@ -1571,6 +1690,7 @@ static int mdd_create(const struct lu_env *env, * first. */ if (S_ISREG(attr->la_mode)) { + lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) @@ -1626,7 +1746,7 @@ static int mdd_create(const struct lu_env *env, } #endif - rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, son, ma, handle, spec); mdd_write_unlock(env, son); if (rc) @@ -1717,6 +1837,12 @@ cleanup: mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + S_ISDIR(attr->la_mode) ? CL_MKDIR : + S_ISREG(attr->la_mode) ? CL_CREATE : + S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, + son, mdd_pobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); out_free: /* finis lov_create stuff, free all temporary data */ @@ -1724,18 +1850,12 @@ out_free: out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - 1, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_count, 1); - if (parent_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], - 1, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qpids, + parent_pending, 1); /* Trigger dqacq on the owner of child and parent. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, @@ -1804,6 +1924,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env, * the other case has been processed in cml_rename * before mdd_rename and enable MDS_PERM_BYPASS. */ LASSERT(sobj); + rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0); if (rc) RETURN(rc); @@ -1837,16 +1958,17 @@ static int mdd_rename(const struct lu_env *env, const char *sname = lsname->ln_name; const char *tname = ltname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); + struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */ struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); struct mdd_device *mdd = mdo2mdd(src_pobj); - struct mdd_object *mdd_sobj = NULL; + struct mdd_object *mdd_sobj = NULL; /* source object */ struct mdd_object *mdd_tobj = NULL; struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); + const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj); int is_dir; - int rc; + int rc, rc2; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; @@ -1854,7 +1976,8 @@ static int mdd_rename(const struct lu_env *env, unsigned int qspids[MAXQUOTAS] = { 0, 0 }; unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1875,14 +1998,16 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qtpids); /* get block quota for target parent */ lquota_chkquota(mds_quota_interface_ref, - obd, qtpids[USRQUOTA], - qtpids[GRPQUOTA], 1, - &rec_pending, NULL, - LQUOTA_FLAGS_BLK); + obd, qtpids, + rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, + data, 1); } } } @@ -1924,6 +2049,7 @@ static int mdd_rename(const struct lu_env *env, if (rc) GOTO(cleanup, rc); + /* Remove source name from source directory */ rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, mdd_object_capa(env, mdd_spobj)); if (rc) @@ -1931,31 +2057,38 @@ static int mdd_rename(const struct lu_env *env, /* "mv dir1 dir2" needs "dir1/.." link update */ if (is_dir && mdd_sobj) { - rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, - mdd_object_capa(env, mdd_spobj)); + rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + mdd_object_capa(env, mdd_sobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_spobj2, rc); - rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, - is_dir, handle, - mdd_object_capa(env, mdd_tpobj)); - if (rc) - GOTO(cleanup, rc); + rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot, + handle, mdd_object_capa(env, mdd_sobj)); + if (rc) { + GOTO(fixup_spobj, rc); + } } - /* + /* Remove target name from target directory * Here tobj can be remote one, so we do index_delete unconditionally * and -ENOENT is allowed. */ rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); - if (rc != 0 && rc != -ENOENT) - GOTO(cleanup, rc); + if (rc != 0) { + if (mdd_tobj) { + /* tname might been renamed to something else */ + GOTO(fixup_spobj, rc); + } + if (rc != -ENOENT) + GOTO(fixup_spobj, rc); + } + /* Insert new fid with target name into target dir */ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -1966,16 +2099,23 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); } - /* + /* Remove old target object * For tobj is remote case cmm layer has processed * and set tobj to NULL then. So when tobj is NOT NULL, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); + if (mdd_is_dead_obj(mdd_tobj)) { + mdd_write_unlock(env, mdd_tobj); + /* shld not be dead, something is wrong */ + CERROR("tobj is dead, something is wrong\n"); + rc = -EINVAL; + goto cleanup; + } __mdd_ref_del(env, mdd_tobj, handle, 0); /* Remove dot reference. */ @@ -1985,17 +2125,17 @@ static int mdd_rename(const struct lu_env *env, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); mdd_write_unlock(env, mdd_tobj); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qtcids); } #endif @@ -2004,7 +2144,7 @@ static int mdd_rename(const struct lu_env *env, la->la_valid = LA_CTIME | LA_MTIME; rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); if (mdd_spobj != mdd_tpobj) { la->la_valid = LA_CTIME | LA_MTIME; @@ -2012,40 +2152,359 @@ static int mdd_rename(const struct lu_env *env, handle, 0); } + if (rc == 0 && mdd_sobj) { + mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD); + rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, + mdo2fid(mdd_tpobj), ltname, handle); + if (rc == -ENOENT) + /* Old files might not have EA entry */ + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), + lsname, handle); + mdd_write_unlock(env, mdd_sobj); + /* We don't fail the transaction if the link ea can't be + updated -- fid2path will use alternate lookup method. */ + rc = 0; + } + EXIT; + +fixup_tpobj: + if (rc) { + rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, + BYPASS_CAPA); + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + + if (mdd_tobj && mdd_object_exists(mdd_tobj) && + !mdd_is_dead_obj(mdd_tobj)) { + rc2 = __mdd_index_insert(env, mdd_tpobj, + mdo2fid(mdd_tobj), tname, + is_dir, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + } + } + +fixup_spobj: + if (rc && is_dir && mdd_sobj) { + rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("sp obj dotdot delete error %d\n",rc2); + + + rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, + dotdot, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj dotdot insert error %d\n",rc2); + } + +fixup_spobj2: + if (rc) { + rc2 = __mdd_index_insert(env, mdd_spobj, + lf, sname, is_dir, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj fix error %d\n",rc2); + } cleanup: if (likely(tdlh) && sdlh != tdlh) mdd_pdo_write_unlock(env, mdd_tpobj, tdlh); if (likely(sdlh)) mdd_pdo_write_unlock(env, mdd_spobj, sdlh); cleanup_unlocked: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj, + mdd_spobj, lf, lsname, handle); + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, lf, ltname, handle); + mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qtpids[USRQUOTA], - qtpids[GRPQUOTA], - 1, 1); - /* Trigger dqrel on the source owner of parent. - * If failed, the next call for lquota_chkquota will - * process it. */ - lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, - FSFILT_OP_UNLINK_PARTIAL_PARENT); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qtpids, rec_pending, 1); + + if (quota_copc) { + /* Trigger dqrel on the source owner of parent. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, + FSFILT_OP_UNLINK_PARTIAL_PARENT); + + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qtcids, - qtpids, rc, quota_opc); + qtpids, rc, quota_copc); + } } #endif return rc; } +/** enable/disable storing of hardlink info */ +int mdd_linkea_enable = 1; +CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, + "record hardlink info in EAs"); + +/** Read the link EA into a temp buffer. + * Uses the name_buf since it is generally large. + * \retval IS_ERR err + * \retval ptr to \a lu_buf (always \a mti_big_buf) + */ +struct lu_buf *mdd_links_get(const struct lu_env *env, + struct mdd_object *mdd_obj) +{ + struct lu_buf *buf; + struct lustre_capa *capa; + struct link_ea_header *leh; + int rc; + + /* First try a small buf */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + + capa = mdd_object_capa(env, mdd_obj); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc == -ERANGE) { + /* Buf was too small, figure out what we need. */ + buf->lb_buf = NULL; + buf->lb_len = 0; + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc < 0) + return ERR_PTR(rc); + buf = mdd_buf_alloc(env, rc); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + } + if (rc < 0) + return ERR_PTR(rc); + + leh = buf->lb_buf; + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + /* entries are swabbed by mdd_lee_unpack */ + } + if (leh->leh_magic != LINK_EA_MAGIC) + return ERR_PTR(-EINVAL); + if (leh->leh_reccount == 0) + return ERR_PTR(-ENODATA); + + return buf; +} + +/** Pack a link_ea_entry. + * All elements are stored as chars to avoid alignment issues. + * Numbers are always big-endian + * \retval record length + */ +static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, + const struct lu_fid *pfid) +{ + int reclen; + + fid_cpu_to_be(&lee->lee_parent_fid, pfid); + strncpy(lee->lee_name, lname->ln_name, lname->ln_namelen); + reclen = sizeof(struct link_ea_entry) + lname->ln_namelen; + + lee->lee_reclen[0] = (reclen >> 8) & 0xff; + lee->lee_reclen[1] = reclen & 0xff; + return reclen; +} + +void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, + struct lu_name *lname, struct lu_fid *pfid) +{ + *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; + fid_be_to_cpu(pfid, &lee->lee_parent_fid); + lname->ln_name = lee->lee_name; + lname->ln_namelen = *reclen - sizeof(struct link_ea_entry); +} + +/** Add a record to the end of link ea buf */ +static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, + const struct lu_fid *pfid, + const struct lu_name *lname) +{ + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + + if (lname == NULL || pfid == NULL) + return -EINVAL; + + /* Make sure our buf is big enough for the new one */ + leh = buf->lb_buf; + reclen = lname->ln_namelen + sizeof(struct link_ea_entry); + if (leh->leh_len + reclen > buf->lb_len) { + if (mdd_buf_grow(env, leh->leh_len + reclen) < 0) + return -ENOMEM; + } + + leh = buf->lb_buf; + lee = buf->lb_buf + leh->leh_len; + reclen = mdd_lee_pack(lee, lname, pfid); + leh->leh_len += reclen; + leh->leh_reccount++; + return 0; +} + +/* For pathologic linkers, we don't want to spend lots of time scanning the + * link ea. Limit ourseleves to something reasonable; links not in the EA + * can be looked up via (slower) parent lookup. + */ +#define LINKEA_MAX_COUNT 128 + +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + int rc; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + if (rc != -ENODATA) { + CERROR("link_ea read failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + RETURN (rc); + } + /* empty EA; start one */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + leh = buf->lb_buf; + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_len = sizeof(struct link_ea_header); + leh->leh_reccount = 0; + } + + leh = buf->lb_buf; + if (leh->leh_reccount > LINKEA_MAX_COUNT) + RETURN(-EOVERFLOW); + + rc = __mdd_links_add(env, buf, pfid, lname); + if (rc) + RETURN(rc); + + leh = buf->lb_buf; + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + if (rc) + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + int reclen = 0; + int count; + int rc, rc2 = 0; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + if (mdd_obj->mod_flags & DEAD_OBJ) + /* No more links, don't bother */ + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + RETURN(rc); + } + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ + + /* Find the old record */ + for(count = 0; count <= leh->leh_reccount; count++) { + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + if (tmpname->ln_namelen == oldlname->ln_namelen && + lu_fid_eq(tmpfid, oldpfid) && + (strncmp(tmpname->ln_name, oldlname->ln_name, + tmpname->ln_namelen) == 0)) + break; + lee = (struct link_ea_entry *)((char *)lee + reclen); + } + if (count > leh->leh_reccount) { + CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", + oldlname->ln_namelen, oldlname->ln_name); + GOTO(out, rc = -ENOENT); + } + + /* Remove the old record */ + leh->leh_reccount--; + leh->leh_len -= reclen; + memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len - + (char *)lee); + + /* If renaming, add the new record */ + if (newpfid != NULL) { + /* if the add fails, we still delete the out-of-date old link */ + rc2 = __mdd_links_add(env, buf, newpfid, newlname); + leh = buf->lb_buf; + } + + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + +out: + if (rc == 0) + rc = rc2; + if (rc) + CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + const struct md_dir_operations mdd_dir_ops = { .mdo_is_subdir = mdd_is_subdir, .mdo_lookup = mdd_lookup,