Whamcloud - gitweb
Landing Inodebits to b_release_1_4_6.
authorgreen <green>
Fri, 28 Oct 2005 20:23:09 +0000 (20:23 +0000)
committergreen <green>
Fri, 28 Oct 2005 20:23:09 +0000 (20:23 +0000)
21 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_mds.h
lustre/ldlm/Makefile.am
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/dir.c
lustre/liblustre/namei.c
lustre/liblustre/super.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/mds/mds_xattr.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/autoMakefile.am

index 72f285c..0721f4b 100644 (file)
@@ -370,6 +370,29 @@ do {                                                                          \
                        lock->l_pid);                                          \
                 break;                                                        \
         }                                                                     \
+        if (lock->l_resource->lr_type == LDLM_IBITS) {                        \
+                CDEBUG(level, "### " format                                   \
+                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
+                       "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "  \
+                       "flags: %x remote: "LPX64" expref: %d "                \
+                       "pid %u\n" , ## a,                                     \
+                       lock->l_resource->lr_namespace->ns_name,               \
+                       lock, lock->l_handle.h_cookie,                         \
+                       atomic_read (&lock->l_refc),                           \
+                       lock->l_readers, lock->l_writers,                      \
+                       ldlm_lockname[lock->l_granted_mode],                   \
+                       ldlm_lockname[lock->l_req_mode],                       \
+                       lock->l_resource->lr_name.name[0],                     \
+                       lock->l_resource->lr_name.name[1],                     \
+                       lock->l_policy_data.l_inodebits.bits,                  \
+                       atomic_read(&lock->l_resource->lr_refcount),           \
+                       ldlm_typename[lock->l_resource->lr_type],              \
+                       lock->l_flags, lock->l_remote_handle.cookie,           \
+                       lock->l_export ?                                       \
+                       atomic_read(&lock->l_export->exp_refcount) : -99,      \
+                       lock->l_pid);                                          \
+                break;                                                        \
+        }                                                                     \
         {                                                                     \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
index beb7023..d0f9900 100644 (file)
@@ -527,6 +527,16 @@ typedef enum {
 #define DISP_OPEN_OPEN    0x20
 #define DISP_ENQ_COMPLETE 0x40
 
+/* INODE LOCK PARTS */
+#define MDS_INODELOCK_LOOKUP 0x000001       /* dentry, mode, owner, group */
+#define MDS_INODELOCK_UPDATE 0x000002       /* size, links, timestamps */
+#define MDS_INODELOCK_OPEN   0x000004       /* For opened files */
+
+/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits */
+#define MDS_INODELOCK_MAXSHIFT 2
+/* This FULL lock is useful to take on unlink sort of operations */
+#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
+
 struct ll_fid {
         __u64 id;
         __u32 generation;
@@ -814,7 +824,7 @@ typedef enum {
         LDLM_PLAIN     = 10,
         LDLM_EXTENT    = 11,
         LDLM_FLOCK     = 12,
-//      LDLM_IBITS     = 13,
+        LDLM_IBITS     = 13,
         LDLM_MAX_TYPE
 } ldlm_type_t;
 
@@ -826,6 +836,10 @@ struct ldlm_extent {
         __u64 gid;
 };
 
+struct ldlm_inodebits {
+        __u64 bits;
+};
+
 struct ldlm_flock {
         __u64 start;
         __u64 end;
@@ -843,6 +857,7 @@ struct ldlm_flock {
 typedef union {
         struct ldlm_extent l_extent;
         struct ldlm_flock  l_flock;
+        struct ldlm_inodebits l_inodebits;
 } ldlm_policy_data_t;
 
 extern void lustre_swab_ldlm_policy_data (ldlm_policy_data_t *d);
index 5c43429..1eb5c58 100644 (file)
@@ -135,7 +135,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen);
+                                     char *name, int namelen, __u64 lockpart);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt);
 int mds_update_server_data(struct obd_device *, int force_sync);
index 2b9856c..aeb4a06 100644 (file)
@@ -10,4 +10,4 @@
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ 
 DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \
        ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c         \
-       ldlm_resource.c l_lock.c
+       ldlm_resource.c l_lock.c ldlm_inodebits.c
index 56c88cf..e3511dd 100644 (file)
@@ -35,7 +35,7 @@
 /* Determine if the lock is compatible with all locks on the queue. */
 static int
 ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
-                            struct list_head *work_list)
+                            int send_cbs)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -61,12 +61,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 if (!(lock->l_policy_data.l_inodebits.bits & req_bits))
                         continue;
 
-                if (!work_list)
+                if (!send_cbs)
                         RETURN(0);
 
                 compat = 0;
                 if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, work_list);
+                        ldlm_add_ast_work_item(lock, req, NULL, 0);
         }
 
         RETURN(compat);
@@ -82,8 +82,7 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
   *   - the caller has NOT initialized req->lr_tmp, so we must
   *   - must call this function with the ns lock held once */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
