Whamcloud - gitweb
land b_inodebits
authornic <nic>
Mon, 12 Apr 2004 21:45:14 +0000 (21:45 +0000)
committernic <nic>
Mon, 12 Apr 2004 21:45:14 +0000 (21:45 +0000)
25 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_mds.h
lustre/kernel_patches/patches/bproc-patch-2.4.20
lustre/ldlm/Makefile.am
lustre/ldlm/Makefile.mk
lustre/ldlm/ldlm_inodebits.c [new file with mode: 0644]
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/dir.c
lustre/liblustre/namei.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/Makefile.mk
lustre/ptlrpc/autoMakefile.am
lustre/tests/replay-single.sh

index cbdf3c2..3bd524d 100644 (file)
@@ -255,9 +255,10 @@ struct ldlm_lock {
 #define LDLM_PLAIN       10
 #define LDLM_EXTENT      11
 #define LDLM_FLOCK       12
+#define LDLM_IBITS       13
 
 #define LDLM_MIN_TYPE 10
-#define LDLM_MAX_TYPE 12
+#define LDLM_MAX_TYPE 13
 
 struct ldlm_resource {
         struct ldlm_namespace *lr_namespace;
@@ -364,11 +365,32 @@ do {                                                                          \
                        atomic_read(&lock->l_export->exp_refcount) : -99);     \
                 break;                                                        \
         }                                                                     \
+        if (lock->l_resource->lr_type == LDLM_IBITS) {                        \
+                CDEBUG(level, "### " format                                   \
+                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
+                       "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "   \
+                       "flags: %x remote: "LPX64" expref: %d\n" , ## a,       \
+                       lock->l_resource->lr_namespace->ns_name,               \
+                       lock, lock->l_handle.h_cookie,                         \
+                       atomic_read (&lock->l_refc),                           \
+                       lock->l_readers, lock->l_writers,                      \
+                       ldlm_lockname[lock->l_granted_mode],                   \
+                       ldlm_lockname[lock->l_req_mode],                       \
+                       lock->l_resource->lr_name.name[0],                     \
+                       lock->l_resource->lr_name.name[1],                     \
+                       lock->l_policy_data.l_inodebits.bits,                  \
+                       atomic_read(&lock->l_resource->lr_refcount),           \
+                       ldlm_typename[lock->l_resource->lr_type],              \
+                       lock->l_flags, lock->l_remote_handle.cookie,           \
+                       lock->l_export ?                                       \
+                       atomic_read(&lock->l_export->exp_refcount) : -99);     \
+                break;                                                        \
+        }                                                                     \
         {                                                                     \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
-                       "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "     \
-                       "remote: "LPX64" expref: %d\n" , ## a,                 \
+                       "res: "LPU64"/"LPU64" rrc: %d type: %s "               \
+                       "flags: %x remote: "LPX64" expref: %d\n" , ## a,       \
                        lock->l_resource->lr_namespace->ns_name,               \
                        lock, lock->l_handle.h_cookie,                         \
                        atomic_read (&lock->l_refc),                           \
index 0d98e42..202198c 100644 (file)
@@ -511,6 +511,12 @@ typedef enum {
 #define DISP_OPEN_OPEN    (1 << 5)
 #define DISP_ENQ_COMPLETE (1<<6)
 
+/* INODE LOCK PARTS */
+#define MDS_INODELOCK_LOOKUP 0x000001       /* dentry, mode, owner, group */
+#define MDS_INODELOCK_UPDATE 0x000002       /* size, links, timestamps */
+//#define MDS_INODELOCK_MAXSHIFT 1
+//#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
+
 struct ll_fid {
         __u64 id;
         __u32 generation;
@@ -733,6 +739,9 @@ struct ldlm_extent {
         __u64 end;
         __u64 gid;
 };
+struct ldlm_inodebits {
+        __u64 bits;
+};
 
 struct ldlm_flock {
         __u64 start;
@@ -751,6 +760,7 @@ struct ldlm_flock {
 typedef union {
         struct ldlm_extent l_extent;
         struct ldlm_flock  l_flock;
+        struct ldlm_inodebits l_inodebits;
 } ldlm_policy_data_t;
 
 extern void lustre_swab_ldlm_policy_data (ldlm_policy_data_t *d);
index a1f333a..c602bb6 100644 (file)
@@ -179,7 +179,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen);
+                                     char *name, int namelen, __u64 lockpart);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt);
 int mds_update_server_data(struct obd_device *, int force_sync);
index 3587bd8..f081eb6 100644 (file)
@@ -1,4 +1,4 @@
-$Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $
+$Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $
 
 Index: linux/fs/exec.c
 ===================================================================
@@ -764,7 +764,7 @@ Index: linux/kernel/bproc_hook.c
 + *  along with this program; if not, write to the Free Software
 + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 + *
-+ * $Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $
++ * $Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $
 + *-----------------------------------------------------------------------*/
 +#include <linux/kernel.h>
 +#include <linux/sched.h>
@@ -832,7 +832,7 @@ Index: linux/include/linux/bproc.h
 + *  along with this program; if not, write to the Free Software
 + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 + *
-+ * $Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $
++ * $Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $
 + *-----------------------------------------------------------------------*/
 +#ifndef _LINUX_BPROC_H
 +#define _LINUX_BPROC_H
index 779639a..7643aef 100644 (file)
@@ -10,4 +10,4 @@
 MOSTLYCLEANFILES = *.o *.ko *.mod.c
 DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \
        ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c         \
-       ldlm_resource.c ldlm_test.c l_lock.c
+       ldlm_resource.c ldlm_test.c l_lock.c ldlm_inodebits.c
index 650331e..b20c77a 100644 (file)
@@ -7,4 +7,5 @@ include $(src)/../portals/Kernelenv
 
 obj-y += ldlm.o
 ldlm-objs := l_lock.o ldlm_lock.o ldlm_resource.o ldlm_extent.o ldlm_request.o \
-               ldlm_lockd.o ldlm_lib.o ldlm_flock.o ldlm_plain.o
+               ldlm_lockd.o ldlm_lib.o ldlm_flock.o ldlm_plain.o \
+               ldlm_inodebits.o
diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c
new file mode 100644 (file)
index 0000000..e3511dd
--- /dev/null
@@ -0,0 +1,135 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003, 2004 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+#ifndef __KERNEL__
+# include <liblustre.h>
+#endif
+
+#include <linux/lustre_dlm.h>
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
+
+#include "ldlm_internal.h"
+
+/* Determine if the lock is compatible with all locks on the queue. */
+static int
+ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
+                            int send_cbs)
+{
+        struct list_head *tmp;
+        struct ldlm_lock *lock;
+        ldlm_mode_t req_mode = req->l_req_mode;
+        __u64 req_bits = req->l_policy_data.l_inodebits.bits;
+        int compat = 1;
+        ENTRY;
+
+        LASSERT(req_bits); /* There is no sence in lock with no bits set,
+                              I think. Also such a lock would be compatible
+                               with any other bit lock */
+        list_for_each(tmp, queue) {
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (req == lock)
+                        RETURN(compat);
+
+                /* locks are compatible, bits don't matter */
+                if (lockmode_compat(lock->l_req_mode, req_mode))
+                        continue;
+
+                /* if bits don't overlap skip it */
+                if (!(lock->l_policy_data.l_inodebits.bits & req_bits))
+                        continue;
+
+                if (!send_cbs)
+                        RETURN(0);
+
+                compat = 0;
+                if (lock->l_blocking_ast)
+                        ldlm_add_ast_work_item(lock, req, NULL, 0);
+        }
+
+        RETURN(compat);
+}
+
+/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
+  *   - blocking ASTs have already been sent
+  *   - the caller has already initialized req->lr_tmp
+  *   - must call this function with the ns lock held
+  *
+  * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
+  *   - blocking ASTs have not been sent
+  *   - the caller has NOT initialized req->lr_tmp, so we must
+  *   - must call this function with the ns lock held once */
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
+                                int first_enq, ldlm_error_t *err)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        int rc;
+        ENTRY;
+
+        LASSERT(list_empty(&res->lr_converting));
+
+        if (!first_enq) {
+                LASSERT(res->lr_tmp != NULL);
+                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0);
+                if (!rc)
+                        RETURN(LDLM_ITER_STOP);
+                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0);
+                if (!rc)
+                        RETURN(LDLM_ITER_STOP);
+
+                ldlm_resource_unlink_lock(lock);
+                ldlm_grant_lock(lock, NULL, 0, 1);
+                RETURN(LDLM_ITER_CONTINUE);
+        }
+
+ restart:
+        LASSERT(res->lr_tmp == NULL);
+        res->lr_tmp = &rpc_list;
+        rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1);
+        rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1);
+        res->lr_tmp = NULL;
+
+        if (rc != 2) {
+                /* If either of the compat_queue()s returned 0, then we
+                 * have ASTs to send and must go onto the waiting list.
+                 *
+                 * bug 2322: we used to unlink and re-add here, which was a
+                 * terrible folly -- if we goto restart, we could get
+                 * re-ordered!  Causes deadlock, because ASTs aren't sent! */
+                if (list_empty(&lock->l_res_link))
+                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+                l_unlock(&res->lr_namespace->ns_lock);
+                rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
+                l_lock(&res->lr_namespace->ns_lock);
+                if (rc == -ERESTART)
+                        GOTO(restart, -ERESTART);
+                *flags |= LDLM_FL_BLOCK_GRANTED;
+        } else {
+                ldlm_resource_unlink_lock(lock);
+                ldlm_grant_lock(lock, NULL, 0, 0);
+        }
+        RETURN(0);
+}
index 742bcba..3a79a52 100644 (file)
@@ -69,6 +69,10 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                             ldlm_error_t *err);
 
