Whamcloud - gitweb
Branch HEAD
authorvitaly <vitaly>
Thu, 25 Oct 2007 10:55:29 +0000 (10:55 +0000)
committervitaly <vitaly>
Thu, 25 Oct 2007 10:55:29 +0000 (10:55 +0000)
b=13622
i=tappro
i=green

1) make class_handle_hash_back() to work;
2) ll_update_inode() checks UPDATE ibit lock still presents before setting LLIF_MDS_SIZE_LOCK;
3) ldlm_lock_match() returns the matched mode instead of 1; pass bitwise sets of modes into;

21 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_handles.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lock.c
lustre/liblustre/dir.c
lustre/liblustre/llite_lib.h
lustre/liblustre/namei.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/obdclass/lustre_handles.c
lustre/osc/osc_request.c

index 873a6a1..b48efef 100644 (file)
@@ -625,10 +625,10 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
-                    const struct ldlm_res_id *,
-                    ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode,
-                    struct lustre_handle *);
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            const struct ldlm_res_id *, ldlm_type_t type,
+                            ldlm_policy_data_t *, ldlm_mode_t mode,
+                            struct lustre_handle *);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
index cf6b403..2fefbf2 100644 (file)
@@ -32,10 +32,12 @@ struct portals_handle {
 
         /* newly added fields to handle the RCU issue. -jxiong */
         spinlock_t h_lock;
-        unsigned int h_size;
         void *h_ptr;
         void (*h_free_cb)(void *, size_t);
         struct rcu_head h_rcu;
+        unsigned int h_size;
+        __u8 h_in:1;
+        __u8 h_unused[3];
 };
 #define RCU2HANDLE(rcu)    container_of(rcu, struct portals_handle, h_rcu)
 
index 88a500a..9ef5cce 100644 (file)
@@ -1302,9 +1302,10 @@ struct md_ops {
                                         struct obd_client_handle *);
         int (*m_set_lock_data)(struct obd_export *, __u64 *, void *);
 
-        int (*m_lock_match)(struct obd_export *, int, const struct lu_fid *,
-                            ldlm_type_t, ldlm_policy_data_t *, ldlm_mode_t,
-                            struct lustre_handle *);
+        ldlm_mode_t (*m_lock_match)(struct obd_export *, int,
+                                    const struct lu_fid *, ldlm_type_t,
+                                    ldlm_policy_data_t *, ldlm_mode_t,
+                                    struct lustre_handle *);
 
         int (*m_cancel_unused)(struct obd_export *, const struct lu_fid *,
                                ldlm_policy_data_t *, ldlm_mode_t, int flags,
index 64df844..7f29480 100644 (file)
@@ -1945,10 +1945,12 @@ static inline int md_cancel_unused(struct obd_export *exp,
         RETURN(rc);
 }
 
-static inline int md_lock_match(struct obd_export *exp, int flags,
-                                const struct lu_fid *fid, ldlm_type_t type,
-                                ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                                struct lustre_handle *lockh)
+static inline ldlm_mode_t md_lock_match(struct obd_export *exp, int flags,
+                                        const struct lu_fid *fid,
+                                        ldlm_type_t type,
+                                        ldlm_policy_data_t *policy,
+                                        ldlm_mode_t mode,
+                                        struct lustre_handle *lockh)
 {
         ENTRY;
         EXP_CHECK_MD_OP(exp, lock_match);
index c9e3c84..e422d98 100644 (file)
@@ -892,7 +892,8 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 
 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
  * comment above ldlm_lock_match */
-static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
+static struct ldlm_lock *search_queue(struct list_head *queue,
+                                      ldlm_mode_t *mode,
                                       ldlm_policy_data_t *policy,
                                       struct ldlm_lock *old_lock, int flags)
 {
@@ -900,6 +901,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
         struct list_head *tmp;
 
         list_for_each(tmp, queue) {
+                ldlm_mode_t match;
+
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (lock == old_lock)
@@ -918,8 +921,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
-                if (!(lock->l_req_mode & mode))
+                if (!(lock->l_req_mode & *mode))
                         continue;
+                match = lock->l_req_mode;
 
                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
                     (lock->l_policy_data.l_extent.start >
@@ -927,7 +931,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
                         continue;
 
-                if (unlikely(mode == LCK_GROUP) &&
+                if (unlikely(match == LCK_GROUP) &&
                     lock->l_resource->lr_type == LDLM_EXTENT &&
                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
                         continue;
@@ -951,8 +955,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                         LDLM_LOCK_GET(lock);
                         ldlm_lock_touch_in_lru(lock);
                 } else {
-                        ldlm_lock_addref_internal_nolock(lock, mode);
+                        ldlm_lock_addref_internal_nolock(lock, match);
                 }
+                *mode = match;
                 return lock;
         }
 
@@ -991,10 +996,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
  * caller code unchanged), the context failure will be discovered by caller
  * sometime later.
  */
-int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
-                    const struct ldlm_res_id *res_id, ldlm_type_t type,
-                    ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                    struct lustre_handle *lockh)
+ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
+                            const struct ldlm_res_id *res_id, ldlm_type_t type,
+                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                            struct lustre_handle *lockh)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock, *old_lock = NULL;
@@ -1019,15 +1024,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 
         lock_res(res);
 
-        lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
         if (flags & LDLM_FL_BLOCK_GRANTED)
                 GOTO(out, rc = 0);
-        lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
-        lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
+        lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags);
         if (lock != NULL)
                 GOTO(out, rc = 1);
 
@@ -1095,7 +1100,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
         if (old_lock)
                 LDLM_LOCK_PUT(old_lock);
 
-        return rc;
+        return rc ? mode : 0;
 }
 
 /* Returns a referenced lock */
