Whamcloud - gitweb
- implement pdirops in mdd. It uses dynlocks for directory operations.
authortappro <tappro>
Sat, 21 Oct 2006 10:17:27 +0000 (10:17 +0000)
committertappro <tappro>
Sat, 21 Oct 2006 10:17:27 +0000 (10:17 +0000)
- use -EREMOTE in mdd_is_parent instead of EREMOTE.

lustre/mdd/Makefile.in
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lock.c [new file with mode: 0644]
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_permission.c
lustre/mdt/mdt_reint.c

index 0213a6a..45d4e1b 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := mdd
 mdd-objs := mdd_object.o mdd_lov.o mdd_orphans.o mdd_lproc.o mdd_dir.o
-mdd-objs += mdd_device.o mdd_trans.o mdd_permission.o
+mdd-objs += mdd_device.o mdd_trans.o mdd_permission.o mdd_lock.o
 
 EXTRA_PRE_CFLAGS := -I@LINUX@/fs -I@LUSTRE@ -I@LUSTRE@/ldiskfs
 
index 6c1fbd4..906a352 100644 (file)
@@ -54,11 +54,12 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
                     const char *name, const struct lu_fid* fid, int mask)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+        struct dynlock_handle *dlh;
         int rc;
 
-        mdd_read_lock(env, mdd_obj);
+        dlh = mdd_pdo_read_lock(env, mdd_obj, name);
         rc = __mdd_lookup(env, pobj, name, fid, mask);
-        mdd_read_unlock(env, mdd_obj);
+        mdd_pdo_read_unlock(env, mdd_obj, dlh);
 
        return rc;
 }
@@ -123,7 +124,7 @@ static int mdd_is_parent(const struct lu_env *env,
                 if (parent == NULL) {
                         if (pf != NULL)
                                 *pf = *pfid;
-                        GOTO(out, rc = EREMOTE);
+                        GOTO(out, rc = -EREMOTE);
                 } else if (IS_ERR(parent))
                         GOTO(out, rc = PTR_ERR(parent));
                 p1 = parent;
@@ -178,7 +179,7 @@ static int mdd_may_create(const struct lu_env *env,
 
         /*check pobj may create or not*/
         if (need_check)
-                rc = mdd_permission_internal(env, pobj,
+                rc = mdd_permission_internal_locked(env, pobj,
                                              MAY_WRITE | MAY_EXEC);
 
         RETURN(rc);
@@ -202,7 +203,9 @@ static inline int mdd_is_sticky(const struct lu_env *env,
         } else if (tmp_la->la_uid == uc->mu_fsuid) {
                 return 0;
         } else {
+                mdd_read_lock(env, pobj);
                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
+                mdd_read_unlock(env, pobj);
                 if (rc)
                         return rc;
                 else if (!(tmp_la->la_mode & S_ISVTX))
@@ -251,8 +254,9 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EPERM);
 
                 if (need_check)
-                        rc = mdd_permission_internal(env, pobj,
-                                                     MAY_WRITE | MAY_EXEC);
+                        rc = mdd_permission_internal_locked(env, pobj,
+                                                            MAY_WRITE |
+                                                            MAY_EXEC);
         }
         RETURN(rc);
 }
@@ -278,20 +282,6 @@ int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
         RETURN(rc);
 }
 