+/* ldlm_inodebits.c */
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
+                               int first_enq, ldlm_error_t *err);
+
 /* l_lock.c */
 void l_check_no_ns_lock(struct ldlm_namespace *ns);
 
index 756dd1c..e10ac64 100644 (file)
@@ -52,6 +52,7 @@ char *ldlm_typename[] = {
         [LDLM_PLAIN] "PLN",
         [LDLM_EXTENT] "EXT",
         [LDLM_FLOCK] "FLK",
+        [LDLM_IBITS] "IBT",
 };
 
 char *ldlm_it2str(int it)
@@ -88,6 +89,7 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = {
 #ifdef __KERNEL__
         [LDLM_FLOCK] ldlm_process_flock_lock,
 #endif
+        [LDLM_IBITS] ldlm_process_inodebits_lock,
 };
 
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
@@ -597,6 +599,14 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
 
+                /* We match if we have existing lock with same or wider set
+                   of bits. */
+                if (lock->l_resource->lr_type == LDLM_IBITS &&
+                     ((lock->l_policy_data.l_inodebits.bits &
+                      policy->l_inodebits.bits) !=
+                      policy->l_inodebits.bits))
+                        continue;
+
                 if (lock->l_destroyed)
                         continue;
 
@@ -1191,6 +1201,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
                        lock->l_policy_data.l_flock.pid,
                        lock->l_policy_data.l_flock.start,
                        lock->l_policy_data.l_flock.end);
