From e5498ceb60c53d3406caa7f5cda97f686fd5879a Mon Sep 17 00:00:00 2001 From: pschwan Date: Sun, 22 Sep 2002 02:47:48 +0000 Subject: [PATCH] b=605627 Fixes two bugs related to 605627 that have been present since we started correctly reusing inodes. - We were giving up locks at file close time, but not throwing away file data pages. Fixed, by adding a flag to the blocking callback. It's now called under two different circumstances: when a lock needs to be given up (LDLM_CB_BLOCKING) and when a lock is about to be freed (LDLM_CB_DYING) - We were not refreshing inode attributes (notably size) correctly. I brute force this by always calling ll_file_size() in ll_inode_revalidate, but this needs some obvious immediate refinement. As an aside, I noticed that the DLM API documentation gives almost no mention to the arguments or calling conventions of the callback functions. --- lustre/include/linux/lustre_dlm.h | 7 ++- lustre/include/linux/lustre_lite.h | 2 +- lustre/ldlm/ldlm_lock.c | 7 ++- lustre/ldlm/ldlm_lockd.c | 10 +++- lustre/ldlm/ldlm_test.c | 97 ++++++++++++++++++++------------------ lustre/llite/file.c | 47 +++++++++++++----- lustre/mdc/mdc_request.c | 33 ++++++++----- lustre/mds/handler.c | 7 ++- lustre/mds/mds_reint.c | 2 + 9 files changed, 133 insertions(+), 79 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 7404dda..ad6a93c 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -39,6 +39,9 @@ typedef enum { #define LDLM_FL_DESTROYED (1 << 6) #define LDLM_FL_WAIT_NOREPROC (1 << 7) +#define LDLM_CB_BLOCKING 1 +#define LDLM_CB_DYING 2 + #define L2B(c) (1 << c) /* compatibility matrix */ @@ -112,7 +115,7 @@ struct ldlm_lock; typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, - __u32 data_len); + __u32 data_len, int flag); typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); @@ -370,7 +373,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id); /* mds/handler.c */ /* This has to be here because recurisve inclusion sucks. */ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len); + void *data, __u32 data_len, int flag); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 1daa659..56a05f9 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -148,7 +148,7 @@ extern struct file_operations ll_file_operations; extern struct inode_operations ll_file_inode_operations; struct ldlm_lock; int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, - __u32 data_len); + __u32 data_len, int flag); int ll_size_lock(struct inode *, struct lov_stripe_md *, __u64 start, int mode, struct lustre_handle **); int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 0b14101..ce54c32 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -152,6 +152,9 @@ void ldlm_lock_put(struct ldlm_lock *lock) LDLM_LOCK_PUT(lock->l_parent); if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) { + lock->l_blocking_ast(lock, NULL, lock->l_data, + lock->l_data_len, LDLM_CB_DYING); + spin_lock(&ns->ns_counter_lock); ns->ns_locks--; spin_unlock(&ns->ns_counter_lock); @@ -425,7 +428,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) /* FIXME: need a real 'desc' here */ lock->l_blocking_ast(lock, NULL, lock->l_data, - lock->l_data_len); + lock->l_data_len, LDLM_CB_BLOCKING); } else l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -741,7 +744,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list) if (w->w_blocking) rc = w->w_lock->l_blocking_ast (w->w_lock, &w->w_desc, w->w_data, - w->w_datalen); + w->w_datalen, LDLM_CB_BLOCKING); else rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags); if (rc) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a6ede2f..8d96b50 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -115,13 +115,18 @@ static int ldlm_del_waiting_lock(struct ldlm_lock *lock) static int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { struct ldlm_request *body; struct ptlrpc_request *req; int rc = 0, size = sizeof(*body); ENTRY; + if (flag == LDLM_CB_DYING) { + /* Don't need to do anything when the lock is freed. */ + RETURN(0); + } + req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import, LDLM_BL_CALLBACK, 1, &size, NULL); if (!req) @@ -391,7 +396,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req) "callback (%p)", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) { lock->l_blocking_ast(lock, &dlm_req->lock_desc, - lock->l_data, lock->l_data_len); + lock->l_data, lock->l_data_len, + LDLM_CB_BLOCKING); } } else LDLM_DEBUG(lock, "Lock still has references, will be" diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 2676301..b5f5e748 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -75,17 +75,26 @@ static int ldlm_do_convert(void); */ static int ldlm_test_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int rc; struct lustre_handle lockh; ENTRY; - LDLM_DEBUG(lock, "We're blocking. Cancelling lock"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) { - CERROR("ldlm_cli_cancel: %d\n", rc); + switch (flag) { + case LDLM_CB_BLOCKING: + LDLM_DEBUG(lock, "We're blocking. Cancelling lock"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CERROR("ldlm_cli_cancel: %d\n", rc); + LBUG(); + } + break; + case LDLM_CB_DYING: + LDLM_DEBUG(lock, "this lock is being freed"); + break; + default: LBUG(); } @@ -95,10 +104,11 @@ static int ldlm_test_blocking_ast(struct ldlm_lock *lock, /* blocking ast for basic tests. noop */ static int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { ENTRY; - CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new); + CERROR("ldlm_blocking_ast: lock=%p, new=%p, flag=%d\n", lock, new, + flag); RETURN(0); } @@ -111,13 +121,12 @@ static int ldlm_test_completion_ast(struct ldlm_lock *lock, int flags) ENTRY; if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV)) { - + LDLM_FL_BLOCK_CONV)) { LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock"); RETURN(0); - } + } - if (lock->l_granted_mode != lock->l_req_mode) + if (lock->l_granted_mode != lock->l_req_mode) CERROR("completion ast called with non-granted lock\n"); /* add to list of granted locks */ @@ -302,21 +311,21 @@ static int ldlm_do_decrement(void) struct ldlm_lock *lock; int rc = 0; ENTRY; - + spin_lock(&ctl_lock); if(list_empty(&lock_list)) { CERROR("lock_list is empty\n"); spin_unlock(&ctl_lock); - RETURN(0); - } - + RETURN(0); + } + /* delete from list */ - lock_info = list_entry(lock_list.next, + lock_info = list_entry(lock_list.next, struct ldlm_test_lock, l_link); list_del(lock_list.next); num_locks--; spin_unlock(&ctl_lock); - + /* decrement and free the info */ lock = ldlm_handle2lock(&lock_info->l_lockh); ldlm_lock_decref(&lock_info->l_lockh, lock->l_granted_mode); @@ -328,7 +337,7 @@ static int ldlm_do_decrement(void) } static int ldlm_do_enqueue(struct ldlm_test_thread *thread) -{ +{ struct lustre_handle lockh; __u64 res_id[3] = {0}; __u32 lock_mode; @@ -349,23 +358,19 @@ static int ldlm_do_enqueue(struct ldlm_test_thread *thread) get_random_bytes(&random, sizeof(random)); ext.start = random % num_extents; get_random_bytes(&random, sizeof(random)); - ext.end = random % + ext.end = random % (num_extents - (int)ext.start) + ext.start; - LDLM_DEBUG_NOLOCK("about to enqueue with resource %d, mode %d," - " extent %d -> %d", - (int)res_id[0], - lock_mode, - (int)ext.start, - (int)ext.end); - - rc = ldlm_match_or_enqueue(®ress_connh, - NULL, - thread->obddev->obd_namespace, - NULL, res_id, LDLM_EXTENT, &ext, - sizeof(ext), lock_mode, &flags, - ldlm_test_completion_ast, - ldlm_test_blocking_ast, + LDLM_DEBUG_NOLOCK("about to enqueue with resource "LPX64", mode %d," + " extent "LPX64" -> "LPX64, res_id[0], lock_mode, + ext.start, ext.end); + + rc = ldlm_match_or_enqueue(®ress_connh, NULL, + thread->obddev->obd_namespace, + NULL, res_id, LDLM_EXTENT, &ext, + sizeof(ext), lock_mode, &flags, + ldlm_test_completion_ast, + ldlm_test_blocking_ast, NULL, 0, &lockh); atomic_inc(&locks_requested); @@ -379,7 +384,7 @@ static int ldlm_do_enqueue(struct ldlm_test_thread *thread) } static int ldlm_do_convert(void) -{ +{ __u32 lock_mode; unsigned char random; int flags = 0, rc = 0; @@ -408,7 +413,7 @@ static int ldlm_do_convert(void) } /* - * Adjust reference counts. + * Adjust reference counts. * FIXME: This is technically a bit... wrong, * since we don't know when/if the convert succeeded */ @@ -454,7 +459,7 @@ static int ldlm_test_main(void *data) */ dec_chance = chance_left * num_locks / max_locks; chance_left -= dec_chance; - + /* FIXME: conversions temporarily disabled * until they are working correctly. */ @@ -483,11 +488,11 @@ static int ldlm_test_main(void *data) atomic_read(&locks_matched)); spin_lock(&ctl_lock); - LDLM_DEBUG_NOLOCK("lock references currently held: %d, ", + LDLM_DEBUG_NOLOCK("lock references currently held: %d, ", num_locks); spin_unlock(&ctl_lock); - /* + /* * We don't sleep after a lock being blocked, so let's * make sure other things can run. */ @@ -531,10 +536,10 @@ static int ldlm_start_thread(struct obd_device *obddev, RETURN(0); } -int ldlm_regression_start(struct obd_device *obddev, - struct lustre_handle *connh, - unsigned int threads, unsigned int max_locks_in, - unsigned int num_resources_in, +int ldlm_regression_start(struct obd_device *obddev, + struct lustre_handle *connh, + unsigned int threads, unsigned int max_locks_in, + unsigned int num_resources_in, unsigned int num_extents_in) { int i, rc = 0; @@ -549,7 +554,7 @@ int ldlm_regression_start(struct obd_device *obddev, regression_running = 1; spin_unlock(&ctl_lock); - regress_connh = *connh; + regress_connh = *connh; max_locks = max_locks_in; num_resources = num_resources_in; num_extents = num_extents_in; @@ -601,8 +606,8 @@ int ldlm_regression_stop(void) /* decrement all held locks */ while (!list_empty(&lock_list)) { struct ldlm_lock *lock; - struct ldlm_test_lock *lock_info = - list_entry(lock_list.next, struct ldlm_test_lock, + struct ldlm_test_lock *lock_info = + list_entry(lock_list.next, struct ldlm_test_lock, l_link); list_del(lock_list.next); num_locks--; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index b5188eb..9183410 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -339,7 +339,7 @@ static void ll_update_atime(struct inode *inode) } int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { struct inode *inode = data; struct lustre_handle lockh; @@ -351,16 +351,25 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, if (inode == NULL) LBUG(); - down(&inode->i_sem); - CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino); - /* FIXME: do something better than throwing away everything */ - invalidate_inode_pages(inode); - up(&inode->i_sem); - - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel failed: %d\n", rc); + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_DYING: + down(&inode->i_sem); + CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino); + /* FIXME: do something better than throwing away everything */ + invalidate_inode_pages(inode); + up(&inode->i_sem); + break; + default: + LBUG(); + } + RETURN(0); } @@ -584,6 +593,17 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) return 0; } +static int ll_inode_revalidate(struct dentry *dentry) +{ + struct inode *inode = dentry->d_inode; + ENTRY; + + if (!inode) + RETURN(0); + + RETURN(ll_file_size(inode, ll_i2info(inode)->lli_smd)); +} + struct file_operations ll_file_operations = { read: ll_file_read, write: ll_file_write, @@ -596,6 +616,7 @@ struct file_operations ll_file_operations = { }; struct inode_operations ll_file_inode_operations = { - truncate: ll_truncate, - setattr: ll_setattr + truncate: ll_truncate, + setattr: ll_setattr, + revalidate: ll_inode_revalidate }; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index da991bd..1ee50ab 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -152,7 +152,7 @@ int mdc_getattr(struct lustre_handle *conn, } static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int rc; struct inode *inode = data; @@ -166,20 +166,29 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(-EINVAL); } - /* FIXME: do something better than throwing away everything */ - if (inode == NULL) + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CERROR("ldlm_cli_cancel: %d\n", rc); + LBUG(); + } + break; + case LDLM_CB_DYING: + /* FIXME: do something better than throwing away everything */ + if (inode == NULL) + LBUG(); + if (S_ISDIR(inode->i_mode)) { + CDEBUG(D_INODE, "invalidating inode %ld\n", + inode->i_ino); + invalidate_inode_pages(inode); + } + break; + default: LBUG(); - if (S_ISDIR(inode->i_mode)) { - CDEBUG(D_INODE, "invalidating inode %ld\n", inode->i_ino); - invalidate_inode_pages(inode); } - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) { - CERROR("ldlm_cli_cancel: %d\n", rc); - LBUG(); - } RETURN(0); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 539e50b..0edd444 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -414,11 +414,16 @@ static int mds_getlovinfo(struct ptlrpc_request *req) } int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int do_ast; ENTRY; + if (flag == LDLM_CB_DYING) { + /* Don't need to do anything when the lock is freed. */ + RETURN(0); + } + l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 69c1af4..33462f0 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -433,6 +433,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, name = lustre_msg_buf(req->rq_reqmsg, offset + 1); namelen = req->rq_reqmsg->buflens[offset + 1] - 1; +#warning FIXME: if mds_name2locked_dentry decrefs this lock, we must not + memcpy(&child_lockh, &lockh, sizeof(child_lockh)); dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen, LCK_EX, &child_lockh, lock_mode); -- 1.8.3.1