-                                int first_enq, ldlm_error_t *err,
-                                struct list_head *work_list)
+                                int first_enq, ldlm_error_t *err)
 {
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
@@ -91,25 +90,27 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
         ENTRY;
 
         LASSERT(list_empty(&res->lr_converting));
-        check_res_locked(res);
 
         if (!first_enq) {
-                LASSERT(work_list != NULL);
-                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
+                LASSERT(res->lr_tmp != NULL);
+                rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
-                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
+                rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0);
                 if (!rc)
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, work_list);
+                ldlm_grant_lock(lock, NULL, 0, 1);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
  restart:
-        rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
-        rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
+        LASSERT(res->lr_tmp == NULL);
+        res->lr_tmp = &rpc_list;
+        rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1);
+        rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1);
+        res->lr_tmp = NULL;
 
         if (rc != 2) {
                 /* If either of the compat_queue()s returned 0, then we
@@ -120,15 +121,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
                 if (list_empty(&lock->l_res_link))
                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                unlock_res(res);
-                rc = ldlm_run_bl_ast_work(&rpc_list);
-                lock_res(res);
+                l_unlock(&res->lr_namespace->ns_lock);
+                rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
+                l_lock(&res->lr_namespace->ns_lock);
                 if (rc == -ERESTART)
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         } else {
                 ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL);
+                ldlm_grant_lock(lock, NULL, 0, 0);
         }
         RETURN(0);
 }
index 2967ab8..7f1e91c 100644 (file)
@@ -50,6 +50,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                             ldlm_error_t *err);
 
+
+/* ldlm_inodebits.c */
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
+                                int first_enq, ldlm_error_t *err);
+
 /* l_lock.c */
 void l_check_ns_lock(struct ldlm_namespace *ns);
 void l_check_no_ns_lock(struct ldlm_namespace *ns);
index a15a5ea..28a4a2f 100644 (file)
@@ -55,6 +55,7 @@ char *ldlm_typename[] = {
         [LDLM_PLAIN] "PLN",
         [LDLM_EXTENT] "EXT",
         [LDLM_FLOCK] "FLK",
+        [LDLM_IBITS] "IBT",
 };
 
 char *ldlm_it2str(int it)
@@ -91,6 +92,7 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = {
 #ifdef __KERNEL__
         [LDLM_FLOCK] ldlm_process_flock_lock,
 #endif
+        [LDLM_IBITS] ldlm_process_inodebits_lock,
 };
 
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
@@ -601,6 +603,14 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
 
+                /* We match if we have existing lock with same or wider set
+                   of bits. */
+                if (lock->l_resource->lr_type == LDLM_IBITS &&
+                     ((lock->l_policy_data.l_inodebits.bits &
+                      policy->l_inodebits.bits) !=
+                      policy->l_inodebits.bits))
+                        continue;
+
                 if (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))
                         continue;
 
@@ -1217,6 +1227,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
                        lock->l_policy_data.l_flock.pid,
                        lock->l_policy_data.l_flock.start,
                        lock->l_policy_data.l_flock.end);
+       else if (lock->l_resource->lr_type == LDLM_IBITS)
+                CDEBUG(level, "  Bits: "LPX64"\n",
+                       lock->l_policy_data.l_inodebits.bits);
 }
 
 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
index 0263e33..591b434 100644 (file)
@@ -78,14 +78,15 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp);
         struct ldlm_res_id res_id =
                 { .name = {st->st_ino, (__u64)lli->lli_st_generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
         ENTRY;
 
         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                             &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh);
         if (!rc) {
                 llu_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
 
-                rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
+                rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &it, LCK_PR,
                                  &data, &lockh, NULL, 0,
                                  ldlm_completion_ast, llu_mdc_blocking_ast,
                                  inode, LDLM_FL_CANCEL_ON_BLOCK);
index 49930fc..07f3934 100644 (file)
@@ -149,6 +149,7 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock,
                 struct inode *inode = llu_inode_from_lock(lock);
                 struct llu_inode_info *lli;
                 struct intnl_stat *st;
+                __u64 bits = lock->l_policy_data.l_inodebits.bits;
 
                 /* Invalidate all dentries associated with this inode */
                 if (inode == NULL)
@@ -157,14 +158,16 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock,
                 lli =  llu_i2info(inode);
                 st = llu_i2stat(inode);
 
-                clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
+                if (bits & MDS_INODELOCK_UPDATE)
+                        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
 
                 if (lock->l_resource->lr_name.name[0] != st->st_ino ||
                     lock->l_resource->lr_name.name[1] !=lli->lli_st_generation){
                         LDLM_ERROR(lock, "data mismatch with ino %llu/%lu",
                                   (long long)st->st_ino,lli->lli_st_generation);
                 }
-                if (S_ISDIR(st->st_mode)) {
+                if (S_ISDIR(st->st_mode) &&
+                    (bits & MDS_INODELOCK_UPDATE)) {
                         CDEBUG(D_INODE, "invalidating inode %llu\n",
                                (long long)st->st_ino);
 
index 422b658..2ba2f36 100644 (file)
@@ -380,13 +380,14 @@ static struct inode* llu_new_inode(struct filesys *fs,
         return inode;
 }
 
-static int llu_have_md_lock(struct inode *inode)
+static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
 {
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         struct llu_inode_info *lli = llu_i2info(inode);
         struct lustre_handle lockh;
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obddev;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
         int flags;
         ENTRY;
 
@@ -400,14 +401,14 @@ static int llu_have_md_lock(struct inode *inode)
 
         /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PR, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PR, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PR);
                 RETURN(1);
         }
 
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PW, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PW, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PW);
                 RETURN(1);
         }