index 0953c12..0f2bc90 100644 (file)
@@ -74,16 +74,11 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         struct mdt_body *body;
         struct lookup_intent it = { .it_op = IT_READDIR };
         struct md_op_data op_data;
-        struct obd_device *obddev = class_exp2obd(sbi->ll_md_exp);
-        struct ldlm_res_id res_id =
-                { .name = {fid_seq(&lli->lli_fid), 
-                           fid_oid(&lli->lli_fid), 
-                           fid_ver(&lli->lli_fid)} };
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
         ENTRY;
 
-        rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
+        rc = md_lock_match(sbi->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
+                           &lli->lli_fid, LDLM_IBITS, &policy, LCK_CR, &lockh);
         if (!rc) {
                 struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR,
                         llu_md_blocking_ast, ldlm_completion_ast, NULL, inode};
index 3243fc2..d262250 100644 (file)
@@ -104,7 +104,7 @@ static inline struct obd_export *llu_i2obdexp(struct inode *inode)
         return llu_i2info(inode)->lli_sbi->ll_dt_exp;
 }
 
-static inline struct obd_export *llu_i2mdcexp(struct inode *inode)
+static inline struct obd_export *llu_i2mdexp(struct inode *inode)
 {
         return llu_i2info(inode)->lli_sbi->ll_md_exp;
 }
index 3d5a7ee..1fcb482 100644 (file)
@@ -241,7 +241,7 @@ static int llu_pb_revalidate(struct pnode *pnode, int flags,
                 }
         }
 
-        exp = llu_i2mdcexp(pb->pb_ino);
+        exp = llu_i2mdexp(pb->pb_ino);
         icbd.icbd_parent = pnode->p_parent->p_base->pb_ino;
         icbd.icbd_child = pnode;
 
@@ -432,7 +432,7 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode,
                             pnode->p_base->pb_name.name,
                             pnode->p_base->pb_name.len, flags, opc);
 
-        rc = md_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it,
+        rc = md_intent_lock(llu_i2mdexp(parent), &op_data, NULL, 0, it,
                             flags, &req, llu_md_blocking_ast,
                             LDLM_FL_CANCEL_ON_BLOCK);
         if (rc < 0)
