Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index bcb0eac..7450c1e 100644 (file)
@@ -1,30 +1,45 @@
-/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  mdd/mdd_handler.c
- *  Lustre Metadata Server (mdd) routines
+ * GPL HEADER START
  *
- *  Copyright (C) 2006 Cluster File Systems, Inc.
- *   Author: Wang Di <wangdi@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdd/mdd_dir.c
+ *
+ * Lustre Metadata Server (mdd) routines
+ *
+ * Author: Wang Di <wangdi@clusterfs.com>
  */
+
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
@@ -65,7 +80,7 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
         struct dynlock_handle *dlh;
         int rc;
 
-        dlh = mdd_pdo_read_lock(env, mdd_obj, name);
+        dlh = mdd_pdo_read_lock(env, mdd_obj, name, MOR_TGT_PARENT);
         if (unlikely(dlh == NULL))
                 return -ENOMEM;
         rc = __mdd_lookup(env, pobj, lname, fid, mask);
@@ -208,7 +223,7 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 {
         struct dt_it     *it;
         struct dt_object *obj;
-        struct dt_it_ops *iops;
+        const struct dt_it_ops *iops;
         int result;
         ENTRY;
 
@@ -252,7 +267,11 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
         if (rc)
                 RETURN(rc);
 
-        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
+        /*
+         * Subdir count limitation can be broken through.
+         */
+        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
+            !S_ISDIR(la->la_mode))
                 RETURN(-EMLINK);
         else
                 RETURN(0);
@@ -276,7 +295,8 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
 
         if (check_perm)
                 rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                                    MAY_WRITE | MAY_EXEC);
+                                                    MAY_WRITE | MAY_EXEC,
+                                                    MOR_TGT_PARENT);
 
         if (!rc && check_nlink)
                 rc = __mdd_may_link(env, pobj);
@@ -301,7 +321,8 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
                 RETURN(-EPERM);
 
         rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                            MAY_WRITE | MAY_EXEC);
+                                            MAY_WRITE | MAY_EXEC,
+                                            MOR_TGT_PARENT);
         if (rc)
                 RETURN(rc);
 
@@ -327,20 +348,20 @@ static inline int mdd_is_sticky(const struct lu_env *env,
                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
                 if (rc)
                         return rc;
-        
+
                 if (!(tmp_la->la_mode & S_ISVTX) ||
                      (tmp_la->la_uid == uc->mu_fsuid))
                         return 0;
         }
 
         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
-        if (rc) 
+        if (rc)
                 return rc;
-        
+
         if (tmp_la->la_uid == uc->mu_fsuid)
                 return 0;
-        
-        return !mdd_capable(uc, CAP_FOWNER);
+
+        return !mdd_capable(uc, CFS_CAP_FOWNER);
 }
 
 /*
@@ -364,7 +385,8 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
 
                 if (check_perm) {
                         rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                                    MAY_WRITE | MAY_EXEC);
+                                                    MAY_WRITE | MAY_EXEC,
+                                                    MOR_TGT_PARENT);
                         if (rc)
                                 RETURN(rc);
                 }
@@ -445,6 +467,42 @@ const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
         return (const struct dt_rec *)pack;
 }
 
+/**
+ * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
+ * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
+ * (maybe too large to be represented). It is a trick to break through the
+ * "i_nlink" limitation for subdir count.
+ */
+void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle)
+{
+        struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+        struct mdd_device *m = mdd_obj2mdd_dev(obj);
+
+        if (!mdd_is_mnlink(obj)) {
+                if (S_ISDIR(mdd_object_type(obj))) {
+                        if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
+                                return;
+
+                        if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
+                                obj->mod_flags |= MNLINK_OBJ;
+                                tmp_la->la_nlink = 1;
+                                tmp_la->la_valid = LA_NLINK;
+                                mdd_attr_set_internal(env, obj, tmp_la, handle,
+                                                      0);
+                                return;
+                        }
+                }
+                mdo_ref_add(env, obj, handle);
+        }
+}
+
+void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle, int is_dot)
+{
+        if (!mdd_is_mnlink(obj) || is_dot)
+                mdo_ref_del(env, obj, handle);
+}
 
 /* insert named index, add reference if isdir */
 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
