From d79fc607ddc94e1b5bfcee6c0ebe4ad6045f2823 Mon Sep 17 00:00:00 2001 From: nic Date: Mon, 12 Apr 2004 21:45:14 +0000 Subject: [PATCH] land b_inodebits --- lustre/include/linux/lustre_dlm.h | 28 ++++- lustre/include/linux/lustre_idl.h | 10 ++ lustre/include/linux/lustre_mds.h | 2 +- lustre/kernel_patches/patches/bproc-patch-2.4.20 | 6 +- lustre/ldlm/Makefile.am | 2 +- lustre/ldlm/Makefile.mk | 3 +- lustre/ldlm/ldlm_inodebits.c | 135 +++++++++++++++++++++ lustre/ldlm/ldlm_internal.h | 4 + lustre/ldlm/ldlm_lock.c | 13 ++ lustre/liblustre/dir.c | 5 +- lustre/liblustre/namei.c | 7 +- lustre/liblustre/super.c | 13 +- lustre/llite/dcache.c | 23 ++++ lustre/llite/dir.c | 5 +- lustre/llite/file.c | 15 +-- lustre/llite/namei.c | 28 ++++- lustre/mdc/mdc_locks.c | 28 ++++- lustre/mds/handler.c | 24 ++-- lustre/mds/mds_internal.h | 9 +- lustre/mds/mds_open.c | 9 +- lustre/mds/mds_reint.c | 145 ++++++++++++++++------- lustre/ptlrpc/Makefile.in | 2 +- lustre/ptlrpc/Makefile.mk | 3 +- lustre/ptlrpc/autoMakefile.am | 2 +- lustre/tests/replay-single.sh | 1 + 25 files changed, 426 insertions(+), 96 deletions(-) create mode 100644 lustre/ldlm/ldlm_inodebits.c diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index cbdf3c2..3bd524d 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -255,9 +255,10 @@ struct ldlm_lock { #define LDLM_PLAIN 10 #define LDLM_EXTENT 11 #define LDLM_FLOCK 12 +#define LDLM_IBITS 13 #define LDLM_MIN_TYPE 10 -#define LDLM_MAX_TYPE 12 +#define LDLM_MAX_TYPE 13 struct ldlm_resource { struct ldlm_namespace *lr_namespace; @@ -364,11 +365,32 @@ do { \ atomic_read(&lock->l_export->exp_refcount) : -99); \ break; \ } \ + if (lock->l_resource->lr_type == LDLM_IBITS) { \ + CDEBUG(level, "### " format \ + " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ + "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " \ + "flags: %x remote: "LPX64" expref: %d\n" , ## a, \ + lock->l_resource->lr_namespace->ns_name, \ + lock, lock->l_handle.h_cookie, \ + atomic_read (&lock->l_refc), \ + lock->l_readers, lock->l_writers, \ + ldlm_lockname[lock->l_granted_mode], \ + ldlm_lockname[lock->l_req_mode], \ + lock->l_resource->lr_name.name[0], \ + lock->l_resource->lr_name.name[1], \ + lock->l_policy_data.l_inodebits.bits, \ + atomic_read(&lock->l_resource->lr_refcount), \ + ldlm_typename[lock->l_resource->lr_type], \ + lock->l_flags, lock->l_remote_handle.cookie, \ + lock->l_export ? \ + atomic_read(&lock->l_export->exp_refcount) : -99); \ + break; \ + } \ { \ CDEBUG(level, "### " format \ " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " \ - "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " \ - "remote: "LPX64" expref: %d\n" , ## a, \ + "res: "LPU64"/"LPU64" rrc: %d type: %s " \ + "flags: %x remote: "LPX64" expref: %d\n" , ## a, \ lock->l_resource->lr_namespace->ns_name, \ lock, lock->l_handle.h_cookie, \ atomic_read (&lock->l_refc), \ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 0d98e42..202198c 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -511,6 +511,12 @@ typedef enum { #define DISP_OPEN_OPEN (1 << 5) #define DISP_ENQ_COMPLETE (1<<6) +/* INODE LOCK PARTS */ +#define MDS_INODELOCK_LOOKUP 0x000001 /* dentry, mode, owner, group */ +#define MDS_INODELOCK_UPDATE 0x000002 /* size, links, timestamps */ +//#define MDS_INODELOCK_MAXSHIFT 1 +//#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) + struct ll_fid { __u64 id; __u32 generation; @@ -733,6 +739,9 @@ struct ldlm_extent { __u64 end; __u64 gid; }; +struct ldlm_inodebits { + __u64 bits; +}; struct ldlm_flock { __u64 start; @@ -751,6 +760,7 @@ struct ldlm_flock { typedef union { struct ldlm_extent l_extent; struct ldlm_flock l_flock; + struct ldlm_inodebits l_inodebits; } ldlm_policy_data_t; extern void lustre_swab_ldlm_policy_data (ldlm_policy_data_t *d); diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index a1f333a..c602bb6 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -179,7 +179,7 @@ int mds_reint_rec(struct mds_update_record *r, int offset, struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, - char *name, int namelen); + char *name, int namelen, __u64 lockpart); struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); int mds_update_server_data(struct obd_device *, int force_sync); diff --git a/lustre/kernel_patches/patches/bproc-patch-2.4.20 b/lustre/kernel_patches/patches/bproc-patch-2.4.20 index 3587bd8..f081eb6 100644 --- a/lustre/kernel_patches/patches/bproc-patch-2.4.20 +++ b/lustre/kernel_patches/patches/bproc-patch-2.4.20 @@ -1,4 +1,4 @@ -$Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $ +$Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $ Index: linux/fs/exec.c =================================================================== @@ -764,7 +764,7 @@ Index: linux/kernel/bproc_hook.c + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * -+ * $Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $ ++ * $Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $ + *-----------------------------------------------------------------------*/ +#include +#include @@ -832,7 +832,7 @@ Index: linux/include/linux/bproc.h + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * -+ * $Id: bproc-patch-2.4.20,v 1.7 2004/03/19 06:33:08 phil Exp $ ++ * $Id: bproc-patch-2.4.20,v 1.8 2004/04/12 21:44:45 nic Exp $ + *-----------------------------------------------------------------------*/ +#ifndef _LINUX_BPROC_H +#define _LINUX_BPROC_H diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index 779639a..7643aef 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -10,4 +10,4 @@ MOSTLYCLEANFILES = *.o *.ko *.mod.c DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \ ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c \ - ldlm_resource.c ldlm_test.c l_lock.c + ldlm_resource.c ldlm_test.c l_lock.c ldlm_inodebits.c diff --git a/lustre/ldlm/Makefile.mk b/lustre/ldlm/Makefile.mk index 650331e..b20c77a 100644 --- a/lustre/ldlm/Makefile.mk +++ b/lustre/ldlm/Makefile.mk @@ -7,4 +7,5 @@ include $(src)/../portals/Kernelenv obj-y += ldlm.o ldlm-objs := l_lock.o ldlm_lock.o ldlm_resource.o ldlm_extent.o ldlm_request.o \ - ldlm_lockd.o ldlm_lib.o ldlm_flock.o ldlm_plain.o + ldlm_lockd.o ldlm_lib.o ldlm_flock.o ldlm_plain.o \ + ldlm_inodebits.o diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c new file mode 100644 index 0000000..e3511dd --- /dev/null +++ b/lustre/ldlm/ldlm_inodebits.c @@ -0,0 +1,135 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2002, 2003, 2004 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Phil Schwan + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_LDLM +#ifndef __KERNEL__ +# include +#endif + +#include +#include +#include + +#include "ldlm_internal.h" + +/* Determine if the lock is compatible with all locks on the queue. */ +static int +ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, + int send_cbs) +{ + struct list_head *tmp; + struct ldlm_lock *lock; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_bits = req->l_policy_data.l_inodebits.bits; + int compat = 1; + ENTRY; + + LASSERT(req_bits); /* There is no sence in lock with no bits set, + I think. Also such a lock would be compatible + with any other bit lock */ + list_for_each(tmp, queue) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (req == lock) + RETURN(compat); + + /* locks are compatible, bits don't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) + continue; + + /* if bits don't overlap skip it */ + if (!(lock->l_policy_data.l_inodebits.bits & req_bits)) + continue; + + if (!send_cbs) + RETURN(0); + + compat = 0; + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, NULL, 0); + } + + RETURN(compat); +} + +/* If first_enq is 0 (ie, called from ldlm_reprocess_queue): + * - blocking ASTs have already been sent + * - the caller has already initialized req->lr_tmp + * - must call this function with the ns lock held + * + * If first_enq is 1 (ie, called from ldlm_lock_enqueue): + * - blocking ASTs have not been sent + * - the caller has NOT initialized req->lr_tmp, so we must + * - must call this function with the ns lock held once */ +int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, + int first_enq, ldlm_error_t *err) +{ + struct ldlm_resource *res = lock->l_resource; + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + int rc; + ENTRY; + + LASSERT(list_empty(&res->lr_converting)); + + if (!first_enq) { + LASSERT(res->lr_tmp != NULL); + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0); + if (!rc) + RETURN(LDLM_ITER_STOP); + rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0); + if (!rc) + RETURN(LDLM_ITER_STOP); + + ldlm_resource_unlink_lock(lock); + ldlm_grant_lock(lock, NULL, 0, 1); + RETURN(LDLM_ITER_CONTINUE); + } + + restart: + LASSERT(res->lr_tmp == NULL); + res->lr_tmp = &rpc_list; + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1); + rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1); + res->lr_tmp = NULL; + + if (rc != 2) { + /* If either of the compat_queue()s returned 0, then we + * have ASTs to send and must go onto the waiting list. + * + * bug 2322: we used to unlink and re-add here, which was a + * terrible folly -- if we goto restart, we could get + * re-ordered! Causes deadlock, because ASTs aren't sent! */ + if (list_empty(&lock->l_res_link)) + ldlm_resource_add_lock(res, &res->lr_waiting, lock); + l_unlock(&res->lr_namespace->ns_lock); + rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); + l_lock(&res->lr_namespace->ns_lock); + if (rc == -ERESTART) + GOTO(restart, -ERESTART); + *flags |= LDLM_FL_BLOCK_GRANTED; + } else { + ldlm_resource_unlink_lock(lock); + ldlm_grant_lock(lock, NULL, 0, 0); + } + RETURN(0); +} diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 742bcba..3a79a52 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -69,6 +69,10 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_error_t *err); +/* ldlm_inodebits.c */ +int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, + int first_enq, ldlm_error_t *err); + /* l_lock.c */ void l_check_no_ns_lock(struct ldlm_namespace *ns); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 756dd1c..e10ac64 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -52,6 +52,7 @@ char *ldlm_typename[] = { [LDLM_PLAIN] "PLN", [LDLM_EXTENT] "EXT", [LDLM_FLOCK] "FLK", + [LDLM_IBITS] "IBT", }; char *ldlm_it2str(int it) @@ -88,6 +89,7 @@ static ldlm_processing_policy ldlm_processing_policy_table[] = { #ifdef __KERNEL__ [LDLM_FLOCK] ldlm_process_flock_lock, #endif + [LDLM_IBITS] ldlm_process_inodebits_lock, }; ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res) @@ -597,6 +599,14 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_policy_data.l_extent.gid != policy->l_extent.gid) continue; + /* We match if we have existing lock with same or wider set + of bits. */ + if (lock->l_resource->lr_type == LDLM_IBITS && + ((lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits) != + policy->l_inodebits.bits)) + continue; + if (lock->l_destroyed) continue; @@ -1191,6 +1201,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) lock->l_policy_data.l_flock.pid, lock->l_policy_data.l_flock.start, lock->l_policy_data.l_flock.end); + else if (lock->l_resource->lr_type == LDLM_IBITS) + CDEBUG(level, " Bits: "LPX64"\n", + lock->l_policy_data.l_inodebits.bits); } void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh) diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index c236d73..a2966a2 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -62,6 +62,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp); struct ldlm_res_id res_id = { .name = {lli->lli_st_ino, (__u64)lli->lli_st_generation} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; ENTRY; if ((lli->lli_st_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index) { @@ -75,11 +76,11 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) } rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); + &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh); if (!rc) { llu_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR, + rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, &it, LCK_PR, &data, &lockh, NULL, 0, ldlm_completion_ast, llu_mdc_blocking_ast, inode); diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index af4a0dc..3339595 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -140,6 +140,7 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock, case LDLM_CB_CANCELING: { struct inode *inode = llu_inode_from_lock(lock); struct llu_inode_info *lli; + __u64 bits = lock->l_policy_data.l_inodebits.bits; /* Invalidate all dentries associated with this inode */ if (inode == NULL) @@ -147,14 +148,16 @@ int llu_mdc_blocking_ast(struct ldlm_lock *lock, lli = llu_i2info(inode); - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); + if (bits & MDS_INODELOCK_UPDATE) + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); if (lock->l_resource->lr_name.name[0] != lli->lli_st_ino || lock->l_resource->lr_name.name[1] != lli->lli_st_generation) { LDLM_ERROR(lock, "data mismatch with ino %lu/%lu", lli->lli_st_ino, lli->lli_st_generation); } - if (S_ISDIR(lli->lli_st_mode)) { + if (S_ISDIR(lli->lli_st_mode) && + (bits & MDS_INODELOCK_UPDATE)) { CDEBUG(D_INODE, "invalidating inode %lu\n", lli->lli_st_ino); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 6971d87..6608922 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -337,13 +337,14 @@ static struct inode* llu_new_inode(struct filesys *fs, return inode; } -static int llu_have_md_lock(struct inode *inode) +static int llu_have_md_lock(struct inode *inode, __u64 lockpart) { struct llu_sb_info *sbi = llu_i2sbi(inode); struct llu_inode_info *lli = llu_i2info(inode); struct lustre_handle lockh; struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obddev; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; int flags; ENTRY; @@ -357,14 +358,14 @@ static int llu_have_md_lock(struct inode *inode) /* FIXME use LDLM_FL_TEST_LOCK instead */ flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PR, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PR, &lockh)) { ldlm_lock_decref(&lockh, LCK_PR); RETURN(1); } - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PW, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PW, &lockh)) { ldlm_lock_decref(&lockh, LCK_PW); RETURN(1); } @@ -382,7 +383,7 @@ static int llu_inode_revalidate(struct inode *inode) RETURN(0); } - if (!llu_have_md_lock(inode)) { + if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) { struct lustre_md md; struct ptlrpc_request *req = NULL; struct llu_sb_info *sbi = llu_i2sbi(inode); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 01a9c8a..a719ca1 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -228,6 +228,29 @@ int ll_revalidate_it(struct dentry *de, int flags, struct lookup_intent *it) ll_i2uctxt(&ctxt, de->d_parent->d_inode, de->d_inode); + if (it->it_op == IT_GETATTR) { /* We need to check for LOOKUP lock + as well */ + rc = mdc_intent_lock(exp, &ctxt, &pfid, de->d_name.name, + de->d_name.len, NULL, 0, &cfid, &lookup_it, + flags, &req, ll_mdc_blocking_ast); + /* If there was no lookup lock, no point in even checking for + UPDATE lock */ + if (!rc) { + it = &lookup_it; + GOTO(out, rc); + } + if (it_disposition(&lookup_it, DISP_LOOKUP_NEG)) { + ll_intent_release(&lookup_it); + it = &lookup_it; + GOTO(out, rc = 0); + } + + if (req) + ptlrpc_req_finished(req); + req = NULL; + ll_lookup_finish_locks(&lookup_it, de); + } + rc = mdc_intent_lock(exp, &ctxt, &pfid, de->d_name.name, de->d_name.len, NULL, 0, &cfid, it, flags, &req, ll_mdc_blocking_ast); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index f92652f..bca0445 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -212,10 +212,11 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp); struct address_space *mapping = dir->i_mapping; struct page *page; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; int rc; rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh); + &res_id, LDLM_IBITS, &policy, LCK_PR, &lockh); if (!rc) { struct lookup_intent it = { .it_op = IT_READDIR }; struct ptlrpc_request *request; @@ -223,7 +224,7 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0); - rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it, + rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_IBITS, &it, LCK_PR, &data, &lockh, NULL, 0, ldlm_completion_ast, ll_mdc_blocking_ast, dir); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index a6d4f74..3ce798b 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -131,7 +131,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, ll_prepare_mdc_op_data(&data, parent->d_inode, NULL, name, len, O_RDWR); - rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data, + rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_IBITS, itp, LCK_PW, &data, &lockh, lmm, lmmsize, ldlm_completion_ast, ll_mdc_blocking_ast, parent->d_inode); if (rc < 0) @@ -1297,13 +1297,14 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) RETURN(rc); } -static int ll_have_md_lock(struct dentry *de) +static int ll_have_md_lock(struct dentry *de, __u64 lockpart) { struct ll_sb_info *sbi = ll_s2sbi(de->d_sb); struct lustre_handle lockh; struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obddev; int flags; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; ENTRY; if (!de->d_inode) @@ -1317,14 +1318,14 @@ static int ll_have_md_lock(struct dentry *de) /* FIXME use LDLM_FL_TEST_LOCK instead */ flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PR, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PR, &lockh)) { ldlm_lock_decref(&lockh, LCK_PR); RETURN(1); } - if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN, - NULL, LCK_PW, &lockh)) { + if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS, + &policy, LCK_PW, &lockh)) { ldlm_lock_decref(&lockh, LCK_PW); RETURN(1); } @@ -1350,7 +1351,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE); #endif - if (!ll_have_md_lock(dentry)) { + if (!ll_have_md_lock(dentry, MDS_INODELOCK_UPDATE)) { struct ptlrpc_request *req = NULL; struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode); struct ll_fid fid; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 23b193c..3508777 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -154,20 +154,33 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, break; case LDLM_CB_CANCELING: { struct inode *inode = ll_inode_from_lock(lock); + __u64 bits = lock->l_policy_data.l_inodebits.bits; - /* Invalidate all dentries associated with this inode */ + /* For lookup locks: Invalidate all dentries associated with + this inode, for UPDATE locks - invalidate directory pages */ if (inode == NULL) break; - clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, - &(ll_i2info(inode)->lli_flags)); + if (bits & MDS_INODELOCK_UPDATE) + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, + &(ll_i2info(inode)->lli_flags)); + if (lock->l_resource->lr_name.name[0] != inode->i_ino || lock->l_resource->lr_name.name[1] != inode->i_generation) { LDLM_ERROR(lock, "data mismatch with ino %lu/%u", inode->i_ino, inode->i_generation); } - if (S_ISDIR(inode->i_mode)) { + + /* If lookup lock is cancelled, we just drop the dentry and + this will cause us to reget data from MDS when we'd want to + access this dentry/inode again. If this is lock on + other parts of inode that is cancelled, we do not need to do + much (but need to discard data from readdir, if any), since + abscence of lock will cause ll_revalidate_it (called from + stat() and similar functions) to renew the data anyway */ + if (S_ISDIR(inode->i_mode) && + (bits & MDS_INODELOCK_UPDATE)) { CDEBUG(D_INODE, "invalidating inode %lu\n", inode->i_ino); @@ -175,7 +188,8 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } if (inode->i_sb->s_root && - inode != inode->i_sb->s_root->d_inode) + inode != inode->i_sb->s_root->d_inode && + (bits & MDS_INODELOCK_LOOKUP)) ll_unhash_aliases(inode); iput(inode); break; @@ -194,6 +208,7 @@ int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode, { .name = {inode->i_ino, inode->i_generation} }; struct obd_device *obddev = class_conn2obd(conn); ENTRY; + RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags, opaque)); } @@ -313,8 +328,9 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset, dentry->d_op = &ll_d_ops; ll_set_dd(dentry); - if (dentry == saved) + if (dentry == saved) { d_add(dentry, inode); + } RETURN(0); } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 7b1aa8b..8f7b3b6 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -166,6 +166,7 @@ int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, ldlm_change_cbdata(class_exp2obd(exp)->obd_namespace, &res_id, it, data); + EXIT; return 0; } @@ -190,6 +191,7 @@ int mdc_enqueue(struct obd_export *exp, struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = { .name = {data->fid1.id, data->fid1.generation} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; int size[6] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)}; int rc, flags = LDLM_FL_HAS_INTENT; int repsize[4] = {sizeof(struct ldlm_reply), @@ -255,6 +257,9 @@ int mdc_enqueue(struct obd_export *exp, size[2] = sizeof(struct mds_body); size[3] = data->namelen + 1; + if (it->it_op & IT_GETATTR) + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4, size, NULL); if (!req) @@ -270,6 +275,7 @@ int mdc_enqueue(struct obd_export *exp, reply_buffers = 3; req->rq_replen = lustre_msg_size(3, repsize); } else if (it->it_op == IT_READDIR) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1, size, NULL); if (!req) @@ -282,10 +288,9 @@ int mdc_enqueue(struct obd_export *exp, LBUG(); RETURN(-EINVAL); } - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id, - lock_type, NULL, lock_mode, &flags, cb_blocking, + lock_type, &policy, lock_mode, &flags,cb_blocking, cb_completion, NULL, cb_data, NULL, 0, NULL, lockh); mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); @@ -430,16 +435,26 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, struct ldlm_res_id res_id ={.name = {cfid->id, cfid->generation}}; struct lustre_handle lockh; + ldlm_policy_data_t policy; int mode = LCK_PR; + /* For the GETATTR case, ll_revalidate_it issues two separate + queries - for LOOKUP and for UPDATE lock because if cannot + check them together - we might have those two bits to be + present in two separate granted locks */ + policy.l_inodebits.bits = + (it->it_op == IT_GETATTR)?MDS_INODELOCK_UPDATE: + MDS_INODELOCK_LOOKUP; + mode = LCK_PR; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_PLAIN, NULL, LCK_PR, &lockh); + LDLM_IBITS, &policy, LCK_PR, &lockh); if (!rc) { mode = LCK_PW; rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, - LDLM_PLAIN, NULL, LCK_PW, &lockh); + LDLM_IBITS, &policy, LCK_PW, + &lockh); } if (rc) { memcpy(&it->d.lustre.it_lock_handle, &lockh, @@ -461,7 +476,7 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, struct mdc_op_data op_data; mdc_fid2mdc_op_data(&op_data, uctxt, pfid, cfid, name, len, 0); - rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it), + rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it), &op_data, &lockh, lmm, lmmsize, ldlm_completion_ast, cb_blocking, NULL); if (rc < 0) @@ -537,11 +552,12 @@ int mdc_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt, * intent_finish has performed the iget().) */ lock = ldlm_handle2lock(&lockh); if (lock) { + ldlm_policy_data_t policy = lock->l_policy_data; LDLM_DEBUG(lock, "matching against this"); LDLM_LOCK_PUT(lock); memcpy(&old_lock, &lockh, sizeof(lockh)); if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_PLAIN, NULL, LCK_NL, &old_lock)) { + LDLM_IBITS, &policy, LCK_NL, &old_lock)) { ldlm_lock_decref_and_cancel(&lockh, it->d.lustre.it_lock_mode); memcpy(&lockh, &old_lock, sizeof(old_lock)); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 3f2aff3..b34e184 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -158,12 +158,14 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, - char *name, int namelen) + char *name, int namelen, __u64 lockpart) { struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; int flags = 0, rc; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; + ENTRY; if (IS_ERR(de)) @@ -172,7 +174,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_PLAIN, NULL, lock_mode, &flags, + LDLM_IBITS, &policy, lock_mode, &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { @@ -652,7 +654,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, } static int mds_getattr_name(int offset, struct ptlrpc_request *req, - struct lustre_handle *child_lockh) + struct lustre_handle *child_lockh, int child_part) { struct obd_device *obd = req->rq_export->exp_obd; struct ldlm_reply *rep = NULL; @@ -732,8 +734,10 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, if (resent_req == 0) { rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, &parent_lockh, &dparent, - LCK_PR, name, namesize, - child_lockh, &dchild, LCK_PR); + LCK_PR, MDS_INODELOCK_UPDATE, + name, namesize, + child_lockh, &dchild, LCK_PR, + child_part); if (rc) GOTO(cleanup, rc); } else { @@ -1136,7 +1140,7 @@ int mds_handle(struct ptlrpc_request *req) * want to cancel. */ lockh.cookie = 0; - rc = mds_getattr_name(0, req, &lockh); + rc = mds_getattr_name(0, req, &lockh, MDS_INODELOCK_UPDATE); /* this non-intent call (from an ioctl) is special */ req->rq_status = rc; if (rc == 0 && lockh.cookie) @@ -1675,6 +1679,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, struct ldlm_reply *rep; struct lustre_handle lockh = { 0 }; struct ldlm_lock *new_lock; + int getattr_part = MDS_INODELOCK_UPDATE; int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), mds->mds_max_mdsize, @@ -1726,10 +1731,13 @@ static int mds_intent_policy(struct ldlm_namespace *ns, #endif RETURN(ELDLM_LOCK_ABORTED); break; - case IT_GETATTR: case IT_LOOKUP: + getattr_part = MDS_INODELOCK_LOOKUP; + case IT_GETATTR: + getattr_part |= MDS_INODELOCK_LOOKUP; case IT_READDIR: - rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh); + rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh, + getattr_part); /* FIXME: LDLM can set req->rq_status. MDS sets policy_res{1,2} with disposition and status. - replay: returns 0 & req->status is old status diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index ec3f063..b4a70b3 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -19,11 +19,12 @@ static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) } /* mds/mds_reint.c */ -int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2); int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, - struct lustre_handle *p2_lockh, int p2_lock_mode); + struct lustre_handle *p2_lockh, int p2_lock_mode, + ldlm_policy_data_t *p2_policy); void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error); int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle, struct ptlrpc_request *req, int rc, __u32 op_data); @@ -33,9 +34,11 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, + __u64 parent_lockpart, char *name, int namelen, struct lustre_handle *child_lockh, - struct dentry **dchildp, int child_mode); + struct dentry **dchildp, int child_mode, + __u64 child_lockpart); int mds_lock_new_child(struct obd_device *obd, struct inode *inode, struct lustre_handle *child_lockh); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index b474283..d62f50e 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -834,11 +834,16 @@ int mds_open(struct mds_update_record *rec, int offset, acc_mode = accmode(rec->ur_flags); /* Step 1: Find and lock the parent */ - if (rec->ur_flags & O_CREAT) + if (rec->ur_flags & O_CREAT) { + /* XXX Well, in fact we only need this lock mode change if + in addition to O_CREAT, the file does not exist. + But we do not know if it exists or not yet */ parent_mode = LCK_PW; + } dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode, &parent_lockh, rec->ur_name, - rec->ur_namelen - 1); + rec->ur_namelen - 1, + MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { rc = PTR_ERR(dparent); CERROR("parent lookup error %d\n", rc); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index b2dd794..7d18572 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -78,11 +78,10 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno, rc); } else { ///* XXX 0 normally, SENDNOW for debug */); - ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1); - rc = llog_cancel(ctxt, lsm, - mlcd->mlcd_cookielen / - sizeof(*mlcd->mlcd_cookies), - mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW); + ctxt = llog_get_context(obd,mlcd->mlcd_cookies[0].lgc_subsys+1); + rc = llog_cancel(ctxt, lsm, mlcd->mlcd_cookielen / + sizeof(*mlcd->mlcd_cookies), + mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW); if (rc) CERROR("error cancelling %d log cookies: rc %d\n", (int)(mlcd->mlcd_cookielen / @@ -394,14 +393,18 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, if (IS_ERR(de)) GOTO(cleanup, rc = PTR_ERR(de)); } else { + __u64 lockpart = MDS_INODELOCK_UPDATE; + if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) ) + lockpart |= MDS_INODELOCK_LOOKUP; de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, - &lockh, NULL, 0); + &lockh, NULL, 0, lockpart); if (IS_ERR(de)) GOTO(cleanup, rc = PTR_ERR(de)); locked = 1; } cleanup_phase = 1; + inode = de->d_inode; LASSERT(inode); if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) && @@ -563,7 +566,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, GOTO(cleanup, rc = -ESTALE); dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh, - rec->ur_name, rec->ur_namelen - 1); + rec->ur_name, rec->ur_namelen - 1, + MDS_INODELOCK_UPDATE); if (IS_ERR(dparent)) { rc = PTR_ERR(dparent); CERROR("parent lookup error %d\n", rc); @@ -751,7 +755,8 @@ cleanup: return 0; } -int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) +static int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2, + ldlm_policy_data_t *p1, ldlm_policy_data_t *p2) { int i; @@ -768,6 +773,13 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) if (res1->name[i] < res2->name[i]) return 0; } + + if (!p1 || !p2) + return 0; + + if (memcmp(p1, p2, sizeof(*p1)) < 0) + return 1; + return 0; } @@ -779,27 +791,32 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) * no lock is taken for that res_id. Must be at least one non-zero res_id. */ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, - struct lustre_handle *p2_lockh, int p2_lock_mode) + struct lustre_handle *p2_lockh, int p2_lock_mode, + ldlm_policy_data_t *p2_policy) { struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id }; struct lustre_handle *handles[2] = { p1_lockh, p2_lockh }; int lock_modes[2] = { p1_lock_mode, p2_lock_mode }; + ldlm_policy_data_t *policies[2] = { p1_policy, p2_policy }; int rc, flags; ENTRY; LASSERT(p1_res_id != NULL && p2_res_id != NULL); - CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", - res_id[0]->name[0], res_id[1]->name[0]); + CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0], + res_id[1]->name[0]); - if (res_gt(p1_res_id, p2_res_id)) { + if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) { handles[1] = p1_lockh; handles[0] = p2_lockh; res_id[1] = p1_res_id; res_id[0] = p2_res_id; lock_modes[1] = p1_lock_mode; lock_modes[0] = p2_lock_mode; + policies[1] = p1_policy; + policies[0] = p2_policy; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", @@ -807,20 +824,21 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], - LDLM_PLAIN, NULL, lock_modes[0], &flags, + LDLM_IBITS, policies[0], lock_modes[0], &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) RETURN(-EIO); ldlm_lock_dump_handle(D_OTHER, handles[0]); - if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) { + if (!memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) && + (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) { memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *res_id[1], LDLM_PLAIN, NULL, + *res_id[1], LDLM_IBITS, policies[1], lock_modes[1], &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[1]); @@ -836,12 +854,16 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, + ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, + ldlm_policy_data_t *p2_policy, struct ldlm_res_id *c1_res_id, struct lustre_handle *c1_lockh, int c1_lock_mode, + ldlm_policy_data_t *c1_policy, struct ldlm_res_id *c2_res_id, - struct lustre_handle *c2_lockh, int c2_lock_mode) + struct lustre_handle *c2_lockh, int c2_lock_mode, + ldlm_policy_data_t *c2_policy) { struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id, c1_res_id, c2_res_id }; @@ -849,10 +871,13 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, c1_lockh, c2_lockh }; int lock_modes[5] = { p1_lock_mode, p2_lock_mode, c1_lock_mode, c2_lock_mode }; + ldlm_policy_data_t *policies[5] = { p1_policy, p2_policy, + c1_policy, c2_policy}; int rc, i, j, sorted, flags; ENTRY; - CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", + CDEBUG(D_DLMTRACE, + "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0], res_id[3]->name[0]); @@ -862,13 +887,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, dlm_handles[4] = dlm_handles[i]; res_id[4] = res_id[i]; lock_modes[4] = lock_modes[i]; + policies[4] = policies[i]; sorted = 0; do { - if (res_gt(res_id[j], res_id[4])) { + if (res_gt(res_id[j], res_id[4], policies[j], + policies[4])) { dlm_handles[j + 1] = dlm_handles[j]; res_id[j + 1] = res_id[j]; lock_modes[j + 1] = lock_modes[j]; + policies[j + 1] = policies[j]; j--; } else { sorted = 1; @@ -878,9 +906,11 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, dlm_handles[j + 1] = dlm_handles[4]; res_id[j + 1] = res_id[4]; lock_modes[j + 1] = lock_modes[4]; + policies[j + 1] = policies[4]; } - CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", + CDEBUG(D_DLMTRACE, + "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0], res_id[3]->name[0]); @@ -890,13 +920,16 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, if (res_id[i]->name[0] == 0) break; if (i != 0 && - memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) { + !memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) && + (policies[i]->l_inodebits.bits & + policies[i-1]->l_inodebits.bits) ) { memcpy(dlm_handles[i], dlm_handles[i-1], sizeof(*(dlm_handles[i]))); ldlm_lock_addref(dlm_handles[i], lock_modes[i]); } else { rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *res_id[i], LDLM_PLAIN, NULL, + *res_id[i], LDLM_IBITS, + policies[i], lock_modes[i], &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, @@ -934,6 +967,7 @@ static int mds_verify_child(struct obd_device *obd, struct ldlm_res_id *child_res_id, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, + ldlm_policy_data_t *child_policy, const char *name, int namelen, struct ldlm_res_id *maxres) { @@ -973,8 +1007,8 @@ static int mds_verify_child(struct obd_device *obd, child_res_id->name[0] = dchild->d_inode->i_ino; child_res_id->name[1] = dchild->d_inode->i_generation; - if (res_gt(parent_res_id, child_res_id) || - res_gt(maxres, child_res_id)) { + if (res_gt(parent_res_id, child_res_id, NULL, NULL) || + res_gt(maxres, child_res_id, NULL, NULL)) { CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n", child_res_id->name[0], parent_res_id->name[0], maxres->name[0]); @@ -982,12 +1016,13 @@ static int mds_verify_child(struct obd_device *obd, } rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - *child_res_id, LDLM_PLAIN, NULL, + *child_res_id, LDLM_IBITS, child_policy, child_mode, &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) GOTO(cleanup, rc = -EIO); + } else { memset(child_res_id, 0, sizeof(*child_res_id)); } @@ -1010,12 +1045,16 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, + __u64 parent_lockpart, char *name, int namelen, struct lustre_handle *child_lockh, - struct dentry **dchildp, int child_mode) + struct dentry **dchildp, int child_mode, + __u64 child_lockpart) { struct ldlm_res_id child_res_id = { .name = {0} }; struct ldlm_res_id parent_res_id = { .name = {0} }; + ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }}; + ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }}; struct inode *inode; int rc = 0, cleanup_phase = 0; ENTRY; @@ -1050,6 +1089,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, child_res_id.name[0] = inode->i_ino; child_res_id.name[1] = inode->i_generation; + iput(inode); retry_locks: @@ -1057,8 +1097,10 @@ retry_locks: /* Step 3: Lock parent and child in resource order. If child doesn't * exist, we still have to lock the parent and re-lookup. */ - rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode, - &child_res_id, child_lockh, child_mode); + rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode, + &parent_policy, + &child_res_id, child_lockh, child_mode, + &child_policy); if (rc) GOTO(cleanup, rc); @@ -1069,8 +1111,9 @@ retry_locks: /* Step 4: Re-lookup child to verify it hasn't changed since locking */ rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp, - parent_mode, &child_res_id, child_lockh, dchildp, - child_mode, name, namelen, &parent_res_id); + parent_mode, &child_res_id, child_lockh, + dchildp, child_mode, &child_policy, + name, namelen, &parent_res_id); if (rc > 0) goto retry_locks; if (rc < 0) { @@ -1128,8 +1171,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, &parent_lockh, &dparent, LCK_PW, + MDS_INODELOCK_UPDATE, rec->ur_name, rec->ur_namelen, - &child_lockh, &dchild, LCK_EX); + &child_lockh, &dchild, LCK_EX, + MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE); if (rc) GOTO(cleanup, rc); @@ -1316,6 +1361,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh; struct ldlm_res_id src_res_id = { .name = {0} }; struct ldlm_res_id tgt_dir_res_id = { .name = {0} }; + ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}}; + ldlm_policy_data_t tgt_dir_policy = + {.l_inodebits = {MDS_INODELOCK_UPDATE}}; + int rc = 0, cleanup_phase = 0; ENTRY; @@ -1354,7 +1403,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation; rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX, - &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX); + &src_policy, + &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, + &tgt_dir_policy); if (rc) GOTO(cleanup, rc); @@ -1510,6 +1561,14 @@ static int mds_get_parents_children_locked(struct obd_device *obd, struct ldlm_res_id p2_res_id = { .name = {0} }; struct ldlm_res_id c1_res_id = { .name = {0} }; struct ldlm_res_id c2_res_id = { .name = {0} }; + ldlm_policy_data_t p_policy = {.l_inodebits = {MDS_INODELOCK_UPDATE}}; + /* Only dentry should change, but the inode itself would be + intact otherwise */ + ldlm_policy_data_t c1_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP}}; + /* If something is going to be replaced, both dentry and inode locks are + needed */ + ldlm_policy_data_t c2_policy = {.l_inodebits = {MDS_INODELOCK_LOOKUP| + MDS_INODELOCK_UPDATE}}; struct ldlm_res_id *maxres_src, *maxres_tgt; struct inode *inode; int rc = 0, cleanup_phase = 0; @@ -1579,6 +1638,7 @@ static int mds_get_parents_children_locked(struct obd_device *obd, c2_res_id.name[0] = inode->i_ino; c2_res_id.name[1] = inode->i_generation; + iput(inode); retry_locks: @@ -1587,15 +1647,19 @@ retry_locks: maxres_tgt = &p2_res_id; cleanup_phase = 4; /* target dentry */ - if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id)) + if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id, NULL,NULL)) maxres_src = &c1_res_id; - if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id)) + if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id, NULL,NULL)) maxres_tgt = &c2_res_id; rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode, + &p_policy, &p2_res_id, &dlm_handles[1], parent_mode, + &p_policy, &c1_res_id, &dlm_handles[2], child_mode, - &c2_res_id, &dlm_handles[3], child_mode); + &c1_policy, + &c2_res_id, &dlm_handles[3], child_mode, + &c2_policy); if (rc) GOTO(cleanup, rc); @@ -1603,8 +1667,9 @@ retry_locks: /* Step 6a: Re-lookup source child to verify it hasn't changed */ rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp, - parent_mode, &c1_res_id, &dlm_handles[2], de_oldp, - child_mode, old_name, old_len, maxres_tgt); + parent_mode, &c1_res_id, &dlm_handles[2], + de_oldp, child_mode, &c1_policy, old_name,old_len, + maxres_tgt); if (rc) { if (c2_res_id.name[0] != 0) ldlm_lock_decref(&dlm_handles[3], child_mode); @@ -1620,8 +1685,9 @@ retry_locks: /* Step 6b: Re-lookup target child to verify it hasn't changed */ rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp, - parent_mode, &c2_res_id, &dlm_handles[3], de_newp, - child_mode, new_name, new_len, maxres_src); + parent_mode, &c2_res_id, &dlm_handles[3], + de_newp, child_mode, &c2_policy, new_name, + new_len, maxres_src); if (rc) { ldlm_lock_decref(&dlm_handles[2], child_mode); ldlm_lock_decref(&dlm_handles[0], parent_mode); @@ -1770,8 +1836,7 @@ cleanup: ldlm_lock_decref(&(dlm_handles[0]), LCK_PW); } else { if (lock_count == 4) - ptlrpc_save_lock(req, - &(dlm_handles[3]), LCK_EX); + ptlrpc_save_lock(req,&(dlm_handles[3]), LCK_EX); ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX); ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW); ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW); diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 94e4c1a..3387747 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -8,7 +8,7 @@ ptlrpc-objs := $(LDLM)l_lock.o $(LDLM)ldlm_lock.o ptlrpc-objs += $(LDLM)ldlm_resource.o $(LDLM)ldlm_lib.o ptlrpc-objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o ptlrpc-objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o -ptlrpc-objs += $(LDLM)ldlm_flock.o +ptlrpc-objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o ptlrpc-objs += client.o recover.o connection.o niobuf.o pack_generic.o ptlrpc-objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o ptlrpc-objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o diff --git a/lustre/ptlrpc/Makefile.mk b/lustre/ptlrpc/Makefile.mk index 4154582..77aa1cb 100644 --- a/lustre/ptlrpc/Makefile.mk +++ b/lustre/ptlrpc/Makefile.mk @@ -13,5 +13,6 @@ ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \ llog_server.o ptlrpcd.o ../ldlm/l_lock.o ../ldlm/ldlm_lock.o \ ../ldlm/ldlm_resource.o ../ldlm/ldlm_extent.o \ ../ldlm/ldlm_request.o ../ldlm/ldlm_lockd.o \ - ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o + ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o \ + ../ldlm/ldlm_inodebits.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index eb04d16..96dcfe3 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -7,7 +7,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/ldlm/l_lock.c $(top_srcdir)/ldlm/ldlm_lock.c \ $(top_srcdir)/ldlm/ldlm_resource.c $(top_srcdir)/ldlm/ldlm_lib.c \ $(top_srcdir)/ldlm/ldlm_plain.c $(top_srcdir)/ldlm/ldlm_extent.c \ $(top_srcdir)/ldlm/ldlm_request.c $(top_srcdir)/ldlm/ldlm_lockd.c \ - $(top_srcdir)/ldlm/ldlm_internal.h + $(top_srcdir)/ldlm/ldlm_internal.h $(top_srcdir)/ldlm/ldlm_inodebits.c COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \ diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index a6fb3a2..e9b34ea 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -63,6 +63,7 @@ setup() { [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE start mds $MDSLCONFARGS --reformat zconf_mount `hostname` $MOUNT + echo 0x3f0410 > /proc/sys/portals/debug } $SETUP -- 1.8.3.1