Whamcloud - gitweb
b=16776
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index dc24003..1c76e3d 100644 (file)
@@ -1,30 +1,45 @@
-/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  mdd/mdd_handler.c
- *  Lustre Metadata Server (mdd) routines
+ * GPL HEADER START
  *
- *  Copyright (C) 2006 Cluster File Systems, Inc.
- *   Author: Wang Di <wangdi@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdd/mdd_dir.c
+ *
+ * Lustre Metadata Server (mdd) routines
+ *
+ * Author: Wang Di <wangdi@clusterfs.com>
  */
+
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
@@ -252,7 +267,11 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
         if (rc)
                 RETURN(rc);
 
-        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
+        /*
+         * Subdir count limitation can be broken through.
+         */ 
+        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
+            !S_ISDIR(la->la_mode))
                 RETURN(-EMLINK);
         else
                 RETURN(0);
@@ -445,6 +464,42 @@ const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
         return (const struct dt_rec *)pack;
 }
 
+/**
+ * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
+ * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
+ * (maybe too large to be represented). It is a trick to break through the
+ * "i_nlink" limitation for subdir count.
+ */
+void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle)
+{
+        struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+        struct mdd_device *m = mdd_obj2mdd_dev(obj);
+
+        if (!mdd_is_mnlink(obj)) {
+                if (S_ISDIR(mdd_object_type(obj))) {
+                        if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
+                                return;
+
+                        if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
+                                obj->mod_flags |= MNLINK_OBJ;
+                                tmp_la->la_nlink = 1;
+                                tmp_la->la_valid = LA_NLINK;
+                                mdd_attr_set_internal(env, obj, tmp_la, handle,
+                                                      0);
+                                return;
+                        }
+                }
+                mdo_ref_add(env, obj, handle);
+        }
+}
+
+void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle, int is_dot)
+{
+        if (!mdd_is_mnlink(obj) || is_dot)
+                mdo_ref_del(env, obj, handle);
+}
 
 /* insert named index, add reference if isdir */
 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
@@ -467,7 +522,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
         if (rc == 0) {
                 if (is_dir) {
                         mdd_write_lock(env, pobj);
-                        mdo_ref_add(env, pobj, handle);
+                        __mdd_ref_add(env, pobj, handle);
                         mdd_write_unlock(env, pobj);
                 }
         }
@@ -488,8 +543,12 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
                                                     (struct dt_key *)name,
                                                     handle, capa);
                 if (rc == 0 && is_dir) {
+                        int is_dot = 0;
+
+                        if (name != NULL && name[0] == '.' && name[1] == 0)
+                                is_dot = 1;
                         mdd_write_lock(env, pobj);
-                        mdo_ref_del(env, pobj, handle);
+                        __mdd_ref_del(env, pobj, handle, is_dot);
                         mdd_write_unlock(env, pobj);
                 }
         } else
@@ -552,18 +611,18 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        mdo_ref_add(env, mdd_sobj, handle);
+        __mdd_ref_add(env, mdd_sobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
         la->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal_locked(env, mdd_tobj, la, handle, 0);
         if (rc)
                 GOTO(out_unlock, rc);
 
         la->la_valid = LA_CTIME;
-        rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
         EXIT;
 out_unlock:
         mdd_write_unlock(env, mdd_sobj);
@@ -657,21 +716,21 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        mdo_ref_del(env, mdd_cobj, handle);
+        __mdd_ref_del(env, mdd_cobj, handle, 0);
         if (is_dir)
                 /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+                __mdd_ref_del(env, mdd_cobj, handle, 1);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
         la->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         la->la_valid = LA_CTIME;
-        rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -679,7 +738,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
 
         if (rc == 0)
                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
-                                   strlen("unlinked"), "unlinked", 0,
+                                   sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
                                    NULL, NULL);
         EXIT;
 cleanup:
@@ -753,7 +812,8 @@ static int mdd_name_insert(const struct lu_env *env,
         if (ma->ma_attr.la_valid & LA_CTIME) {
                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
                 la->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+                rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
+                                                        handle, 0);
         }
         EXIT;
 out_unlock:
@@ -825,7 +885,8 @@ static int mdd_name_remove(const struct lu_env *env,
         if (ma->ma_attr.la_valid & LA_CTIME) {
                 la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
                 la->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+                rc = mdd_attr_check_set_internal_locked(env, mdd_obj, la,
+                                                        handle, 0);
         }
         EXIT;
 out_unlock:
@@ -910,7 +971,7 @@ static int mdd_rename_tgt(const struct lu_env *env,
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
         la->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -920,14 +981,14 @@ static int mdd_rename_tgt(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdo_ref_del(env, mdd_tobj, handle);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
+                rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
                 if (rc)
                         GOTO(cleanup, rc);
 
@@ -1015,7 +1076,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
 
         /* update lov_objid data, must be before transaction stop! */
         if (rc == 0)
-                mdd_lov_objid_update(env, mdd);
+                mdd_lov_objid_update(mdd, lmm);
 
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
@@ -1091,7 +1152,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
-                mdo_ref_add(env, child, handle);
+                __mdd_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0) {
@@ -1101,13 +1162,11 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                         if (rc != 0) {
                                 int rc2;
 
-                                rc2 = __mdd_index_delete(env, child, dot, 0,
+                                rc2 = __mdd_index_delete(env, child, dot, 1,
                                                          handle, BYPASS_CAPA);
                                 if (rc2 != 0)
                                         CERROR("Failure to cleanup after dotdot"
                                                " creation: %d (%d)\n", rc2, rc);
-                                else
-                                        mdo_ref_del(env, child, handle);
                         }
                 }
         }
@@ -1174,12 +1233,6 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         }
 
         switch (ma->ma_attr.la_mode & S_IFMT) {
-        case S_IFDIR: {
-                if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
-                        RETURN(-EMLINK);
-                else
-                        RETURN(0);
-        }
         case S_IFLNK: {
                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
 
@@ -1188,6 +1241,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                 else
                         RETURN(0);
         }
+        case S_IFDIR:
         case S_IFREG:
         case S_IFCHR:
         case S_IFBLK:
@@ -1212,24 +1266,27 @@ static int mdd_create(const struct lu_env *env,
                       struct md_op_spec *spec,
                       struct md_attr* ma)
 {
-        char *name = lname->ln_name;
-        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object *son = md2mdd_obj(child);
-        struct mdd_device *mdd = mdo2mdd(pobj);
-        struct lu_attr    *attr = &ma->ma_attr;
-        struct lov_mds_md *lmm = NULL;
-        struct thandle    *handle;
-        int rc, created = 0, inserted = 0, lmm_size = 0;
-        struct dynlock_handle *dlh;
+        struct mdd_thread_info *info = mdd_env_info(env);
+        struct lu_attr         *la = &info->mti_la_for_fix;
+        struct md_attr         *ma_acl = &info->mti_ma;
+        struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
+        struct mdd_object      *son = md2mdd_obj(child);
+        struct mdd_device      *mdd = mdo2mdd(pobj);
+        struct lu_attr         *attr = &ma->ma_attr;
+        struct lov_mds_md      *lmm = NULL;
+        struct thandle         *handle;
+        struct dynlock_handle  *dlh;
+        char                   *name = lname->ln_name;
+        int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
+        int got_def_acl = 0;
         ENTRY;
 
         /*
          * Two operations have to be performed:
          *
-         *  - allocation of new object (->do_create()), and
+         *  - an allocation of a new object (->do_create()), and
          *
-         *  - insertion into parent index (->dio_insert()).
+         *  - an insertion into a parent index (->dio_insert()).
          *
          * Due to locking, operation order is not important, when both are
          * successful, *but* error handling cases are quite different:
@@ -1276,6 +1333,21 @@ static int mdd_create(const struct lu_env *env,
                         RETURN(rc);
         }
 
+        if (!S_ISLNK(attr->la_mode)) {
+                ma_acl->ma_acl_size = sizeof info->mti_xattr_buf;
+                ma_acl->ma_acl = info->mti_xattr_buf;
+                ma_acl->ma_need = MA_ACL_DEF;
+                ma_acl->ma_valid = 0;
+
+                mdd_read_lock(env, mdd_pobj);
+                rc = mdd_def_acl_get(env, mdd_pobj, ma_acl);
+                mdd_read_unlock(env, mdd_pobj);
+                if (rc)
+                        GOTO(out_free, rc);
+                else if (ma_acl->ma_valid & MA_ACL_DEF)
+                        got_def_acl = 1;
+        }
+
         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
@@ -1285,10 +1357,6 @@ static int mdd_create(const struct lu_env *env,
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
 
-        /*
-         * XXX: Check that link can be added to the parent in mkdir case.
-         */
-
         mdd_write_lock(env, son);
         rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
         if (rc) {
@@ -1299,14 +1367,18 @@ static int mdd_create(const struct lu_env *env,
         created = 1;
 
 #ifdef CONFIG_FS_POSIX_ACL
-        mdd_read_lock(env, mdd_pobj);
-        rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
-        mdd_read_unlock(env, mdd_pobj);
-        if (rc) {
-                mdd_write_unlock(env, son);
-                GOTO(cleanup, rc);
-        } else {
-                ma->ma_attr.la_valid |= LA_MODE;
+        if (got_def_acl) {
+                struct lu_buf *acl_buf = &info->mti_buf;
+                acl_buf->lb_buf = ma_acl->ma_acl;
+                acl_buf->lb_len = ma_acl->ma_acl_size;
+
+                rc = __mdd_acl_init(env, son, acl_buf, &attr->la_mode, handle);
+                if (rc) {
+                        mdd_write_unlock(env, son);
+                        GOTO(cleanup, rc);
+                } else {
+                        ma->ma_attr.la_valid |= LA_MODE;
+                }
         }
 #endif
 
@@ -1320,6 +1392,8 @@ static int mdd_create(const struct lu_env *env,
                  */
                 GOTO(cleanup, rc);
 
+        initialized = 1;
+
         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
                                 name, S_ISDIR(attr->la_mode), handle,
                                 mdd_object_capa(env, mdd_pobj));
@@ -1361,7 +1435,7 @@ static int mdd_create(const struct lu_env *env,
 
         *la = ma->ma_attr;
         la->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal_locked(env, mdd_pobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1380,16 +1454,19 @@ cleanup:
                                 CERROR("error can not cleanup destroy %d\n",
                                        rc2);
                 }
+
                 if (rc2 == 0) {
                         mdd_write_lock(env, son);
-                        mdo_ref_del(env, son, handle);
+                        __mdd_ref_del(env, son, handle, 0);
+                        if (initialized && S_ISDIR(attr->la_mode))
+                                __mdd_ref_del(env, son, handle, 1);
                         mdd_write_unlock(env, son);
                 }
         }
 
         /* update lov_objid data, must be before transaction stop! */
         if (rc == 0)
-                mdd_lov_objid_update(env, mdd);
+                mdd_lov_objid_update(mdd, lmm);
 
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
@@ -1568,7 +1645,8 @@ static int mdd_rename(const struct lu_env *env,
         /* XXX: mdd_sobj must be local one if it is NOT NULL. */
         if (mdd_sobj) {
                 la->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 0);
+                rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la,
+                                                        handle, 0);
                 if (rc)
                         GOTO(cleanup, rc);
         }
@@ -1580,14 +1658,14 @@ static int mdd_rename(const struct lu_env *env,
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
                 mdd_write_lock(env, mdd_tobj);
-                mdo_ref_del(env, mdd_tobj, handle);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (is_dir)
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
+                rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
                 if (rc)
                         GOTO(cleanup, rc);
 
@@ -1598,13 +1676,13 @@ static int mdd_rename(const struct lu_env *env,
         }
 
         la->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
+        rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         if (mdd_spobj != mdd_tpobj) {
                 la->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
+                rc = mdd_attr_check_set_internal_locked(env, mdd_tpobj, la,
                                                   handle, 0);
         }