From: vitaly Date: Thu, 25 Oct 2007 10:55:29 +0000 (+0000) Subject: Branch HEAD X-Git-Tag: v1_7_0_51~570 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a25adb47c7b7eeb68a922e2647d74eeff3401c6a Branch HEAD b=13622 i=tappro i=green 1) make class_handle_hash_back() to work; 2) ll_update_inode() checks UPDATE ibit lock still presents before setting LLIF_MDS_SIZE_LOCK; 3) ldlm_lock_match() returns the matched mode instead of 1; pass bitwise sets of modes into; --- diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 873a6a1..b48efef 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -625,10 +625,10 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_allow_match(struct ldlm_lock *lock); -int ldlm_lock_match(struct ldlm_namespace *ns, int flags, - const struct ldlm_res_id *, - ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode, - struct lustre_handle *); +ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, + const struct ldlm_res_id *, ldlm_type_t type, + ldlm_policy_data_t *, ldlm_mode_t mode, + struct lustre_handle *); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); void ldlm_lock_cancel(struct ldlm_lock *lock); diff --git a/lustre/include/lustre_handles.h b/lustre/include/lustre_handles.h index cf6b403..2fefbf2 100644 --- a/lustre/include/lustre_handles.h +++ b/lustre/include/lustre_handles.h @@ -32,10 +32,12 @@ struct portals_handle { /* newly added fields to handle the RCU issue. -jxiong */ spinlock_t h_lock; - unsigned int h_size; void *h_ptr; void (*h_free_cb)(void *, size_t); struct rcu_head h_rcu; + unsigned int h_size; + __u8 h_in:1; + __u8 h_unused[3]; }; #define RCU2HANDLE(rcu) container_of(rcu, struct portals_handle, h_rcu) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 88a500a..9ef5cce 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1302,9 +1302,10 @@ struct md_ops { struct obd_client_handle *); int (*m_set_lock_data)(struct obd_export *, __u64 *, void *); - int (*m_lock_match)(struct obd_export *, int, const struct lu_fid *, - ldlm_type_t, ldlm_policy_data_t *, ldlm_mode_t, - struct lustre_handle *); + ldlm_mode_t (*m_lock_match)(struct obd_export *, int, + const struct lu_fid *, ldlm_type_t, + ldlm_policy_data_t *, ldlm_mode_t, + struct lustre_handle *); int (*m_cancel_unused)(struct obd_export *, const struct lu_fid *, ldlm_policy_data_t *, ldlm_mode_t, int flags, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 64df844..7f29480 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1945,10 +1945,12 @@ static inline int md_cancel_unused(struct obd_export *exp, RETURN(rc); } -static inline int md_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +static inline ldlm_mode_t md_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, + ldlm_type_t type, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, + struct lustre_handle *lockh) { ENTRY; EXP_CHECK_MD_OP(exp, lock_match); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index c9e3c84..e422d98 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -892,7 +892,8 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) /* returns a referenced lock or NULL. See the flag descriptions below, in the * comment above ldlm_lock_match */ -static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, +static struct ldlm_lock *search_queue(struct list_head *queue, + ldlm_mode_t *mode, ldlm_policy_data_t *policy, struct ldlm_lock *old_lock, int flags) { @@ -900,6 +901,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, struct list_head *tmp; list_for_each(tmp, queue) { + ldlm_mode_t match; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (lock == old_lock) @@ -918,8 +921,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_readers == 0 && lock->l_writers == 0) continue; - if (!(lock->l_req_mode & mode)) + if (!(lock->l_req_mode & *mode)) continue; + match = lock->l_req_mode; if (lock->l_resource->lr_type == LDLM_EXTENT && (lock->l_policy_data.l_extent.start > @@ -927,7 +931,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_policy_data.l_extent.end < policy->l_extent.end)) continue; - if (unlikely(mode == LCK_GROUP) && + if (unlikely(match == LCK_GROUP) && lock->l_resource->lr_type == LDLM_EXTENT && lock->l_policy_data.l_extent.gid != policy->l_extent.gid) continue; @@ -951,8 +955,9 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, LDLM_LOCK_GET(lock); ldlm_lock_touch_in_lru(lock); } else { - ldlm_lock_addref_internal_nolock(lock, mode); + ldlm_lock_addref_internal_nolock(lock, match); } + *mode = match; return lock; } @@ -991,10 +996,10 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock) * caller code unchanged), the context failure will be discovered by caller * sometime later. */ -int ldlm_lock_match(struct ldlm_namespace *ns, int flags, - const struct ldlm_res_id *res_id, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, + const struct ldlm_res_id *res_id, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { struct ldlm_resource *res; struct ldlm_lock *lock, *old_lock = NULL; @@ -1019,15 +1024,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, lock_res(res); - lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_granted, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); if (flags & LDLM_FL_BLOCK_GRANTED) GOTO(out, rc = 0); - lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_converting, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); - lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags); + lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, flags); if (lock != NULL) GOTO(out, rc = 1); @@ -1095,7 +1100,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, if (old_lock) LDLM_LOCK_PUT(old_lock); - return rc; + return rc ? mode : 0; } /* Returns a referenced lock */ diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 0953c12..0f2bc90 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -74,16 +74,11 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) struct mdt_body *body; struct lookup_intent it = { .it_op = IT_READDIR }; struct md_op_data op_data; - struct obd_device *obddev = class_exp2obd(sbi->ll_md_exp); - struct ldlm_res_id res_id = - { .name = {fid_seq(&lli->lli_fid), - fid_oid(&lli->lli_fid), - fid_ver(&lli->lli_fid)} }; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; ENTRY; - rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); + rc = md_lock_match(sbi->ll_md_exp, LDLM_FL_BLOCK_GRANTED, + &lli->lli_fid, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR, llu_md_blocking_ast, ldlm_completion_ast, NULL, inode}; diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 3243fc2..d262250 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -104,7 +104,7 @@ static inline struct obd_export *llu_i2obdexp(struct inode *inode) return llu_i2info(inode)->lli_sbi->ll_dt_exp; } -static inline struct obd_export *llu_i2mdcexp(struct inode *inode) +static inline struct obd_export *llu_i2mdexp(struct inode *inode) { return llu_i2info(inode)->lli_sbi->ll_md_exp; } diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index 3d5a7ee..1fcb482 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -241,7 +241,7 @@ static int llu_pb_revalidate(struct pnode *pnode, int flags, } } - exp = llu_i2mdcexp(pb->pb_ino); + exp = llu_i2mdexp(pb->pb_ino); icbd.icbd_parent = pnode->p_parent->p_base->pb_ino; icbd.icbd_child = pnode; @@ -432,7 +432,7 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode, pnode->p_base->pb_name.name, pnode->p_base->pb_name.len, flags, opc); - rc = md_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it, + rc = md_intent_lock(llu_i2mdexp(parent), &op_data, NULL, 0, it, flags, &req, llu_md_blocking_ast, LDLM_FL_CANCEL_ON_BLOCK); if (rc < 0) diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index bb7d124..6e03a07 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -250,7 +250,7 @@ int llu_local_size(struct inode *inode) RETURN(0); rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, - &policy, LCK_PR | LCK_PW, &flags, inode, &lockh); + &policy, LCK_PR, &flags, inode, &lockh); if (rc < 0) RETURN(rc); else if (rc == 0) diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 1224725..ad759d7 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -109,6 +109,24 @@ static void llu_fsop_gone(struct filesys *fs) static struct inode_ops llu_inode_ops; +static ldlm_mode_t llu_take_md_lock(struct inode *inode, __u64 bits, + struct lustre_handle *lockh) +{ + ldlm_policy_data_t policy = { .l_inodebits = {bits}}; + struct lu_fid *fid; + ldlm_mode_t rc; + int flags; + ENTRY; + + fid = &llu_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); + + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + rc = md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh); + RETURN(rc); +} + void llu_update_inode(struct inode *inode, struct mdt_body *body, struct lov_stripe_md *lsm) { @@ -164,12 +182,32 @@ void llu_update_inode(struct inode *inode, struct mdt_body *body, st->st_nlink = body->nlink; if (body->valid & OBD_MD_FLRDEV) st->st_rdev = body->rdev; - if (body->valid & OBD_MD_FLSIZE) - st->st_size = body->size; - if (body->valid & OBD_MD_FLBLOCKS) - st->st_blocks = body->blocks; if (body->valid & OBD_MD_FLFLAGS) lli->lli_st_flags = body->flags; + if (body->valid & OBD_MD_FLSIZE) { + if ((llu_i2sbi(inode)->ll_lco.lco_flags & OBD_CONNECT_SOM) && + S_ISREG(st->st_mode) && lli->lli_smd) { + struct lustre_handle lockh; + ldlm_mode_t mode; + + /* As it is possible a blocking ast has been processed + * by this time, we need to check there is an UPDATE + * lock on the client and set LLIF_MDS_SIZE_LOCK holding + * it. */ + mode = llu_take_md_lock(inode, MDS_INODELOCK_UPDATE, + &lockh); + if (mode) { + st->st_size = body->size; + lli->lli_flags |= LLIF_MDS_SIZE_LOCK; + ldlm_lock_decref(&lockh, mode); + } + } else { + st->st_size = body->size; + } + + if (body->valid & OBD_MD_FLBLOCKS) + st->st_blocks = body->blocks; + } } void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) @@ -378,27 +416,20 @@ static struct inode* llu_new_inode(struct filesys *fs, static int llu_have_md_lock(struct inode *inode, __u64 lockpart) { - struct llu_sb_info *sbi = llu_i2sbi(inode); - struct llu_inode_info *lli = llu_i2info(inode); struct lustre_handle lockh; - struct ldlm_res_id res_id = { .name = {0} }; - struct obd_device *obddev; ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; + struct lu_fid *fid; int flags; ENTRY; LASSERT(inode); - obddev = sbi->ll_md_exp->exp_obd; - res_id.name[0] = fid_seq(&lli->lli_fid); - res_id.name[1] = fid_oid(&lli->lli_fid); - res_id.name[2] = fid_ver(&lli->lli_fid); - - CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]); + fid = &llu_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, - &policy, LCK_PW | LCK_PR, &lockh)) { + if (md_lock_match(llu_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) { RETURN(1); } RETURN(0); @@ -455,9 +486,6 @@ static int llu_inode_revalidate(struct inode *inode) llu_update_inode(inode, md.body, md.lsm); if (md.lsm != NULL && llu_i2info(inode)->lli_smd != md.lsm) obd_free_memmd(sbi->ll_dt_exp, &md.lsm); - if (md.body->valid & OBD_MD_FLSIZE && - sbi->ll_lco.lco_flags & OBD_CONNECT_SOM) - llu_i2info(inode)->lli_flags |= LLIF_MDS_SIZE_LOCK; ptlrpc_req_finished(req); } @@ -1411,7 +1439,7 @@ static int llu_file_flock(struct inode *ino, flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, &res_id, + rc = ldlm_cli_enqueue(llu_i2mdexp(ino), NULL, &einfo, &res_id, &flock, &flags, NULL, 0, NULL, &lockh, 0); RETURN(rc); } diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index bca80bc..65100a7 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -358,7 +358,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, RETURN(0); #endif - rc = ll_have_md_lock(de->d_parent->d_inode, + rc = ll_have_md_lock(de->d_parent->d_inode, MDS_INODELOCK_UPDATE); RETURN(rc); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index ad10402..f381fb8 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1096,14 +1096,14 @@ int ll_local_size(struct inode *inode) RETURN(0); rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT, - &policy, LCK_PR | LCK_PW, &flags, inode, &lockh); + &policy, LCK_PR, &flags, inode, &lockh); if (rc < 0) RETURN(rc); else if (rc == 0) RETURN(-ENODATA); ll_merge_lvb(inode); - obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh); + obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh); RETURN(0); } @@ -2553,13 +2553,30 @@ int ll_have_md_lock(struct inode *inode, __u64 bits) flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, - LCK_CR|LCK_CW|LCK_PR, &lockh)) { + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) { RETURN(1); } - RETURN(0); } +ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, + struct lustre_handle *lockh) +{ + ldlm_policy_data_t policy = { .l_inodebits = {bits}}; + struct lu_fid *fid; + ldlm_mode_t rc; + int flags; + ENTRY; + + fid = &ll_i2info(inode)->lli_fid; + CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); + + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh); + RETURN(rc); +} + static int ll_inode_revalidate_fini(struct inode *inode, int rc) { if (rc == -ENOENT) { /* Already unlinked. Just update nlink * and return success */ @@ -2642,8 +2659,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) } ll_lookup_finish_locks(&oit, dentry); - } else if (!ll_have_md_lock(dentry->d_inode, - MDS_INODELOCK_UPDATE)) { + } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE)) { struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode); obd_valid valid = OBD_MD_FLGETATTR; struct obd_capa *oc; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index e0191ec..17da0f1 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -491,6 +491,8 @@ extern struct file_operations ll_file_operations_noflock; extern struct inode_operations ll_file_inode_operations; extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *); extern int ll_have_md_lock(struct inode *inode, __u64 bits); +extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, + struct lustre_handle *lockh); int ll_extent_lock(struct ll_file_data *, struct inode *, struct lov_stripe_md *, int mode, ldlm_policy_data_t *, struct lustre_handle *, int ast_flags); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 8817868..77f615f 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1660,25 +1660,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) inode->i_nlink = body->nlink; if (body->valid & OBD_MD_FLRDEV) inode->i_rdev = old_decode_dev(body->rdev); - if (body->valid & OBD_MD_FLSIZE) { - if (ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) { - if (lli->lli_flags & (LLIF_DONE_WRITING | - LLIF_EPOCH_PENDING | - LLIF_SOM_DIRTY)) - CWARN("ino %lu flags %lu still has size authority!" - "do not trust the size got from MDS\n", - inode->i_ino, lli->lli_flags); - else { - i_size_write(inode, body->size); - lli->lli_flags |= LLIF_MDS_SIZE_LOCK; - } - } else { - i_size_write(inode, body->size); - } - - if (body->valid & OBD_MD_FLBLOCKS) - inode->i_blocks = body->blocks; - } if (body->valid & OBD_MD_FLID) { /* FID shouldn't be changed! */ @@ -1694,6 +1675,40 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) LASSERT(fid_seq(&lli->lli_fid) != 0); + if (body->valid & OBD_MD_FLSIZE) { + if ((ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) && + S_ISREG(inode->i_mode) && lli->lli_smd) { + struct lustre_handle lockh; + ldlm_mode_t mode; + + /* As it is possible a blocking ast has been processed + * by this time, we need to check there is an UPDATE + * lock on the client and set LLIF_MDS_SIZE_LOCK holding + * it. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE, + &lockh); + if (mode) { + if (lli->lli_flags & (LLIF_DONE_WRITING | + LLIF_EPOCH_PENDING | + LLIF_SOM_DIRTY)) { + CERROR("ino %lu flags %lu still has " + "size authority! do not trust " + "the size got from MDS\n", + inode->i_ino, lli->lli_flags); + } else { + i_size_write(inode, body->size); + lli->lli_flags |= LLIF_MDS_SIZE_LOCK; + } + ldlm_lock_decref(&lockh, mode); + } + } else { + i_size_write(inode, body->size); + } + + if (body->valid & OBD_MD_FLBLOCKS) + inode->i_blocks = body->blocks; + } + if (body->valid & OBD_MD_FLMDSCAPA) { LASSERT(md->mds_capa); ll_add_capa(inode, md->mds_capa); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 2fe0c7a..2ac04ca 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -2710,14 +2710,15 @@ int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data)); } -int lmv_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - int i, rc = 0; + ldlm_mode_t rc; + int i; ENTRY; CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid)); @@ -2730,10 +2731,10 @@ int lmv_lock_match(struct obd_export *exp, int flags, rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid, type, policy, mode, lockh); if (rc) - RETURN(1); + RETURN(rc); } - RETURN(rc); + RETURN(0); } int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index c31295e..f293cfd 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1846,6 +1846,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct ptlrpc_request_set *rqset) { + ldlm_mode_t mode = einfo->ei_mode; struct lov_request_set *set; struct lov_request *req; struct list_head *pos; @@ -1855,6 +1856,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, LASSERT(oinfo); ASSERT_LSM_MAGIC(oinfo->oi_md); + LASSERT(mode == (mode & -mode)); /* we should never be asked to replay a lock this way. */ LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0); @@ -1884,7 +1886,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } out: - rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset); + rc = lov_fini_enqueue_set(set, mode, rc, rqset); RETURN(rc); } @@ -1902,6 +1904,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, ENTRY; ASSERT_LSM_MAGIC(lsm); + LASSERT((*flags & LDLM_FL_TEST_LOCK) || mode == (mode & -mode)); if (!exp || !exp->exp_obd) RETURN(-ENODEV); @@ -1924,7 +1927,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_oi.oi_md, type, &sub_policy, mode, &lov_flags, data, lov_lockhp); rc = lov_update_match_set(set, req, rc); - if (rc != 1) + if (rc <= 0) break; } lov_fini_match_set(set, mode, *flags); diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 290c057..f800a18 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -360,7 +360,7 @@ int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, int ret = rc; ENTRY; - if (rc == 1) + if (rc > 0) ret = 0; else if (rc == 0) ret = 1; diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 723a37e..f982152 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -151,8 +151,8 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, ldlm_policy_data_t *policy, ldlm_mode_t mode, int flags, void *opaque); -int mdc_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh); +ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh); #endif diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 82ad397..c728797 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -146,22 +146,20 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data) RETURN(0); } -int mdc_lock_match(struct obd_export *exp, int flags, - const struct lu_fid *fid, ldlm_type_t type, - ldlm_policy_data_t *policy, ldlm_mode_t mode, - struct lustre_handle *lockh) +ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, + const struct lu_fid *fid, ldlm_type_t type, + ldlm_policy_data_t *policy, ldlm_mode_t mode, + struct lustre_handle *lockh) { struct ldlm_res_id res_id = { .name = {fid_seq(fid), fid_oid(fid), fid_ver(fid)} }; - struct obd_device *obd = class_exp2obd(exp); - int rc; + ldlm_mode_t rc; ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, flags, + rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags, &res_id, type, policy, mode, lockh); - RETURN(rc); } @@ -669,11 +667,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - struct ldlm_res_id res_id = { .name = { fid_seq(&op_data->op_fid2), - fid_oid(&op_data->op_fid2), - fid_ver(&op_data->op_fid2) } }; ldlm_policy_data_t policy; - ldlm_mode_t mode = LCK_CR; + ldlm_mode_t mode; /* As not all attributes are kept under update lock, e.g. owner/group/acls are under lookup lock, we need both @@ -686,30 +681,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - if (!rc) { - mode = LCK_CW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - if (!rc) { - mode = LCK_PR; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (!rc) { - mode = LCK_PW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_IBITS, &policy, mode, &lockh); - } - - if (rc) { + mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, + &op_data->op_fid2, LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); + if (mode) { memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); it->d.lustre.it_lock_mode = mode; @@ -717,8 +692,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ - if (rc || op_data->op_namelen != 0) - RETURN(rc); + if (mode || op_data->op_namelen != 0) + RETURN(!!mode); } /* lookup_it may be called only after revalidate_it has run, because diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index 0a08a3d..dfdfec4 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -93,6 +93,7 @@ void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb) bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK]; spin_lock(&bucket->lock); list_add_rcu(&h->h_link, &bucket->head); + h->h_in = 1; spin_unlock(&bucket->lock); CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n", @@ -112,11 +113,11 @@ static void class_handle_unhash_nolock(struct portals_handle *h) h, h->h_cookie); spin_lock(&h->h_lock); - if (h->h_cookie == 0) { + if (h->h_in == 0) { spin_unlock(&h->h_lock); return; } - h->h_cookie = 0; + h->h_in = 0; spin_unlock(&h->h_lock); list_del_rcu(&h->h_link); } @@ -143,6 +144,7 @@ void class_handle_hash_back(struct portals_handle *h) atomic_inc(&handle_count); spin_lock(&bucket->lock); list_add_rcu(&h->h_link, &bucket->head); + h->h_in = 1; spin_unlock(&bucket->lock); EXIT; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 34c877b..795ca4a 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2766,8 +2766,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, return; } lock_res_and_lock(lock); -#ifdef __KERNEL__ -#ifdef __LINUX__ +#if defined (__KERNEL__) && defined (__LINUX__) /* Liang XXX: Darwin and Winnt checking should be added */ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -2782,7 +2781,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, new_inode, new_inode->i_ino, new_inode->i_generation); } #endif -#endif lock->l_ast_data = data; lock->l_flags |= (flags & LDLM_FL_NO_LRU); unlock_res_and_lock(lock); @@ -2883,6 +2881,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_reply *rep; struct ptlrpc_request *req = NULL; int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT; + ldlm_mode_t mode; int rc; ENTRY; @@ -2899,11 +2898,29 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, goto no_match; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, - einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode, - oinfo->oi_lockh); - if (rc == 1) { + /* If we're trying to read, we also search for an existing PW lock. The + * VFS and page cache already protect us locally, so lots of readers/ + * writers can share a single PW lock. + * + * There are problems with conversion deadlocks, so instead of + * converting a read lock to a write lock, we'll just enqueue a new + * one. + * + * At some point we should cancel the read lock instead of making them + * send us a blocking callback, but there are problems with canceling + * locks out from other users right now, too. */ + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; + mode = ldlm_lock_match(obd->obd_namespace, + oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id, + einfo->ei_type, &oinfo->oi_policy, mode, + oinfo->oi_lockh); + if (mode) { + /* addref the lock only if not async requests and PW lock is + * matched whereas we asked for PR. */ + if (!rqset && einfo->ei_mode != mode) + ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata, oinfo->oi_flags); if (intent) { @@ -2916,45 +2933,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up(oinfo, ELDLM_OK); /* For async requests, decref the lock. */ - if (rqset) + if (einfo->ei_mode != mode) + ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); + else if (rqset) ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode); RETURN(ELDLM_OK); } - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. - * - * There are problems with conversion deadlocks, so instead of - * converting a read lock to a write lock, we'll just enqueue a new - * one. - * - * At some point we should cancel the read lock instead of making them - * send us a blocking callback, but there are problems with canceling - * locks out from other users right now, too. */ - - if (einfo->ei_mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, - oinfo->oi_flags | LDLM_FL_LVB_READY, - &res_id, einfo->ei_type, &oinfo->oi_policy, - LCK_PW, oinfo->oi_lockh); - if (rc == 1) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - /* addref the lock only if not async requests. */ - if (!rqset) - ldlm_lock_addref(oinfo->oi_lockh, LCK_PR); - osc_set_data_with_check(oinfo->oi_lockh, - einfo->ei_cbdata, - oinfo->oi_flags); - oinfo->oi_cb_up(oinfo, ELDLM_OK); - ldlm_lock_decref(oinfo->oi_lockh, LCK_PW); - RETURN(ELDLM_OK); - } - } - no_match: if (intent) { int size[3] = { @@ -3011,8 +2997,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, { struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = exp->exp_obd; - int rc; int lflags = *flags; + ldlm_mode_t rc; ENTRY; res_id.name[0] = lsm->lsm_object_id; @@ -3026,28 +3012,21 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, policy->l_extent.end |= ~CFS_PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, - &res_id, type, policy, mode, lockh); - if (rc) { - //if (!(*flags & LDLM_FL_TEST_LOCK)) - osc_set_data_with_check(lockh, data, lflags); - RETURN(rc); - } /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ - if (mode == LCK_PR) { - rc = ldlm_lock_match(obd->obd_namespace, - lflags | LDLM_FL_LVB_READY, &res_id, - type, policy, LCK_PW, lockh); - if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) { - /* FIXME: This is not incredibly elegant, but it might - * be more elegant than adding another parameter to - * lock_match. I want a second opinion. */ - osc_set_data_with_check(lockh, data, lflags); + rc = mode; + if (mode == LCK_PR) + rc |= LCK_PW; + rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, + &res_id, type, policy, rc, lockh); + if (rc) { + osc_set_data_with_check(lockh, data, lflags); + if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { ldlm_lock_addref(lockh, LCK_PR); ldlm_lock_decref(lockh, LCK_PW); } + RETURN(rc); } RETURN(rc); }