-static void mdd_lock2(const struct lu_env *env,
-                      struct mdd_object *o0, struct mdd_object *o1)
-{
-        mdd_write_lock(env, o0);
-        mdd_write_lock(env, o1);
-}
-
-static void mdd_unlock2(const struct lu_env *env,
-                        struct mdd_object *o0, struct mdd_object *o1)
-{
-        mdd_write_unlock(env, o1);
-        mdd_write_unlock(env, o0);
-}
-
 /* insert new index, add reference if isdir, update times */
 static int __mdd_index_insert(const struct lu_env *env,
                              struct mdd_object *pobj, const struct lu_fid *lf,
@@ -318,8 +308,11 @@ static int __mdd_index_insert(const struct lu_env *env,
                 rc = -ENOTDIR;
 
         if (rc == 0) {
-                if (isdir)
+                if (isdir) {
+                        mdd_write_lock(env, pobj);
                         mdd_ref_add_internal(env, pobj, th);
+                        mdd_write_unlock(env, pobj);                
+                }
 #if 0
                 la->la_valid = LA_MTIME|LA_CTIME;
                 la->la_atime = ma->ma_attr.la_atime;
@@ -348,8 +341,11 @@ static int __mdd_index_delete(const struct lu_env *env,
                 rc = next->do_index_ops->dio_delete(env, next,
                                                     (struct dt_key *)name,
                                                     handle, capa);
-                if (rc == 0 && is_dir)
+                if (rc == 0 && is_dir) {
+                        mdd_write_lock(env, pobj);
                         mdd_ref_del_internal(env, pobj, handle);
+                        mdd_write_unlock(env, pobj);
+                }
         } else
                 rc = -ENOTDIR;
         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start, 
@@ -385,6 +381,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         struct mdd_device *mdd = mdo2mdd(src_obj);
         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct thandle *handle;
+        struct dynlock_handle *dlh;
         int rc;
         ENTRY;
 
@@ -393,7 +390,8 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        mdd_lock2(env, mdd_tobj, mdd_sobj);
+        dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+        mdd_write_lock(env, mdd_sobj);
 
         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
         if (rc)
@@ -412,10 +410,11 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                 GOTO(out, rc);
 
         la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
 
 out:
-        mdd_unlock2(env, mdd_tobj, mdd_sobj);
+        mdd_write_unlock(env, mdd_sobj);
+        mdd_pdo_write_unlock(env, mdd_tobj, dlh);
         mdd_trans_stop(env, mdd, rc, handle);
         RETURN(rc);
 }
@@ -518,6 +517,7 @@ static int mdd_unlink(const struct lu_env *env,
         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct thandle    *handle;
+        struct dynlock_handle *dlh;
         int rc, is_dir;
         ENTRY;
 
@@ -529,7 +529,8 @@ static int mdd_unlink(const struct lu_env *env,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        mdd_lock2(env, mdd_pobj, mdd_cobj);
+        dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+        mdd_write_lock(env, mdd_cobj);
 
         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
         if (rc)
@@ -554,7 +555,7 @@ static int mdd_unlink(const struct lu_env *env,
         }
 
         la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -565,7 +566,8 @@ static int mdd_unlink(const struct lu_env *env,
                                    strlen("unlinked"), "unlinked", 0,
                                    NULL, NULL);
 cleanup:
-        mdd_unlock2(env, mdd_pobj, mdd_cobj);
+        mdd_write_unlock(env, mdd_cobj);
+        mdd_pdo_write_unlock(env, mdd_pobj, dlh);
         mdd_trans_stop(env, mdd, rc, handle);
         RETURN(rc);
 }
@@ -597,7 +599,8 @@ static int mdd_ni_sanity_check(const struct lu_env *env,
         else
                 RETURN(0);
 #endif
-        RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+        RETURN(mdd_permission_internal_locked(env, obj,
+                                              MAY_WRITE | MAY_EXEC));
 }
 
 static int mdd_name_insert(const struct lu_env *env,
@@ -608,6 +611,7 @@ static int mdd_name_insert(const struct lu_env *env,
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct thandle *handle;
+        struct dynlock_handle *dlh;
         int rc;
         ENTRY;
 
@@ -616,7 +620,7 @@ static int mdd_name_insert(const struct lu_env *env,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        mdd_write_lock(env, mdd_obj);
+        dlh = mdd_pdo_write_lock(env, mdd_obj, name);
         rc = mdd_ni_sanity_check(env, pobj, name, fid);
         if (rc)
                 GOTO(out_unlock, rc);
@@ -625,8 +629,7 @@ static int mdd_name_insert(const struct lu_env *env,
                                 BYPASS_CAPA);
 
 out_unlock:
-        mdd_write_unlock(env, mdd_obj);
-
+        mdd_pdo_write_unlock(env, mdd_obj, dlh);
         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
         RETURN(rc);
 }
@@ -656,7 +659,8 @@ static int mdd_nr_sanity_check(const struct lu_env *env,
         rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
         RETURN(rc);
 #endif
-        RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+        RETURN(mdd_permission_internal_locked(env, obj,
+                                              MAY_WRITE | MAY_EXEC));
 }
 
 static int mdd_name_remove(const struct lu_env *env,
@@ -666,6 +670,7 @@ static int mdd_name_remove(const struct lu_env *env,
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct thandle *handle;
+        struct dynlock_handle *dlh;
         int rc;
         ENTRY;
 
@@ -674,7 +679,7 @@ static int mdd_name_remove(const struct lu_env *env,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        mdd_write_lock(env, mdd_obj);
+        dlh = mdd_pdo_write_lock(env, mdd_obj, name);
         rc = mdd_nr_sanity_check(env, pobj, name);
         if (rc)
                 GOTO(out_unlock, rc);
@@ -683,8 +688,7 @@ static int mdd_name_remove(const struct lu_env *env,
                                 BYPASS_CAPA);
 
 out_unlock:
-        mdd_write_unlock(env, mdd_obj);
-
+        mdd_pdo_write_unlock(env, mdd_obj, dlh);
         mdd_trans_stop(env, mdd, rc, handle);
         RETURN(rc);
 }
@@ -723,6 +727,7 @@ static int mdd_rename_tgt(const struct lu_env *env,
         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
         struct thandle *handle;
+        struct dynlock_handle *dlh;
         int rc;
         ENTRY;
 
@@ -731,10 +736,9 @@ static int mdd_rename_tgt(const struct lu_env *env,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
         if (mdd_tobj)
-                mdd_lock2(env, mdd_tpobj, mdd_tobj);
-        else
-                mdd_write_lock(env, mdd_tpobj);
+                mdd_write_lock(env, mdd_tobj);
 
         /*TODO rename sanity checking*/
         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
@@ -756,9 +760,8 @@ static int mdd_rename_tgt(const struct lu_env *env,
                 mdd_ref_del_internal(env, mdd_tobj, handle);
 cleanup:
         if (tobj)
-                mdd_unlock2(env, mdd_tpobj, mdd_tobj);
-        else
-                mdd_write_unlock(env, mdd_tpobj);
+                mdd_write_unlock(env, mdd_tobj);
+        mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
         mdd_trans_stop(env, mdd, rc, handle);
         RETURN(rc);
 }
@@ -874,7 +877,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                 rc = mdd_exec_permission_lite(env, mdd_obj);
         else
 #endif
-        rc = mdd_permission_internal(env, mdd_obj, mask);
+        rc = mdd_permission_internal_locked(env, mdd_obj, mask);
         if (rc)
                 RETURN(rc);
 
@@ -1004,6 +1007,7 @@ static int mdd_create(const struct lu_env *env,
         struct lov_mds_md *lmm = NULL;
         struct thandle    *handle;
         int rc, created = 0, inserted = 0, lmm_size = 0;
+        struct dynlock_handle *dlh;
         struct timeval  start;
         ENTRY;
 
@@ -1063,7 +1067,7 @@ static int mdd_create(const struct lu_env *env,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        mdd_write_lock(env, mdd_pobj);
+        dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
 
         /*
          * XXX check that link can be added to the parent in mkdir case.
@@ -1079,7 +1083,9 @@ static int mdd_create(const struct lu_env *env,
         created = 1;
 
 #ifdef CONFIG_FS_POSIX_ACL
+        mdd_read_lock(env, mdd_pobj);
         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
+        mdd_read_unlock(env, mdd_pobj);
         if (rc) {
                 mdd_write_unlock(env, son);
                 GOTO(cleanup, rc);
@@ -1143,7 +1149,7 @@ static int mdd_create(const struct lu_env *env,
 
         *la_copy = ma->ma_attr;
         la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1171,54 +1177,50 @@ cleanup:
         mdd_lov_create_finish(env, mdd, rc);
         if (lmm)
                 OBD_FREE(lmm, lmm_size);
-        mdd_write_unlock(env, mdd_pobj);
+        mdd_pdo_write_unlock(env, mdd_pobj, dlh);
         mdd_trans_stop(env, mdd, rc, handle);
         mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
         RETURN(rc);
 }
 
-static int mdd_rename_lock(const struct lu_env *env,
-                           struct mdd_device *mdd,
-                           struct mdd_object *src_pobj,
-                           struct mdd_object *tgt_pobj)
+/* 
+ * Get locks on parents in proper order
+ * RETURN: < 0 - error, rename_order if successful 
+ */
+enum rename_order {
+        MDD_RN_SAME,
+        MDD_RN_SRCTGT,
+        MDD_RN_TGTSRC
+};
+
+static int mdd_rename_order(const struct lu_env *env, struct mdd_device *mdd,
+                            struct mdd_object *src_pobj,
+                            struct mdd_object *tgt_pobj)
 {
+        /* order of locking, 1 - tgt-src, 0 - src-tgt*/
         int rc;
         ENTRY;
 
-        if (src_pobj == tgt_pobj) {
-                mdd_write_lock(env, src_pobj);
-                RETURN(0);
-        }
+        if (src_pobj == tgt_pobj)
+                RETURN(MDD_RN_SAME);
 
         /* compared the parent child relationship of src_p&tgt_p */
         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
-                mdd_lock2(env, src_pobj, tgt_pobj);
-                RETURN(0);
+                rc = MDD_RN_SRCTGT;
         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
-                mdd_lock2(env, tgt_pobj, src_pobj);
-                RETURN(0);
-        }
-
-        rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
-        if (rc < 0)
-                RETURN(rc);
-
-        if (rc == 1) {
-                mdd_lock2(env, tgt_pobj, src_pobj);
-                RETURN(0);
+                rc = MDD_RN_TGTSRC;
+        } else {
+                rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
+                if (rc == -EREMOTE)
+                        rc == 0;
+
+                if (rc == 1)
+                        rc = MDD_RN_TGTSRC;
+                else if (rc == 0)
+                        rc = MDD_RN_SRCTGT;
         }
 
-        mdd_lock2(env, src_pobj, tgt_pobj);
-        RETURN(0);
-}
-
-static void mdd_rename_unlock(const struct lu_env *env,
-                              struct mdd_object *src_pobj,
-                              struct mdd_object *tgt_pobj)
-{
-        mdd_write_unlock(env, src_pobj);
-        if (src_pobj != tgt_pobj)
-                mdd_write_unlock(env, tgt_pobj);
+        RETURN(rc);
 }
 
 static int mdd_rename_sanity_check(const struct lu_env *env,
@@ -1235,7 +1237,8 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         /* The sobj maybe on the remote, check parent permission only here */
-        rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
+        rc = mdd_permission_internal_locked(env, src_pobj,
+                                            MAY_WRITE | MAY_EXEC);
         if (rc)
                 RETURN(rc);
 
@@ -1268,9 +1271,10 @@ static int mdd_rename(const struct lu_env *env,
         struct mdd_object *mdd_sobj = NULL;
         struct mdd_object *mdd_tobj = NULL;
         struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+        struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
         int is_dir;
-        int rc;
+        int rc, one_lock;
         ENTRY;
 
         LASSERT(ma->ma_attr.la_mode & S_IFMT);
@@ -1288,10 +1292,26 @@ static int mdd_rename(const struct lu_env *env,
                 RETURN(PTR_ERR(handle));
 
         /* FIXME: Should consider tobj and sobj too in rename_lock. */
-        rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
-        if (rc)
+        rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
+        if (rc < 0)
                 GOTO(cleanup_unlocked, rc);
 
+        /* get locks in determined order */
+        if (rc == MDD_RN_SAME) {
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+                /* check hashes to determine do we need one lock or two */
+                if (mdd_name2hash(sname) != mdd_name2hash(tname))
+                        tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+                else
+                        tdlh = NULL;
+        } else if (rc == MDD_RN_SRCTGT) {
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+        } else {
+                tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+                sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+        }
+        
         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
                                      lf, is_dir, mdd_tobj);
         if (rc)
@@ -1321,7 +1341,8 @@ static int mdd_rename(const struct lu_env *env,
         la_copy->la_valid = LA_CTIME;
         if (mdd_sobj) {
                 /*XXX: how to update ctime for remote sobj? */
-                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
+                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
+                                                  handle, 1);
                 if (rc)
                         GOTO(cleanup, rc);
         }
@@ -1344,17 +1365,20 @@ static int mdd_rename(const struct lu_env *env,
         }
 
         la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
+        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         if (mdd_spobj != mdd_tpobj) {
                 la_copy->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
+                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
+                                                  handle, 0);
         }
 
 cleanup:
-        mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
+        mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
+        if (tdlh)
+                mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
 cleanup_unlocked:
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
index b84ba5d..4beb446 100644 (file)
 #include <asm/semaphore.h>
 
 #include <linux/lustre_acl.h>
+#include <obd.h>
 #include <md_object.h>
 #include <dt_object.h>
 #include <linux/sched.h>
 #include <linux/capability.h>
+#include <linux/dynlocks.h>
 
 enum mdd_txn_op {
         MDD_TXN_OBJECT_DESTROY_OP = 0,
@@ -85,6 +87,7 @@ struct mdd_object {
         __u32             mod_count;
         __u32             mod_valid;
         unsigned long     mod_flags;
+        struct dynlock    mod_pdlock;
 };
 
 struct orph_key {
@@ -146,9 +149,27 @@ int mdd_object_create_internal(const struct lu_env *env,
 int mdd_attr_set_internal_locked(const struct lu_env *env,
                                  struct mdd_object *o,
                                  const struct lu_attr *attr,
-                                 struct thandle *handle);
+                                 struct thandle *handle, const int needacl);
 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
                        struct md_attr *ma);
+/* mdd_lock.c */
+void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj);
+void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj);
+void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj);
+void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj);
+
+void mdd_pdlock_init(struct mdd_object *obj);
+unsigned long mdd_name2hash(const char *name);
+struct dynlock_handle *mdd_pdo_write_lock(const struct lu_env *env,
+                                          struct mdd_object *obj,
+                                          const char *name);
+struct dynlock_handle *mdd_pdo_read_lock(const struct lu_env *env,
+                                         struct mdd_object *obj,
+                                         const char *name);
+void mdd_pdo_write_unlock(const struct lu_env *env, struct mdd_object *obj,
+                          struct dynlock_handle *dlh);
+void mdd_pdo_read_unlock(const struct lu_env *env, struct mdd_object *obj,
+                         struct dynlock_handle *dlh);
 /* mdd_dir.c */
 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
                             struct mdd_object *cobj, struct md_attr *ma);
@@ -179,11 +200,6 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len);
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
                                        const void *area, ssize_t len);
 
-void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj);
-void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj);
-void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj);
-void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj);
-
 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d);
 int __mdd_orphan_add(const struct lu_env *, struct mdd_object *,
                      struct thandle *);