+        else if (lock->l_resource->lr_type == LDLM_IBITS)
+                CDEBUG(level, " Bits: "LPX64"\n",
+                       lock->l_policy_data.l_inodebits.bits);
 }
 
 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
index c236d73..a2966a2 100644 (file)
@@ -62,6 +62,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp);
         struct ldlm_res_id res_id =
                 { .name = {lli->lli_st_ino, (__u64)lli->lli_st_generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
         ENTRY;
 
         if ((lli->lli_st_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index) {
@@ -75,11 +76,11 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         }
 
         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                             &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh);
         if (!rc) {
                 llu_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
 
-                rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
+                rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &it, LCK_PR,
                                  &data, &lockh, NULL, 0,
                                  ldlm_completion_ast, llu_mdc_blocking_ast,
                                  inode);
index af4a0dc..3339595 100644 (file)
@@ -140,6 +140,7 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock,
         case LDLM_CB_CANCELING: {
                 struct inode *inode = llu_inode_from_lock(lock);
                 struct llu_inode_info *lli;
+                __u64 bits = lock->l_policy_data.l_inodebits.bits;
 
                 /* Invalidate all dentries associated with this inode */
                 if (inode == NULL)
@@ -147,14 +148,16 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock,
 
                 lli =  llu_i2info(inode);
 
-                clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
+                if (bits & MDS_INODELOCK_UPDATE)
+                        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
 
                 if (lock->l_resource->lr_name.name[0] != lli->lli_st_ino ||
                     lock->l_resource->lr_name.name[1] != lli->lli_st_generation) {
                         LDLM_ERROR(lock, "data mismatch with ino %lu/%lu",
                                    lli->lli_st_ino, lli->lli_st_generation);
                 }
-                if (S_ISDIR(lli->lli_st_mode)) {
+                if (S_ISDIR(lli->lli_st_mode) &&
+                    (bits & MDS_INODELOCK_UPDATE)) {
                         CDEBUG(D_INODE, "invalidating inode %lu\n",
                                lli->lli_st_ino);
 
index 6971d87..6608922 100644 (file)
@@ -337,13 +337,14 @@ static struct inode* llu_new_inode(struct filesys *fs,
         return inode;
 }
 
-static int llu_have_md_lock(struct inode *inode)
+static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
 {
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         struct llu_inode_info *lli = llu_i2info(inode);
         struct lustre_handle lockh;
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obddev;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
         int flags;
         ENTRY;
 
@@ -357,14 +358,14 @@ static int llu_have_md_lock(struct inode *inode)
 
         /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PR, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PR, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PR);
                 RETURN(1);
         }
 
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PW, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PW, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PW);
                 RETURN(1);
         }
@@ -382,7 +383,7 @@ static int llu_inode_revalidate(struct inode *inode)
                 RETURN(0);
         }
 
-        if (!llu_have_md_lock(inode)) {
+        if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) {
                 struct lustre_md md;
                 struct ptlrpc_request *req = NULL;
                 struct llu_sb_info *sbi = llu_i2sbi(inode);
index 01a9c8a..a719ca1 100644 (file)
@@ -228,6 +228,29 @@ int ll_revalidate_it(struct dentry *de, int flags, struct lookup_intent *it)
 
         ll_i2uctxt(&ctxt, de->d_parent->d_inode, de->d_inode);
 
+        if (it->it_op == IT_GETATTR) { /* We need to check for LOOKUP lock
+                                          as well */
+                rc = mdc_intent_lock(exp, &ctxt, &pfid, de->d_name.name,
+                                     de->d_name.len, NULL, 0, &cfid, &lookup_it,
+                                     flags, &req, ll_mdc_blocking_ast);
+                /* If there was no lookup lock, no point in even checking for
+                   UPDATE lock */
+                if (!rc) {
+                        it = &lookup_it;
+                        GOTO(out, rc);
+                }
+                if (it_disposition(&lookup_it, DISP_LOOKUP_NEG)) {
+                        ll_intent_release(&lookup_it);
+                        it = &lookup_it;
+                        GOTO(out, rc = 0);
+                }
+                        
+                if (req)
+                        ptlrpc_req_finished(req);
+                req = NULL;
+                ll_lookup_finish_locks(&lookup_it, de);
+        }
+
         rc = mdc_intent_lock(exp, &ctxt, &pfid, de->d_name.name, de->d_name.len,
                              NULL, 0,
                              &cfid, it, flags, &req, ll_mdc_blocking_ast);
index f92652f..bca0445 100644 (file)
@@ -212,10 +212,11 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
         struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
         struct address_space *mapping = dir->i_mapping;
         struct page *page;
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
         int rc;
 
         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                             &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh);
         if (!rc) {
                 struct lookup_intent it = { .it_op = IT_READDIR };
                 struct ptlrpc_request *request;
@@ -223,7 +224,7 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 
                 ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0);
 
-                rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
+                rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_IBITS, &it,
                                  LCK_PR, &data, &lockh, NULL, 0,
                                  ldlm_completion_ast, ll_mdc_blocking_ast, dir);
 
index a6d4f74..3ce798b 100644 (file)
@@ -131,7 +131,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
 
         ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR);
 