@@ -456,18 +514,21 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
         ENTRY;
 
         if (dt_try_as_dir(env, next)) {
+                struct md_ucred  *uc = md_ucred(env);
+
                 rc = next->do_index_ops->dio_insert(env, next,
                                                     __mdd_fid_rec(env, lf),
                                                     (const struct dt_key *)name,
-                                                    handle, capa);
+                                                    handle, capa, uc->mu_cap &
+                                                    CFS_CAP_SYS_RESOURCE_MASK);
         } else {
                 rc = -ENOTDIR;
         }
 
         if (rc == 0) {
                 if (is_dir) {
-                        mdd_write_lock(env, pobj);
-                        mdo_ref_add(env, pobj, handle);
+                        mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+                        __mdd_ref_add(env, pobj, handle);
                         mdd_write_unlock(env, pobj);
                 }
         }
@@ -488,8 +549,12 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
                                                     (struct dt_key *)name,
                                                     handle, capa);
                 if (rc == 0 && is_dir) {
-                        mdd_write_lock(env, pobj);
-                        mdo_ref_del(env, pobj, handle);
+                        int is_dot = 0;
+
+                        if (name != NULL && name[0] == '.' && name[1] == 0)
+                                is_dot = 1;
+                        mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+                        __mdd_ref_del(env, pobj, handle, is_dot);
                         mdd_write_unlock(env, pobj);
                 }
         } else
@@ -508,10 +573,13 @@ __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
         ENTRY;
 
         if (dt_try_as_dir(env, next)) {
+                struct md_ucred  *uc = md_ucred(env);
+
                 rc = next->do_index_ops->dio_insert(env, next,
                                                     __mdd_fid_rec(env, lf),
                                                     (const struct dt_key *)name,
-                                                    handle, capa);
+                                                    handle, capa, uc->mu_cap &
+                                                    CFS_CAP_SYS_RESOURCE_MASK);
         } else {
                 rc = -ENOTDIR;
         }
@@ -529,18 +597,40 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         struct mdd_device *mdd = mdo2mdd(src_obj);
         struct dynlock_handle *dlh;
         struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, rec_pending = 0;
+#endif
         int rc;
         ENTRY;
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        quota_opc = FSFILT_OP_LINK;
+                        mdd_quota_wrapper(la_tmp, qids);
+                        /* get block quota for parent */
+                        lquota_chkquota(mds_quota_interface_ref, obd,
+                                        qids[USRQUOTA], qids[GRPQUOTA], 1,
+                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+                }
+        }
+#endif
+
         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
-        dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
-        mdd_write_lock(env, mdd_sobj);
+        mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
 
         rc = mdd_link_sanity_check(env, mdd_tobj, lname, mdd_sobj);
         if (rc)
@@ -552,7 +642,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        mdo_ref_add(env, mdd_sobj, handle);
+        __mdd_ref_add(env, mdd_sobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -570,6 +660,19 @@ out_unlock:
         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc) {
+                if (rec_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qids[USRQUOTA], qids[GRPQUOTA],
+                                              1, 1);
+                /* Trigger dqacq for the parent owner. If failed,
+                 * the next call for lquota_chkquota will process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
+                              quota_opc);
+        }
+#endif
         return rc;
 }
 
@@ -627,6 +730,13 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle    *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0;
+#endif
         int rc, is_dir;
         ENTRY;
 
@@ -642,10 +752,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                 RETURN(PTR_ERR(handle));
 
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
-        mdd_write_lock(env, mdd_cobj);
+        mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
         is_dir = S_ISDIR(ma->ma_attr.la_mode);
         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
@@ -657,10 +767,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        mdo_ref_del(env, mdd_cobj, handle);
+        __mdd_ref_del(env, mdd_cobj, handle, 0);
         if (is_dir)
                 /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+                __mdd_ref_del(env, mdd_cobj, handle, 1);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -676,6 +786,23 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                 GOTO(cleanup, rc);
 
         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+            ma->ma_attr.la_nlink == 0) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        mdd_quota_wrapper(la_tmp, qpids);
+                        if (mdd_cobj->mod_count == 0) {
+                                quota_opc = FSFILT_OP_UNLINK;
+                                mdd_quota_wrapper(&ma->ma_attr, qcids);
+                        } else {
+                                quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
+                        }
+                }
+        }
+#endif
 
         if (rc == 0)
                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
@@ -687,6 +814,13 @@ cleanup:
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc)
+                /* Trigger dqrel on the owner of child and parent. If failed,
+                 * the next call for lquota_chkquota will process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
+                              quota_opc);
+#endif
         return rc;
 }
 
@@ -723,15 +857,43 @@ static int mdd_name_insert(const struct lu_env *env,
         struct dynlock_handle *dlh;
         struct thandle *handle;
         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+#ifdef HAVE_QUOTA_SUPPORT
+        struct md_ucred *uc = md_ucred(env);
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, rec_pending = 0;
+        cfs_cap_t save = uc->mu_cap;
+#endif
         int rc;
         ENTRY;
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
+                        struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                        rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
+                        if (!rc) {
+                                quota_opc = FSFILT_OP_LINK;
+                                mdd_quota_wrapper(la_tmp, qids);
+                                /* get block quota for parent */
+                                lquota_chkquota(mds_quota_interface_ref, obd,
+                                                qids[USRQUOTA], qids[GRPQUOTA],
+                                                1, &rec_pending, NULL,
+                                                LQUOTA_FLAGS_BLK);
+                        }
+                } else {
+                        uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
+                }
+        }
+#endif
         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
         handle = mdd_trans_start(env, mdo2mdd(pobj));
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
-        dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
 