@@ -424,7 +425,7 @@ static int llu_inode_revalidate(struct inode *inode)
                 RETURN(0);
         }
 
-        if (!llu_have_md_lock(inode)) {
+        if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) {
                 struct lustre_md md;
                 struct ptlrpc_request *req = NULL;
                 struct llu_sb_info *sbi = llu_i2sbi(inode);
index cade821..dd5c42d 100644 (file)
@@ -211,10 +211,11 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
         struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
         struct address_space *mapping = dir->i_mapping;
         struct page *page;
+        ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
         int rc;
 
         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                             &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
         if (!rc) {
                 struct lookup_intent it = { .it_op = IT_READDIR };
                 struct ptlrpc_request *request;
@@ -222,8 +223,8 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 
                 ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0);
 
-                rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
-                                 LCK_PR, &data, &lockh, NULL, 0,
+                rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_IBITS, &it,
+                                 LCK_CR, &data, &lockh, NULL, 0,
                                  ldlm_completion_ast, ll_mdc_blocking_ast, dir,
                                  0);
 
@@ -251,7 +252,7 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
         }
 
 out_unlock:
-        ldlm_lock_decref(&lockh, LCK_PR);
+        ldlm_lock_decref(&lockh, LCK_CR);
         return page;
 
 fail:
index fc64688..2a2827b 100644 (file)
@@ -152,7 +152,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
 
         ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR);
 
-        rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
+        rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, itp, LCK_PW, &data,
                          &lockh, lmm, lmmsize, ldlm_completion_ast,
                          ll_mdc_blocking_ast, NULL, 0);
         if (rc < 0)
@@ -1481,6 +1481,7 @@ static int ll_have_md_lock(struct dentry *de)
         struct lustre_handle lockh;
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obddev;
+        ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
         int flags;
         ENTRY;
 
@@ -1493,19 +1494,12 @@ static int ll_have_md_lock(struct dentry *de)
 
         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
 
-        /* FIXME use LDLM_FL_TEST_LOCK instead */
-        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PR, &lockh)) {
-                ldlm_lock_decref(&lockh, LCK_PR);
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
                 RETURN(1);
         }
 
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PW, &lockh)) {
-                ldlm_lock_decref(&lockh, LCK_PW);
-                RETURN(1);
-        }
         RETURN(0);
 }
 
index 4511658..2ee2f85 100644 (file)
@@ -150,28 +150,33 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 break;
         case LDLM_CB_CANCELING: {
                 struct inode *inode = ll_inode_from_lock(lock);
+                __u64 bits = lock->l_policy_data.l_inodebits.bits;
 
                 /* Invalidate all dentries associated with this inode */
                 if (inode == NULL)
                         break;
 
-                clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
-                          &(ll_i2info(inode)->lli_flags));
-
                 if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
                     lock->l_resource->lr_name.name[1] != inode->i_generation) {
                         LDLM_ERROR(lock, "data mismatch with ino %lu/%u (%p)",
                                    inode->i_ino, inode->i_generation, inode);
                 }
-                if (S_ISDIR(inode->i_mode)) {
+
+                if (bits & MDS_INODELOCK_UPDATE)
+                        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
+                                  &(ll_i2info(inode)->lli_flags));
+
+                
+                if (S_ISDIR(inode->i_mode) &&
+                     (bits & MDS_INODELOCK_UPDATE))  {
                         CDEBUG(D_INODE, "invalidating inode %lu\n",
                                inode->i_ino);
-
                         truncate_inode_pages(inode->i_mapping, 0);
                 }
 
                 if (inode->i_sb->s_root &&
-                    inode != inode->i_sb->s_root->d_inode)
+                    inode != inode->i_sb->s_root->d_inode &&
+                    (bits & MDS_INODELOCK_LOOKUP))
                         ll_unhash_aliases(inode);
                 iput(inode);
                 break;
@@ -375,7 +380,6 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
         RETURN(0);
 }
 
