X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=03a9e86eb6c16c46c62f402263015dadd1ce2737;hp=221df4f0934bbf0079e40796e8bb900e02afdb14;hb=c159c408293fbebf71a948e630aa9f637f3c8ffe;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 221df4f..03a9e86 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -71,16 +71,29 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle); +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle); + static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct dynlock_handle *dlh; int rc; - dlh = mdd_pdo_read_lock(env, mdd_obj, name); + dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (unlikely(dlh == NULL)) return -ENOMEM; rc = __mdd_lookup(env, pobj, lname, fid, mask); @@ -89,9 +102,9 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, return rc; } -static int mdd_lookup(const struct lu_env *env, - struct md_object *pobj, const struct lu_name *lname, - struct lu_fid* fid, struct md_op_spec *spec) +int mdd_lookup(const struct lu_env *env, + struct md_object *pobj, const struct lu_name *lname, + struct lu_fid* fid, struct md_op_spec *spec) { int rc; ENTRY; @@ -99,7 +112,6 @@ static int mdd_lookup(const struct lu_env *env, RETURN(rc); } - static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, struct lu_fid *fid) { @@ -107,10 +119,10 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, } /* - * For root fid use special function, whcih does not compare version component - * of fid. Vresion component is different for root fids on all MDTs. + * For root fid use special function, which does not compare version component + * of fid. Version component is different for root fids on all MDTs. */ -static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) +int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) { return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) && fid_oid(&mdd->mdd_root_fid) == fid_oid(fid); @@ -223,7 +235,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, { struct dt_it *it; struct dt_object *obj; - struct dt_it_ops *iops; + const struct dt_it_ops *iops; int result; ENTRY; @@ -232,7 +244,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, 0, BYPASS_CAPA); + it = iops->init(env, obj, BYPASS_CAPA); if (it != NULL) { result = iops->get(env, it, (const void *)""); if (result > 0) { @@ -269,7 +281,7 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj) /* * Subdir count limitation can be broken through. - */ + */ if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink && !S_ISDIR(la->la_mode)) RETURN(-EMLINK); @@ -295,7 +307,8 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj, if (check_perm) rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (!rc && check_nlink) rc = __mdd_may_link(env, pobj); @@ -320,7 +333,8 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj, RETURN(-EPERM); rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (rc) RETURN(rc); @@ -346,20 +360,20 @@ static inline int mdd_is_sticky(const struct lu_env *env, rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); if (rc) return rc; - + if (!(tmp_la->la_mode & S_ISVTX) || (tmp_la->la_uid == uc->mu_fsuid)) return 0; } rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); - if (rc) + if (rc) return rc; - + if (tmp_la->la_uid == uc->mu_fsuid) return 0; - - return !mdd_capable(uc, CAP_FOWNER); + + return !mdd_capable(uc, CFS_CAP_FOWNER); } /* @@ -383,7 +397,8 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, if (check_perm) { rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (rc) RETURN(rc); } @@ -455,15 +470,6 @@ int mdd_link_sanity_check(const struct lu_env *env, RETURN(rc); } -const struct dt_rec *__mdd_fid_rec(const struct lu_env *env, - const struct lu_fid *fid) -{ - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; - - fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2); - return (const struct dt_rec *)pack; -} - /** * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and * assign i_nlink to 1 which means the i_nlink for subdir count is incredible @@ -511,17 +517,20 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, ENTRY; if (dt_try_as_dir(env, next)) { + struct md_ucred *uc = md_ucred(env); + rc = next->do_index_ops->dio_insert(env, next, __mdd_fid_rec(env, lf), (const struct dt_key *)name, - handle, capa); + handle, capa, uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } if (rc == 0) { if (is_dir) { - mdd_write_lock(env, pobj); + mdd_write_lock(env, pobj, MOR_TGT_PARENT); __mdd_ref_add(env, pobj, handle); mdd_write_unlock(env, pobj); } @@ -547,7 +556,7 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, if (name != NULL && name[0] == '.' && name[1] == 0) is_dot = 1; - mdd_write_lock(env, pobj); + mdd_write_lock(env, pobj, MOR_TGT_PARENT); __mdd_ref_del(env, pobj, handle, is_dot); mdd_write_unlock(env, pobj); } @@ -567,39 +576,127 @@ __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, ENTRY; if (dt_try_as_dir(env, next)) { + struct md_ucred *uc = md_ucred(env); + rc = next->do_index_ops->dio_insert(env, next, __mdd_fid_rec(env, lf), (const struct dt_key *)name, - handle, capa); + handle, capa, uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } RETURN(rc); } +/** Store a namespace change changelog record + * If this fails, we must fail the whole transaction; we don't + * want the change to commit without the log entry. + * \param target - mdd_object of change + * \param parent - parent dir/object + * \param tf - target lu_fid, overrides fid of \a target if this is non-null + * \param tname - target name string + * \param handle - transacion handle + */ +static int mdd_changelog_ns_store(const struct lu_env *env, + struct mdd_device *mdd, + enum changelog_rec_type type, + struct mdd_object *target, + struct mdd_object *parent, + const struct lu_fid *tf, + const struct lu_name *tname, + struct thandle *handle) +{ + const struct lu_fid *tfid; + const struct lu_fid *tpfid = mdo2fid(parent); + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; + ENTRY; + + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + RETURN(0); + + LASSERT(parent != NULL); + LASSERT(tname != NULL); + LASSERT(handle != NULL); + + /* target */ + reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + rec = (struct llog_changelog_rec *)buf->lb_buf; + + rec->cr_flags = CLF_VERSION; + rec->cr_type = (__u32)type; + tfid = tf ? tf : mdo2fid(target); + rec->cr_tfid = *tfid; + rec->cr_pfid = *tpfid; + rec->cr_namelen = tname->ln_namelen; + memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen); + if (likely(target)) + target->mod_cltime = cfs_time_current_64(); + + rc = mdd_changelog_llog_write(mdd, rec, handle); + if (rc < 0) { + CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", + rc, type, tname->ln_name, PFID(tfid), PFID(tpfid)); + return -EFAULT; + } + + return 0; +} + static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); struct mdd_device *mdd = mdo2mdd(src_obj); struct dynlock_handle *dlh; struct thandle *handle; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qids); + /* get block quota for parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], 1, + &rec_pending, NULL, LQUOTA_FLAGS_BLK, + data, 1); + } + } +#endif + mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_tobj, name); + dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - mdd_write_lock(env, mdd_sobj); + mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD); rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj); if (rc) @@ -623,12 +720,31 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0); + if (rc == 0) + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle); + EXIT; out_unlock: mdd_write_unlock(env, mdd_sobj); mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj, + mdd_tobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqacq for the parent owner. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, + quota_opc); + } +#endif return rc; } @@ -638,23 +754,28 @@ int mdd_finish_unlink(const struct lu_env *env, struct thandle *th) { int rc; + int reset = 1; ENTRY; rc = mdd_iattr_get(env, obj, ma); if (rc == 0 && ma->ma_attr.la_nlink == 0) { /* add new orphan and the object - * will be deleted during the object_put() */ - if (__mdd_orphan_add(env, obj, th) == 0) - obj->mod_flags |= ORPHAN_OBJ; + * will be deleted during mdd_close() */ + if (obj->mod_count) { + rc = __mdd_orphan_add(env, obj, th); + if (rc == 0) + obj->mod_flags |= ORPHAN_OBJ; + } obj->mod_flags |= DEAD_OBJ; - if (obj->mod_count == 0) + if (!(obj->mod_flags & ORPHAN_OBJ)) { rc = mdd_object_kill(env, obj, ma); - else - /* clear MA_LOV | MA_COOKIE, if we do not - * unlink it in case we get it somewhere */ - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); - } else + if (rc == 0) + reset = 0; + } + + } + if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); RETURN(rc); @@ -679,14 +800,22 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *mdd_cobj = md2mdd_obj(cobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; - int rc, is_dir; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif + int is_dir = S_ISDIR(ma->ma_attr.la_mode); + int rc; ENTRY; LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", @@ -700,13 +829,11 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - - dlh = mdd_pdo_write_lock(env, mdd_pobj, name); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - mdd_write_lock(env, mdd_cobj); + mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - is_dir = S_ISDIR(ma->ma_attr.la_mode); rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); if (rc) GOTO(cleanup, rc); @@ -735,17 +862,52 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, GOTO(cleanup, rc); rc = mdd_finish_unlink(env, mdd_cobj, ma, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA); + if (!rc) { + mdd_quota_wrapper(la_tmp, qpids); + if (mdd_cobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK; + mdd_quota_wrapper(&ma->ma_attr, qcids); + } else { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT; + } + } + } +#endif if (rc == 0) obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, NULL, NULL); + if (!is_dir) + /* old files may not have link ea; ignore errors */ + mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), + lname, NULL, NULL, handle); + EXIT; cleanup: mdd_write_unlock(env, mdd_cobj); mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + is_dir ? CL_RMDIR : CL_UNLINK, + mdd_cobj, mdd_pobj, NULL, lname, + handle); + mdd_trans_stop(env, mdd, rc, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) + /* Trigger dqrel on the owner of child and parent. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, + quota_opc); +#endif return rc; } @@ -775,22 +937,52 @@ static int mdd_name_insert(const struct lu_env *env, const struct lu_fid *fid, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; int is_dir = S_ISDIR(ma->ma_attr.la_mode); +#ifdef HAVE_QUOTA_SUPPORT + struct md_ucred *uc = md_ucred(env); + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; + cfs_cap_t save = uc->mu_cap; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qids); + /* get block quota for parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + 1, &rec_pending, NULL, + LQUOTA_FLAGS_BLK, data, 1); + } + } else { + uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK; + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP); handle = mdd_trans_start(env, mdo2mdd(pobj)); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_obj, name); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -820,6 +1012,24 @@ out_unlock: mdd_pdo_write_unlock(env, mdd_obj, dlh); out_trans: mdd_trans_stop(env, mdo2mdd(pobj), rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (quota_opc) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, + obd, qids[USRQUOTA], + qids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqacq for the parent owner. If failed, + * the next call for lquota_chkquota will process it*/ + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, + rc, quota_opc); + } else { + uc->mu_cap = save; + } + } +#endif return rc; } @@ -848,22 +1058,39 @@ static int mdd_name_remove(const struct lu_env *env, const struct lu_name *lname, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; int is_dir = S_ISDIR(ma->ma_attr.la_mode); +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); + if (!rc) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT; + mdd_quota_wrapper(la_tmp, qids); + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_obj, name); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -893,6 +1120,14 @@ out_unlock: mdd_pdo_write_unlock(env, mdd_obj, dlh); out_trans: mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + /* Trigger dqrel for the parent owner. + * If failed, the next call for lquota_chkquota will process it. */ + if (quota_opc) + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, + quota_opc); +#endif return rc; } @@ -924,31 +1159,57 @@ static int mdd_rt_sanity_check(const struct lu_env *env, RETURN(rc); } +/* Partial rename op on slave MDD */ static int mdd_rename_tgt(const struct lu_env *env, struct md_object *pobj, struct md_object *tobj, const struct lu_fid *lf, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tpobj = md2mdd_obj(pobj); struct mdd_object *mdd_tobj = md2mdd_obj(tobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && !tobj) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qpids); + /* get block quota for target parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], 1, + &rec_pending, NULL, LQUOTA_FLAGS_BLK, + data, 1); + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_tpobj, name); + dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); if (tobj) - mdd_write_lock(env, mdd_tobj); + mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma); if (rc) @@ -975,7 +1236,7 @@ static int mdd_rename_tgt(const struct lu_env *env, if (rc) GOTO(cleanup, rc); - /* + /* * For tobj is remote case cmm layer has processed * and pass NULL tobj to here. So when tobj is NOT NULL, * it must be local one. @@ -995,6 +1256,14 @@ static int mdd_rename_tgt(const struct lu_env *env, rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); if (rc) GOTO(cleanup, rc); + +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qcids); + } +#endif } EXIT; cleanup: @@ -1002,7 +1271,29 @@ cleanup: mdd_write_unlock(env, mdd_tobj); mdd_pdo_write_unlock(env, mdd_tpobj, dlh); out_trans: + if (rc == 0) + /* Bare EXT record with no RENAME in front of it signifies + a partial slave op */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, NULL, lname, handle); + mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qpids[USRQUOTA], + qpids[GRPQUOTA], + rec_pending, 1); + if (quota_opc) + /* Trigger dqrel/dqacq on the target owner of child and + * parent. If failed, the next call for lquota_chkquota + * will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, + qpids, rc, quota_opc); + } +#endif return rc; } @@ -1060,7 +1351,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, /* Replay creates has objects already */ #if 0 - if (spec->u.sp_ea.no_lov_create) { + if (spec->no_create) { CDEBUG(D_INFO, "we already have lov ea\n"); rc = mdd_lov_set_md(env, mdd_pobj, son, (struct lov_mds_md *)spec->u.sp_ea.eadata, @@ -1085,11 +1376,12 @@ out_free: RETURN(rc); } +/* Get fid from name and parent */ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; const struct dt_key *key = (const struct dt_key *)name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); @@ -1114,17 +1406,21 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)) RETURN(-ENAMETOOLONG); - rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask); + rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask, + MOR_TGT_PARENT); if (rc) RETURN(rc); if (likely(S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))) { + rc = dir->do_index_ops->dio_lookup(env, dir, (struct dt_rec *)pack, key, mdd_object_capa(env, mdd_obj)); - if (rc == 0) + if (rc > 0) rc = fid_unpack(pack, fid); + else if (rc == 0) + rc = -ENOENT; } else rc = -ENOTDIR; @@ -1132,8 +1428,9 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, } int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, - struct mdd_object *child, struct md_attr *ma, - struct thandle *handle) + const struct lu_name *lname, struct mdd_object *child, + struct md_attr *ma, struct thandle *handle, + const struct md_op_spec *spec) { int rc; ENTRY; @@ -1170,6 +1467,9 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, } } } + if (rc == 0) + mdd_links_add(env, child, pfid, lname, handle); + RETURN(rc); } @@ -1214,7 +1514,8 @@ static int mdd_create_sanity_check(const struct lu_env *env, * EXEC permission have been checked * when lookup before create already. */ - rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE); + rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE, + MOR_TGT_PARENT); if (rc) RETURN(rc); } @@ -1266,24 +1567,35 @@ static int mdd_create(const struct lu_env *env, struct md_op_spec *spec, struct md_attr* ma) { - char *name = lname->ln_name; - struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *son = md2mdd_obj(child); - struct mdd_device *mdd = mdo2mdd(pobj); - struct lu_attr *attr = &ma->ma_attr; - struct lov_mds_md *lmm = NULL; - struct thandle *handle; + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_attr *la = &info->mti_la_for_fix; + struct md_attr *ma_acl = &info->mti_ma; + struct mdd_object *mdd_pobj = md2mdd_obj(pobj); + struct mdd_object *son = md2mdd_obj(child); + struct mdd_device *mdd = mdo2mdd(pobj); + struct lu_attr *attr = &ma->ma_attr; + struct lov_mds_md *lmm = NULL; + struct thandle *handle; + struct dynlock_handle *dlh; + const char *name = lname->ln_name; int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0; - struct dynlock_handle *dlh; + int got_def_acl = 0; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, block_count = 0; + int inode_pending = 0, block_pending = 0, parent_pending = 0; +#endif ENTRY; /* * Two operations have to be performed: * - * - allocation of new object (->do_create()), and + * - an allocation of a new object (->do_create()), and * - * - insertion into parent index (->dio_insert()). + * - an insertion into a parent index (->dio_insert()). * * Due to locking, operation order is not important, when both are * successful, *but* error handling cases are quite different: @@ -1319,6 +1631,51 @@ static int mdd_create(const struct lu_env *env, if (rc) RETURN(rc); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA); + if (!rc) { + int same = 0; + + quota_opc = FSFILT_OP_CREATE; + mdd_quota_wrapper(&ma->ma_attr, qcids); + mdd_quota_wrapper(la_tmp, qpids); + /* get file quota for child */ + lquota_chkquota(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], 1, + &inode_pending, NULL, 0, NULL, 0); + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFLNK: + case S_IFDIR: + block_count = 2; + break; + case S_IFREG: + block_count = 1; + break; + } + if (qcids[USRQUOTA] == qpids[USRQUOTA] && + qcids[GRPQUOTA] == qpids[GRPQUOTA]) { + block_count += 1; + same = 1; + } + /* get block quota for child and parent */ + if (block_count) + lquota_chkquota(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + block_count, + &block_pending, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); + if (!same) + lquota_chkquota(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], 1, + &parent_pending, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); + } + } +#endif + /* * No RPC inside the transaction, so OST objects should be created at * first. @@ -1327,7 +1684,22 @@ static int mdd_create(const struct lu_env *env, rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) - RETURN(rc); + GOTO(out_pending, rc); + } + + if (!S_ISLNK(attr->la_mode)) { + ma_acl->ma_acl_size = sizeof info->mti_xattr_buf; + ma_acl->ma_acl = info->mti_xattr_buf; + ma_acl->ma_need = MA_ACL_DEF; + ma_acl->ma_valid = 0; + + mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT); + rc = mdd_def_acl_get(env, mdd_pobj, ma_acl); + mdd_read_unlock(env, mdd_pobj); + if (rc) + GOTO(out_free, rc); + else if (ma_acl->ma_valid & MA_ACL_DEF) + got_def_acl = 1; } mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP); @@ -1335,16 +1707,12 @@ static int mdd_create(const struct lu_env *env, if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_pobj, name); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - /* - * XXX: Check that link can be added to the parent in mkdir case. - */ - - mdd_write_lock(env, son); - rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle); + mdd_write_lock(env, son, MOR_TGT_CHILD); + rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec); if (rc) { mdd_write_unlock(env, son); GOTO(cleanup, rc); @@ -1353,19 +1721,23 @@ static int mdd_create(const struct lu_env *env, created = 1; #ifdef CONFIG_FS_POSIX_ACL - mdd_read_lock(env, mdd_pobj); - rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle); - mdd_read_unlock(env, mdd_pobj); - if (rc) { - mdd_write_unlock(env, son); - GOTO(cleanup, rc); - } else { - ma->ma_attr.la_valid |= LA_MODE; + if (got_def_acl) { + struct lu_buf *acl_buf = &info->mti_buf; + acl_buf->lb_buf = ma_acl->ma_acl; + acl_buf->lb_len = ma_acl->ma_acl_size; + + rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle); + if (rc) { + mdd_write_unlock(env, son); + GOTO(cleanup, rc); + } else { + ma->ma_attr.la_valid |= LA_MODE; + } } #endif - rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), - son, ma, handle); + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, + son, ma, handle, spec); mdd_write_unlock(env, son); if (rc) /* @@ -1399,6 +1771,7 @@ static int mdd_create(const struct lu_env *env, } if (S_ISLNK(attr->la_mode)) { + struct md_ucred *uc = md_ucred(env); struct dt_object *dt = mdd_object_child(son); const char *target_name = spec->u.sp_symname; int sym_len = strlen(target_name); @@ -1407,7 +1780,9 @@ static int mdd_create(const struct lu_env *env, buf = mdd_buf_get_const(env, target_name, sym_len); rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, - mdd_object_capa(env, son)); + mdd_object_capa(env, son), + uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); if (rc == sym_len) rc = 0; @@ -1438,7 +1813,7 @@ cleanup: } if (rc2 == 0) { - mdd_write_lock(env, son); + mdd_write_lock(env, son, MOR_TGT_CHILD); __mdd_ref_del(env, son, handle, 0); if (initialized && S_ISDIR(attr->la_mode)) __mdd_ref_del(env, son, handle, 1); @@ -1452,10 +1827,37 @@ cleanup: mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + S_ISDIR(attr->la_mode) ? CL_MKDIR : + S_ISREG(attr->la_mode) ? CL_CREATE : + S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, + son, mdd_pobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); out_free: /* finis lov_create stuff, free all temporary data */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (inode_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + inode_pending, 0); + if (block_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + block_pending, 1); + if (parent_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], + parent_pending, 1); + /* Trigger dqacq on the owner of child and parent. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, + quota_opc); + } +#endif return rc; } @@ -1548,18 +1950,28 @@ static int mdd_rename(const struct lu_env *env, struct md_object *tobj, const struct lu_name *ltname, struct md_attr *ma) { - char *sname = lsname->ln_name; - char *tname = ltname->ln_name; + const char *sname = lsname->ln_name; + const char *tname = ltname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); + struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */ struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); struct mdd_device *mdd = mdo2mdd(src_pobj); - struct mdd_object *mdd_sobj = NULL; + struct mdd_object *mdd_sobj = NULL; /* source object */ struct mdd_object *mdd_tobj = NULL; struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; + const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); int is_dir; int rc; + +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qspids[MAXQUOTAS] = { 0, 0 }; + unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif ENTRY; LASSERT(ma->ma_attr.la_mode & S_IFMT); @@ -1568,10 +1980,37 @@ static int mdd_rename(const struct lu_env *env, if (tobj) mdd_tobj = md2mdd_obj(tobj); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA); + if (!rc) { + mdd_quota_wrapper(la_tmp, qspids); + if (!tobj) { + rc = mdd_la_get(env, mdd_tpobj, la_tmp, + BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qtpids); + /* get block quota for target parent */ + lquota_chkquota(mds_quota_interface_ref, + obd, qtpids[USRQUOTA], + qtpids[GRPQUOTA], 1, + &rec_pending, NULL, + LQUOTA_FLAGS_BLK, + data, 1); + } + } + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); /* FIXME: Should consider tobj and sobj too in rename_lock. */ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj); @@ -1580,18 +2019,20 @@ static int mdd_rename(const struct lu_env *env, /* Get locks in determined order */ if (rc == MDD_RN_SAME) { - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, + sname, MOR_SRC_PARENT); /* check hashes to determine do we need one lock or two */ if (mdd_name2hash(sname) != mdd_name2hash(tname)) - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname, + MOR_TGT_PARENT); else tdlh = sdlh; } else if (rc == MDD_RN_SRCTGT) { - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT); } else { - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT); } if (sdlh == NULL || tdlh == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -1602,12 +2043,27 @@ static int mdd_rename(const struct lu_env *env, if (rc) GOTO(cleanup, rc); + /* Remove source name from source directory */ rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, mdd_object_capa(env, mdd_spobj)); if (rc) GOTO(cleanup, rc); - /* + /* "mv dir1 dir2" needs "dir1/.." link update */ + if (is_dir && mdd_sobj) { + rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, + mdd_object_capa(env, mdd_spobj)); + if (rc) + GOTO(cleanup, rc); + + rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, + is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); + if (rc) + GOTO(cleanup, rc); + } + + /* Remove target name from target directory * Here tobj can be remote one, so we do index_delete unconditionally * and -ENOENT is allowed. */ @@ -1616,6 +2072,7 @@ static int mdd_rename(const struct lu_env *env, if (rc != 0 && rc != -ENOENT) GOTO(cleanup, rc); + /* Insert new fid with target name into target dir */ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); if (rc) @@ -1633,13 +2090,13 @@ static int mdd_rename(const struct lu_env *env, GOTO(cleanup, rc); } - /* + /* Remove old target object * For tobj is remote case cmm layer has processed * and set tobj to NULL then. So when tobj is NOT NULL, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { - mdd_write_lock(env, mdd_tobj); + mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); __mdd_ref_del(env, mdd_tobj, handle, 0); /* Remove dot reference. */ @@ -1655,6 +2112,14 @@ static int mdd_rename(const struct lu_env *env, mdd_write_unlock(env, mdd_tobj); if (rc) GOTO(cleanup, rc); + +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qtcids); + } +#endif } la->la_valid = LA_CTIME | LA_MTIME; @@ -1668,6 +2133,20 @@ static int mdd_rename(const struct lu_env *env, handle, 0); } + if (rc == 0 && mdd_sobj) { + mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD); + rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, + mdo2fid(mdd_tpobj), ltname, handle); + if (rc == -ENOENT) + /* Old files might not have EA entry */ + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), + lsname, handle); + mdd_write_unlock(env, mdd_sobj); + /* We don't fail the transaction if the link ea can't be + updated -- fid2path will use alternate lookup method. */ + rc = 0; + } + EXIT; cleanup: if (likely(tdlh) && sdlh != tdlh) @@ -1675,13 +2154,301 @@ cleanup: if (likely(sdlh)) mdd_pdo_write_unlock(env, mdd_spobj, sdlh); cleanup_unlocked: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj, + mdd_spobj, lf, lsname, handle); + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, lf, ltname, handle); + mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qtpids[USRQUOTA], + qtpids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqrel on the source owner of parent. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, + FSFILT_OP_UNLINK_PARTIAL_PARENT); + if (quota_opc) + /* Trigger dqrel/dqacq on the target owner of child and + * parent. If failed, the next call for lquota_chkquota + * will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qtcids, + qtpids, rc, quota_opc); + } +#endif return rc; } -struct md_dir_operations mdd_dir_ops = { +/** enable/disable storing of hardlink info */ +int mdd_linkea_enable = 1; +CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, + "record hardlink info in EAs"); + +/** Read the link EA into a temp buffer. + * Uses the name_buf since it is generally large. + * \retval IS_ERR err + * \retval ptr to \a lu_buf (always \a mti_big_buf) + */ +struct lu_buf *mdd_links_get(const struct lu_env *env, + struct mdd_object *mdd_obj) +{ + struct lu_buf *buf; + struct lustre_capa *capa; + struct link_ea_header *leh; + int rc; + + /* First try a small buf */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + + capa = mdd_object_capa(env, mdd_obj); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc == -ERANGE) { + /* Buf was too small, figure out what we need. */ + buf->lb_buf = NULL; + buf->lb_len = 0; + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc < 0) + return ERR_PTR(rc); + buf = mdd_buf_alloc(env, rc); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + } + if (rc < 0) + return ERR_PTR(rc); + + leh = buf->lb_buf; + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + /* entries are swabbed by mdd_lee_unpack */ + } + if (leh->leh_magic != LINK_EA_MAGIC) + return ERR_PTR(-EINVAL); + if (leh->leh_reccount == 0) + return ERR_PTR(-ENODATA); + + return buf; +} + +/** Pack a link_ea_entry. + * All elements are stored as chars to avoid alignment issues. + * Numbers are always big-endian + * \param packbuf is a temp fid buffer + * \retval record length + */ +static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, + const struct lu_fid *pfid, struct lu_fid* packbuf) +{ + char *ptr; + int reclen; + + fid_pack(&lee->lee_parent_fid, pfid, packbuf); + ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len; + strncpy(ptr, lname->ln_name, lname->ln_namelen); + reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen + + sizeof(lee->lee_reclen); + lee->lee_reclen[0] = (reclen >> 8) & 0xff; + lee->lee_reclen[1] = reclen & 0xff; + return reclen; +} + +void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, + struct lu_name *lname, struct lu_fid *pfid) +{ + *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; + fid_unpack(&lee->lee_parent_fid, pfid); + lname->ln_name = (char *)&lee->lee_parent_fid + + lee->lee_parent_fid.fp_len; + lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len - + sizeof(lee->lee_reclen); +} + +/** Add a record to the end of link ea buf */ +static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, + const struct lu_fid *pfid, + const struct lu_name *lname) +{ + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + + if (lname == NULL || pfid == NULL) + return -EINVAL; + + /* Make sure our buf is big enough for the new one */ + leh = buf->lb_buf; + reclen = lname->ln_namelen + sizeof(struct link_ea_entry); + if (leh->leh_len + reclen > buf->lb_len) { + if (mdd_buf_grow(env, leh->leh_len + reclen) < 0) + return -ENOMEM; + } + + leh = buf->lb_buf; + lee = buf->lb_buf + leh->leh_len; + reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2); + leh->leh_len += reclen; + leh->leh_reccount++; + return 0; +} + +/* For pathologic linkers, we don't want to spend lots of time scanning the + * link ea. Limit ourseleves to something reasonable; links not in the EA + * can be looked up via (slower) parent lookup. + */ +#define LINKEA_MAX_COUNT 128 + +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + int rc; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + if (rc != -ENODATA) { + CERROR("link_ea read failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + RETURN (rc); + } + /* empty EA; start one */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + leh = buf->lb_buf; + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_len = sizeof(struct link_ea_header); + leh->leh_reccount = 0; + } + + leh = buf->lb_buf; + if (leh->leh_reccount > LINKEA_MAX_COUNT) + RETURN(-EOVERFLOW); + + rc = __mdd_links_add(env, buf, pfid, lname); + if (rc) + RETURN(rc); + + leh = buf->lb_buf; + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + if (rc) + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + int reclen = 0; + int count; + int rc, rc2 = 0; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + if (mdd_obj->mod_flags & DEAD_OBJ) + /* No more links, don't bother */ + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + RETURN(rc); + } + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ + + /* Find the old record */ + for(count = 0; count <= leh->leh_reccount; count++) { + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + if (tmpname->ln_namelen == oldlname->ln_namelen && + lu_fid_eq(tmpfid, oldpfid) && + (strncmp(tmpname->ln_name, oldlname->ln_name, + tmpname->ln_namelen) == 0)) + break; + lee = (struct link_ea_entry *)((char *)lee + reclen); + } + if (count > leh->leh_reccount) { + CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", + oldlname->ln_namelen, oldlname->ln_name); + GOTO(out, rc = -ENOENT); + } + + /* Remove the old record */ + leh->leh_reccount--; + leh->leh_len -= reclen; + memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len - + (char *)lee); + + /* If renaming, add the new record */ + if (newpfid != NULL) { + /* if the add fails, we still delete the out-of-date old link */ + rc2 = __mdd_links_add(env, buf, newpfid, newlname); + leh = buf->lb_buf; + } + + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + +out: + if (rc == 0) + rc = rc2; + if (rc) + CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +const struct md_dir_operations mdd_dir_ops = { .mdo_is_subdir = mdd_is_subdir, .mdo_lookup = mdd_lookup, .mdo_create = mdd_create,