@@ -270,6 +286,9 @@ int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
                               int mask, int getattr);
 int mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
                             int mask);
+int mdd_permission_internal_locked(const struct lu_env *env,
+                                   struct mdd_object *obj, int mask);
+
 int mdd_permission(const struct lu_env *env, struct md_object *obj, int mask);
 int mdd_capa_get(const struct lu_env *env, struct md_object *obj,
                  struct lustre_capa *capa, int renewal);
diff --git a/lustre/mdd/mdd_lock.c b/lustre/mdd/mdd_lock.c
new file mode 100644 (file)
index 0000000..581cdc4
--- /dev/null
@@ -0,0 +1,113 @@
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  mdd/mdd_handler.c
+ *  Lustre Metadata Server (mdd) routines
+ *
+ *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *   Author: Mike Pershin <tappro@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+#include <lustre_ver.h>
+#include "mdd_internal.h"
+
+void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
+{
+        struct dt_object  *next = mdd_object_child(obj);
+
+        next->do_ops->do_write_lock(env, next);
+}
+
+void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
+{
+        struct dt_object  *next = mdd_object_child(obj);
+
+        next->do_ops->do_read_lock(env, next);
+}
+
+void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
+{
+        struct dt_object  *next = mdd_object_child(obj);
+
+        next->do_ops->do_write_unlock(env, next);
+}
+
+void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
+{
+        struct dt_object  *next = mdd_object_child(obj);
+
+        next->do_ops->do_read_unlock(env, next);
+}
+
+
+/* Methods for parallel directory locking */
+
+void mdd_pdlock_init(struct mdd_object *obj)
+{
+        dynlock_init(&obj->mod_pdlock);
+
+}
+
+unsigned long mdd_name2hash(const char *name)
+{
+        unsigned long value = 0;
+        int namelen = strlen(name);
+        int i = 0;
+        while (namelen > i) {
+                value += name[i] * (i << 7);
+                i++;
+        }
+        return value;
+}
+
+struct dynlock_handle *mdd_pdo_write_lock(const struct lu_env *env,
+                                          struct mdd_object *obj,
+                                          const char *name)
+{
+        unsigned long value = mdd_name2hash(name);
+        return dynlock_lock(&obj->mod_pdlock, value, DLT_WRITE, GFP_NOFS);
+}
+
+struct dynlock_handle *mdd_pdo_read_lock(const struct lu_env *env,
+                                         struct mdd_object *obj,
+                                         const char *name)
+{
+        unsigned long value = mdd_name2hash(name);
+        return dynlock_lock(&obj->mod_pdlock, value, DLT_READ, GFP_NOFS);
+}
+
+void mdd_pdo_write_unlock(const struct lu_env *env, struct mdd_object *obj,
+                          struct dynlock_handle *dlh)
+{
+        return dynlock_unlock(&obj->mod_pdlock, dlh);
+}
+
+void mdd_pdo_read_unlock(const struct lu_env *env, struct mdd_object *obj,
+                         struct dynlock_handle *dlh)
+{
+        return dynlock_unlock(&obj->mod_pdlock, dlh);
+}
+
index 9f3b5ab..0ab9c6d 100644 (file)
@@ -303,8 +303,8 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
 
                         /* Get parent dir stripe and set */
                         if (pobj != NULL)
