From c7d8967deea50e506787499b5c879d0a3330d441 Mon Sep 17 00:00:00 2001 From: green Date: Fri, 28 Oct 2005 20:23:09 +0000 Subject: [PATCH] Landing Inodebits to b_release_1_4_6. --- lustre/include/linux/lustre_dlm.h | 23 +++++++ lustre/include/linux/lustre_idl.h | 17 ++++- lustre/include/linux/lustre_mds.h | 2 +- lustre/ldlm/Makefile.am | 2 +- lustre/ldlm/ldlm_inodebits.c | 33 ++++----- lustre/ldlm/ldlm_internal.h | 5 ++ lustre/ldlm/ldlm_lock.c | 13 ++++ lustre/liblustre/dir.c | 5 +- lustre/liblustre/namei.c | 7 +- lustre/liblustre/super.c | 13 ++-- lustre/llite/dir.c | 9 +-- lustre/llite/file.c | 16 ++--- lustre/llite/namei.c | 18 +++-- lustre/mdc/mdc_locks.c | 34 +++++++--- lustre/mds/handler.c | 49 +++++++++----- lustre/mds/mds_internal.h | 9 ++- lustre/mds/mds_open.c | 7 +- lustre/mds/mds_reint.c | 139 ++++++++++++++++++++++++++------------ lustre/mds/mds_xattr.c | 13 +++- lustre/ptlrpc/Makefile.in | 2 +- lustre/ptlrpc/autoMakefile.am | 1 + 21 files changed, 287 insertions(+), 130 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 72f285c..0721f4b 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -370,6 +370,29 @@ do { \ lock->l_pid); \ break; \ } \ + if (lock->l_resource->lr_type == LDLM_IBITS) { \ + CDEBUG(level, "### " format \ + " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ + "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " \ + "flags: %x remote: "LPX64" expref: %d " \ + "pid %u\n" , ## a, \ + lock->l_resource->lr_namespace->ns_name, \ + lock, lock->l_handle.h_cookie, \ + atomic_read (&lock->l_refc), \ + lock->l_readers, lock->l_writers, \ + ldlm_lockname[lock->l_granted_mode], \ + ldlm_lockname[lock->l_req_mode], \ + lock->l_resource->lr_name.name[0], \ + lock->l_resource->lr_name.name[1], \ + lock->l_policy_data.l_inodebits.bits, \ + atomic_read(&lock->l_resource->lr_refcount), \ + ldlm_typename[lock->l_resource->lr_type], \ + lock->l_flags, lock->l_remote_handle.cookie, \ + lock->l_export ? \ + atomic_read(&lock->l_export->exp_refcount) : -99, \ + lock->l_pid); \ + break; \ + } \ { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index beb7023..d0f9900 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -527,6 +527,16 @@ typedef enum { #define DISP_OPEN_OPEN 0x20 #define DISP_ENQ_COMPLETE 0x40 +/* INODE LOCK PARTS */ +#define MDS_INODELOCK_LOOKUP 0x000001 /* dentry, mode, owner, group */ +#define MDS_INODELOCK_UPDATE 0x000002 /* size, links, timestamps */ +#define MDS_INODELOCK_OPEN 0x000004 /* For opened files */ + +/* Do not forget to increase MDS_INODELOCK_MAXSHIFT when adding new bits */ +#define MDS_INODELOCK_MAXSHIFT 2 +/* This FULL lock is useful to take on unlink sort of operations */ +#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) + struct ll_fid { __u64 id; __u32 generation; @@ -814,7 +824,7 @@ typedef enum { LDLM_PLAIN = 10, LDLM_EXTENT = 11, LDLM_FLOCK = 12, -// LDLM_IBITS = 13, + LDLM_IBITS = 13, LDLM_MAX_TYPE } ldlm_type_t; @@ -826,6 +836,10 @@ struct ldlm_extent { __u64 gid; }; +struct ldlm_inodebits { + __u64 bits; +}; + struct ldlm_flock { __u64 start; __u64 end; @@ -843,6 +857,7 @@ struct ldlm_flock { typedef union { struct ldlm_extent l_extent; struct ldlm_flock l_flock; + struct ldlm_inodebits l_inodebits; } ldlm_policy_data_t; extern void lustre_swab_ldlm_policy_data (ldlm_policy_data_t *d); diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 5c43429..1eb5c58 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -135,7 +135,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset, struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, - char *name, int namelen); + char *name, int namelen, __u64 lockpart); struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); int mds_update_server_data(struct obd_device *, int force_sync); diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index 2b9856c..aeb4a06 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -10,4 +10,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \ ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c \ - ldlm_resource.c l_lock.c + ldlm_resource.c l_lock.c ldlm_inodebits.c diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 56c88cf..e3511dd 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -35,7 +35,7 @@ /* Determine if the lock is compatible with all locks on the queue. */ static int ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, - struct list_head *work_list) + int send_cbs) { struct list_head *tmp; struct ldlm_lock *lock; @@ -61,12 +61,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (!(lock->l_policy_data.l_inodebits.bits & req_bits)) continue; - if (!work_list) + if (!send_cbs) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, work_list); + ldlm_add_ast_work_item(lock, req, NULL, 0); } RETURN(compat); @@ -82,8 +82,7 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err, - struct list_head *work_list) + int first_enq, ldlm_error_t *err) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -91,25 +90,27 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, ENTRY; LASSERT(list_empty(&res->lr_converting)); - check_res_locked(res); if (!first_enq) { - LASSERT(work_list != NULL); - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL); + LASSERT(res->lr_tmp != NULL); + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0); if (!rc) RETURN(LDLM_ITER_STOP); - rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL); + rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0); if (!rc) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, work_list); + ldlm_grant_lock(lock, NULL, 0, 1); RETURN(LDLM_ITER_CONTINUE); } restart: - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list); - rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list); + LASSERT(res->lr_tmp == NULL); + res->lr_tmp = &rpc_list; + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1); + rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1); + res->lr_tmp = NULL; if (rc != 2) { /* If either of the compat_queue()s returned 0, then we @@ -120,15 +121,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - unlock_res(res); - rc = ldlm_run_bl_ast_work(&rpc_list); - lock_res(res); + l_unlock(&res->lr_namespace->ns_lock); + rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); + l_lock(&res->lr_namespace->ns_lock); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL); + ldlm_grant_lock(lock, NULL, 0, 0); } RETURN(0); } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 2967ab8..7f1e91c 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -50,6 +50,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err); + +/* ldlm_inodebits.c */ +int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, + int first_enq, ldlm_error_t *err); + /* l_lock.c */ void l_check_ns_lock(struct ldlm_namespace *ns); void l_check_no_ns_lock(struct ldlm_namespace *ns); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index a15a5ea..28a4a2f 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -55,6 +55,7 @@ char *ldlm_typename[] = { [LDLM_PLAIN] "PLN", [LDLM_EXTENT] "EXT", [LDLM_FLOCK] "FLK", + [LDLM_IBITS] "IBT", }; char *ldlm_it2str(int it) @@ -91,6 +92,7 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = { #ifdef __KERNEL__ [LDLM_FLOCK] ldlm_process_flock_lock, #endif + [LDLM_IBITS] ldlm_process_inodebits_lock, }; ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res) @@ -601,6 +603,14 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_policy_data.l_extent.gid != policy->l_extent.gid) continue; + /* We match if we have existing lock with same or wider set + of bits. */ + if (lock->l_resource->lr_type == LDLM_IBITS && + ((lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits) != + policy->l_inodebits.bits)) + continue; + if (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)) continue; @@ -1217,6 +1227,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) lock->l_policy_data.l_flock.pid, lock->l_policy_data.l_flock.start, lock->l_policy_data.l_flock.end); + else if (lock->l_resource->lr_type == LDLM_IBITS) + CDEBUG(level, " Bits: "LPX64"\n", + lock->l_policy_data.l_inodebits.bits); } void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh) diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 0263e33..591b434 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -78,14 +78,15 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp); struct ldlm_res_id res_id = { .name = {st->st_ino, (__u64)lli->lli_st_generation} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; ENTRY; rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); + &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh); if (!rc) { llu_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR, + rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &it, LCK_PR, &data, &lockh, NULL, 0, ldlm_completion_ast, llu_mdc_blocking_ast, inode, LDLM_FL_CANCEL_ON_BLOCK); diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index 49930fc..07f3934 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -149,6 +149,7 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock, struct inode *inode = llu_inode_from_lock(lock); struct llu_inode_info *lli; struct intnl_stat *st; + __u64 bits = lock->l_policy_data.l_inodebits.bits; /* Invalidate all dentries associated with this inode */ if (inode == NULL) @@ -157,14 +158,16 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock, lli = llu_i2info(inode); st = llu_i2stat(inode); - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + if (bits & MDS_INODELOCK_UPDATE) + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); if (lock->l_resource->lr_name.name[0] != st->st_ino || lock->l_resource->lr_name.name[1] !=lli->lli_st_generation){ LDLM_ERROR(lock, "data mismatch with ino %llu/%lu", (long long)st->st_ino,lli->lli_st_generation); } - if (S_ISDIR(st->st_mode)) { + if (S_ISDIR(st->st_mode) && + (bits & MDS_INODELOCK_UPDATE)) { CDEBUG(D_INODE, "invalidating inode %llu\n", (long long)st->st_ino); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 422b658..2ba2f36 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -380,13 +380,14 @@ static struct inode* llu_new_inode(struct filesys *fs, return inode; } -static int llu_have_md_lock(struct inode *inode) +static int llu_have_md_lock(struct inode *inode, __u64 lockpart) { struct llu_sb_info *sbi = llu_i2sbi(inode); struct llu_inode_info *lli = llu_i2info(inode); struct lustre_handle lockh; struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obddev; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; int flags; ENTRY; @@ -400,14 +401,14 @@ static int llu_have_md_lock(struct inode *inode) /* FIXME use LDLM_FL_TEST_LOCK instead */ flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PR, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PR, &lockh)) { ldlm_lock_decref(&lockh, LCK_PR); RETURN(1); } - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PW, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PW, &lockh)) { ldlm_lock_decref(&lockh, LCK_PW); RETURN(1); } @@ -424,7 +425,7 @@ static int llu_inode_revalidate(struct inode *inode) RETURN(0); } - if (!llu_have_md_lock(inode)) { + if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) { struct lustre_md md; struct ptlrpc_request *req = NULL; struct llu_sb_info *sbi = llu_i2sbi(inode); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index cade821..dd5c42d 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -211,10 +211,11 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); struct address_space *mapping = dir->i_mapping; struct page *page; + ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; int rc; rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); + &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); if (!rc) { struct lookup_intent it = { .it_op = IT_READDIR }; struct ptlrpc_request *request; @@ -222,8 +223,8 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0); - rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it, - LCK_PR, &data, &lockh, NULL, 0, + rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_IBITS, &it, + LCK_CR, &data, &lockh, NULL, 0, ldlm_completion_ast, ll_mdc_blocking_ast, dir, 0); @@ -251,7 +252,7 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) } out_unlock: - ldlm_lock_decref(&lockh, LCK_PR); + ldlm_lock_decref(&lockh, LCK_CR); return page; fail: diff --git a/lustre/llite/file.c b/lustre/llite/file.c index fc64688..2a2827b 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -152,7 +152,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data, + rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, itp, LCK_PW, &data, &lockh, lmm, lmmsize, ldlm_completion_ast, ll_mdc_blocking_ast, NULL, 0); if (rc < 0) @@ -1481,6 +1481,7 @@ static int ll_have_md_lock(struct dentry *de) struct lustre_handle lockh; struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obddev; + ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}}; int flags; ENTRY; @@ -1493,19 +1494,12 @@ static int ll_have_md_lock(struct dentry *de) CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]); - /* FIXME use LDLM_FL_TEST_LOCK instead */ - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PR, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PR); + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK; + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) { RETURN(1); } - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PW, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PW); - RETURN(1); - } RETURN(0); } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 4511658..2ee2f85 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -150,28 +150,33 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, break; case LDLM_CB_CANCELING: { struct inode *inode = ll_inode_from_lock(lock); + __u64 bits = lock->l_policy_data.l_inodebits.bits; /* Invalidate all dentries associated with this inode */ if (inode == NULL) break; - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, - &(ll_i2info(inode)->lli_flags)); - if (lock->l_resource->lr_name.name[0] != inode->i_ino || lock->l_resource->lr_name.name[1] != inode->i_generation) { LDLM_ERROR(lock, "data mismatch with ino %lu/%u (%p)", inode->i_ino, inode->i_generation, inode); } - if (S_ISDIR(inode->i_mode)) { + + if (bits & MDS_INODELOCK_UPDATE) + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, + &(ll_i2info(inode)->lli_flags)); + + + if (S_ISDIR(inode->i_mode) && + (bits & MDS_INODELOCK_UPDATE)) { CDEBUG(D_INODE, "invalidating inode %lu\n", inode->i_ino); - truncate_inode_pages(inode->i_mapping, 0); } if (inode->i_sb->s_root && - inode != inode->i_sb->s_root->d_inode) + inode != inode->i_sb->s_root->d_inode && + (bits & MDS_INODELOCK_LOOKUP)) ll_unhash_aliases(inode); iput(inode); break; @@ -375,7 +380,6 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, RETURN(0); } - static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, struct lookup_intent *it, int lookup_flags) { diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 202afbf..4363263 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -58,9 +58,9 @@ static int it_to_lock_mode(struct lookup_intent *it) { /* CREAT needs to be tested before open (both could be set) */ if (it->it_op & IT_CREAT) - return LCK_PW; + return LCK_CW; else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) - return LCK_PR; + return LCK_CR; LBUG(); RETURN(-EINVAL); @@ -241,6 +241,7 @@ int mdc_enqueue(struct obd_export *exp, struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = { .name = {data->fid1.id, data->fid1.generation} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)}; int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repsize[4] = {sizeof(struct ldlm_reply), @@ -296,6 +297,7 @@ int mdc_enqueue(struct obd_export *exp, } else if (it->it_op & IT_UNLINK) { size[2] = sizeof(struct mds_rec_unlink); size[3] = data->namelen + 1; + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4, size, NULL); if (!req) @@ -315,6 +317,9 @@ int mdc_enqueue(struct obd_export *exp, size[2] = sizeof(struct mds_body); size[3] = data->namelen + 1; + if (it->it_op & IT_GETATTR) + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4, size, NULL); if (!req) @@ -330,6 +335,7 @@ int mdc_enqueue(struct obd_export *exp, reply_buffers = 3; req->rq_replen = lustre_msg_size(3, repsize); } else if (it->it_op == IT_READDIR) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1, size, NULL); if (!req) @@ -345,7 +351,7 @@ int mdc_enqueue(struct obd_export *exp, mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id, - lock_type, NULL, lock_mode, &flags, cb_blocking, + lock_type,&policy,lock_mode, &flags,cb_blocking, cb_completion, NULL, cb_data, NULL, 0, NULL, lockh); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); @@ -495,16 +501,25 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, struct ldlm_res_id res_id = {.name ={op_data->fid2.id, op_data->fid2.generation}}; struct lustre_handle lockh; - int mode = LCK_PR; + ldlm_policy_data_t policy; + int mode = LCK_CR; + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_PLAIN, NULL, LCK_PR, &lockh); + LDLM_IBITS, &policy, LCK_CR, &lockh); + if (!rc) { + mode = LCK_CW; + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy, LCK_CW, &lockh); + } if (!rc) { - mode = LCK_PW; + mode = LCK_PR; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_PLAIN, NULL, LCK_PW, &lockh); + LDLM_IBITS, &policy, LCK_PR, &lockh); } if (rc) { memcpy(&it->d.lustre.it_lock_handle, &lockh, @@ -524,7 +539,7 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * never dropped its reference, so the refcounts are all OK */ if (!it_disposition(it, DISP_ENQ_COMPLETE)) { - rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it), + rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), op_data, &lockh, lmm, lmmsize, ldlm_completion_ast, cb_blocking, NULL, extra_lock_flags); @@ -599,11 +614,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * intent_finish has performed the iget().) */ lock = ldlm_handle2lock(&lockh); if (lock) { + ldlm_policy_data_t policy = lock->l_policy_data; LDLM_DEBUG(lock, "matching against this"); LDLM_LOCK_PUT(lock); memcpy(&old_lock, &lockh, sizeof(lockh)); if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_PLAIN, NULL, LCK_NL, &old_lock)) { + LDLM_IBITS, &policy, LCK_NL, &old_lock)) { ldlm_lock_decref_and_cancel(&lockh, it->d.lustre.it_lock_mode); memcpy(&lockh, &old_lock, sizeof(old_lock)); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index b809ca7..659c381 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -163,12 +163,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, - char *name, int namelen) + char *name, int namelen, __u64 lockpart) { struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; int flags = 0, rc; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; ENTRY; if (IS_ERR(de)) @@ -177,7 +178,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_PLAIN, NULL, lock_mode, &flags, + LDLM_IBITS, &policy, lock_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { @@ -628,8 +629,8 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, return(rc); } -static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags, - struct lustre_handle *child_lockh) +static int mds_getattr_name(int offset, struct ptlrpc_request *req, + int child_part, struct lustre_handle *child_lockh) { struct obd_device *obd = req->rq_export->exp_obd; struct mds_obd *mds = &obd->u.mds; @@ -707,13 +708,26 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags, } if (resent_req == 0) { + if (name) { rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, &parent_lockh, &dparent, - LCK_PR, name, namesize, - child_lockh, &dchild, LCK_PR, - flags); - if (rc) - GOTO(cleanup, rc); + LCK_CR, + MDS_INODELOCK_UPDATE, + name, namesize, + child_lockh, &dchild, LCK_CR, + child_part); + } else { + /* For revalidate by fid we always take UPDATE lock */ + dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, + LCK_CR, child_lockh, + NULL, 0, + MDS_INODELOCK_UPDATE); + LASSERT(dchild); + if (IS_ERR(dchild)) + rc = PTR_ERR(dchild); + } + if (rc) + GOTO(cleanup, rc); } else { struct ldlm_lock *granted_lock; struct ll_fid child_fid; @@ -760,8 +774,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags, case 2: if (resent_req == 0) { if (rc && dchild->d_inode) - ldlm_lock_decref(child_lockh, LCK_PR); - ldlm_lock_decref(&parent_lockh, LCK_PR); + ldlm_lock_decref(child_lockh, LCK_CR); + ldlm_lock_decref(&parent_lockh, LCK_CR); l_dput(dparent); } l_dput(dchild); @@ -1206,11 +1220,11 @@ int mds_handle(struct ptlrpc_request *req) * want to cancel. */ lockh.cookie = 0; - rc = mds_getattr_name(0, req, 0, &lockh); + rc = mds_getattr_name(0, req, MDS_INODELOCK_UPDATE, &lockh); /* this non-intent call (from an ioctl) is special */ req->rq_status = rc; if (rc == 0 && lockh.cookie) - ldlm_lock_decref(&lockh, LCK_PR); + ldlm_lock_decref(&lockh, LCK_CR); break; } case MDS_STATFS: @@ -1882,6 +1896,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, struct ldlm_reply *rep; struct lustre_handle lockh = { 0 }; struct ldlm_lock *new_lock = NULL; + int getattr_part = MDS_INODELOCK_UPDATE; int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), mds->mds_max_mdsize, @@ -1933,13 +1948,15 @@ static int mds_intent_policy(struct ldlm_namespace *ns, #endif RETURN(ELDLM_LOCK_ABORTED); break; - case IT_GETATTR: case IT_LOOKUP: + getattr_part = MDS_INODELOCK_LOOKUP; + case IT_GETATTR: + getattr_part |= MDS_INODELOCK_LOOKUP; case IT_READDIR: fixup_handle_for_resent_req(req, lock, &new_lock, &lockh); rep->lock_policy_res2 = mds_getattr_name(offset, req, - flags & LDLM_INHERIT_FLAGS, - &lockh); + getattr_part, &lockh); + /* FIXME: LDLM can set req->rq_status. MDS sets policy_res{1,2} with disposition and status. - replay: returns 0 & req->status is old status diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 78ce7cb..606cf1b 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -99,12 +99,14 @@ static inline void mds_inode_unset_orphan(struct inode *inode) } /* mds/mds_reint.c */ -int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2); +int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2, + ldlm_policy_data_t *p1, ldlm_policy_data_t *p2); int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, - int p2_lock_flags); + ldlm_policy_data_t *p2_policy); void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error); int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle, struct ptlrpc_request *req, int rc, __u32 op_data); @@ -114,10 +116,11 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, + __u64 parent_lockpart, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, - int child_lock_flags); + __u64 child_lockpart); int mds_lock_new_child(struct obd_device *obd, struct inode *inode, struct lustre_handle *child_lockh); int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode, diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index f4c25bc..9964a16 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -818,7 +818,7 @@ int mds_open(struct mds_update_record *rec, int offset, struct mds_export_data *med; struct lustre_handle parent_lockh; int rc = 0, cleanup_phase = 0, acc_mode, created = 0; - int parent_mode = LCK_PR; + int parent_mode = LCK_CR; void *handle = NULL; struct dentry_params dp; uid_t parent_uid = 0; @@ -878,10 +878,11 @@ int mds_open(struct mds_update_record *rec, int offset, /* Step 1: Find and lock the parent */ if (rec->ur_flags & MDS_OPEN_CREAT) - parent_mode = LCK_PW; + parent_mode = LCK_EX; dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode, &parent_lockh, rec->ur_name, - rec->ur_namelen - 1); + rec->ur_namelen - 1, + MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { rc = PTR_ERR(dparent); if (rc != -ENOENT) { diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 49cd470..b8a9b9e 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -465,8 +465,12 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) GOTO(cleanup, rc = -EROFS); } else { - de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, - &lockh, NULL, 0); + __u64 lockpart = MDS_INODELOCK_UPDATE; + if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) ) + lockpart |= MDS_INODELOCK_LOOKUP; + + de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, + &lockh, NULL, 0, lockpart); if (IS_ERR(de)) GOTO(cleanup, rc = PTR_ERR(de)); locked = 1; @@ -627,9 +631,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, l_dput(de); if (locked) { if (rc) { - ldlm_lock_decref(&lockh, LCK_PW); + ldlm_lock_decref(&lockh, LCK_EX); } else { - ptlrpc_save_lock (req, &lockh, LCK_PW); + ptlrpc_save_lock (req, &lockh, LCK_EX); } } case 0: @@ -705,8 +709,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) GOTO(cleanup, rc = -ESTALE); - dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh, - rec->ur_name, rec->ur_namelen - 1); + dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh, + rec->ur_name, rec->ur_namelen - 1, + MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { rc = PTR_ERR(dparent); if (rc != -ENOENT) @@ -897,9 +902,9 @@ cleanup: l_dput(dchild); case 1: /* locked parent dentry */ if (rc) { - ldlm_lock_decref(&lockh, LCK_PW); + ldlm_lock_decref(&lockh, LCK_EX); } else { - ptlrpc_save_lock (req, &lockh, LCK_PW); + ptlrpc_save_lock (req, &lockh, LCK_EX); } l_dput(dparent); case 0: @@ -916,7 +921,8 @@ cleanup: return 0; } -int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) +int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2, + ldlm_policy_data_t *p1, ldlm_policy_data_t *p2) { int i; @@ -933,6 +939,10 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) if (res1->name[i] < res2->name[i]) return 0; } + if (!p1 || !p2) + return 0; + if (memcmp(p1, p2, sizeof(*p1)) < 0) + return 1; return 0; } @@ -944,15 +954,16 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) * no lock is taken for that res_id. Must be at least one non-zero res_id. */ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, - int p2_lock_flags) + ldlm_policy_data_t *p2_policy) { struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id }; struct lustre_handle *handles[2] = { p1_lockh, p2_lockh }; int lock_modes[2] = { p1_lock_mode, p2_lock_mode }; - int flags[2] = { LDLM_FL_LOCAL_ONLY, LDLM_FL_LOCAL_ONLY | p2_lock_flags }; - int rc; + ldlm_policy_data_t *policies[2] = {p1_policy, p2_policy}; + int rc, flags; ENTRY; LASSERT(p1_res_id != NULL && p2_res_id != NULL); @@ -960,35 +971,38 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); - if (res_gt(p1_res_id, p2_res_id)) { + if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) { handles[1] = p1_lockh; handles[0] = p2_lockh; res_id[1] = p1_res_id; res_id[0] = p2_res_id; lock_modes[1] = p1_lock_mode; lock_modes[0] = p2_lock_mode; - flags[1] = LDLM_FL_LOCAL_ONLY; - flags[0] = p2_lock_flags | LDLM_FL_LOCAL_ONLY; + policies[1] = p1_policy; + policies[0] = p2_policy; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); + flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], - LDLM_PLAIN, NULL, lock_modes[0], &flags[0], + LDLM_IBITS, policies[0], lock_modes[0], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) RETURN(-EIO); ldlm_lock_dump_handle(D_OTHER, handles[0]); - if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) { + if (!memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) && + (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) { memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { + flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *res_id[1], LDLM_PLAIN, NULL, - lock_modes[1], &flags[1], + *res_id[1], LDLM_IBITS, policies[1], + lock_modes[1], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[1]); if (rc != ELDLM_OK) { @@ -1003,12 +1017,16 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, + ldlm_policy_data_t *p2_policy, struct ldlm_res_id *c1_res_id, struct lustre_handle *c1_lockh, int c1_lock_mode, + ldlm_policy_data_t *c1_policy, struct ldlm_res_id *c2_res_id, - struct lustre_handle *c2_lockh, int c2_lock_mode) + struct lustre_handle *c2_lockh, int c2_lock_mode, + ldlm_policy_data_t *c2_policy) { struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id, c1_res_id, c2_res_id }; @@ -1016,6 +1034,8 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, c1_lockh, c2_lockh }; int lock_modes[5] = { p1_lock_mode, p2_lock_mode, c1_lock_mode, c2_lock_mode }; + ldlm_policy_data_t *policies[5] = {p1_policy, p2_policy, + c1_policy, c2_policy}; int rc, i, j, sorted, flags; ENTRY; @@ -1029,13 +1049,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, dlm_handles[4] = dlm_handles[i]; res_id[4] = res_id[i]; lock_modes[4] = lock_modes[i]; + policies[4] = policies[i]; sorted = 0; do { - if (res_gt(res_id[j], res_id[4])) { + if (res_gt(res_id[j], res_id[4], policies[j], + policies[4])) { dlm_handles[j + 1] = dlm_handles[j]; res_id[j + 1] = res_id[j]; lock_modes[j + 1] = lock_modes[j]; + policies[j + 1] = policies[j]; j--; } else { sorted = 1; @@ -1045,6 +1068,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, dlm_handles[j + 1] = dlm_handles[4]; res_id[j + 1] = res_id[4]; lock_modes[j + 1] = lock_modes[4]; + policies[j + 1] = policies[4]; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", @@ -1057,13 +1081,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, if (res_id[i]->name[0] == 0) break; if (i != 0 && - memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) { + !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) && + (policies[i]->l_inodebits.bits & + policies[i-1]->l_inodebits.bits)) { memcpy(dlm_handles[i], dlm_handles[i-1], sizeof(*(dlm_handles[i]))); ldlm_lock_addref(dlm_handles[i], lock_modes[i]); } else { rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *res_id[i], LDLM_PLAIN, NULL, + *res_id[i], LDLM_IBITS, + policies[i], lock_modes[i], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, @@ -1101,6 +1128,7 @@ static int mds_verify_child(struct obd_device *obd, struct ldlm_res_id *child_res_id, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, + ldlm_policy_data_t *child_policy, const char *name, int namelen, struct ldlm_res_id *maxres) { @@ -1140,8 +1168,8 @@ static int mds_verify_child(struct obd_device *obd, child_res_id->name[0] = dchild->d_inode->i_ino; child_res_id->name[1] = dchild->d_inode->i_generation; - if (res_gt(parent_res_id, child_res_id) || - res_gt(maxres, child_res_id)) { + if (res_gt(parent_res_id, child_res_id, NULL, NULL) || + res_gt(maxres, child_res_id, NULL, NULL)) { CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n", child_res_id->name[0], parent_res_id->name[0], maxres->name[0]); @@ -1149,7 +1177,7 @@ static int mds_verify_child(struct obd_device *obd, } rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *child_res_id, LDLM_PLAIN, NULL, + *child_res_id, LDLM_IBITS, child_policy, child_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, child_lockh); @@ -1177,13 +1205,16 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, + __u64 parent_lockpart, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, - int child_lock_flags) + __u64 child_lockpart) { struct ldlm_res_id child_res_id = { .name = {0} }; struct ldlm_res_id parent_res_id = { .name = {0} }; + ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }}; + ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }}; struct inode *inode; int rc = 0, cleanup_phase = 0; ENTRY; @@ -1235,8 +1266,9 @@ retry_locks: /* Step 3: Lock parent and child in resource order. If child doesn't * exist, we still have to lock the parent and re-lookup. */ rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode, + &parent_policy, &child_res_id, child_lockh, child_mode, - child_lock_flags); + &child_policy); if (rc) GOTO(cleanup, rc); @@ -1248,7 +1280,7 @@ retry_locks: /* Step 4: Re-lookup child to verify it hasn't changed since locking */ rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp, parent_mode, &child_res_id, child_lockh, dchildp, - child_mode, name, namelen, &parent_res_id); + child_mode,&child_policy, name, namelen, &parent_res_id); if (rc > 0) goto retry_locks; if (rc < 0) { @@ -1387,9 +1419,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, GOTO(cleanup, rc = -ENOENT); rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, - &parent_lockh, &dparent, LCK_PW, + &parent_lockh, &dparent, LCK_EX, + MDS_INODELOCK_UPDATE, rec->ur_name, rec->ur_namelen, - &child_lockh, &dchild, LCK_EX, 0); + &child_lockh, &dchild, LCK_EX, + MDS_INODELOCK_FULL); if (rc) GOTO(cleanup, rc); @@ -1566,9 +1600,9 @@ cleanup: ldlm_lock_decref(&child_lockh, LCK_EX); case 1: /* child and parent dentry, parent lock */ if (rc) - ldlm_lock_decref(&parent_lockh, LCK_PW); + ldlm_lock_decref(&parent_lockh, LCK_EX); else - ptlrpc_save_lock(req, &parent_lockh, LCK_PW); + ptlrpc_save_lock(req, &parent_lockh, LCK_EX); l_dput(dchild); l_dput(dchild); l_dput(dparent); @@ -1597,6 +1631,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh; struct ldlm_res_id src_res_id = { .name = {0} }; struct ldlm_res_id tgt_dir_res_id = { .name = {0} }; + ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}}; + ldlm_policy_data_t tgt_dir_policy = + {.l_inodebits = {MDS_INODELOCK_UPDATE}}; int rc = 0, cleanup_phase = 0; ENTRY; @@ -1638,7 +1675,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation; rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX, - &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, 0); + &src_policy, + &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, + &tgt_dir_policy); if (rc) GOTO(cleanup, rc); @@ -1754,6 +1793,12 @@ static int mds_get_parents_children_locked(struct obd_device *obd, struct ldlm_res_id p2_res_id = { .name = {0} }; struct ldlm_res_id c1_res_id = { .name = {0} }; struct ldlm_res_id c2_res_id = { .name = {0} }; + ldlm_policy_data_t p_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}}; + /* Only dentry should disappear, but the inode itself would be + intact otherwise. */ + ldlm_policy_data_t c1_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP}}; + /* If something is going to be replaced, both dentry and inode locks are needed */ + ldlm_policy_data_t c2_policy = {.l_inodebits = {MDS_INODELOCK_FULL}}; struct ldlm_res_id *maxres_src, *maxres_tgt; struct inode *inode; int rc = 0, cleanup_phase = 0; @@ -1836,15 +1881,19 @@ retry_locks: maxres_tgt = &p2_res_id; cleanup_phase = 4; /* target dentry */ - if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id)) + if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id,NULL,NULL)) maxres_src = &c1_res_id; - if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id)) + if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id,NULL,NULL)) maxres_tgt = &c2_res_id; rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode, + &p_policy, &p2_res_id, &dlm_handles[1], parent_mode, + &p_policy, &c1_res_id, &dlm_handles[2], child_mode, - &c2_res_id, &dlm_handles[3], child_mode); + &c1_policy, + &c2_res_id, &dlm_handles[3], child_mode, + &c2_policy); if (rc) GOTO(cleanup, rc); @@ -1853,7 +1902,8 @@ retry_locks: /* Step 6a: Re-lookup source child to verify it hasn't changed */ rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp, parent_mode, &c1_res_id, &dlm_handles[2], de_oldp, - child_mode, old_name, old_len, maxres_tgt); + child_mode, &c1_policy, old_name, old_len, + maxres_tgt); if (rc) { if (c2_res_id.name[0] != 0) ldlm_lock_decref(&dlm_handles[3], child_mode); @@ -1870,7 +1920,8 @@ retry_locks: /* Step 6b: Re-lookup target child to verify it hasn't changed */ rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp, parent_mode, &c2_res_id, &dlm_handles[3], de_newp, - child_mode, new_name, new_len, maxres_src); + child_mode, &c2_policy, new_name, new_len, + maxres_src); if (rc) { ldlm_lock_decref(&dlm_handles[2], child_mode); ldlm_lock_decref(&dlm_handles[0], parent_mode); @@ -1933,7 +1984,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir, - rec->ur_fid2, &de_tgtdir, LCK_PW, + rec->ur_fid2, &de_tgtdir, LCK_EX, rec->ur_name, rec->ur_namelen, &de_old, rec->ur_tgt, rec->ur_tgtlen, &de_new, @@ -2058,14 +2109,14 @@ cleanup: if (lock_count == 4) ldlm_lock_decref(&(dlm_handles[3]), LCK_EX); ldlm_lock_decref(&(dlm_handles[2]), LCK_EX); - ldlm_lock_decref(&(dlm_handles[1]), LCK_PW); - ldlm_lock_decref(&(dlm_handles[0]), LCK_PW); + ldlm_lock_decref(&(dlm_handles[1]), LCK_EX); + ldlm_lock_decref(&(dlm_handles[0]), LCK_EX); } else { if (lock_count == 4) ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX); ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX); - ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW); - ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW); + ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_EX); + ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_EX); } l_dput(de_new); l_dput(de_old); diff --git a/lustre/mds/mds_xattr.c b/lustre/mds/mds_xattr.c index a96d397..69a7af5 100644 --- a/lustre/mds/mds_xattr.c +++ b/lustre/mds/mds_xattr.c @@ -208,6 +208,7 @@ int mds_setxattr_internal(struct ptlrpc_request *req, struct mds_body *body) char *xattr = NULL; int xattrlen; int rc = -EOPNOTSUPP, err = 0; + __u64 lockpart; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); @@ -218,8 +219,14 @@ int mds_setxattr_internal(struct ptlrpc_request *req, struct mds_body *body) MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); - de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PW, + lockpart = MDS_INODELOCK_UPDATE; + +/* + de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, &lockh, NULL, 0); +*/ + de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, + &lockh, NULL, 0, lockpart); if (IS_ERR(de)) GOTO(out, rc = PTR_ERR(de)); @@ -287,9 +294,9 @@ out_trans: out_dput: l_dput(de); if (rc) - ldlm_lock_decref(&lockh, LCK_PW); + ldlm_lock_decref(&lockh, LCK_EX); else - ptlrpc_save_lock (req, &lockh, LCK_PW); + ptlrpc_save_lock (req, &lockh, LCK_EX); if (err && !rc) rc = err; diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 002e5ef..ac1a021 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -8,7 +8,7 @@ ldlm_objs := $(LDLM)l_lock.o $(LDLM)ldlm_lock.o ldlm_objs += $(LDLM)ldlm_resource.o $(LDLM)ldlm_lib.o ldlm_objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o ldlm_objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o -ldlm_objs += $(LDLM)ldlm_flock.o +ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index c77dcfb..ab76f7a 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -12,6 +12,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \ $(top_srcdir)/lustre/ldlm/ldlm_request.c \ $(top_srcdir)/lustre/ldlm/ldlm_lockd.c \ $(top_srcdir)/lustre/ldlm/ldlm_internal.h \ + $(top_srcdir)/lustre/ldlm/ldlm_inodebits.c \ $(top_srcdir)/lustre/ldlm/ldlm_flock.c COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ -- 1.8.3.1