-        rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
+        rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, itp, LCK_PW, &data,
                          &lockh, lmm, lmmsize, ldlm_completion_ast,
                          ll_mdc_blocking_ast, parent->d_inode);
         if (rc < 0)
@@ -1297,13 +1297,14 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         RETURN(rc);
 }
 
-static int ll_have_md_lock(struct dentry *de)
+static int ll_have_md_lock(struct dentry *de, __u64 lockpart)
 {
         struct ll_sb_info *sbi = ll_s2sbi(de->d_sb);
         struct lustre_handle lockh;
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obddev;
         int flags;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
         ENTRY;
 
         if (!de->d_inode)
@@ -1317,14 +1318,14 @@ static int ll_have_md_lock(struct dentry *de)
 
         /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PR, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PR, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PR);
                 RETURN(1);
         }
 
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PW, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PW, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PW);
                 RETURN(1);
         }
@@ -1350,7 +1351,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
 #endif
 
-        if (!ll_have_md_lock(dentry)) {
+        if (!ll_have_md_lock(dentry, MDS_INODELOCK_UPDATE)) {
                 struct ptlrpc_request *req = NULL;
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
                 struct ll_fid fid;
index 23b193c..3508777 100644 (file)
@@ -154,20 +154,33 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 break;
         case LDLM_CB_CANCELING: {
                 struct inode *inode = ll_inode_from_lock(lock);
+                __u64 bits = lock->l_policy_data.l_inodebits.bits;
 
-                /* Invalidate all dentries associated with this inode */
+                /* For lookup locks: Invalidate all dentries associated with
+                   this inode, for UPDATE locks - invalidate directory pages */
                 if (inode == NULL)
                         break;
 
-                clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
-                          &(ll_i2info(inode)->lli_flags));
+                if (bits & MDS_INODELOCK_UPDATE)
+                        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
+                                  &(ll_i2info(inode)->lli_flags));
+
 
                 if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
                     lock->l_resource->lr_name.name[1] != inode->i_generation) {
                         LDLM_ERROR(lock, "data mismatch with ino %lu/%u",
                                    inode->i_ino, inode->i_generation);
                 }
-                if (S_ISDIR(inode->i_mode)) {
+
+                /* If lookup lock is cancelled, we just drop the dentry and
+                   this will cause us to reget data from MDS when we'd want to
+                   access this dentry/inode again. If this is lock on
+                   other parts of inode that is cancelled, we do not need to do
+                   much (but need to discard data from readdir, if any), since
+                   abscence of lock will cause ll_revalidate_it (called from
+                   stat() and similar functions) to renew the data anyway */
+                if (S_ISDIR(inode->i_mode) &&
+                    (bits & MDS_INODELOCK_UPDATE)) {
                         CDEBUG(D_INODE, "invalidating inode %lu\n",
                                inode->i_ino);
 
@@ -175,7 +188,8 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 }
 
                 if (inode->i_sb->s_root &&
-                    inode != inode->i_sb->s_root->d_inode)
+                    inode != inode->i_sb->s_root->d_inode &&
+                    (bits & MDS_INODELOCK_LOOKUP))
                         ll_unhash_aliases(inode);
                 iput(inode);
                 break;
@@ -194,6 +208,7 @@ int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
                 { .name = {inode->i_ino, inode->i_generation} };
         struct obd_device *obddev = class_conn2obd(conn);
         ENTRY;
+        
         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
                                       opaque));
 }
@@ -313,8 +328,9 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
         dentry->d_op = &ll_d_ops;
         ll_set_dd(dentry);
 
-        if (dentry == saved)
+        if (dentry == saved) {
                 d_add(dentry, inode);
+        }
 
         RETURN(0);
 }
index 7b1aa8b..8f7b3b6 100644 (file)
@@ -166,6 +166,7 @@ int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
 
         ldlm_change_cbdata(class_exp2obd(exp)->obd_namespace, &res_id, it, 
                            data);
+
         EXIT;
         return 0;
 }
@@ -190,6 +191,7 @@ int mdc_enqueue(struct obd_export *exp,
         struct obd_device *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id =
                 { .name = {data->fid1.id, data->fid1.generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         int size[6] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
         int rc, flags = LDLM_FL_HAS_INTENT;
         int repsize[4] = {sizeof(struct ldlm_reply),
@@ -255,6 +257,9 @@ int mdc_enqueue(struct obd_export *exp,
                 size[2] = sizeof(struct mds_body);
                 size[3] = data->namelen + 1;
 
+                if (it->it_op & IT_GETATTR)
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
                                       size, NULL);
                 if (!req)
@@ -270,6 +275,7 @@ int mdc_enqueue(struct obd_export *exp,
                 reply_buffers = 3;
                 req->rq_replen = lustre_msg_size(3, repsize);
         } else if (it->it_op == IT_READDIR) {
+               policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (!req)
@@ -282,10 +288,9 @@ int mdc_enqueue(struct obd_export *exp,
                 LBUG();
                 RETURN(-EINVAL);
         }
-
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
-                              lock_type, NULL, lock_mode, &flags, cb_blocking,
+                              lock_type, &policy, lock_mode, &flags,cb_blocking,
                               cb_completion, NULL, cb_data, NULL, 0, NULL,
                               lockh);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
@@ -430,16 +435,26 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
                 struct ldlm_res_id res_id ={.name = {cfid->id,
                                                      cfid->generation}};
                 struct lustre_handle lockh;
+                ldlm_policy_data_t policy;
                 int mode = LCK_PR;
 
+                /* For the GETATTR case, ll_revalidate_it issues two separate
+                   queries - for LOOKUP and for UPDATE lock because if cannot
+                   check them together - we might have those two bits to be
+                   present in two separate granted locks */
+                policy.l_inodebits.bits = 
+                                 (it->it_op == IT_GETATTR)?MDS_INODELOCK_UPDATE:
+                                                           MDS_INODELOCK_LOOKUP;
+                mode = LCK_PR;
                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                      LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                                     LDLM_IBITS, &policy, LCK_PR, &lockh);
                 if (!rc) {
                         mode = LCK_PW;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                              LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_PLAIN, NULL, LCK_PW, &lockh);
+                                             LDLM_IBITS, &policy, LCK_PW,
+                                             &lockh);
                 }
                 if (rc) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
@@ -461,7 +476,7 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
                 struct mdc_op_data op_data;
                 mdc_fid2mdc_op_data(&op_data, uctxt, pfid, cfid, name, len, 0);
 
-                rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it),
+                rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
                                  &op_data, &lockh, lmm, lmmsize,
                                  ldlm_completion_ast, cb_blocking, NULL);
                 if (rc < 0)