-
 static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                                    struct lookup_intent *it, int lookup_flags)
 {
index 202afbf..4363263 100644 (file)
@@ -58,9 +58,9 @@ static int it_to_lock_mode(struct lookup_intent *it)
 {
         /* CREAT needs to be tested before open (both could be set) */
         if (it->it_op & IT_CREAT)
-                return LCK_PW;
+                return LCK_CW;
         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
-                return LCK_PR;
+                return LCK_CR;
 
         LBUG();
         RETURN(-EINVAL);
@@ -241,6 +241,7 @@ int mdc_enqueue(struct obd_export *exp,
         struct obd_device *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id =
                 { .name = {data->fid1.id, data->fid1.generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
         int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repsize[4] = {sizeof(struct ldlm_reply),
@@ -296,6 +297,7 @@ int mdc_enqueue(struct obd_export *exp,
         } else if (it->it_op & IT_UNLINK) {
                 size[2] = sizeof(struct mds_rec_unlink);
                 size[3] = data->namelen + 1;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
                                       size, NULL);
                 if (!req)
@@ -315,6 +317,9 @@ int mdc_enqueue(struct obd_export *exp,
                 size[2] = sizeof(struct mds_body);
                 size[3] = data->namelen + 1;
 
+                if (it->it_op & IT_GETATTR)
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
                                       size, NULL);
                 if (!req)
@@ -330,6 +335,7 @@ int mdc_enqueue(struct obd_export *exp,
                 reply_buffers = 3;
                 req->rq_replen = lustre_msg_size(3, repsize);
         } else if (it->it_op == IT_READDIR) {
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (!req)
@@ -345,7 +351,7 @@ int mdc_enqueue(struct obd_export *exp,
 
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
-                              lock_type, NULL, lock_mode, &flags, cb_blocking,
+                              lock_type,&policy,lock_mode, &flags,cb_blocking,
                               cb_completion, NULL, cb_data, NULL, 0, NULL,
                               lockh);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
@@ -495,16 +501,25 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
                 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
                                                      op_data->fid2.generation}};
                 struct lustre_handle lockh;
-                int mode = LCK_PR;
+                ldlm_policy_data_t policy;
+                int mode = LCK_CR;
 
+                policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+                                MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                      LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_PLAIN, NULL, LCK_PR, &lockh);
+                                     LDLM_IBITS, &policy, LCK_CR, &lockh);
+                if (!rc) {
+                        mode = LCK_CW;
+                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                             LDLM_FL_BLOCK_GRANTED, &res_id,
+                                             LDLM_IBITS, &policy, LCK_CW, &lockh);
+                }
                 if (!rc) {
-                        mode = LCK_PW;
+                        mode = LCK_PR;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                              LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_PLAIN, NULL, LCK_PW, &lockh);
+                                             LDLM_IBITS, &policy, LCK_PR, &lockh);
                 }
                 if (rc) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
@@ -524,7 +539,7 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
          * never dropped its reference, so the refcounts are all OK */
         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
 
-                rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it),
+                rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
                                  op_data, &lockh, lmm, lmmsize,
                                  ldlm_completion_ast, cb_blocking, NULL,
                                  extra_lock_flags);
@@ -599,11 +614,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
          * intent_finish has performed the iget().) */
         lock = ldlm_handle2lock(&lockh);
         if (lock) {
+                ldlm_policy_data_t policy = lock->l_policy_data;
                 LDLM_DEBUG(lock, "matching against this");
                 LDLM_LOCK_PUT(lock);
                 memcpy(&old_lock, &lockh, sizeof(lockh));
                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
-                                    LDLM_PLAIN, NULL, LCK_NL, &old_lock)) {
+                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
                         ldlm_lock_decref_and_cancel(&lockh,
                                                     it->d.lustre.it_lock_mode);
                         memcpy(&lockh, &old_lock, sizeof(old_lock));
index b809ca7..659c381 100644 (file)
@@ -163,12 +163,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen)
+                                     char *name, int namelen, __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
         struct ldlm_res_id res_id = { .name = {0} };
         int flags = 0, rc;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; 
         ENTRY;
 
         if (IS_ERR(de))
@@ -177,7 +178,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
-                              LDLM_PLAIN, NULL, lock_mode, &flags,
+                              LDLM_IBITS, &policy, lock_mode, &flags,
                               ldlm_blocking_ast, ldlm_completion_ast,
                               NULL, NULL, NULL, 0, NULL, lockh);
         if (rc != ELDLM_OK) {
@@ -628,8 +629,8 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         return(rc);
 }
 
-static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags,
-                            struct lustre_handle *child_lockh)
+static int mds_getattr_name(int offset, struct ptlrpc_request *req,
+                            int child_part, struct lustre_handle *child_lockh)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
@@ -707,13 +708,26 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags,
         }
 
         if (resent_req == 0) {
+            if (name) {
                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
                                                  &parent_lockh, &dparent,
-                                                 LCK_PR, name, namesize,
-                                                 child_lockh, &dchild, LCK_PR,
-                                                 flags);
-                if (rc)
-                        GOTO(cleanup, rc);
+                                                 LCK_CR,
+                                                 MDS_INODELOCK_UPDATE,
+                                                 name, namesize,
+                                                 child_lockh, &dchild, LCK_CR,
+                                                 child_part);
+            } else {
+                        /* For revalidate by fid we always take UPDATE lock */
+                        dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
+                                                       LCK_CR, child_lockh,
+                                                       NULL, 0,
+                                                       MDS_INODELOCK_UPDATE);
+                        LASSERT(dchild);
+                        if (IS_ERR(dchild))
+                                rc = PTR_ERR(dchild);
+            }
+            if (rc)
+                    GOTO(cleanup, rc);
         } else {
                 struct ldlm_lock *granted_lock;
                 struct ll_fid child_fid;
@@ -760,8 +774,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags,
         case 2:
                 if (resent_req == 0) {
                         if (rc && dchild->d_inode)
-                                ldlm_lock_decref(child_lockh, LCK_PR);
-                        ldlm_lock_decref(&parent_lockh, LCK_PR);
+                                ldlm_lock_decref(child_lockh, LCK_CR);
+                        ldlm_lock_decref(&parent_lockh, LCK_CR);
                         l_dput(dparent);
                 }
                 l_dput(dchild);
@@ -1206,11 +1220,11 @@ int mds_handle(struct ptlrpc_request *req)
                  * want to cancel.
                  */
                 lockh.cookie = 0;
-                rc = mds_getattr_name(0, req, 0, &lockh);
+                rc = mds_getattr_name(0, req, MDS_INODELOCK_UPDATE, &lockh);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
                 if (rc == 0 && lockh.cookie)
-                        ldlm_lock_decref(&lockh, LCK_PR);
+                        ldlm_lock_decref(&lockh, LCK_CR);
                 break;
         }
         case MDS_STATFS:
@@ -1882,6 +1896,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         struct ldlm_reply *rep;
         struct lustre_handle lockh = { 0 };
         struct ldlm_lock *new_lock = NULL;
+        int getattr_part = MDS_INODELOCK_UPDATE;
         int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
                                           sizeof(struct mds_body),
                                           mds->mds_max_mdsize,
@@ -1933,13 +1948,15 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 #endif 
                         RETURN(ELDLM_LOCK_ABORTED);
                 break;
-        case IT_GETATTR:
         case IT_LOOKUP:
+                        getattr_part = MDS_INODELOCK_LOOKUP;
+        case IT_GETATTR:
+                        getattr_part |= MDS_INODELOCK_LOOKUP;
         case IT_READDIR:
                 fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
                 rep->lock_policy_res2 = mds_getattr_name(offset, req,
-                                                     flags & LDLM_INHERIT_FLAGS,
-                                                     &lockh);
+                                                         getattr_part, &lockh);
+
                 /* FIXME: LDLM can set req->rq_status. MDS sets
                    policy_res{1,2} with disposition and status.
                    - replay: returns 0 & req->status is old status
index 78ce7cb..606cf1b 100644 (file)
@@ -99,12 +99,14 @@ static inline void mds_inode_unset_orphan(struct inode *inode)
 }
 
 /* mds/mds_reint.c */
-int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2);
+int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
+           ldlm_policy_data_t *p1, ldlm_policy_data_t *p2);
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
+                          ldlm_policy_data_t *p1_policy,
                           struct ldlm_res_id *p2_res_id,
                           struct lustre_handle *p2_lockh, int p2_lock_mode,
-                          int p2_lock_flags);
+                          ldlm_policy_data_t *p2_policy);
 void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error);
 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
@@ -114,10 +116,11 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
+                                __u64 parent_lockpart,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
-                                int child_lock_flags);
+                                __u64 child_lockpart);
 int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
                        struct lustre_handle *child_lockh);
 int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
index f4c25bc..9964a16 100644 (file)
@@ -818,7 +818,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         struct mds_export_data *med;
         struct lustre_handle parent_lockh;
         int rc = 0, cleanup_phase = 0, acc_mode, created = 0;
-        int parent_mode = LCK_PR;
+        int parent_mode = LCK_CR;
         void *handle = NULL;
         struct dentry_params dp;
         uid_t parent_uid = 0;
@@ -878,10 +878,11 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         /* Step 1: Find and lock the parent */
         if (rec->ur_flags & MDS_OPEN_CREAT)
-                parent_mode = LCK_PW;
+                parent_mode = LCK_EX;
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
                                         &parent_lockh, rec->ur_name,
-                                        rec->ur_namelen - 1);
+                                        rec->ur_namelen - 1,
+                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 if (rc != -ENOENT) {
index 49cd470..b8a9b9e 100644 (file)
@@ -465,8 +465,12 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
                         GOTO(cleanup, rc = -EROFS);
         } else {
-                de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                           &lockh, NULL, 0);
+              __u64 lockpart = MDS_INODELOCK_UPDATE;
+              if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
+                        lockpart |= MDS_INODELOCK_LOOKUP;
+
+                de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
+                                           &lockh, NULL, 0, lockpart);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
                 locked = 1;
@@ -627,9 +631,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 l_dput(de);
                 if (locked) {
                         if (rc) {
-                                ldlm_lock_decref(&lockh, LCK_PW);
+                                ldlm_lock_decref(&lockh, LCK_EX);
                         } else {
-                                ptlrpc_save_lock (req, &lockh, LCK_PW);
+                                ptlrpc_save_lock (req, &lockh, LCK_EX);
                         }
                 }
         case 0:
@@ -705,8 +709,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
                 GOTO(cleanup, rc = -ESTALE);
 
-        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh,
-                                        rec->ur_name, rec->ur_namelen - 1);
+        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh,
+                                        rec->ur_name, rec->ur_namelen - 1,
+                                        MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 if (rc != -ENOENT)
@@ -897,9 +902,9 @@ cleanup:
                 l_dput(dchild);
         case 1: /* locked parent dentry */
                 if (rc) {
-                        ldlm_lock_decref(&lockh, LCK_PW);
+                        ldlm_lock_decref(&lockh, LCK_EX);
                 } else {
-                        ptlrpc_save_lock (req, &lockh, LCK_PW);
+                        ptlrpc_save_lock (req, &lockh, LCK_EX);
                 }
                 l_dput(dparent);
         case 0:
@@ -916,7 +921,8 @@ cleanup:
         return 0;
 }
 
-int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
+int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2,
+           ldlm_policy_data_t *p1, ldlm_policy_data_t *p2)
 {
         int i;
 
@@ -933,6 +939,10 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
                 if (res1->name[i] < res2->name[i])
                         return 0;
         }
+        if (!p1 || !p2)
+                return 0;
+        if (memcmp(p1, p2, sizeof(*p1)) < 0)
+                return 1;
         return 0;
 }
 
@@ -944,15 +954,16 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
  * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
+                          ldlm_policy_data_t *p1_policy,
                           struct ldlm_res_id *p2_res_id,
                           struct lustre_handle *p2_lockh, int p2_lock_mode,
-                          int p2_lock_flags)
+                          ldlm_policy_data_t *p2_policy)
 {
         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
-        int flags[2] = { LDLM_FL_LOCAL_ONLY, LDLM_FL_LOCAL_ONLY | p2_lock_flags };
-        int rc;
+        ldlm_policy_data_t *policies[2] = {p1_policy, p2_policy};
+        int rc, flags;
         ENTRY;
 
         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
@@ -960,35 +971,38 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0]);
 
-        if (res_gt(p1_res_id, p2_res_id)) {
+        if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) {
                 handles[1] = p1_lockh;
                 handles[0] = p2_lockh;
                 res_id[1] = p1_res_id;
                 res_id[0] = p2_res_id;
                 lock_modes[1] = p1_lock_mode;
                 lock_modes[0] = p2_lock_mode;
-                flags[1] = LDLM_FL_LOCAL_ONLY;
-                flags[0] = p2_lock_flags | LDLM_FL_LOCAL_ONLY;
+                policies[1] = p1_policy;
+                policies[0] = p2_policy;
         }
 
         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0]);
 
+        flags = LDLM_FL_LOCAL_ONLY;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
-                              LDLM_PLAIN, NULL, lock_modes[0], &flags[0],
+                              LDLM_IBITS, policies[0], lock_modes[0], &flags,
                               ldlm_blocking_ast, ldlm_completion_ast,
                               NULL, NULL, NULL, 0, NULL, handles[0]);
         if (rc != ELDLM_OK)
                 RETURN(-EIO);
         ldlm_lock_dump_handle(D_OTHER, handles[0]);
 
-        if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
+        if (!memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) && 
+            (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) {
                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
                 ldlm_lock_addref(handles[1], lock_modes[1]);
         } else if (res_id[1]->name[0] != 0) {
+                flags = LDLM_FL_LOCAL_ONLY;
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      *res_id[1], LDLM_PLAIN, NULL,
-                                      lock_modes[1], &flags[1],
+                                      *res_id[1], LDLM_IBITS, policies[1],
+                                      lock_modes[1], &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
                                       NULL, NULL, NULL, 0, NULL, handles[1]);
                 if (rc != ELDLM_OK) {
@@ -1003,12 +1017,16 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
 
 int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                            struct lustre_handle *p1_lockh, int p1_lock_mode,
+                           ldlm_policy_data_t *p1_policy, 
                            struct ldlm_res_id *p2_res_id,
                            struct lustre_handle *p2_lockh, int p2_lock_mode,
+                           ldlm_policy_data_t *p2_policy, 
                            struct ldlm_res_id *c1_res_id,
                            struct lustre_handle *c1_lockh, int c1_lock_mode,
+                           ldlm_policy_data_t *c1_policy, 
                            struct ldlm_res_id *c2_res_id,
-                           struct lustre_handle *c2_lockh, int c2_lock_mode)
+                           struct lustre_handle *c2_lockh, int c2_lock_mode,
+                           ldlm_policy_data_t *c2_policy)
 {
         struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
                                           c1_res_id, c2_res_id };
@@ -1016,6 +1034,8 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                                                  c1_lockh, c2_lockh };
         int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
                               c1_lock_mode, c2_lock_mode };
+        ldlm_policy_data_t *policies[5] = {p1_policy, p2_policy,
+                                           c1_policy, c2_policy};
         int rc, i, j, sorted, flags;
         ENTRY;
 
@@ -1029,13 +1049,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 dlm_handles[4] = dlm_handles[i];
                 res_id[4] = res_id[i];
                 lock_modes[4] = lock_modes[i];