-                                rc = mdd_get_md(env, pobj, lmm, &size,
-                                                MDS_LOV_MD_NAME);
+                                rc = mdd_get_md_locked(env, pobj, lmm, &size,
+                                                       MDS_LOV_MD_NAME);
                         if (rc > 0) {
                                 buf = mdd_buf_get(env, lmm, size);
                                 rc = mdd_xattr_set_txn(env, child, buf,
index f2a57bc..127bfea 100644 (file)
@@ -576,7 +576,7 @@ int mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
         return __mdd_permission_internal(env, obj, mask, 1);
 }
 
-static inline int mdd_permission_internal_locked(const struct lu_env *env,
+inline int mdd_permission_internal_locked(const struct lu_env *env,
                                                  struct mdd_object *obj,
                                                  int mask)
 {
index 2182ccd..abe6158 100644 (file)
@@ -322,13 +322,13 @@ static int mdt_reint_create(struct mdt_thread_info *info,
                 RETURN(err_serious(-ESTALE));
 
         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
-        case S_IFREG:
         case S_IFDIR:{
                 if (info->mti_rr.rr_name[0] == 0) {
                         rc = mdt_md_mkobj(info);
                         break;
                 }
         }
+        case S_IFREG:
         case S_IFLNK:
         case S_IFCHR:
         case S_IFBLK:
@@ -658,16 +658,15 @@ static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
                         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
                                            fid, &dst_fid);
                         mdt_object_put(info->mti_env, dst);
-                        if (rc < 0) {
-                                CERROR("Error while doing mdo_is_subdir(), rc %d\n",
-                                       rc);
+                        if (rc < 0 && rc != -EREMOTE) {
+                                CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
                         } else if (rc == 1) {
                                 rc = -EINVAL;
                         }
                 } else {
                         rc = PTR_ERR(dst);
                 }
-        } while (rc == EREMOTE);
+        } while (rc == -EREMOTE);
 
         RETURN(rc);
 }
@@ -802,7 +801,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
         ma->ma_valid = 0;
-
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_RENAME_WRITE);