index bb7d124..6e03a07 100644 (file)
@@ -250,7 +250,7 @@ int llu_local_size(struct inode *inode)
                 RETURN(0);
         
         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
-                       &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
+                       &policy, LCK_PR, &flags, inode, &lockh);
         if (rc < 0)
                 RETURN(rc);
         else if (rc == 0)
index 1224725..ad759d7 100644 (file)
@@ -109,6 +109,24 @@ static void llu_fsop_gone(struct filesys *fs)
 
 static struct inode_ops llu_inode_ops;
 
+static ldlm_mode_t llu_take_md_lock(struct inode *inode, __u64 bits,
+                                    struct lustre_handle *lockh)
+{
+        ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
+        ldlm_mode_t rc;
+        int flags;
+        ENTRY;
+
+        fid = &llu_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
+
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        rc = md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
+        RETURN(rc);
+}
+
 void llu_update_inode(struct inode *inode, struct mdt_body *body,
                       struct lov_stripe_md *lsm)
 {
@@ -164,12 +182,32 @@ void llu_update_inode(struct inode *inode, struct mdt_body *body,
                 st->st_nlink = body->nlink;
         if (body->valid & OBD_MD_FLRDEV)
                 st->st_rdev = body->rdev;
-        if (body->valid & OBD_MD_FLSIZE)
-                st->st_size = body->size;
-        if (body->valid & OBD_MD_FLBLOCKS)
-                st->st_blocks = body->blocks;
         if (body->valid & OBD_MD_FLFLAGS)
                 lli->lli_st_flags = body->flags;
+        if (body->valid & OBD_MD_FLSIZE) {
+                if ((llu_i2sbi(inode)->ll_lco.lco_flags & OBD_CONNECT_SOM) && 
+                    S_ISREG(st->st_mode) && lli->lli_smd) {
+                        struct lustre_handle lockh;
+                        ldlm_mode_t mode;
+                        
+                        /* As it is possible a blocking ast has been processed
+                         * by this time, we need to check there is an UPDATE 
+                         * lock on the client and set LLIF_MDS_SIZE_LOCK holding
+                         * it. */
+                        mode = llu_take_md_lock(inode, MDS_INODELOCK_UPDATE,
+                                                &lockh);
+                        if (mode) {
+                                st->st_size = body->size;
+                                lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
+                                ldlm_lock_decref(&lockh, mode);
+                        }
+                } else {
+                    st->st_size = body->size;
+                }
+                
+                if (body->valid & OBD_MD_FLBLOCKS)
+                        st->st_blocks = body->blocks;
+        }
 }
 
 void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid)
@@ -378,27 +416,20 @@ static struct inode* llu_new_inode(struct filesys *fs,
 
 static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
 {
-        struct llu_sb_info *sbi = llu_i2sbi(inode);
-        struct llu_inode_info *lli = llu_i2info(inode);
         struct lustre_handle lockh;
-        struct ldlm_res_id res_id = { .name = {0} };
-        struct obd_device *obddev;
         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
+        struct lu_fid *fid;
         int flags;
         ENTRY;
 
         LASSERT(inode);
 
-        obddev = sbi->ll_md_exp->exp_obd;
-        res_id.name[0] = fid_seq(&lli->lli_fid);
-        res_id.name[1] = fid_oid(&lli->lli_fid);
-        res_id.name[2] = fid_ver(&lli->lli_fid);
-
-        CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
+        fid = &llu_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
-                            &policy, LCK_PW | LCK_PR, &lockh)) {
+        if (md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                          LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
         RETURN(0);
@@ -455,9 +486,6 @@ static int llu_inode_revalidate(struct inode *inode)
                 llu_update_inode(inode, md.body, md.lsm);
                 if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm)
                         obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
-                if (md.body->valid & OBD_MD_FLSIZE &&
-                    sbi->ll_lco.lco_flags & OBD_CONNECT_SOM)
-                        llu_i2info(inode)->lli_flags |= LLIF_MDS_SIZE_LOCK;
                 ptlrpc_req_finished(req);
         }
 