+                policies[4] = policies[i];
 
                 sorted = 0;
                 do {
-                        if (res_gt(res_id[j], res_id[4])) {
+                        if (res_gt(res_id[j], res_id[4], policies[j],
+                                   policies[4])) {
                                 dlm_handles[j + 1] = dlm_handles[j];
                                 res_id[j + 1] = res_id[j];
                                 lock_modes[j + 1] = lock_modes[j];
+                                policies[j + 1] = policies[j];
                                 j--;
                         } else {
                                 sorted = 1;
@@ -1045,6 +1068,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 dlm_handles[j + 1] = dlm_handles[4];
                 res_id[j + 1] = res_id[4];
                 lock_modes[j + 1] = lock_modes[4];
+                policies[j + 1] = policies[4];
         }
 
         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
@@ -1057,13 +1081,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                 if (res_id[i]->name[0] == 0)
                         break;
                 if (i != 0 &&
-                    memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
+                    !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) &&
+                    (policies[i]->l_inodebits.bits &
+                     policies[i-1]->l_inodebits.bits)) {
                         memcpy(dlm_handles[i], dlm_handles[i-1],
                                sizeof(*(dlm_handles[i])));
                         ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
                 } else {
                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              *res_id[i], LDLM_PLAIN, NULL,
+                                              *res_id[i], LDLM_IBITS,
+                                              policies[i],
                                               lock_modes[i], &flags,
                                               ldlm_blocking_ast,
                                               ldlm_completion_ast, NULL, NULL,
@@ -1101,6 +1128,7 @@ static int mds_verify_child(struct obd_device *obd,
                             struct ldlm_res_id *child_res_id,
                             struct lustre_handle *child_lockh,
                             struct dentry **dchildp, int child_mode,
+                            ldlm_policy_data_t *child_policy,
                             const char *name, int namelen,
                             struct ldlm_res_id *maxres)
 {
@@ -1140,8 +1168,8 @@ static int mds_verify_child(struct obd_device *obd,
                 child_res_id->name[0] = dchild->d_inode->i_ino;
                 child_res_id->name[1] = dchild->d_inode->i_generation;
 
-                if (res_gt(parent_res_id, child_res_id) ||
-                    res_gt(maxres, child_res_id)) {
+                if (res_gt(parent_res_id, child_res_id, NULL, NULL) ||
+                    res_gt(maxres, child_res_id, NULL, NULL)) {
                         CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
                                child_res_id->name[0], parent_res_id->name[0],
                                maxres->name[0]);
@@ -1149,7 +1177,7 @@ static int mds_verify_child(struct obd_device *obd,
                 }
 
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      *child_res_id, LDLM_PLAIN, NULL,
+                                      *child_res_id, LDLM_IBITS, child_policy,
                                       child_mode, &flags, ldlm_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
                                       NULL, child_lockh);
@@ -1177,13 +1205,16 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
+                                __u64 parent_lockpart,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
-                                int child_lock_flags)
+                                __u64 child_lockpart)
 {
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct ldlm_res_id parent_res_id = { .name = {0} };
+        ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }};
+        ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }};
         struct inode *inode;
         int rc = 0, cleanup_phase = 0;
         ENTRY;
@@ -1235,8 +1266,9 @@ retry_locks:
         /* Step 3: Lock parent and child in resource order.  If child doesn't
          *         exist, we still have to lock the parent and re-lookup. */
         rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
+                                   &parent_policy,
                                    &child_res_id, child_lockh, child_mode,
-                                   child_lock_flags);
+                                   &child_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1248,7 +1280,7 @@ retry_locks:
         /* Step 4: Re-lookup child to verify it hasn't changed since locking */
         rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
                               parent_mode, &child_res_id, child_lockh, dchildp,
-                              child_mode, name, namelen, &parent_res_id);
+                              child_mode,&child_policy, name, namelen, &parent_res_id);
         if (rc > 0)
                 goto retry_locks;
         if (rc < 0) {
@@ -1387,9 +1419,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ENOENT);
 
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
-                                         &parent_lockh, &dparent, LCK_PW,
+                                         &parent_lockh, &dparent, LCK_EX,
+                                         MDS_INODELOCK_UPDATE, 
                                          rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &dchild, LCK_EX, 0);
+                                         &child_lockh, &dchild, LCK_EX, 
+                                         MDS_INODELOCK_FULL);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1566,9 +1600,9 @@ cleanup:
                 ldlm_lock_decref(&child_lockh, LCK_EX);
         case 1: /* child and parent dentry, parent lock */
                 if (rc)
-                        ldlm_lock_decref(&parent_lockh, LCK_PW);
+                        ldlm_lock_decref(&parent_lockh, LCK_EX);
                 else
-                        ptlrpc_save_lock(req, &parent_lockh, LCK_PW);
+                        ptlrpc_save_lock(req, &parent_lockh, LCK_EX);
                 l_dput(dchild);
                 l_dput(dchild);
                 l_dput(dparent);
@@ -1597,6 +1631,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
         struct ldlm_res_id src_res_id = { .name = {0} };
         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
+        ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
+        ldlm_policy_data_t tgt_dir_policy =
+                                       {.l_inodebits = {MDS_INODELOCK_UPDATE}};
         int rc = 0, cleanup_phase = 0;
         ENTRY;
 
@@ -1638,7 +1675,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
 
         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
-                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, 0);
+                                   &src_policy,
+                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX,
+                                   &tgt_dir_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1754,6 +1793,12 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         struct ldlm_res_id p2_res_id = { .name = {0} };
         struct ldlm_res_id c1_res_id = { .name = {0} };
         struct ldlm_res_id c2_res_id = { .name = {0} };