@@ -761,6 +923,23 @@ out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
 out_trans:
         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                if (quota_opc) {
+                        if (rec_pending)
+                                lquota_pending_commit(mds_quota_interface_ref,
+                                                      obd, qids[USRQUOTA],
+                                                      qids[GRPQUOTA], 1, 1);
+                        /* Trigger dqacq for the parent owner. If failed,
+                         * the next call for lquota_chkquota will process it*/
+                        lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
+                                      rc, quota_opc);
+                } else {
+                        uc->mu_cap = save;
+                }
+        }
+#endif
         return rc;
 }
 
@@ -796,15 +975,32 @@ static int mdd_name_remove(const struct lu_env *env,
         struct dynlock_handle *dlh;
         struct thandle *handle;
         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0;
+#endif
         int rc;
         ENTRY;
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_PARENT;
+                        mdd_quota_wrapper(la_tmp, qids);
+                }
+        }
+#endif
         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
-        dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
 
@@ -834,6 +1030,14 @@ out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        /* Trigger dqrel for the parent owner.
+         * If failed, the next call for lquota_chkquota will process it. */
+        if (quota_opc)
+                lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
+                              quota_opc);
+#endif
         return rc;
 }
 
@@ -877,19 +1081,41 @@ static int mdd_rename_tgt(const struct lu_env *env,
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, rec_pending = 0;
+#endif
         int rc;
         ENTRY;
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota && !tobj) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        quota_opc = FSFILT_OP_LINK;
+                        mdd_quota_wrapper(la_tmp, qpids);
+                        /* get block quota for target parent */
+                        lquota_chkquota(mds_quota_interface_ref, obd,
+                                        qpids[USRQUOTA], qpids[GRPQUOTA], 1,
+                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+                }
+        }
+#endif
         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
-        dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
         if (tobj)
-                mdd_write_lock(env, mdd_tobj);
+                mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
 
         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, ma);
         if (rc)
@@ -916,17 +1142,17 @@ static int mdd_rename_tgt(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* 
+        /*
          * For tobj is remote case cmm layer has processed
          * and pass NULL tobj to here. So when tobj is NOT NULL,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdo_ref_del(env, mdd_tobj, handle);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -936,6 +1162,14 @@ static int mdd_rename_tgt(const struct lu_env *env,
                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
                 if (rc)
                         GOTO(cleanup, rc);
+
+#ifdef HAVE_QUOTA_SUPPORT
+                if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+                    ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
+                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                        mdd_quota_wrapper(&ma->ma_attr, qcids);
+                }
+#endif
         }
         EXIT;
 cleanup:
@@ -944,6 +1178,22 @@ cleanup:
         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                if (rec_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qpids[USRQUOTA],
+                                              qpids[GRPQUOTA],
+                                              1, 1);
+                if (quota_opc)
+                        /* Trigger dqrel/dqacq on the target owner of child and
+                         * parent. If failed, the next call for lquota_chkquota
+                         * will process it. */
+                        lquota_adjust(mds_quota_interface_ref, obd, qcids,
+                                      qpids, rc, quota_opc);
+        }
+#endif
         return rc;
 }
 
@@ -1055,7 +1305,8 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
                 RETURN(-ENAMETOOLONG);
 
-        rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
+        rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask,
+                                            MOR_TGT_PARENT);
         if (rc)
                 RETURN(rc);
 
