X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=03a9e86eb6c16c46c62f402263015dadd1ce2737;hp=bcb0eaca95c427d24327b46e2fd8f052f3ab61ed;hb=c159c408293fbebf71a948e630aa9f637f3c8ffe;hpb=bc310f7889a95597962ae4ff9414b02847b75034;ds=sidebyside diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index bcb0eac..03a9e86 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -1,30 +1,45 @@ -/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * mdd/mdd_handler.c - * Lustre Metadata Server (mdd) routines + * GPL HEADER START * - * Copyright (C) 2006 Cluster File Systems, Inc. - * Author: Wang Di + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/mdd/mdd_dir.c + * + * Lustre Metadata Server (mdd) routines + * + * Author: Wang Di */ + #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif @@ -56,16 +71,29 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle); +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle); + static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct dynlock_handle *dlh; int rc; - dlh = mdd_pdo_read_lock(env, mdd_obj, name); + dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (unlikely(dlh == NULL)) return -ENOMEM; rc = __mdd_lookup(env, pobj, lname, fid, mask); @@ -74,9 +102,9 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, return rc; } -static int mdd_lookup(const struct lu_env *env, - struct md_object *pobj, const struct lu_name *lname, - struct lu_fid* fid, struct md_op_spec *spec) +int mdd_lookup(const struct lu_env *env, + struct md_object *pobj, const struct lu_name *lname, + struct lu_fid* fid, struct md_op_spec *spec) { int rc; ENTRY; @@ -84,7 +112,6 @@ static int mdd_lookup(const struct lu_env *env, RETURN(rc); } - static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, struct lu_fid *fid) { @@ -92,10 +119,10 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, } /* - * For root fid use special function, whcih does not compare version component - * of fid. Vresion component is different for root fids on all MDTs. + * For root fid use special function, which does not compare version component + * of fid. Version component is different for root fids on all MDTs. */ -static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) +int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) { return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) && fid_oid(&mdd->mdd_root_fid) == fid_oid(fid); @@ -208,7 +235,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, { struct dt_it *it; struct dt_object *obj; - struct dt_it_ops *iops; + const struct dt_it_ops *iops; int result; ENTRY; @@ -217,7 +244,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, 0, BYPASS_CAPA); + it = iops->init(env, obj, BYPASS_CAPA); if (it != NULL) { result = iops->get(env, it, (const void *)""); if (result > 0) { @@ -252,7 +279,11 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj) if (rc) RETURN(rc); - if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) + /* + * Subdir count limitation can be broken through. + */ + if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink && + !S_ISDIR(la->la_mode)) RETURN(-EMLINK); else RETURN(0); @@ -276,7 +307,8 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj, if (check_perm) rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (!rc && check_nlink) rc = __mdd_may_link(env, pobj); @@ -301,7 +333,8 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj, RETURN(-EPERM); rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (rc) RETURN(rc); @@ -327,20 +360,20 @@ static inline int mdd_is_sticky(const struct lu_env *env, rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); if (rc) return rc; - + if (!(tmp_la->la_mode & S_ISVTX) || (tmp_la->la_uid == uc->mu_fsuid)) return 0; } rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); - if (rc) + if (rc) return rc; - + if (tmp_la->la_uid == uc->mu_fsuid) return 0; - - return !mdd_capable(uc, CAP_FOWNER); + + return !mdd_capable(uc, CFS_CAP_FOWNER); } /* @@ -364,7 +397,8 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, if (check_perm) { rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC); + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); if (rc) RETURN(rc); } @@ -436,15 +470,42 @@ int mdd_link_sanity_check(const struct lu_env *env, RETURN(rc); } -const struct dt_rec *__mdd_fid_rec(const struct lu_env *env, - const struct lu_fid *fid) +/** + * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and + * assign i_nlink to 1 which means the i_nlink for subdir count is incredible + * (maybe too large to be represented). It is a trick to break through the + * "i_nlink" limitation for subdir count. + */ +void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle) { - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; + struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; + struct mdd_device *m = mdd_obj2mdd_dev(obj); - fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2); - return (const struct dt_rec *)pack; + if (!mdd_is_mnlink(obj)) { + if (S_ISDIR(mdd_object_type(obj))) { + if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA)) + return; + + if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) { + obj->mod_flags |= MNLINK_OBJ; + tmp_la->la_nlink = 1; + tmp_la->la_valid = LA_NLINK; + mdd_attr_set_internal(env, obj, tmp_la, handle, + 0); + return; + } + } + mdo_ref_add(env, obj, handle); + } } +void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, + struct thandle *handle, int is_dot) +{ + if (!mdd_is_mnlink(obj) || is_dot) + mdo_ref_del(env, obj, handle); +} /* insert named index, add reference if isdir */ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, @@ -456,18 +517,21 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, ENTRY; if (dt_try_as_dir(env, next)) { + struct md_ucred *uc = md_ucred(env); + rc = next->do_index_ops->dio_insert(env, next, __mdd_fid_rec(env, lf), (const struct dt_key *)name, - handle, capa); + handle, capa, uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } if (rc == 0) { if (is_dir) { - mdd_write_lock(env, pobj); - mdo_ref_add(env, pobj, handle); + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_add(env, pobj, handle); mdd_write_unlock(env, pobj); } } @@ -488,8 +552,12 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, (struct dt_key *)name, handle, capa); if (rc == 0 && is_dir) { - mdd_write_lock(env, pobj); - mdo_ref_del(env, pobj, handle); + int is_dot = 0; + + if (name != NULL && name[0] == '.' && name[1] == 0) + is_dot = 1; + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + __mdd_ref_del(env, pobj, handle, is_dot); mdd_write_unlock(env, pobj); } } else @@ -508,39 +576,127 @@ __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, ENTRY; if (dt_try_as_dir(env, next)) { + struct md_ucred *uc = md_ucred(env); + rc = next->do_index_ops->dio_insert(env, next, __mdd_fid_rec(env, lf), (const struct dt_key *)name, - handle, capa); + handle, capa, uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } RETURN(rc); } +/** Store a namespace change changelog record + * If this fails, we must fail the whole transaction; we don't + * want the change to commit without the log entry. + * \param target - mdd_object of change + * \param parent - parent dir/object + * \param tf - target lu_fid, overrides fid of \a target if this is non-null + * \param tname - target name string + * \param handle - transacion handle + */ +static int mdd_changelog_ns_store(const struct lu_env *env, + struct mdd_device *mdd, + enum changelog_rec_type type, + struct mdd_object *target, + struct mdd_object *parent, + const struct lu_fid *tf, + const struct lu_name *tname, + struct thandle *handle) +{ + const struct lu_fid *tfid; + const struct lu_fid *tpfid = mdo2fid(parent); + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; + ENTRY; + + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + RETURN(0); + + LASSERT(parent != NULL); + LASSERT(tname != NULL); + LASSERT(handle != NULL); + + /* target */ + reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + rec = (struct llog_changelog_rec *)buf->lb_buf; + + rec->cr_flags = CLF_VERSION; + rec->cr_type = (__u32)type; + tfid = tf ? tf : mdo2fid(target); + rec->cr_tfid = *tfid; + rec->cr_pfid = *tpfid; + rec->cr_namelen = tname->ln_namelen; + memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen); + if (likely(target)) + target->mod_cltime = cfs_time_current_64(); + + rc = mdd_changelog_llog_write(mdd, rec, handle); + if (rc < 0) { + CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", + rc, type, tname->ln_name, PFID(tfid), PFID(tpfid)); + return -EFAULT; + } + + return 0; +} + static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); struct mdd_device *mdd = mdo2mdd(src_obj); struct dynlock_handle *dlh; struct thandle *handle; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qids); + /* get block quota for parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], 1, + &rec_pending, NULL, LQUOTA_FLAGS_BLK, + data, 1); + } + } +#endif + mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_tobj, name); + dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - mdd_write_lock(env, mdd_sobj); + mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD); rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj); if (rc) @@ -552,7 +708,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, if (rc) GOTO(out_unlock, rc); - mdo_ref_add(env, mdd_sobj, handle); + __mdd_ref_add(env, mdd_sobj, handle); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -564,12 +720,31 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0); + if (rc == 0) + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle); + EXIT; out_unlock: mdd_write_unlock(env, mdd_sobj); mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj, + mdd_tobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqacq for the parent owner. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, + quota_opc); + } +#endif return rc; } @@ -579,23 +754,28 @@ int mdd_finish_unlink(const struct lu_env *env, struct thandle *th) { int rc; + int reset = 1; ENTRY; rc = mdd_iattr_get(env, obj, ma); if (rc == 0 && ma->ma_attr.la_nlink == 0) { /* add new orphan and the object - * will be deleted during the object_put() */ - if (__mdd_orphan_add(env, obj, th) == 0) - obj->mod_flags |= ORPHAN_OBJ; + * will be deleted during mdd_close() */ + if (obj->mod_count) { + rc = __mdd_orphan_add(env, obj, th); + if (rc == 0) + obj->mod_flags |= ORPHAN_OBJ; + } obj->mod_flags |= DEAD_OBJ; - if (obj->mod_count == 0) + if (!(obj->mod_flags & ORPHAN_OBJ)) { rc = mdd_object_kill(env, obj, ma); - else - /* clear MA_LOV | MA_COOKIE, if we do not - * unlink it in case we get it somewhere */ - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); - } else + if (rc == 0) + reset = 0; + } + + } + if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); RETURN(rc); @@ -620,14 +800,22 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *mdd_cobj = md2mdd_obj(cobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; - int rc, is_dir; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif + int is_dir = S_ISDIR(ma->ma_attr.la_mode); + int rc; ENTRY; LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", @@ -641,13 +829,11 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - - dlh = mdd_pdo_write_lock(env, mdd_pobj, name); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - mdd_write_lock(env, mdd_cobj); + mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - is_dir = S_ISDIR(ma->ma_attr.la_mode); rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); if (rc) GOTO(cleanup, rc); @@ -657,10 +843,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - mdo_ref_del(env, mdd_cobj, handle); + __mdd_ref_del(env, mdd_cobj, handle, 0); if (is_dir) /* unlink dot */ - mdo_ref_del(env, mdd_cobj, handle); + __mdd_ref_del(env, mdd_cobj, handle, 1); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -676,17 +862,52 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, GOTO(cleanup, rc); rc = mdd_finish_unlink(env, mdd_cobj, ma, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA); + if (!rc) { + mdd_quota_wrapper(la_tmp, qpids); + if (mdd_cobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK; + mdd_quota_wrapper(&ma->ma_attr, qcids); + } else { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT; + } + } + } +#endif if (rc == 0) obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, NULL, NULL); + if (!is_dir) + /* old files may not have link ea; ignore errors */ + mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), + lname, NULL, NULL, handle); + EXIT; cleanup: mdd_write_unlock(env, mdd_cobj); mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + is_dir ? CL_RMDIR : CL_UNLINK, + mdd_cobj, mdd_pobj, NULL, lname, + handle); + mdd_trans_stop(env, mdd, rc, handle); +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) + /* Trigger dqrel on the owner of child and parent. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, + quota_opc); +#endif return rc; } @@ -716,22 +937,52 @@ static int mdd_name_insert(const struct lu_env *env, const struct lu_fid *fid, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; int is_dir = S_ISDIR(ma->ma_attr.la_mode); +#ifdef HAVE_QUOTA_SUPPORT + struct md_ucred *uc = md_ucred(env); + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; + cfs_cap_t save = uc->mu_cap; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qids); + /* get block quota for parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qids[USRQUOTA], qids[GRPQUOTA], + 1, &rec_pending, NULL, + LQUOTA_FLAGS_BLK, data, 1); + } + } else { + uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK; + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP); handle = mdd_trans_start(env, mdo2mdd(pobj)); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_obj, name); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -761,6 +1012,24 @@ out_unlock: mdd_pdo_write_unlock(env, mdd_obj, dlh); out_trans: mdd_trans_stop(env, mdo2mdd(pobj), rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (quota_opc) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, + obd, qids[USRQUOTA], + qids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqacq for the parent owner. If failed, + * the next call for lquota_chkquota will process it*/ + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, + rc, quota_opc); + } else { + uc->mu_cap = save; + } + } +#endif return rc; } @@ -789,22 +1058,39 @@ static int mdd_name_remove(const struct lu_env *env, const struct lu_name *lname, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; int is_dir = S_ISDIR(ma->ma_attr.la_mode); +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); + if (!rc) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT; + mdd_quota_wrapper(la_tmp, qids); + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_obj, name); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -834,6 +1120,14 @@ out_unlock: mdd_pdo_write_unlock(env, mdd_obj, dlh); out_trans: mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + /* Trigger dqrel for the parent owner. + * If failed, the next call for lquota_chkquota will process it. */ + if (quota_opc) + lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, + quota_opc); +#endif return rc; } @@ -865,31 +1159,57 @@ static int mdd_rt_sanity_check(const struct lu_env *env, RETURN(rc); } +/* Partial rename op on slave MDD */ static int mdd_rename_tgt(const struct lu_env *env, struct md_object *pobj, struct md_object *tobj, const struct lu_fid *lf, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tpobj = md2mdd_obj(pobj); struct mdd_object *mdd_tobj = md2mdd_obj(tobj); struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif int rc; ENTRY; +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && !tobj) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qpids); + /* get block quota for target parent */ + lquota_chkquota(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], 1, + &rec_pending, NULL, LQUOTA_FLAGS_BLK, + data, 1); + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_tpobj, name); + dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); if (tobj) - mdd_write_lock(env, mdd_tobj); + mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma); if (rc) @@ -916,17 +1236,17 @@ static int mdd_rename_tgt(const struct lu_env *env, if (rc) GOTO(cleanup, rc); - /* + /* * For tobj is remote case cmm layer has processed * and pass NULL tobj to here. So when tobj is NOT NULL, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { - mdo_ref_del(env, mdd_tobj, handle); + __mdd_ref_del(env, mdd_tobj, handle, 0); /* Remove dot reference. */ if (S_ISDIR(ma->ma_attr.la_mode)) - mdo_ref_del(env, mdd_tobj, handle); + __mdd_ref_del(env, mdd_tobj, handle, 1); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); @@ -936,6 +1256,14 @@ static int mdd_rename_tgt(const struct lu_env *env, rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); if (rc) GOTO(cleanup, rc); + +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qcids); + } +#endif } EXIT; cleanup: @@ -943,7 +1271,29 @@ cleanup: mdd_write_unlock(env, mdd_tobj); mdd_pdo_write_unlock(env, mdd_tpobj, dlh); out_trans: + if (rc == 0) + /* Bare EXT record with no RENAME in front of it signifies + a partial slave op */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, NULL, lname, handle); + mdd_trans_stop(env, mdd, rc, handle); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qpids[USRQUOTA], + qpids[GRPQUOTA], + rec_pending, 1); + if (quota_opc) + /* Trigger dqrel/dqacq on the target owner of child and + * parent. If failed, the next call for lquota_chkquota + * will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, + qpids, rc, quota_opc); + } +#endif return rc; } @@ -1001,7 +1351,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, /* Replay creates has objects already */ #if 0 - if (spec->u.sp_ea.no_lov_create) { + if (spec->no_create) { CDEBUG(D_INFO, "we already have lov ea\n"); rc = mdd_lov_set_md(env, mdd_pobj, son, (struct lov_mds_md *)spec->u.sp_ea.eadata, @@ -1026,11 +1376,12 @@ out_free: RETURN(rc); } +/* Get fid from name and parent */ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; const struct dt_key *key = (const struct dt_key *)name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); @@ -1055,17 +1406,21 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)) RETURN(-ENAMETOOLONG); - rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask); + rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask, + MOR_TGT_PARENT); if (rc) RETURN(rc); if (likely(S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))) { + rc = dir->do_index_ops->dio_lookup(env, dir, (struct dt_rec *)pack, key, mdd_object_capa(env, mdd_obj)); - if (rc == 0) + if (rc > 0) rc = fid_unpack(pack, fid); + else if (rc == 0) + rc = -ENOENT; } else rc = -ENOTDIR; @@ -1073,8 +1428,9 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, } int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, - struct mdd_object *child, struct md_attr *ma, - struct thandle *handle) + const struct lu_name *lname, struct mdd_object *child, + struct md_attr *ma, struct thandle *handle, + const struct md_op_spec *spec) { int rc; ENTRY; @@ -1093,7 +1449,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, if (S_ISDIR(ma->ma_attr.la_mode)) { /* Add "." and ".." for newly created dir */ - mdo_ref_add(env, child, handle); + __mdd_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), dot, handle, BYPASS_CAPA); if (rc == 0) { @@ -1103,16 +1459,17 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, if (rc != 0) { int rc2; - rc2 = __mdd_index_delete(env, child, dot, 0, + rc2 = __mdd_index_delete(env, child, dot, 1, handle, BYPASS_CAPA); if (rc2 != 0) CERROR("Failure to cleanup after dotdot" " creation: %d (%d)\n", rc2, rc); - else - mdo_ref_del(env, child, handle); } } } + if (rc == 0) + mdd_links_add(env, child, pfid, lname, handle); + RETURN(rc); } @@ -1157,7 +1514,8 @@ static int mdd_create_sanity_check(const struct lu_env *env, * EXEC permission have been checked * when lookup before create already. */ - rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE); + rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE, + MOR_TGT_PARENT); if (rc) RETURN(rc); } @@ -1176,12 +1534,6 @@ static int mdd_create_sanity_check(const struct lu_env *env, } switch (ma->ma_attr.la_mode & S_IFMT) { - case S_IFDIR: { - if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) - RETURN(-EMLINK); - else - RETURN(0); - } case S_IFLNK: { unsigned int symlen = strlen(spec->u.sp_symname) + 1; @@ -1190,6 +1542,7 @@ static int mdd_create_sanity_check(const struct lu_env *env, else RETURN(0); } + case S_IFDIR: case S_IFREG: case S_IFCHR: case S_IFBLK: @@ -1214,24 +1567,35 @@ static int mdd_create(const struct lu_env *env, struct md_op_spec *spec, struct md_attr* ma) { - char *name = lname->ln_name; - struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *son = md2mdd_obj(child); - struct mdd_device *mdd = mdo2mdd(pobj); - struct lu_attr *attr = &ma->ma_attr; - struct lov_mds_md *lmm = NULL; - struct thandle *handle; - int rc, created = 0, inserted = 0, lmm_size = 0; - struct dynlock_handle *dlh; + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_attr *la = &info->mti_la_for_fix; + struct md_attr *ma_acl = &info->mti_ma; + struct mdd_object *mdd_pobj = md2mdd_obj(pobj); + struct mdd_object *son = md2mdd_obj(child); + struct mdd_device *mdd = mdo2mdd(pobj); + struct lu_attr *attr = &ma->ma_attr; + struct lov_mds_md *lmm = NULL; + struct thandle *handle; + struct dynlock_handle *dlh; + const char *name = lname->ln_name; + int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0; + int got_def_acl = 0; +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, block_count = 0; + int inode_pending = 0, block_pending = 0, parent_pending = 0; +#endif ENTRY; /* * Two operations have to be performed: * - * - allocation of new object (->do_create()), and + * - an allocation of a new object (->do_create()), and * - * - insertion into parent index (->dio_insert()). + * - an insertion into a parent index (->dio_insert()). * * Due to locking, operation order is not important, when both are * successful, *but* error handling cases are quite different: @@ -1267,6 +1631,51 @@ static int mdd_create(const struct lu_env *env, if (rc) RETURN(rc); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA); + if (!rc) { + int same = 0; + + quota_opc = FSFILT_OP_CREATE; + mdd_quota_wrapper(&ma->ma_attr, qcids); + mdd_quota_wrapper(la_tmp, qpids); + /* get file quota for child */ + lquota_chkquota(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], 1, + &inode_pending, NULL, 0, NULL, 0); + switch (ma->ma_attr.la_mode & S_IFMT) { + case S_IFLNK: + case S_IFDIR: + block_count = 2; + break; + case S_IFREG: + block_count = 1; + break; + } + if (qcids[USRQUOTA] == qpids[USRQUOTA] && + qcids[GRPQUOTA] == qpids[GRPQUOTA]) { + block_count += 1; + same = 1; + } + /* get block quota for child and parent */ + if (block_count) + lquota_chkquota(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + block_count, + &block_pending, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); + if (!same) + lquota_chkquota(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], 1, + &parent_pending, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); + } + } +#endif + /* * No RPC inside the transaction, so OST objects should be created at * first. @@ -1275,7 +1684,22 @@ static int mdd_create(const struct lu_env *env, rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, attr); if (rc) - RETURN(rc); + GOTO(out_pending, rc); + } + + if (!S_ISLNK(attr->la_mode)) { + ma_acl->ma_acl_size = sizeof info->mti_xattr_buf; + ma_acl->ma_acl = info->mti_xattr_buf; + ma_acl->ma_need = MA_ACL_DEF; + ma_acl->ma_valid = 0; + + mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT); + rc = mdd_def_acl_get(env, mdd_pobj, ma_acl); + mdd_read_unlock(env, mdd_pobj); + if (rc) + GOTO(out_free, rc); + else if (ma_acl->ma_valid & MA_ACL_DEF) + got_def_acl = 1; } mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP); @@ -1283,16 +1707,12 @@ static int mdd_create(const struct lu_env *env, if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); - dlh = mdd_pdo_write_lock(env, mdd_pobj, name); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); - /* - * XXX: Check that link can be added to the parent in mkdir case. - */ - - mdd_write_lock(env, son); - rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle); + mdd_write_lock(env, son, MOR_TGT_CHILD); + rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec); if (rc) { mdd_write_unlock(env, son); GOTO(cleanup, rc); @@ -1301,19 +1721,23 @@ static int mdd_create(const struct lu_env *env, created = 1; #ifdef CONFIG_FS_POSIX_ACL - mdd_read_lock(env, mdd_pobj); - rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle); - mdd_read_unlock(env, mdd_pobj); - if (rc) { - mdd_write_unlock(env, son); - GOTO(cleanup, rc); - } else { - ma->ma_attr.la_valid |= LA_MODE; + if (got_def_acl) { + struct lu_buf *acl_buf = &info->mti_buf; + acl_buf->lb_buf = ma_acl->ma_acl; + acl_buf->lb_len = ma_acl->ma_acl_size; + + rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle); + if (rc) { + mdd_write_unlock(env, son); + GOTO(cleanup, rc); + } else { + ma->ma_attr.la_valid |= LA_MODE; + } } #endif - rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), - son, ma, handle); + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, + son, ma, handle, spec); mdd_write_unlock(env, son); if (rc) /* @@ -1322,6 +1746,8 @@ static int mdd_create(const struct lu_env *env, */ GOTO(cleanup, rc); + initialized = 1; + rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), name, S_ISDIR(attr->la_mode), handle, mdd_object_capa(env, mdd_pobj)); @@ -1345,6 +1771,7 @@ static int mdd_create(const struct lu_env *env, } if (S_ISLNK(attr->la_mode)) { + struct md_ucred *uc = md_ucred(env); struct dt_object *dt = mdd_object_child(son); const char *target_name = spec->u.sp_symname; int sym_len = strlen(target_name); @@ -1353,7 +1780,9 @@ static int mdd_create(const struct lu_env *env, buf = mdd_buf_get_const(env, target_name, sym_len); rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, - mdd_object_capa(env, son)); + mdd_object_capa(env, son), + uc->mu_cap & + CFS_CAP_SYS_RESOURCE_MASK); if (rc == sym_len) rc = 0; @@ -1382,9 +1811,12 @@ cleanup: CERROR("error can not cleanup destroy %d\n", rc2); } + if (rc2 == 0) { - mdd_write_lock(env, son); - mdo_ref_del(env, son, handle); + mdd_write_lock(env, son, MOR_TGT_CHILD); + __mdd_ref_del(env, son, handle, 0); + if (initialized && S_ISDIR(attr->la_mode)) + __mdd_ref_del(env, son, handle, 1); mdd_write_unlock(env, son); } } @@ -1395,10 +1827,37 @@ cleanup: mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + S_ISDIR(attr->la_mode) ? CL_MKDIR : + S_ISREG(attr->la_mode) ? CL_CREATE : + S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, + son, mdd_pobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); out_free: /* finis lov_create stuff, free all temporary data */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (quota_opc) { + if (inode_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + inode_pending, 0); + if (block_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qcids[USRQUOTA], qcids[GRPQUOTA], + block_pending, 1); + if (parent_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qpids[USRQUOTA], qpids[GRPQUOTA], + parent_pending, 1); + /* Trigger dqacq on the owner of child and parent. If failed, + * the next call for lquota_chkquota will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, + quota_opc); + } +#endif return rc; } @@ -1491,18 +1950,28 @@ static int mdd_rename(const struct lu_env *env, struct md_object *tobj, const struct lu_name *ltname, struct md_attr *ma) { - char *sname = lsname->ln_name; - char *tname = ltname->ln_name; + const char *sname = lsname->ln_name; + const char *tname = ltname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); + struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */ struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); struct mdd_device *mdd = mdo2mdd(src_pobj); - struct mdd_object *mdd_sobj = NULL; + struct mdd_object *mdd_sobj = NULL; /* source object */ struct mdd_object *mdd_tobj = NULL; struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; + const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); int is_dir; int rc; + +#ifdef HAVE_QUOTA_SUPPORT + struct obd_device *obd = mdd->mdd_obd_dev; + struct mds_obd *mds = &obd->u.mds; + unsigned int qspids[MAXQUOTAS] = { 0, 0 }; + unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; + unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; + int quota_opc = 0, rec_pending = 0; +#endif ENTRY; LASSERT(ma->ma_attr.la_mode & S_IFMT); @@ -1511,10 +1980,37 @@ static int mdd_rename(const struct lu_env *env, if (tobj) mdd_tobj = md2mdd_obj(tobj); +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; + + rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA); + if (!rc) { + mdd_quota_wrapper(la_tmp, qspids); + if (!tobj) { + rc = mdd_la_get(env, mdd_tpobj, la_tmp, + BYPASS_CAPA); + if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_opc = FSFILT_OP_LINK; + mdd_quota_wrapper(la_tmp, qtpids); + /* get block quota for target parent */ + lquota_chkquota(mds_quota_interface_ref, + obd, qtpids[USRQUOTA], + qtpids[GRPQUOTA], 1, + &rec_pending, NULL, + LQUOTA_FLAGS_BLK, + data, 1); + } + } + } + } +#endif mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); + GOTO(out_pending, rc = PTR_ERR(handle)); /* FIXME: Should consider tobj and sobj too in rename_lock. */ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj); @@ -1523,18 +2019,20 @@ static int mdd_rename(const struct lu_env *env, /* Get locks in determined order */ if (rc == MDD_RN_SAME) { - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, + sname, MOR_SRC_PARENT); /* check hashes to determine do we need one lock or two */ if (mdd_name2hash(sname) != mdd_name2hash(tname)) - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname, + MOR_TGT_PARENT); else tdlh = sdlh; } else if (rc == MDD_RN_SRCTGT) { - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT); } else { - tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname); - sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname); + tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT); + sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT); } if (sdlh == NULL || tdlh == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -1545,12 +2043,27 @@ static int mdd_rename(const struct lu_env *env, if (rc) GOTO(cleanup, rc); + /* Remove source name from source directory */ rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, mdd_object_capa(env, mdd_spobj)); if (rc) GOTO(cleanup, rc); - /* + /* "mv dir1 dir2" needs "dir1/.." link update */ + if (is_dir && mdd_sobj) { + rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, + mdd_object_capa(env, mdd_spobj)); + if (rc) + GOTO(cleanup, rc); + + rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, + is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); + if (rc) + GOTO(cleanup, rc); + } + + /* Remove target name from target directory * Here tobj can be remote one, so we do index_delete unconditionally * and -ENOENT is allowed. */ @@ -1559,6 +2072,7 @@ static int mdd_rename(const struct lu_env *env, if (rc != 0 && rc != -ENOENT) GOTO(cleanup, rc); + /* Insert new fid with target name into target dir */ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); if (rc) @@ -1576,18 +2090,18 @@ static int mdd_rename(const struct lu_env *env, GOTO(cleanup, rc); } - /* + /* Remove old target object * For tobj is remote case cmm layer has processed * and set tobj to NULL then. So when tobj is NOT NULL, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { - mdd_write_lock(env, mdd_tobj); - mdo_ref_del(env, mdd_tobj, handle); + mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); + __mdd_ref_del(env, mdd_tobj, handle, 0); /* Remove dot reference. */ if (is_dir) - mdo_ref_del(env, mdd_tobj, handle); + __mdd_ref_del(env, mdd_tobj, handle, 1); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); @@ -1598,6 +2112,14 @@ static int mdd_rename(const struct lu_env *env, mdd_write_unlock(env, mdd_tobj); if (rc) GOTO(cleanup, rc); + +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota && ma->ma_valid & MA_INODE && + ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qtcids); + } +#endif } la->la_valid = LA_CTIME | LA_MTIME; @@ -1611,6 +2133,20 @@ static int mdd_rename(const struct lu_env *env, handle, 0); } + if (rc == 0 && mdd_sobj) { + mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD); + rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, + mdo2fid(mdd_tpobj), ltname, handle); + if (rc == -ENOENT) + /* Old files might not have EA entry */ + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), + lsname, handle); + mdd_write_unlock(env, mdd_sobj); + /* We don't fail the transaction if the link ea can't be + updated -- fid2path will use alternate lookup method. */ + rc = 0; + } + EXIT; cleanup: if (likely(tdlh) && sdlh != tdlh) @@ -1618,13 +2154,301 @@ cleanup: if (likely(sdlh)) mdd_pdo_write_unlock(env, mdd_spobj, sdlh); cleanup_unlocked: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj, + mdd_spobj, lf, lsname, handle); + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + mdd_tpobj, lf, ltname, handle); + mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); +out_pending: +#ifdef HAVE_QUOTA_SUPPORT + if (mds->mds_quota) { + if (rec_pending) + lquota_pending_commit(mds_quota_interface_ref, obd, + qtpids[USRQUOTA], + qtpids[GRPQUOTA], + rec_pending, 1); + /* Trigger dqrel on the source owner of parent. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, + FSFILT_OP_UNLINK_PARTIAL_PARENT); + if (quota_opc) + /* Trigger dqrel/dqacq on the target owner of child and + * parent. If failed, the next call for lquota_chkquota + * will process it. */ + lquota_adjust(mds_quota_interface_ref, obd, qtcids, + qtpids, rc, quota_opc); + } +#endif return rc; } -struct md_dir_operations mdd_dir_ops = { +/** enable/disable storing of hardlink info */ +int mdd_linkea_enable = 1; +CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, + "record hardlink info in EAs"); + +/** Read the link EA into a temp buffer. + * Uses the name_buf since it is generally large. + * \retval IS_ERR err + * \retval ptr to \a lu_buf (always \a mti_big_buf) + */ +struct lu_buf *mdd_links_get(const struct lu_env *env, + struct mdd_object *mdd_obj) +{ + struct lu_buf *buf; + struct lustre_capa *capa; + struct link_ea_header *leh; + int rc; + + /* First try a small buf */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + + capa = mdd_object_capa(env, mdd_obj); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc == -ERANGE) { + /* Buf was too small, figure out what we need. */ + buf->lb_buf = NULL; + buf->lb_len = 0; + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc < 0) + return ERR_PTR(rc); + buf = mdd_buf_alloc(env, rc); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + } + if (rc < 0) + return ERR_PTR(rc); + + leh = buf->lb_buf; + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + /* entries are swabbed by mdd_lee_unpack */ + } + if (leh->leh_magic != LINK_EA_MAGIC) + return ERR_PTR(-EINVAL); + if (leh->leh_reccount == 0) + return ERR_PTR(-ENODATA); + + return buf; +} + +/** Pack a link_ea_entry. + * All elements are stored as chars to avoid alignment issues. + * Numbers are always big-endian + * \param packbuf is a temp fid buffer + * \retval record length + */ +static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, + const struct lu_fid *pfid, struct lu_fid* packbuf) +{ + char *ptr; + int reclen; + + fid_pack(&lee->lee_parent_fid, pfid, packbuf); + ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len; + strncpy(ptr, lname->ln_name, lname->ln_namelen); + reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen + + sizeof(lee->lee_reclen); + lee->lee_reclen[0] = (reclen >> 8) & 0xff; + lee->lee_reclen[1] = reclen & 0xff; + return reclen; +} + +void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, + struct lu_name *lname, struct lu_fid *pfid) +{ + *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; + fid_unpack(&lee->lee_parent_fid, pfid); + lname->ln_name = (char *)&lee->lee_parent_fid + + lee->lee_parent_fid.fp_len; + lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len - + sizeof(lee->lee_reclen); +} + +/** Add a record to the end of link ea buf */ +static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, + const struct lu_fid *pfid, + const struct lu_name *lname) +{ + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + + if (lname == NULL || pfid == NULL) + return -EINVAL; + + /* Make sure our buf is big enough for the new one */ + leh = buf->lb_buf; + reclen = lname->ln_namelen + sizeof(struct link_ea_entry); + if (leh->leh_len + reclen > buf->lb_len) { + if (mdd_buf_grow(env, leh->leh_len + reclen) < 0) + return -ENOMEM; + } + + leh = buf->lb_buf; + lee = buf->lb_buf + leh->leh_len; + reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2); + leh->leh_len += reclen; + leh->leh_reccount++; + return 0; +} + +/* For pathologic linkers, we don't want to spend lots of time scanning the + * link ea. Limit ourseleves to something reasonable; links not in the EA + * can be looked up via (slower) parent lookup. + */ +#define LINKEA_MAX_COUNT 128 + +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + int rc; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + if (rc != -ENODATA) { + CERROR("link_ea read failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + RETURN (rc); + } + /* empty EA; start one */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + leh = buf->lb_buf; + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_len = sizeof(struct link_ea_header); + leh->leh_reccount = 0; + } + + leh = buf->lb_buf; + if (leh->leh_reccount > LINKEA_MAX_COUNT) + RETURN(-EOVERFLOW); + + rc = __mdd_links_add(env, buf, pfid, lname); + if (rc) + RETURN(rc); + + leh = buf->lb_buf; + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + if (rc) + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + int reclen = 0; + int count; + int rc, rc2 = 0; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + if (mdd_obj->mod_flags & DEAD_OBJ) + /* No more links, don't bother */ + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + RETURN(rc); + } + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ + + /* Find the old record */ + for(count = 0; count <= leh->leh_reccount; count++) { + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + if (tmpname->ln_namelen == oldlname->ln_namelen && + lu_fid_eq(tmpfid, oldpfid) && + (strncmp(tmpname->ln_name, oldlname->ln_name, + tmpname->ln_namelen) == 0)) + break; + lee = (struct link_ea_entry *)((char *)lee + reclen); + } + if (count > leh->leh_reccount) { + CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", + oldlname->ln_namelen, oldlname->ln_name); + GOTO(out, rc = -ENOENT); + } + + /* Remove the old record */ + leh->leh_reccount--; + leh->leh_len -= reclen; + memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len - + (char *)lee); + + /* If renaming, add the new record */ + if (newpfid != NULL) { + /* if the add fails, we still delete the out-of-date old link */ + rc2 = __mdd_links_add(env, buf, newpfid, newlname); + leh = buf->lb_buf; + } + + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + +out: + if (rc == 0) + rc = rc2; + if (rc) + CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_vmalloc) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +const struct md_dir_operations mdd_dir_ops = { .mdo_is_subdir = mdd_is_subdir, .mdo_lookup = mdd_lookup, .mdo_create = mdd_create,