@@ -537,11 +552,12 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
          * intent_finish has performed the iget().) */
         lock = ldlm_handle2lock(&lockh);
         if (lock) {
+                ldlm_policy_data_t policy = lock->l_policy_data;
                 LDLM_DEBUG(lock, "matching against this");
                 LDLM_LOCK_PUT(lock);
                 memcpy(&old_lock, &lockh, sizeof(lockh));
                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
-                                    LDLM_PLAIN, NULL, LCK_NL, &old_lock)) {
+                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
                         ldlm_lock_decref_and_cancel(&lockh,
                                                     it->d.lustre.it_lock_mode);
                         memcpy(&lockh, &old_lock, sizeof(old_lock));
index 3f2aff3..b34e184 100644 (file)
@@ -158,12 +158,14 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen)
+                                     char *name, int namelen, __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
         struct ldlm_res_id res_id = { .name = {0} };
         int flags = 0, rc;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
+
         ENTRY;
 
         if (IS_ERR(de))
@@ -172,7 +174,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
-                              LDLM_PLAIN, NULL, lock_mode, &flags,
+                              LDLM_IBITS, &policy, lock_mode, &flags,
                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
                               NULL, 0, NULL, lockh);
         if (rc != ELDLM_OK) {
@@ -652,7 +654,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
 }
 
 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
-                            struct lustre_handle *child_lockh)
+                            struct lustre_handle *child_lockh, int child_part)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_reply *rep = NULL;
@@ -732,8 +734,10 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         if (resent_req == 0) {
                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
                                                  &parent_lockh, &dparent,
-                                                 LCK_PR, name, namesize,
-                                                 child_lockh, &dchild, LCK_PR);
+                                                 LCK_PR, MDS_INODELOCK_UPDATE,
+                                                 name, namesize,
+                                                 child_lockh, &dchild, LCK_PR,
+                                                 child_part);
                 if (rc)
                         GOTO(cleanup, rc);
         } else {
@@ -1136,7 +1140,7 @@ int mds_handle(struct ptlrpc_request *req)
                  * want to cancel.
                  */
                 lockh.cookie = 0;
-                rc = mds_getattr_name(0, req, &lockh);
+                rc = mds_getattr_name(0, req, &lockh, MDS_INODELOCK_UPDATE);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
                 if (rc == 0 && lockh.cookie)
@@ -1675,6 +1679,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         struct ldlm_reply *rep;
         struct lustre_handle lockh = { 0 };
         struct ldlm_lock *new_lock;
+        int getattr_part = MDS_INODELOCK_UPDATE;
         int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
                                           sizeof(struct mds_body),
                                           mds->mds_max_mdsize,
@@ -1726,10 +1731,13 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 #endif 
                         RETURN(ELDLM_LOCK_ABORTED);
                 break;
-        case IT_GETATTR:
         case IT_LOOKUP:
+                getattr_part = MDS_INODELOCK_LOOKUP;
+        case IT_GETATTR:
+                getattr_part |= MDS_INODELOCK_LOOKUP;
         case IT_READDIR:
-                rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh);
+                rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh,
+                                                         getattr_part);
                 /* FIXME: LDLM can set req->rq_status. MDS sets
                    policy_res{1,2} with disposition and status.
                    - replay: returns 0 & req->status is old status 
index ec3f063..b4a70b3 100644 (file)
@@ -19,11 +19,12 @@ static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 }
 
 /* mds/mds_reint.c */
-int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2);
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
+                          ldlm_policy_data_t *p1_policy,
                           struct ldlm_res_id *p2_res_id,
-                          struct lustre_handle *p2_lockh, int p2_lock_mode);
+                          struct lustre_handle *p2_lockh, int p2_lock_mode,
+                          ldlm_policy_data_t *p2_policy);
 void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error);
 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
@@ -33,9 +34,11 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
+                                __u64 parent_lockpart,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
-                                struct dentry **dchildp, int child_mode);
+                                struct dentry **dchildp, int child_mode,
+                                __u64 child_lockpart);
 int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
                        struct lustre_handle *child_lockh);
 