@@ -1093,7 +1344,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
-                mdo_ref_add(env, child, handle);
+                __mdd_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0) {
@@ -1103,13 +1354,11 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                         if (rc != 0) {
                                 int rc2;
 
-                                rc2 = __mdd_index_delete(env, child, dot, 0,
+                                rc2 = __mdd_index_delete(env, child, dot, 1,
                                                          handle, BYPASS_CAPA);
                                 if (rc2 != 0)
                                         CERROR("Failure to cleanup after dotdot"
                                                " creation: %d (%d)\n", rc2, rc);
-                                else
-                                        mdo_ref_del(env, child, handle);
                         }
                 }
         }
@@ -1157,7 +1406,8 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                  * EXEC permission have been checked
                  * when lookup before create already.
                  */
-                rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
+                rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE,
+                                                    MOR_TGT_PARENT);
                 if (rc)
                         RETURN(rc);
         }
@@ -1176,12 +1426,6 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         }
 
         switch (ma->ma_attr.la_mode & S_IFMT) {
-        case S_IFDIR: {
-                if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
-                        RETURN(-EMLINK);
-                else
-                        RETURN(0);
-        }
         case S_IFLNK: {
                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
 
@@ -1190,6 +1434,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                 else
                         RETURN(0);
         }
+        case S_IFDIR:
         case S_IFREG:
         case S_IFCHR:
         case S_IFBLK:
@@ -1214,24 +1459,35 @@ static int mdd_create(const struct lu_env *env,
                       struct md_op_spec *spec,
                       struct md_attr* ma)
 {
-        char *name = lname->ln_name;
-        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object *son = md2mdd_obj(child);
-        struct mdd_device *mdd = mdo2mdd(pobj);
-        struct lu_attr    *attr = &ma->ma_attr;
-        struct lov_mds_md *lmm = NULL;
-        struct thandle    *handle;
-        int rc, created = 0, inserted = 0, lmm_size = 0;
-        struct dynlock_handle *dlh;
+        struct mdd_thread_info *info = mdd_env_info(env);
+        struct lu_attr         *la = &info->mti_la_for_fix;
+        struct md_attr         *ma_acl = &info->mti_ma;
+        struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
+        struct mdd_object      *son = md2mdd_obj(child);
+        struct mdd_device      *mdd = mdo2mdd(pobj);
+        struct lu_attr         *attr = &ma->ma_attr;
+        struct lov_mds_md      *lmm = NULL;
+        struct thandle         *handle;
+        struct dynlock_handle  *dlh;
+        char                   *name = lname->ln_name;
+        int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
+        int got_def_acl = 0;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qcids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qpids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, block_count = 0;
+        int inode_pending = 0, block_pending = 0, parent_pending = 0;
+#endif
         ENTRY;
 
         /*
          * Two operations have to be performed:
          *
-         *  - allocation of new object (->do_create()), and
+         *  - an allocation of a new object (->do_create()), and
          *
-         *  - insertion into parent index (->dio_insert()).
+         *  - an insertion into a parent index (->dio_insert()).
          *
          * Due to locking, operation order is not important, when both are
          * successful, *but* error handling cases are quite different:
@@ -1267,6 +1523,51 @@ static int mdd_create(const struct lu_env *env,
         if (rc)
                 RETURN(rc);
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_pobj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        int same = 0;
+
+                        quota_opc = FSFILT_OP_CREATE;
+                        mdd_quota_wrapper(&ma->ma_attr, qcids);
+                        mdd_quota_wrapper(la_tmp, qpids);
+                        /* get file quota for child */
+                        lquota_chkquota(mds_quota_interface_ref, obd,
+                                        qcids[USRQUOTA], qcids[GRPQUOTA], 1,
+                                        &inode_pending, NULL, 0);
+                        switch (ma->ma_attr.la_mode & S_IFMT) {
+                        case S_IFLNK:
+                        case S_IFDIR:
+                                block_count = 2;
+                                break;
+                        case S_IFREG:
+                                block_count = 1;
+                                break;
+                        }
+                        if (qcids[USRQUOTA] == qpids[USRQUOTA] &&
+                            qcids[GRPQUOTA] == qpids[GRPQUOTA]) {
+                                block_count += 1;
+                                same = 1;
+                        }
+                        /* get block quota for child and parent */
+                        if (block_count)
+                                lquota_chkquota(mds_quota_interface_ref, obd,
+                                                qcids[USRQUOTA], qcids[GRPQUOTA],
+                                                block_count,
+                                                &block_pending, NULL,
+                                                LQUOTA_FLAGS_BLK);
+                        if (!same)
+                                lquota_chkquota(mds_quota_interface_ref, obd,
+                                                qpids[USRQUOTA], qpids[GRPQUOTA], 1,
+                                                &parent_pending, NULL,
+                                                LQUOTA_FLAGS_BLK);
+                }
+        }
+#endif
+
         /*
          * No RPC inside the transaction, so OST objects should be created at
          * first.
@@ -1275,7 +1576,22 @@ static int mdd_create(const struct lu_env *env,
                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
                                     spec, attr);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(out_pending, rc);
+        }
+
+        if (!S_ISLNK(attr->la_mode)) {
+                ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
+                ma_acl->ma_acl = info->mti_xattr_buf;
+                ma_acl->ma_need = MA_ACL_DEF;
+                ma_acl->ma_valid = 0;
+
+                mdd_read_lock(env, mdd_pobj, MOR_TGT_PARENT);
+                rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
+                mdd_read_unlock(env, mdd_pobj);
+                if (rc)
+                        GOTO(out_free, rc);
+                else if (ma_acl->ma_valid & MA_ACL_DEF)
+                        got_def_acl = 1;
         }
 
         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
@@ -1283,15 +1599,11 @@ static int mdd_create(const struct lu_env *env,
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
 
-        /*
-         * XXX: Check that link can be added to the parent in mkdir case.
-         */
-
-        mdd_write_lock(env, son);
+        mdd_write_lock(env, son, MOR_TGT_CHILD);
         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
         if (rc) {
                 mdd_write_unlock(env, son);
@@ -1301,14 +1613,18 @@ static int mdd_create(const struct lu_env *env,
         created = 1;
 
 #ifdef CONFIG_FS_POSIX_ACL
-        mdd_read_lock(env, mdd_pobj);
-        rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
-        mdd_read_unlock(env, mdd_pobj);
-        if (rc) {
-                mdd_write_unlock(env, son);
-                GOTO(cleanup, rc);
-        } else {
-                ma->ma_attr.la_valid |= LA_MODE;
+        if (got_def_acl) {
+                struct lu_buf *acl_buf = &info->mti_buf;
+                acl_buf->lb_buf = ma_acl->ma_acl;
+                acl_buf->lb_len = ma_acl->ma_acl_size;
+
+                rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
+                if (rc) {
+                        mdd_write_unlock(env, son);
+                        GOTO(cleanup, rc);
+                } else {
+                        ma->ma_attr.la_valid |= LA_MODE;
+                }
         }
 #endif
 
@@ -1322,6 +1638,8 @@ static int mdd_create(const struct lu_env *env,
                  */
                 GOTO(cleanup, rc);
 
+        initialized = 1;
+
         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
                                 name, S_ISDIR(attr->la_mode), handle,
                                 mdd_object_capa(env, mdd_pobj));
@@ -1345,6 +1663,7 @@ static int mdd_create(const struct lu_env *env,
         }
 
         if (S_ISLNK(attr->la_mode)) {
+                struct md_ucred  *uc = md_ucred(env);
                 struct dt_object *dt = mdd_object_child(son);
                 const char *target_name = spec->u.sp_symname;
                 int sym_len = strlen(target_name);
@@ -1353,7 +1672,9 @@ static int mdd_create(const struct lu_env *env,
 
                 buf = mdd_buf_get_const(env, target_name, sym_len);
                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
-                                                mdd_object_capa(env, son));
+                                                mdd_object_capa(env, son),
+                                                uc->mu_cap &
+                                                CFS_CAP_SYS_RESOURCE_MASK);
 
                 if (rc == sym_len)
                         rc = 0;
@@ -1382,9 +1703,12 @@ cleanup:
                                 CERROR("error can not cleanup destroy %d\n",
                                        rc2);
                 }
+
                 if (rc2 == 0) {
-                        mdd_write_lock(env, son);
-                        mdo_ref_del(env, son, handle);
+                        mdd_write_lock(env, son, MOR_TGT_CHILD);
+                        __mdd_ref_del(env, son, handle, 0);
+                        if (initialized && S_ISDIR(attr->la_mode))
+                                __mdd_ref_del(env, son, handle, 1);
                         mdd_write_unlock(env, son);
                 }
         }
@@ -1399,6 +1723,27 @@ out_trans:
 out_free:
         /* finis lov_create stuff, free all temporary data */
         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc) {
+                if (inode_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qcids[USRQUOTA], qcids[GRPQUOTA],
+                                              1, 0);
+                if (block_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qcids[USRQUOTA], qcids[GRPQUOTA],
+                                              block_count, 1);
+                if (parent_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qpids[USRQUOTA], qpids[GRPQUOTA],
+                                              1, 1);
+                /* Trigger dqacq on the owner of child and parent. If failed,
+                 * the next call for lquota_chkquota will process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
+                              quota_opc);
+        }
+#endif
         return rc;
 }
 
@@ -1501,8 +1846,15 @@ static int mdd_rename(const struct lu_env *env,
         struct mdd_object *mdd_tobj = NULL;
         struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
-        int is_dir;
-        int rc;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qspids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qtcids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, rec_pending = 0;
+#endif
+        int rc, is_dir;
         ENTRY;
 
         LASSERT(ma->ma_attr.la_mode & S_IFMT);
@@ -1511,10 +1863,34 @@ static int mdd_rename(const struct lu_env *env,
         if (tobj)
                 mdd_tobj = md2mdd_obj(tobj);
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_spobj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        mdd_quota_wrapper(la_tmp, qspids);
+                        if (!tobj) {
+                                rc = mdd_la_get(env, mdd_tpobj, la_tmp,
+                                                BYPASS_CAPA);
+                                if (!rc) {
+                                        quota_opc = FSFILT_OP_LINK;
+                                        mdd_quota_wrapper(la_tmp, qtpids);
+                                        /* get block quota for target parent */
+                                        lquota_chkquota(mds_quota_interface_ref,
+                                                        obd, qtpids[USRQUOTA],
+                                                        qtpids[GRPQUOTA], 1,
+                                                        &rec_pending, NULL,
+                                                        LQUOTA_FLAGS_BLK);
+                                }
+                        }
+                }
+        }
+#endif
         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
         /* FIXME: Should consider tobj and sobj too in rename_lock. */
         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