@@ -1411,7 +1439,7 @@ static int llu_file_flock(struct inode *ino,
                flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start,
                flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, &res_id, 
+        rc = ldlm_cli_enqueue(llu_i2mdexp(ino), NULL, &einfo, &res_id, 
                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
         RETURN(rc);
 }
index bca80bc..65100a7 100644 (file)
@@ -358,7 +358,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                         RETURN(0);
 #endif
 
-                rc = ll_have_md_lock(de->d_parent->d_inode, 
+                rc = ll_have_md_lock(de->d_parent->d_inode,
                                      MDS_INODELOCK_UPDATE);
         
                 RETURN(rc);
index ad10402..f381fb8 100644 (file)
@@ -1096,14 +1096,14 @@ int ll_local_size(struct inode *inode)
                 RETURN(0);
 
         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
-                       &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
+                       &policy, LCK_PR, &flags, inode, &lockh);
         if (rc < 0)
                 RETURN(rc);
         else if (rc == 0)
                 RETURN(-ENODATA);
 
         ll_merge_lvb(inode);
-        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
+        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
         RETURN(0);
 }
 
@@ -2553,13 +2553,30 @@ int ll_have_md_lock(struct inode *inode, __u64 bits)
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
-                          LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+                          LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
-
         RETURN(0);
 }
 
+ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
+                            struct lustre_handle *lockh)
+{
+        ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
+        ldlm_mode_t rc;
+        int flags;
+        ENTRY;
+
+        fid = &ll_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
+
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
+        RETURN(rc);
+}
+
 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
                               * and return success */
@@ -2642,8 +2659,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 }
 
                 ll_lookup_finish_locks(&oit, dentry);
-        } else if (!ll_have_md_lock(dentry->d_inode,
-                                    MDS_INODELOCK_UPDATE)) {
+        } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) {
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
                 obd_valid valid = OBD_MD_FLGETATTR;
                 struct obd_capa *oc;
index e0191ec..17da0f1 100644 (file)
@@ -491,6 +491,8 @@ extern struct file_operations ll_file_operations_noflock;
 extern struct inode_operations ll_file_inode_operations;
 extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
 extern int ll_have_md_lock(struct inode *inode, __u64 bits);
+extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
+                                   struct lustre_handle *lockh);
 int ll_extent_lock(struct ll_file_data *, struct inode *,
                    struct lov_stripe_md *, int mode, ldlm_policy_data_t *,
                    struct lustre_handle *, int ast_flags);
index 8817868..77f615f 100644 (file)
@@ -1660,25 +1660,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                 inode->i_nlink = body->nlink;
         if (body->valid & OBD_MD_FLRDEV)
                 inode->i_rdev = old_decode_dev(body->rdev);
-        if (body->valid & OBD_MD_FLSIZE) {
-                if (ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) {
-                        if (lli->lli_flags & (LLIF_DONE_WRITING |
-                                              LLIF_EPOCH_PENDING |
-                                              LLIF_SOM_DIRTY))
-                          CWARN("ino %lu flags %lu still has size authority!"
-                                "do not trust the size got from MDS\n", 
-                                inode->i_ino, lli->lli_flags);
-                        else {
-                                i_size_write(inode, body->size);
-                                lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
-                        }
-                } else {
-                        i_size_write(inode, body->size);
-                }
-
-                if (body->valid & OBD_MD_FLBLOCKS)
-                        inode->i_blocks = body->blocks;
-        }
 
         if (body->valid & OBD_MD_FLID) {
                 /* FID shouldn't be changed! */
@@ -1694,6 +1675,40 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
 
         LASSERT(fid_seq(&lli->lli_fid) != 0);
 
+        if (body->valid & OBD_MD_FLSIZE) {
+                if ((ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) &&
+                    S_ISREG(inode->i_mode) && lli->lli_smd) {
+                        struct lustre_handle lockh;
+                        ldlm_mode_t mode;
+                        
+                        /* As it is possible a blocking ast has been processed
+                         * by this time, we need to check there is an UPDATE 
+                         * lock on the client and set LLIF_MDS_SIZE_LOCK holding
+                         * it. */
+                        mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE,
+                                               &lockh);
+                        if (mode) {
+                                if (lli->lli_flags & (LLIF_DONE_WRITING |
+                                                      LLIF_EPOCH_PENDING |
+                                                      LLIF_SOM_DIRTY)) {
+                                        CERROR("ino %lu flags %lu still has "
+                                               "size authority! do not trust "
+                                               "the size got from MDS\n",
+                                               inode->i_ino, lli->lli_flags);
+                                } else {
+                                        i_size_write(inode, body->size);
+                                        lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
+                                }
+                                ldlm_lock_decref(&lockh, mode);
+                        }
+                } else {
+                        i_size_write(inode, body->size);
+                }
+
+                if (body->valid & OBD_MD_FLBLOCKS)
+                        inode->i_blocks = body->blocks;
+        }
+
         if (body->valid & OBD_MD_FLMDSCAPA) {
                 LASSERT(md->mds_capa);
                 ll_add_capa(inode, md->mds_capa);
index 2fe0c7a..2ac04ca 100644 (file)
@@ -2710,14 +2710,15 @@ int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
 }
 