index b474283..d62f50e 100644 (file)
@@ -834,11 +834,16 @@ int mds_open(struct mds_update_record *rec, int offset,
         acc_mode = accmode(rec->ur_flags);
 
         /* Step 1: Find and lock the parent */
-        if (rec->ur_flags & O_CREAT)
+        if (rec->ur_flags & O_CREAT) {
+                /* XXX Well, in fact we only need this lock mode change if
+                   in addition to O_CREAT, the file does not exist.
+                   But we do not know if it exists or not yet */
                 parent_mode = LCK_PW;
+        }
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
                                         &parent_lockh, rec->ur_name,
-                                        rec->ur_namelen - 1);
+                                        rec->ur_namelen - 1,
+                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
index b2dd794..7d18572 100644 (file)
@@ -78,11 +78,10 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                        rc);
         } else {
                 ///* XXX 0 normally, SENDNOW for debug */);
-                ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
-                rc = llog_cancel(ctxt, lsm,
-                                         mlcd->mlcd_cookielen /
-                                         sizeof(*mlcd->mlcd_cookies),
-                                         mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+                ctxt = llog_get_context(obd,mlcd->mlcd_cookies[0].lgc_subsys+1);
+                rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen /
+                                                sizeof(*mlcd->mlcd_cookies),
+                                 mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
                 if (rc)
                         CERROR("error cancelling %d log cookies: rc %d\n",
                                (int)(mlcd->mlcd_cookielen /
@@ -394,14 +393,18 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
         } else {
+                __u64 lockpart = MDS_INODELOCK_UPDATE;
+                if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
+                        lockpart |= MDS_INODELOCK_LOOKUP;
                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                           &lockh, NULL, 0);
+                                           &lockh, NULL, 0, lockpart);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
                 locked = 1;
         }
 
         cleanup_phase = 1;
+
         inode = de->d_inode;
         LASSERT(inode);
         if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) && 
@@ -563,7 +566,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ESTALE);
 
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh,
-                                        rec->ur_name, rec->ur_namelen - 1);
+                                        rec->ur_name, rec->ur_namelen - 1,
+                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
@@ -751,7 +755,8 @@ cleanup:
         return 0;
 }
 
-int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
+static int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
+           ldlm_policy_data_t *p1, ldlm_policy_data_t *p2)
 {
         int i;
 
@@ -768,6 +773,13 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
                 if (res1->name[i] < res2->name[i])
                         return 0;
         }
+
+        if (!p1 || !p2)
+                return 0;
+
+        if (memcmp(p1, p2, sizeof(*p1)) < 0)
+                return 1;
+
         return 0;
 }
 
@@ -779,27 +791,32 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
+                          ldlm_policy_data_t *p1_policy,
                           struct ldlm_res_id *p2_res_id,
-                          struct lustre_handle *p2_lockh, int p2_lock_mode)
+                          struct lustre_handle *p2_lockh, int p2_lock_mode,
+                          ldlm_policy_data_t *p2_policy)
 {
         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
+        ldlm_policy_data_t *policies[2] = { p1_policy, p2_policy };
         int rc, flags;
         ENTRY;
 
         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
 
-        CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
-               res_id[0]->name[0], res_id[1]->name[0]);
+        CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0],
+               res_id[1]->name[0]);
 
-        if (res_gt(p1_res_id, p2_res_id)) {
+        if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) {
                 handles[1] = p1_lockh;
                 handles[0] = p2_lockh;
                 res_id[1] = p1_res_id;
                 res_id[0] = p2_res_id;
                 lock_modes[1] = p1_lock_mode;
                 lock_modes[0] = p2_lock_mode;
+                policies[1] = p1_policy;
+                policies[0] = p2_policy;
         }
 
         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
@@ -807,20 +824,21 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
 
         flags = LDLM_FL_LOCAL_ONLY;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
-                              LDLM_PLAIN, NULL, lock_modes[0], &flags,
+                              LDLM_IBITS, policies[0], lock_modes[0], &flags,
                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
                               NULL, 0, NULL, handles[0]);
         if (rc != ELDLM_OK)
                 RETURN(-EIO);
         ldlm_lock_dump_handle(D_OTHER, handles[0]);
 
-        if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
+        if (!memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) &&
+            (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) {
                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
                 ldlm_lock_addref(handles[1], lock_modes[1]);
         } else if (res_id[1]->name[0] != 0) {
                 flags = LDLM_FL_LOCAL_ONLY;
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      *res_id[1], LDLM_PLAIN, NULL,
+                                      *res_id[1], LDLM_IBITS, policies[1],
                                       lock_modes[1], &flags, mds_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
                                       NULL, handles[1]);
@@ -836,12 +854,16 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
 
 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                            struct lustre_handle *p1_lockh, int p1_lock_mode,
+                           ldlm_policy_data_t *p1_policy,
                            struct ldlm_res_id *p2_res_id,
                            struct lustre_handle *p2_lockh, int p2_lock_mode,
+                           ldlm_policy_data_t *p2_policy,
                            struct ldlm_res_id *c1_res_id,
                            struct lustre_handle *c1_lockh, int c1_lock_mode,
+                           ldlm_policy_data_t *c1_policy,
                            struct ldlm_res_id *c2_res_id,
-                           struct lustre_handle *c2_lockh, int c2_lock_mode)
+                           struct lustre_handle *c2_lockh, int c2_lock_mode,
+                           ldlm_policy_data_t *c2_policy)
 {
         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
                                           c1_res_id, c2_res_id };
@@ -849,10 +871,13 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                                                  c1_lockh, c2_lockh };
         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
                               c1_lock_mode, c2_lock_mode };