+        ldlm_policy_data_t p_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}};
+        /* Only dentry should disappear, but the inode itself would be
+           intact otherwise. */
+        ldlm_policy_data_t c1_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP}};
+        /* If something is going to be replaced, both dentry and inode locks are           needed */
+        ldlm_policy_data_t c2_policy = {.l_inodebits = {MDS_INODELOCK_FULL}};
         struct ldlm_res_id *maxres_src, *maxres_tgt;
         struct inode *inode;
         int rc = 0, cleanup_phase = 0;
@@ -1836,15 +1881,19 @@ retry_locks:
         maxres_tgt = &p2_res_id;
         cleanup_phase = 4; /* target dentry */
 
-        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
+        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id,NULL,NULL))
                 maxres_src = &c1_res_id;
-        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
+        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id,NULL,NULL))
                 maxres_tgt = &c2_res_id;
 
         rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
+                                    &p_policy,
                                     &p2_res_id, &dlm_handles[1], parent_mode,
+                                    &p_policy,
                                     &c1_res_id, &dlm_handles[2], child_mode,
-                                    &c2_res_id, &dlm_handles[3], child_mode);
+                                    &c1_policy,
+                                    &c2_res_id, &dlm_handles[3], child_mode,
+                                    &c2_policy);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1853,7 +1902,8 @@ retry_locks:
         /* Step 6a: Re-lookup source child to verify it hasn't changed */
         rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
                               parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
-                              child_mode, old_name, old_len, maxres_tgt);
+                              child_mode, &c1_policy, old_name, old_len,
+                              maxres_tgt);
         if (rc) {
                 if (c2_res_id.name[0] != 0)
                         ldlm_lock_decref(&dlm_handles[3], child_mode);
@@ -1870,7 +1920,8 @@ retry_locks:
         /* Step 6b: Re-lookup target child to verify it hasn't changed */
         rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
                               parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
-                              child_mode, new_name, new_len, maxres_src);
+                              child_mode, &c2_policy, new_name, new_len,
+                              maxres_src);
         if (rc) {
                 ldlm_lock_decref(&dlm_handles[2], child_mode);
                 ldlm_lock_decref(&dlm_handles[0], parent_mode);
@@ -1933,7 +1984,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
-                                             rec->ur_fid2, &de_tgtdir, LCK_PW,
+                                             rec->ur_fid2, &de_tgtdir, LCK_EX,
                                              rec->ur_name, rec->ur_namelen,
                                              &de_old, rec->ur_tgt,
                                              rec->ur_tgtlen, &de_new,
@@ -2058,14 +2109,14 @@ cleanup:
                         if (lock_count == 4)
                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
                         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
-                        ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
-                        ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
+                        ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
+                        ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
                 } else {
                         if (lock_count == 4)
                                 ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX);
                         ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
-                        ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
-                        ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
+                        ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_EX);
+                        ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_EX);
                 }
                 l_dput(de_new);
                 l_dput(de_old);
index a96d397..69a7af5 100644 (file)
@@ -208,6 +208,7 @@ int mds_setxattr_internal(struct ptlrpc_request *req, struct mds_body *body)
         char *xattr = NULL;
         int xattrlen;
         int rc = -EOPNOTSUPP, err = 0;
+        __u64 lockpart;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
@@ -218,8 +219,14 @@ int mds_setxattr_internal(struct ptlrpc_request *req, struct mds_body *body)
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
-        de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PW,
+        lockpart = MDS_INODELOCK_UPDATE;
+
+/*
+        de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX,
                                    &lockh, NULL, 0);
+*/
+        de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX,
+                                   &lockh, NULL, 0, lockpart);
         if (IS_ERR(de))
                 GOTO(out, rc = PTR_ERR(de));
 
@@ -287,9 +294,9 @@ out_trans:
 out_dput:
         l_dput(de);
         if (rc)
-                ldlm_lock_decref(&lockh, LCK_PW);
+                ldlm_lock_decref(&lockh, LCK_EX);
         else
-                ptlrpc_save_lock (req, &lockh, LCK_PW);
+                ptlrpc_save_lock (req, &lockh, LCK_EX);
 
         if (err && !rc)
                 rc = err;
index 002e5ef..ac1a021 100644 (file)
@@ -8,7 +8,7 @@ ldlm_objs := $(LDLM)l_lock.o $(LDLM)ldlm_lock.o
 ldlm_objs += $(LDLM)ldlm_resource.o $(LDLM)ldlm_lib.o
 ldlm_objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o
 ldlm_objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o
-ldlm_objs += $(LDLM)ldlm_flock.o
+ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o
 ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
 ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
 ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
index c77dcfb..ab76f7a 100644 (file)
@@ -12,6 +12,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \
        $(top_srcdir)/lustre/ldlm/ldlm_request.c        \
        $(top_srcdir)/lustre/ldlm/ldlm_lockd.c          \
        $(top_srcdir)/lustre/ldlm/ldlm_internal.h       \
+       $(top_srcdir)/lustre/ldlm/ldlm_inodebits.c      \
        $(top_srcdir)/lustre/ldlm/ldlm_flock.c
 
 COMMON_SOURCES =  client.c recover.c connection.c niobuf.c pack_generic.c   \