-int lmv_lock_match(struct obd_export *exp, int flags,
-                   const struct lu_fid *fid, ldlm_type_t type,
-                   ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                   struct lustre_handle *lockh)
+ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
+                           const struct lu_fid *fid, ldlm_type_t type,
+                           ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                           struct lustre_handle *lockh)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        int i, rc = 0;
+        ldlm_mode_t rc;
+        int i;
         ENTRY;
 
         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
@@ -2730,10 +2731,10 @@ int lmv_lock_match(struct obd_export *exp, int flags,
                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
                                    type, policy, mode, lockh);
                 if (rc)
-                        RETURN(1);
+                        RETURN(rc);
         }
 
-        RETURN(rc);
+        RETURN(0);
 }
 
 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
index c31295e..f293cfd 100644 (file)
@@ -1846,6 +1846,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                        struct ldlm_enqueue_info *einfo,
                        struct ptlrpc_request_set *rqset)
 {
+        ldlm_mode_t mode = einfo->ei_mode;
         struct lov_request_set *set;
         struct lov_request *req;
         struct list_head *pos;
@@ -1855,6 +1856,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
         LASSERT(oinfo);
         ASSERT_LSM_MAGIC(oinfo->oi_md);
+        LASSERT(mode == (mode & -mode));
 
         /* we should never be asked to replay a lock this way. */
         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
@@ -1884,7 +1886,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 out:
-        rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset);
+        rc = lov_fini_enqueue_set(set, mode, rc, rqset);
         RETURN(rc);
 }
 
@@ -1902,6 +1904,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         ENTRY;
 
         ASSERT_LSM_MAGIC(lsm);
+        LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode));
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
@@ -1924,7 +1927,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                                req->rq_oi.oi_md, type, &sub_policy,
                                mode, &lov_flags, data, lov_lockhp);
                 rc = lov_update_match_set(set, req, rc);
-                if (rc != 1)
+                if (rc <= 0)
                         break;
         }
         lov_fini_match_set(set, mode, *flags);
index 290c057..f800a18 100644 (file)
@@ -360,7 +360,7 @@ int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
         int ret = rc;
         ENTRY;
 
-        if (rc == 1)
+        if (rc > 0)
                 ret = 0;
         else if (rc == 0)
                 ret = 1;
index 723a37e..f982152 100644 (file)
@@ -151,8 +151,8 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
                       ldlm_policy_data_t *policy, ldlm_mode_t mode,
                       int flags, void *opaque);
-int mdc_lock_match(struct obd_export *exp, int flags,
-                   const struct lu_fid *fid, ldlm_type_t type,
-                   ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                   struct lustre_handle *lockh);
+ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
+                           const struct lu_fid *fid, ldlm_type_t type,
+                           ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                           struct lustre_handle *lockh);
 #endif
