-/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * mdd/mdd_handler.c
- * Lustre Metadata Server (mdd) routines
+ * GPL HEADER START
*
- * Copyright (C) 2006 Cluster File Systems, Inc.
- * Author: Wang Di <wangdi@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdd/mdd_dir.c
+ *
+ * Lustre Metadata Server (mdd) routines
+ *
+ * Author: Wang Di <wangdi@clusterfs.com>
*/
+
#ifndef EXPORT_SYMTAB
# define EXPORT_SYMTAB
#endif
struct dynlock_handle *dlh;
int rc;
- dlh = mdd_pdo_read_lock(env, mdd_obj, name);
+ dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
if (unlikely(dlh == NULL))
return -ENOMEM;
rc = __mdd_lookup(env, pobj, lname, fid, mask);
{
struct dt_it *it;
struct dt_object *obj;
- struct dt_it_ops *iops;
+ const struct dt_it_ops *iops;
int result;
ENTRY;
if (rc)
RETURN(rc);
- if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
+ /*
+ * Subdir count limitation can be broken through.
+ */
+ if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
+ !S_ISDIR(la->la_mode))
RETURN(-EMLINK);
else
RETURN(0);
if (check_perm)
rc = mdd_permission_internal_locked(env, pobj, NULL,
- MAY_WRITE | MAY_EXEC);
+ MAY_WRITE | MAY_EXEC,
+ MOR_TGT_PARENT);
if (!rc && check_nlink)
rc = __mdd_may_link(env, pobj);
RETURN(-EPERM);
rc = mdd_permission_internal_locked(env, pobj, NULL,
- MAY_WRITE | MAY_EXEC);
+ MAY_WRITE | MAY_EXEC,
+ MOR_TGT_PARENT);
if (rc)
RETURN(rc);
rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
if (rc)
return rc;
-
+
if (!(tmp_la->la_mode & S_ISVTX) ||
(tmp_la->la_uid == uc->mu_fsuid))
return 0;
}
rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
- if (rc)
+ if (rc)
return rc;
-
+
if (tmp_la->la_uid == uc->mu_fsuid)
return 0;
-
- return !mdd_capable(uc, CAP_FOWNER);
+
+ return !mdd_capable(uc, CFS_CAP_FOWNER);
}
/*
if (check_perm) {
rc = mdd_permission_internal_locked(env, pobj, NULL,
- MAY_WRITE | MAY_EXEC);
+ MAY_WRITE | MAY_EXEC,
+ MOR_TGT_PARENT);
if (rc)
RETURN(rc);
}
return (const struct dt_rec *)pack;
}
+/**
+ * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
+ * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
+ * (maybe too large to be represented). It is a trick to break through the
+ * "i_nlink" limitation for subdir count.
+ */
+void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
+ struct thandle *handle)
+{
+ struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+ struct mdd_device *m = mdd_obj2mdd_dev(obj);
+
+ if (!mdd_is_mnlink(obj)) {
+ if (S_ISDIR(mdd_object_type(obj))) {
+ if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
+ return;
+
+ if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
+ obj->mod_flags |= MNLINK_OBJ;
+ tmp_la->la_nlink = 1;
+ tmp_la->la_valid = LA_NLINK;
+ mdd_attr_set_internal(env, obj, tmp_la, handle,
+ 0);
+ return;
+ }
+ }
+ mdo_ref_add(env, obj, handle);
+ }
+}
+
+void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
+ struct thandle *handle, int is_dot)
+{
+ if (!mdd_is_mnlink(obj) || is_dot)
+ mdo_ref_del(env, obj, handle);
+}
/* insert named index, add reference if isdir */
static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
ENTRY;
if (dt_try_as_dir(env, next)) {
+ struct md_ucred *uc = md_ucred(env);
+
rc = next->do_index_ops->dio_insert(env, next,
__mdd_fid_rec(env, lf),
(const struct dt_key *)name,
- handle, capa);
+ handle, capa, uc->mu_cap &
+ CFS_CAP_SYS_RESOURCE_MASK);
} else {
rc = -ENOTDIR;
}
if (rc == 0) {
if (is_dir) {
- mdd_write_lock(env, pobj);
- mdo_ref_add(env, pobj, handle);
+ mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ __mdd_ref_add(env, pobj, handle);
mdd_write_unlock(env, pobj);
}
}
(struct dt_key *)name,
handle, capa);
if (rc == 0 && is_dir) {
- mdd_write_lock(env, pobj);
- mdo_ref_del(env, pobj, handle);
+ int is_dot = 0;
+
+ if (name != NULL && name[0] == '.' && name[1] == 0)
+ is_dot = 1;
+ mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ __mdd_ref_del(env, pobj, handle, is_dot);
mdd_write_unlock(env, pobj);
}
} else
ENTRY;
if (dt_try_as_dir(env, next)) {
+ struct md_ucred *uc = md_ucred(env);
+
rc = next->do_index_ops->dio_insert(env, next,
__mdd_fid_rec(env, lf),
(const struct dt_key *)name,
- handle, capa);
+ handle, capa, uc->mu_cap &
+ CFS_CAP_SYS_RESOURCE_MASK);
} else {
rc = -ENOTDIR;
}
struct mdd_device *mdd = mdo2mdd(src_obj);
struct dynlock_handle *dlh;
struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0, rec_pending = 0;
+#endif
int rc;
ENTRY;
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ quota_opc = FSFILT_OP_LINK;
+ mdd_quota_wrapper(la_tmp, qids);
+ /* get block quota for parent */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qids[USRQUOTA], qids[GRPQUOTA], 1,
+ &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+ }
+ }
+#endif
+
mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_pending, rc = PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
- mdd_write_lock(env, mdd_sobj);
+ mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
if (rc)
if (rc)
GOTO(out_unlock, rc);
- mdo_ref_add(env, mdd_sobj, handle);
+ __mdd_ref_add(env, mdd_sobj, handle);
LASSERT(ma->ma_attr.la_valid & LA_CTIME);
la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
mdd_pdo_write_unlock(env, mdd_tobj, dlh);
out_trans:
mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ if (quota_opc) {
+ if (rec_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qids[USRQUOTA], qids[GRPQUOTA],
+ 1, 1);
+ /* Trigger dqacq for the parent owner. If failed,
+ * the next call for lquota_chkquota will process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
+ quota_opc);
+ }
+#endif
return rc;
}
struct mdd_device *mdd = mdo2mdd(pobj);
struct dynlock_handle *dlh;
struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+ unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0;
+#endif
int rc, is_dir;
ENTRY;
RETURN(PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
- mdd_write_lock(env, mdd_cobj);
+ mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
is_dir = S_ISDIR(ma->ma_attr.la_mode);
rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
if (rc)
GOTO(cleanup, rc);
- mdo_ref_del(env, mdd_cobj, handle);
+ __mdd_ref_del(env, mdd_cobj, handle, 0);
if (is_dir)
/* unlink dot */
- mdo_ref_del(env, mdd_cobj, handle);
+ __mdd_ref_del(env, mdd_cobj, handle, 1);
LASSERT(ma->ma_attr.la_valid & LA_CTIME);
la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
GOTO(cleanup, rc);
rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_nlink == 0) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ mdd_quota_wrapper(la_tmp, qpids);
+ if (mdd_cobj->mod_count == 0) {
+ quota_opc = FSFILT_OP_UNLINK;
+ mdd_quota_wrapper(&ma->ma_attr, qcids);
+ } else {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
+ }
+ }
+ }
+#endif
if (rc == 0)
obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
mdd_pdo_write_unlock(env, mdd_pobj, dlh);
out_trans:
mdd_trans_stop(env, mdd, rc, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (quota_opc)
+ /* Trigger dqrel on the owner of child and parent. If failed,
+ * the next call for lquota_chkquota will process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
+ quota_opc);
+#endif
return rc;
}
struct dynlock_handle *dlh;
struct thandle *handle;
int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+#ifdef HAVE_QUOTA_SUPPORT
+ struct md_ucred *uc = md_ucred(env);
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0, rec_pending = 0;
+ cfs_cap_t save = uc->mu_cap;
+#endif
int rc;
ENTRY;
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ quota_opc = FSFILT_OP_LINK;
+ mdd_quota_wrapper(la_tmp, qids);
+ /* get block quota for parent */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qids[USRQUOTA], qids[GRPQUOTA],
+ 1, &rec_pending, NULL,
+ LQUOTA_FLAGS_BLK);
+ }
+ } else {
+ uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
+ }
+ }
+#endif
mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
handle = mdd_trans_start(env, mdo2mdd(pobj));
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_pending, rc = PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
mdd_pdo_write_unlock(env, mdd_obj, dlh);
out_trans:
mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ if (quota_opc) {
+ if (rec_pending)
+ lquota_pending_commit(mds_quota_interface_ref,
+ obd, qids[USRQUOTA],
+ qids[GRPQUOTA], 1, 1);
+ /* Trigger dqacq for the parent owner. If failed,
+ * the next call for lquota_chkquota will process it*/
+ lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
+ rc, quota_opc);
+ } else {
+ uc->mu_cap = save;
+ }
+ }
+#endif
return rc;
}
struct dynlock_handle *dlh;
struct thandle *handle;
int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0;
+#endif
int rc;
ENTRY;
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
+ mdd_quota_wrapper(la_tmp, qids);
+ }
+ }
+#endif
mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_pending, rc = PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
mdd_pdo_write_unlock(env, mdd_obj, dlh);
out_trans:
mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ /* Trigger dqrel for the parent owner.
+ * If failed, the next call for lquota_chkquota will process it. */
+ if (quota_opc)
+ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
+ quota_opc);
+#endif
return rc;
}
struct mdd_device *mdd = mdo2mdd(pobj);
struct dynlock_handle *dlh;
struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+ unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0, rec_pending = 0;
+#endif
int rc;
ENTRY;
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota && !tobj) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ quota_opc = FSFILT_OP_LINK;
+ mdd_quota_wrapper(la_tmp, qpids);
+ /* get block quota for target parent */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qpids[USRQUOTA], qpids[GRPQUOTA], 1,
+ &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+ }
+ }
+#endif
mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_pending, rc = PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
if (tobj)
- mdd_write_lock(env, mdd_tobj);
+ mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
if (rc)
if (rc)
GOTO(cleanup, rc);
- /*
+ /*
* For tobj is remote case cmm layer has processed
* and pass NULL tobj to here. So when tobj is NOT NULL,
* it must be local one.
*/
if (tobj && mdd_object_exists(mdd_tobj)) {
- mdo_ref_del(env, mdd_tobj, handle);
+ __mdd_ref_del(env, mdd_tobj, handle, 0);
/* Remove dot reference. */
if (S_ISDIR(ma->ma_attr.la_mode))
- mdo_ref_del(env, mdd_tobj, handle);
+ __mdd_ref_del(env, mdd_tobj, handle, 1);
la->la_valid = LA_CTIME;
rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
if (rc)
GOTO(cleanup, rc);
+
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qcids);
+ }
+#endif
}
EXIT;
cleanup:
mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
out_trans:
mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ if (rec_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qpids[USRQUOTA],
+ qpids[GRPQUOTA],
+ 1, 1);
+ if (quota_opc)
+ /* Trigger dqrel/dqacq on the target owner of child and
+ * parent. If failed, the next call for lquota_chkquota
+ * will process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, qcids,
+ qpids, rc, quota_opc);
+ }
+#endif
return rc;
}
if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
RETURN(-ENAMETOOLONG);
- rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
+ rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
+ MOR_TGT_PARENT);
if (rc)
RETURN(rc);
if (S_ISDIR(ma->ma_attr.la_mode)) {
/* Add "." and ".." for newly created dir */
- mdo_ref_add(env, child, handle);
+ __mdd_ref_add(env, child, handle);
rc = __mdd_index_insert_only(env, child, mdo2fid(child),
dot, handle, BYPASS_CAPA);
if (rc == 0) {
if (rc != 0) {
int rc2;
- rc2 = __mdd_index_delete(env, child, dot, 0,
+ rc2 = __mdd_index_delete(env, child, dot, 1,
handle, BYPASS_CAPA);
if (rc2 != 0)
CERROR("Failure to cleanup after dotdot"
" creation: %d (%d)\n", rc2, rc);
- else
- mdo_ref_del(env, child, handle);
}
}
}
* EXEC permission have been checked
* when lookup before create already.
*/
- rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
+ rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
+ MOR_TGT_PARENT);
if (rc)
RETURN(rc);
}
}
switch (ma->ma_attr.la_mode & S_IFMT) {
- case S_IFDIR: {
- if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
- RETURN(-EMLINK);
- else
- RETURN(0);
- }
case S_IFLNK: {
unsigned int symlen = strlen(spec->u.sp_symname) + 1;
else
RETURN(0);
}
+ case S_IFDIR:
case S_IFREG:
case S_IFCHR:
case S_IFBLK:
struct md_op_spec *spec,
struct md_attr* ma)
{
- char *name = lname->ln_name;
- struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
- struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
- struct mdd_object *son = md2mdd_obj(child);
- struct mdd_device *mdd = mdo2mdd(pobj);
- struct lu_attr *attr = &ma->ma_attr;
- struct lov_mds_md *lmm = NULL;
- struct thandle *handle;
- int rc, created = 0, inserted = 0, lmm_size = 0;
- struct dynlock_handle *dlh;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ struct md_attr *ma_acl = &info->mti_ma;
+ struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
+ struct mdd_object *son = md2mdd_obj(child);
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ struct lu_attr *attr = &ma->ma_attr;
+ struct lov_mds_md *lmm = NULL;
+ struct thandle *handle;
+ struct dynlock_handle *dlh;
+ char *name = lname->ln_name;
+ int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
+ int got_def_acl = 0;
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+ unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0, block_count = 0;
+ int inode_pending = 0, block_pending = 0, parent_pending = 0;
+#endif
ENTRY;
/*
* Two operations have to be performed:
*
- * - allocation of new object (->do_create()), and
+ * - an allocation of a new object (->do_create()), and
*
- * - insertion into parent index (->dio_insert()).
+ * - an insertion into a parent index (->dio_insert()).
*
* Due to locking, operation order is not important, when both are
* successful, *but* error handling cases are quite different:
if (rc)
RETURN(rc);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ int same = 0;
+
+ quota_opc = FSFILT_OP_CREATE;
+ mdd_quota_wrapper(&ma->ma_attr, qcids);
+ mdd_quota_wrapper(la_tmp, qpids);
+ /* get file quota for child */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qcids[USRQUOTA], qcids[GRPQUOTA], 1,
+ &inode_pending, NULL, 0);
+ switch (ma->ma_attr.la_mode & S_IFMT) {
+ case S_IFLNK:
+ case S_IFDIR:
+ block_count = 2;
+ break;
+ case S_IFREG:
+ block_count = 1;
+ break;
+ }
+ if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
+ qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
+ block_count += 1;
+ same = 1;
+ }
+ /* get block quota for child and parent */
+ if (block_count)
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qcids[USRQUOTA], qcids[GRPQUOTA],
+ block_count,
+ &block_pending, NULL,
+ LQUOTA_FLAGS_BLK);
+ if (!same)
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ qpids[USRQUOTA], qpids[GRPQUOTA], 1,
+ &parent_pending, NULL,
+ LQUOTA_FLAGS_BLK);
+ }
+ }
+#endif
+
/*
* No RPC inside the transaction, so OST objects should be created at
* first.
rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
spec, attr);
if (rc)
- RETURN(rc);
+ GOTO(out_pending, rc);
+ }
+
+ if (!S_ISLNK(attr->la_mode)) {
+ ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
+ ma_acl->ma_acl = info->mti_xattr_buf;
+ ma_acl->ma_need = MA_ACL_DEF;
+ ma_acl->ma_valid = 0;
+
+ mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
+ rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
+ mdd_read_unlock(env, mdd_pobj);
+ if (rc)
+ GOTO(out_free, rc);
+ else if (ma_acl->ma_valid & MA_ACL_DEF)
+ got_def_acl = 1;
}
mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
- dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
- /*
- * XXX: Check that link can be added to the parent in mkdir case.
- */
-
- mdd_write_lock(env, son);
+ mdd_write_lock(env, son, MOR_TGT_CHILD);
rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
if (rc) {
mdd_write_unlock(env, son);
created = 1;
#ifdef CONFIG_FS_POSIX_ACL
- mdd_read_lock(env, mdd_pobj);
- rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
- mdd_read_unlock(env, mdd_pobj);
- if (rc) {
- mdd_write_unlock(env, son);
- GOTO(cleanup, rc);
- } else {
- ma->ma_attr.la_valid |= LA_MODE;
+ if (got_def_acl) {
+ struct lu_buf *acl_buf = &info->mti_buf;
+ acl_buf->lb_buf = ma_acl->ma_acl;
+ acl_buf->lb_len = ma_acl->ma_acl_size;
+
+ rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
+ if (rc) {
+ mdd_write_unlock(env, son);
+ GOTO(cleanup, rc);
+ } else {
+ ma->ma_attr.la_valid |= LA_MODE;
+ }
}
#endif
*/
GOTO(cleanup, rc);
+ initialized = 1;
+
rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
name, S_ISDIR(attr->la_mode), handle,
mdd_object_capa(env, mdd_pobj));
}
if (S_ISLNK(attr->la_mode)) {
+ struct md_ucred *uc = md_ucred(env);
struct dt_object *dt = mdd_object_child(son);
const char *target_name = spec->u.sp_symname;
int sym_len = strlen(target_name);
buf = mdd_buf_get_const(env, target_name, sym_len);
rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
- mdd_object_capa(env, son));
+ mdd_object_capa(env, son),
+ uc->mu_cap &
+ CFS_CAP_SYS_RESOURCE_MASK);
if (rc == sym_len)
rc = 0;
CERROR("error can not cleanup destroy %d\n",
rc2);
}
+
if (rc2 == 0) {
- mdd_write_lock(env, son);
- mdo_ref_del(env, son, handle);
+ mdd_write_lock(env, son, MOR_TGT_CHILD);
+ __mdd_ref_del(env, son, handle, 0);
+ if (initialized && S_ISDIR(attr->la_mode))
+ __mdd_ref_del(env, son, handle, 1);
mdd_write_unlock(env, son);
}
}
out_free:
/* finis lov_create stuff, free all temporary data */
mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ if (quota_opc) {
+ if (inode_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qcids[USRQUOTA], qcids[GRPQUOTA],
+ 1, 0);
+ if (block_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qcids[USRQUOTA], qcids[GRPQUOTA],
+ block_count, 1);
+ if (parent_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qpids[USRQUOTA], qpids[GRPQUOTA],
+ 1, 1);
+ /* Trigger dqacq on the owner of child and parent. If failed,
+ * the next call for lquota_chkquota will process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
+ quota_opc);
+ }
+#endif
return rc;
}
struct mdd_object *mdd_tobj = NULL;
struct dynlock_handle *sdlh, *tdlh;
struct thandle *handle;
- int is_dir;
- int rc;
+#ifdef HAVE_QUOTA_SUPPORT
+ struct obd_device *obd = mdd->mdd_obd_dev;
+ struct mds_obd *mds = &obd->u.mds;
+ unsigned int qspids[MAXQUOTAS] = { 0, 0 };
+ unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
+ unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
+ int quota_opc = 0, rec_pending = 0;
+#endif
+ int rc, is_dir;
ENTRY;
LASSERT(ma->ma_attr.la_mode & S_IFMT);
if (tobj)
mdd_tobj = md2mdd_obj(tobj);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+ rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
+ if (!rc) {
+ mdd_quota_wrapper(la_tmp, qspids);
+ if (!tobj) {
+ rc = mdd_la_get(env, mdd_tpobj, la_tmp,
+ BYPASS_CAPA);
+ if (!rc) {
+ quota_opc = FSFILT_OP_LINK;
+ mdd_quota_wrapper(la_tmp, qtpids);
+ /* get block quota for target parent */
+ lquota_chkquota(mds_quota_interface_ref,
+ obd, qtpids[USRQUOTA],
+ qtpids[GRPQUOTA], 1,
+ &rec_pending, NULL,
+ LQUOTA_FLAGS_BLK);
+ }
+ }
+ }
+ }
+#endif
mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_pending, rc = PTR_ERR(handle));
/* FIXME: Should consider tobj and sobj too in rename_lock. */
rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
/* Get locks in determined order */
if (rc == MDD_RN_SAME) {
- sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj,
+ sname, MOR_SRC_PARENT);
/* check hashes to determine do we need one lock or two */
if (mdd_name2hash(sname) != mdd_name2hash(tname))
- tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
+ MOR_TGT_PARENT);
else
tdlh = sdlh;
} else if (rc == MDD_RN_SRCTGT) {
- sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
- tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
} else {
- tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
- sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
}
if (sdlh == NULL || tdlh == NULL)
GOTO(cleanup, rc = -ENOMEM);
GOTO(cleanup, rc);
}
- /*
+ /*
* For tobj is remote case cmm layer has processed
* and set tobj to NULL then. So when tobj is NOT NULL,
* it must be local one.
*/
if (tobj && mdd_object_exists(mdd_tobj)) {
- mdd_write_lock(env, mdd_tobj);
- mdo_ref_del(env, mdd_tobj, handle);
+ mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+ __mdd_ref_del(env, mdd_tobj, handle, 0);
/* Remove dot reference. */
if (is_dir)
- mdo_ref_del(env, mdd_tobj, handle);
+ __mdd_ref_del(env, mdd_tobj, handle, 1);
la->la_valid = LA_CTIME;
rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
mdd_write_unlock(env, mdd_tobj);
if (rc)
GOTO(cleanup, rc);
+
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qtcids);
+ }
+#endif
}
la->la_valid = LA_CTIME | LA_MTIME;
mdd_trans_stop(env, mdd, rc, handle);
if (mdd_sobj)
mdd_object_put(env, mdd_sobj);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+ if (mds->mds_quota) {
+ if (rec_pending)
+ lquota_pending_commit(mds_quota_interface_ref, obd,
+ qtpids[USRQUOTA],
+ qtpids[GRPQUOTA],
+ 1, 1);
+ /* Trigger dqrel on the source owner of parent.
+ * If failed, the next call for lquota_chkquota will
+ * process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
+ FSFILT_OP_UNLINK_PARTIAL_PARENT);
+ if (quota_opc)
+ /* Trigger dqrel/dqacq on the target owner of child and
+ * parent. If failed, the next call for lquota_chkquota
+ * will process it. */
+ lquota_adjust(mds_quota_interface_ref, obd, qtcids,
+ qtpids, rc, quota_opc);
+ }
+#endif
return rc;
}
-struct md_dir_operations mdd_dir_ops = {
+const struct md_dir_operations mdd_dir_ops = {
.mdo_is_subdir = mdd_is_subdir,
.mdo_lookup = mdd_lookup,
.mdo_create = mdd_create,