+        ldlm_policy_data_t *policies[5] = { p1_policy, p2_policy,
+                                            c1_policy, c2_policy};
         int rc, i, j, sorted, flags;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+        CDEBUG(D_DLMTRACE,
+               "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
                res_id[3]->name[0]);
 
@@ -862,13 +887,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 dlm_handles[4] = dlm_handles[i];
                 res_id[4] = res_id[i];
                 lock_modes[4] = lock_modes[i];
+                policies[4] = policies[i];
 
                 sorted = 0;
                 do {
-                        if (res_gt(res_id[j], res_id[4])) {
+                        if (res_gt(res_id[j], res_id[4], policies[j],
+                                   policies[4])) {
                                 dlm_handles[j + 1] = dlm_handles[j];
                                 res_id[j + 1] = res_id[j];
                                 lock_modes[j + 1] = lock_modes[j];
+                                policies[j + 1] = policies[j];
                                 j--;
                         } else {
                                 sorted = 1;
@@ -878,9 +906,11 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 dlm_handles[j + 1] = dlm_handles[4];
                 res_id[j + 1] = res_id[4];
                 lock_modes[j + 1] = lock_modes[4];
+                policies[j + 1] = policies[4];
         }
 
-        CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+        CDEBUG(D_DLMTRACE,
+               "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
                res_id[3]->name[0]);
 
@@ -890,13 +920,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 if (res_id[i]->name[0] == 0)
                         break;
                 if (i != 0 &&
-                    memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
+                    !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) &&
+                    (policies[i]->l_inodebits.bits &
+                     policies[i-1]->l_inodebits.bits) ) {
                         memcpy(dlm_handles[i], dlm_handles[i-1],
                                sizeof(*(dlm_handles[i])));
                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
                 } else {
                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              *res_id[i], LDLM_PLAIN, NULL,
+                                              *res_id[i], LDLM_IBITS,
+                                              policies[i],
                                               lock_modes[i], &flags,
                                               mds_blocking_ast,
                                               ldlm_completion_ast, NULL, NULL,
@@ -934,6 +967,7 @@ static int mds_verify_child(struct obd_device *obd,
                             struct ldlm_res_id *child_res_id,
                             struct lustre_handle *child_lockh,
                             struct dentry **dchildp, int child_mode,
+                            ldlm_policy_data_t *child_policy,
                             const char *name, int namelen,
                             struct ldlm_res_id *maxres)
 {
@@ -973,8 +1007,8 @@ static int mds_verify_child(struct obd_device *obd,
                 child_res_id->name[0] = dchild->d_inode->i_ino;
                 child_res_id->name[1] = dchild->d_inode->i_generation;
 
-                if (res_gt(parent_res_id, child_res_id) ||
-                    res_gt(maxres, child_res_id)) {
+                if (res_gt(parent_res_id, child_res_id, NULL, NULL) ||
+                    res_gt(maxres, child_res_id, NULL, NULL)) {
                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
                                child_res_id->name[0], parent_res_id->name[0],
                                maxres->name[0]);
@@ -982,12 +1016,13 @@ static int mds_verify_child(struct obd_device *obd,
                 }
 
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      *child_res_id, LDLM_PLAIN, NULL,
+                                      *child_res_id, LDLM_IBITS, child_policy,
                                       child_mode, &flags, mds_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
                                       NULL, child_lockh);
                 if (rc != ELDLM_OK)
                         GOTO(cleanup, rc = -EIO);
+
         } else {
                 memset(child_res_id, 0, sizeof(*child_res_id));
         }
@@ -1010,12 +1045,16 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
+                                __u64 parent_lockpart,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
-                                struct dentry **dchildp, int child_mode)
+                                struct dentry **dchildp, int child_mode,
+                                __u64 child_lockpart)
 {
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct ldlm_res_id parent_res_id = { .name = {0} };
+        ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }};
+        ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }};
         struct inode *inode;
         int rc = 0, cleanup_phase = 0;
         ENTRY;
@@ -1050,6 +1089,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 
         child_res_id.name[0] = inode->i_ino;
         child_res_id.name[1] = inode->i_generation;
+
         iput(inode);
 
 retry_locks:
@@ -1057,8 +1097,10 @@ retry_locks:
 
         /* Step 3: Lock parent and child in resource order.  If child doesn't
          *         exist, we still have to lock the parent and re-lookup. */
-        rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
-                                   &child_res_id, child_lockh, child_mode);
+        rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
+                                   &parent_policy,
+                                   &child_res_id, child_lockh, child_mode,
+                                   &child_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1069,8 +1111,9 @@ retry_locks:
 
         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
-                              parent_mode, &child_res_id, child_lockh, dchildp,
-                              child_mode, name, namelen, &parent_res_id);
+                              parent_mode, &child_res_id, child_lockh, 
+                              dchildp, child_mode, &child_policy,
+                              name, namelen, &parent_res_id);
         if (rc > 0)
                 goto retry_locks;
         if (rc < 0) {
@@ -1128,8 +1171,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
                                          &parent_lockh, &dparent, LCK_PW,
+                                         MDS_INODELOCK_UPDATE,
                                          rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &dchild, LCK_EX);
+                                         &child_lockh, &dchild, LCK_EX,
+                                         MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1316,6 +1361,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
         struct ldlm_res_id src_res_id = { .name = {0} };
         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
+        ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
+        ldlm_policy_data_t tgt_dir_policy =
+                                       {.l_inodebits = {MDS_INODELOCK_UPDATE}};
+
         int rc = 0, cleanup_phase = 0;
         ENTRY;
 