index 82ad397..c728797 100644 (file)
@@ -146,22 +146,20 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
         RETURN(0);
 }
 
-int mdc_lock_match(struct obd_export *exp, int flags,
-                   const struct lu_fid *fid, ldlm_type_t type,
-                   ldlm_policy_data_t *policy, ldlm_mode_t mode,
-                   struct lustre_handle *lockh)
+ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
+                           const struct lu_fid *fid, ldlm_type_t type,
+                           ldlm_policy_data_t *policy, ldlm_mode_t mode,
+                           struct lustre_handle *lockh)
 {
         struct ldlm_res_id res_id =
                 { .name = {fid_seq(fid),
                            fid_oid(fid),
                            fid_ver(fid)} };
-        struct obd_device *obd = class_exp2obd(exp);
-        int rc;
+        ldlm_mode_t rc;
         ENTRY;
 
-        rc = ldlm_lock_match(obd->obd_namespace, flags,
+        rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
                              &res_id, type, policy, mode, lockh);
-
         RETURN(rc);
 }
 
@@ -669,11 +667,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
-                struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2),
-                                                        fid_oid(&op_data->op_fid2),
-                                                        fid_ver(&op_data->op_fid2) } };
                 ldlm_policy_data_t policy;
-                ldlm_mode_t mode = LCK_CR;
+                ldlm_mode_t mode;
 
                 /* As not all attributes are kept under update lock, e.g.
                    owner/group/acls are under lookup lock, we need both
@@ -686,30 +681,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
 
-                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_IBITS, &policy, mode, &lockh);
-                if (!rc) {
-                        mode = LCK_CW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-                if (!rc) {
-                        mode = LCK_PR;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-
-                if (!rc) {
-                        mode = LCK_PW;
-                        rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy, mode, &lockh);
-                }
-
-                if (rc) {
+                mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
+                                      &op_data->op_fid2, LDLM_IBITS, &policy,
+                                      LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+                if (mode) {
                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
                                sizeof(lockh));
                         it->d.lustre.it_lock_mode = mode;
@@ -717,8 +692,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
 
                 /* Only return failure if it was not GETATTR by cfid
                    (from inode_revalidate) */
-                if (rc || op_data->op_namelen != 0)
-                        RETURN(rc);
+                if (mode || op_data->op_namelen != 0)
+                        RETURN(!!mode);
         }
 
         /* lookup_it may be called only after revalidate_it has run, because
index 0a08a3d..dfdfec4 100644 (file)
@@ -93,6 +93,7 @@ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb)
         bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK];
         spin_lock(&bucket->lock);
         list_add_rcu(&h->h_link, &bucket->head);
+        h->h_in = 1;
         spin_unlock(&bucket->lock);
 
         CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n",
@@ -112,11 +113,11 @@ static void class_handle_unhash_nolock(struct portals_handle *h)
                h, h->h_cookie);
 
         spin_lock(&h->h_lock);
-        if (h->h_cookie == 0) {
+        if (h->h_in == 0) {
                 spin_unlock(&h->h_lock);
                 return;
         }
-        h->h_cookie = 0;
+        h->h_in = 0;
         spin_unlock(&h->h_lock);
         list_del_rcu(&h->h_link);
 }
@@ -143,6 +144,7 @@ void class_handle_hash_back(struct portals_handle *h)
         atomic_inc(&handle_count);
         spin_lock(&bucket->lock);
         list_add_rcu(&h->h_link, &bucket->head);
+        h->h_in = 1;
         spin_unlock(&bucket->lock);
 
         EXIT;
index 34c877b..795ca4a 100644 (file)
@@ -2766,8 +2766,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                 return;
         }
         lock_res_and_lock(lock);
-#ifdef __KERNEL__
-#ifdef __LINUX__
+#if defined (__KERNEL__) && defined (__LINUX__)
         /* Liang XXX: Darwin and Winnt checking should be added */
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2782,7 +2781,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                          new_inode, new_inode->i_ino, new_inode->i_generation);
         }
 #endif
-#endif
         lock->l_ast_data = data;
         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
         unlock_res_and_lock(lock);