@@ -1523,18 +1899,20 @@ static int mdd_rename(const struct lu_env *env,
 
         /* Get locks in determined order */
         if (rc == MDD_RN_SAME) {
-                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj,
+                                          sname, MOR_SRC_PARENT);
                 /* check hashes to determine do we need one lock or two */
                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
-                        tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+                        tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,
+                                MOR_TGT_PARENT);
                 else
                         tdlh = sdlh;
         } else if (rc == MDD_RN_SRCTGT) {
-                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
-                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_SRC_PARENT);
+                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_TGT_PARENT);
         } else {
-                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
-                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname,MOR_SRC_PARENT);
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname,MOR_TGT_PARENT);
         }
         if (sdlh == NULL || tdlh == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
@@ -1576,18 +1954,18 @@ static int mdd_rename(const struct lu_env *env,
                         GOTO(cleanup, rc);
         }
 
-        /* 
+        /*
          * For tobj is remote case cmm layer has processed
          * and set tobj to NULL then. So when tobj is NOT NULL,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdd_write_lock(env, mdd_tobj);
-                mdo_ref_del(env, mdd_tobj, handle);
+                mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (is_dir)
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -1598,6 +1976,14 @@ static int mdd_rename(const struct lu_env *env,
                 mdd_write_unlock(env, mdd_tobj);
                 if (rc)
                         GOTO(cleanup, rc);
+
+#ifdef HAVE_QUOTA_SUPPORT
+                if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+                    ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) {
+                        quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                        mdd_quota_wrapper(&ma->ma_attr, qtcids);
+                }
+#endif
         }
 
         la->la_valid = LA_CTIME | LA_MTIME;
@@ -1621,10 +2007,31 @@ cleanup_unlocked:
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
                 mdd_object_put(env, mdd_sobj);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                if (rec_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qtpids[USRQUOTA],
+                                              qtpids[GRPQUOTA],
+                                              1, 1);
+                /* Trigger dqrel on the source owner of parent.
+                 * If failed, the next call for lquota_chkquota will
+                 * process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc,
+                              FSFILT_OP_UNLINK_PARTIAL_PARENT);
+                if (quota_opc)
+                        /* Trigger dqrel/dqacq on the target owner of child and
+                         * parent. If failed, the next call for lquota_chkquota
+                         * will process it. */
+                        lquota_adjust(mds_quota_interface_ref, obd, qtcids,
+                                      qtpids, rc, quota_opc);
+        }
+#endif
         return rc;
 }
 
-struct md_dir_operations mdd_dir_ops = {
+const struct md_dir_operations mdd_dir_ops = {
         .mdo_is_subdir     = mdd_is_subdir,
         .mdo_lookup        = mdd_lookup,
         .mdo_create        = mdd_create,