@@ -1354,7 +1403,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
 
         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
-                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
+                                   &src_policy,
+                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX,
+                                   &tgt_dir_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1510,6 +1561,14 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         struct ldlm_res_id p2_res_id = { .name = {0} };
         struct ldlm_res_id c1_res_id = { .name = {0} };
         struct ldlm_res_id c2_res_id = { .name = {0} };
+        ldlm_policy_data_t p_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}};
+        /* Only dentry should change, but the inode itself would be
+           intact otherwise */
+        ldlm_policy_data_t c1_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP}};
+        /* If something is going to be replaced, both dentry and inode locks are
+           needed */
+        ldlm_policy_data_t c2_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP|
+                                                        MDS_INODELOCK_UPDATE}};
         struct ldlm_res_id *maxres_src, *maxres_tgt;
         struct inode *inode;
         int rc = 0, cleanup_phase = 0;
@@ -1579,6 +1638,7 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
 
         c2_res_id.name[0] = inode->i_ino;
         c2_res_id.name[1] = inode->i_generation;
+
         iput(inode);
 
 retry_locks:
@@ -1587,15 +1647,19 @@ retry_locks:
         maxres_tgt = &p2_res_id;
         cleanup_phase = 4; /* target dentry */
 
-        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
+        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id, NULL,NULL))
                 maxres_src = &c1_res_id;
-        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
+        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id, NULL,NULL))
                 maxres_tgt = &c2_res_id;
 
         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
+                                    &p_policy,
                                     &p2_res_id, &dlm_handles[1], parent_mode,
+                                    &p_policy,
                                     &c1_res_id, &dlm_handles[2], child_mode,
-                                    &c2_res_id, &dlm_handles[3], child_mode);
+                                    &c1_policy,
+                                    &c2_res_id, &dlm_handles[3], child_mode,
+                                    &c2_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1603,8 +1667,9 @@ retry_locks:
 
         /* Step 6a: Re-lookup source child to verify it hasn't changed */
         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
-                              parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
-                              child_mode, old_name, old_len, maxres_tgt);
+                              parent_mode, &c1_res_id, &dlm_handles[2],
+                              de_oldp, child_mode, &c1_policy, old_name,old_len,
+                              maxres_tgt);
         if (rc) {
                 if (c2_res_id.name[0] != 0)
                         ldlm_lock_decref(&dlm_handles[3], child_mode);
@@ -1620,8 +1685,9 @@ retry_locks:
 
         /* Step 6b: Re-lookup target child to verify it hasn't changed */
         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
-                              parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
-                              child_mode, new_name, new_len, maxres_src);
+                              parent_mode, &c2_res_id, &dlm_handles[3],
+                              de_newp, child_mode, &c2_policy, new_name,
+                              new_len, maxres_src);
         if (rc) {
                 ldlm_lock_decref(&dlm_handles[2], child_mode);
                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
@@ -1770,8 +1836,7 @@ cleanup:
                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
                 } else {
                         if (lock_count == 4)
-                                ptlrpc_save_lock(req,
-                                              &(dlm_handles[3]), LCK_EX);
+                                ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX);
                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
                         ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
                         ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
index 94e4c1a..3387747 100644 (file)
@@ -8,7 +8,7 @@ ptlrpc-objs := $(LDLM)l_lock.o $(LDLM)ldlm_lock.o
 ptlrpc-objs += $(LDLM)ldlm_resource.o $(LDLM)ldlm_lib.o
 ptlrpc-objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o
 ptlrpc-objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o
-ptlrpc-objs += $(LDLM)ldlm_flock.o
+ptlrpc-objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o
 ptlrpc-objs += client.o recover.o connection.o niobuf.o pack_generic.o
 ptlrpc-objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
 ptlrpc-objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
index 4154582..77aa1cb 100644 (file)
@@ -13,5 +13,6 @@ ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \
                llog_server.o ptlrpcd.o ../ldlm/l_lock.o ../ldlm/ldlm_lock.o \
                ../ldlm/ldlm_resource.o ../ldlm/ldlm_extent.o \
                ../ldlm/ldlm_request.o ../ldlm/ldlm_lockd.o \
-               ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o
+               ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o \
+               ../ldlm/ldlm_inodebits.o
 
index eb04d16..96dcfe3 100644 (file)
@@ -7,7 +7,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/ldlm/l_lock.c $(top_srcdir)/ldlm/ldlm_lock.c \
     $(top_srcdir)/ldlm/ldlm_resource.c $(top_srcdir)/ldlm/ldlm_lib.c          \
     $(top_srcdir)/ldlm/ldlm_plain.c $(top_srcdir)/ldlm/ldlm_extent.c          \
     $(top_srcdir)/ldlm/ldlm_request.c $(top_srcdir)/ldlm/ldlm_lockd.c         \
-    $(top_srcdir)/ldlm/ldlm_internal.h
+    $(top_srcdir)/ldlm/ldlm_internal.h $(top_srcdir)/ldlm/ldlm_inodebits.c
 
 COMMON_SOURCES =  client.c recover.c connection.c niobuf.c pack_generic.c   \
     events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c   \
index a6fb3a2..e9b34ea 100755 (executable)
@@ -63,6 +63,7 @@ setup() {
     [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
     start mds $MDSLCONFARGS --reformat
     zconf_mount `hostname` $MOUNT
+    echo 0x3f0410 > /proc/sys/portals/debug
 }
 
 $SETUP