@@ -2883,6 +2881,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+        ldlm_mode_t mode;
         int rc;
         ENTRY;
 
@@ -2899,11 +2898,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 goto no_match;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace,
-                             oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
-                             einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
-                             oinfo->oi_lockh);
-        if (rc == 1) {
+        /* If we're trying to read, we also search for an existing PW lock.  The
+         * VFS and page cache already protect us locally, so lots of readers/
+         * writers can share a single PW lock.
+         *
+         * There are problems with conversion deadlocks, so instead of
+         * converting a read lock to a write lock, we'll just enqueue a new
+         * one.
+         *
+         * At some point we should cancel the read lock instead of making them
+         * send us a blocking callback, but there are problems with canceling
+         * locks out from other users right now, too. */
+        mode = einfo->ei_mode;
+        if (einfo->ei_mode == LCK_PR)
+                mode |= LCK_PW;
+        mode = ldlm_lock_match(obd->obd_namespace,
+                               oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
+                               einfo->ei_type, &oinfo->oi_policy, mode,
+                               oinfo->oi_lockh);
+        if (mode) {
+                /* addref the lock only if not async requests and PW lock is
+                 * matched whereas we asked for PR. */
+                if (!rqset && einfo->ei_mode != mode)
+                        ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
                                         oinfo->oi_flags);
                 if (intent) {
@@ -2916,45 +2933,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
 
                 /* For async requests, decref the lock. */
-                if (rqset)
+                if (einfo->ei_mode != mode)
+                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
+                else if (rqset)
                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
 
                 RETURN(ELDLM_OK);
         }
 
-        /* If we're trying to read, we also search for an existing PW lock.  The
-         * VFS and page cache already protect us locally, so lots of readers/
-         * writers can share a single PW lock.
-         *
-         * There are problems with conversion deadlocks, so instead of
-         * converting a read lock to a write lock, we'll just enqueue a new
-         * one.
-         *
-         * At some point we should cancel the read lock instead of making them
-         * send us a blocking callback, but there are problems with canceling
-         * locks out from other users right now, too. */
-
-        if (einfo->ei_mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace,
-                                     oinfo->oi_flags | LDLM_FL_LVB_READY,
-                                     &res_id, einfo->ei_type, &oinfo->oi_policy,
-                                     LCK_PW, oinfo->oi_lockh);
-                if (rc == 1) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        /* addref the lock only if not async requests. */
-                        if (!rqset)
-                                ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
-                        osc_set_data_with_check(oinfo->oi_lockh,
-                                                einfo->ei_cbdata,
-                                                oinfo->oi_flags);
-                        oinfo->oi_cb_up(oinfo, ELDLM_OK);
-                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
-                        RETURN(ELDLM_OK);
-                }
-        }
-
  no_match:
         if (intent) {
                 int size[3] = {
@@ -3011,8 +2997,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
 {
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
-        int rc;
         int lflags = *flags;
+        ldlm_mode_t rc;
         ENTRY;
 
         res_id.name[0] = lsm->lsm_object_id;
@@ -3026,28 +3012,21 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         policy->l_extent.end |= ~CFS_PAGE_MASK;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
-                             &res_id, type, policy, mode, lockh);
-        if (rc) {
-                //if (!(*flags & LDLM_FL_TEST_LOCK))
-                        osc_set_data_with_check(lockh, data, lflags);
-                RETURN(rc);
-        }
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
          * writers can share a single PW lock. */
-        if (mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace,
-                                     lflags | LDLM_FL_LVB_READY, &res_id,
-                                     type, policy, LCK_PW, lockh);
-                if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        osc_set_data_with_check(lockh, data, lflags);
+        rc = mode;
+        if (mode == LCK_PR)
+                rc |= LCK_PW;
+        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+                             &res_id, type, policy, rc, lockh);
+        if (rc) {
+                osc_set_data_with_check(lockh, data, lflags);
+                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
                 }
+                RETURN(rc);
         }
         RETURN(rc);
 }