From 217923708d5947a2145cfbee7568d32da7e362e6 Mon Sep 17 00:00:00 2001 From: pschwan Date: Tue, 24 Sep 2002 20:04:00 +0000 Subject: [PATCH] The old patch was failing because sometimes the inodes were gone by the time the callback was called with CB_DYING. I also realized that the client would believe that it had valid data for too long. Instead of an extra callback when the lock is freed, I moved that extra callback to just before we cancel the lock. Because there's two paths for that now (cancelling with server notification and cancelling without), I had to introduce _another_ l_flag, which indicates whether we've done this call. Oh well. It works much better now. --- lustre/include/linux/lustre_dlm.h | 9 +++++++-- lustre/include/linux/lustre_lite.h | 2 +- lustre/ldlm/ldlm_lock.c | 17 +++++++++++++++-- lustre/ldlm/ldlm_lockd.c | 10 ++++++++-- lustre/ldlm/ldlm_request.c | 1 + lustre/ldlm/ldlm_test.c | 26 ++++++++++++++++++-------- lustre/llite/file.c | 28 ++++++++++++++++++---------- lustre/mdc/mdc_request.c | 35 +++++++++++++++++++++-------------- lustre/mds/handler.c | 7 ++++++- 9 files changed, 95 insertions(+), 40 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 1efcdca..9f6c747 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -38,6 +38,10 @@ typedef enum { #define LDLM_FL_AST_SENT (1 << 5) #define LDLM_FL_DESTROYED (1 << 6) #define LDLM_FL_WAIT_NOREPROC (1 << 7) +#define LDLM_FL_CANCEL (1 << 8) + +#define LDLM_CB_BLOCKING 1 +#define LDLM_CB_CANCELING 2 #define L2B(c) (1 << c) @@ -112,7 +116,7 @@ struct ldlm_lock; typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, - __u32 data_len); + __u32 data_len, int flag); typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); @@ -259,6 +263,7 @@ void ldlm_unregister_intent(void); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle); void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); +void ldlm_cancel_callback(struct ldlm_lock *lock); #define LDLM_LOCK_PUT(lock) \ do { \ @@ -370,7 +375,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *, __u64 *, int local_only); /* mds/handler.c */ /* This has to be here because recurisve inclusion sucks. */ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len); + void *data, __u32 data_len, int flag); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index c853fee..23912d5 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -148,7 +148,7 @@ extern struct file_operations ll_file_operations; extern struct inode_operations ll_file_inode_operations; struct ldlm_lock; int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, - __u32 data_len); + __u32 data_len, int flag); int ll_size_lock(struct inode *, struct lov_stripe_md *, __u64 start, int mode, struct lustre_handle **); int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 0b14101..d3e2f47 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -425,7 +425,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) /* FIXME: need a real 'desc' here */ lock->l_blocking_ast(lock, NULL, lock->l_data, - lock->l_data_len); + lock->l_data_len, LDLM_CB_BLOCKING); } else l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -741,7 +741,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list) if (w->w_blocking) rc = w->w_lock->l_blocking_ast (w->w_lock, &w->w_desc, w->w_data, - w->w_datalen); + w->w_datalen, LDLM_CB_BLOCKING); else rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags); if (rc) @@ -780,6 +780,17 @@ void ldlm_reprocess_all(struct ldlm_resource *res) EXIT; } +void ldlm_cancel_callback(struct ldlm_lock *lock) +{ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + if (!(lock->l_flags & LDLM_FL_CANCEL)) { + lock->l_flags |= LDLM_FL_CANCEL; + lock->l_blocking_ast(lock, NULL, lock->l_data, + lock->l_data_len, LDLM_CB_CANCELING); + } + l_unlock(&lock->l_resource->lr_namespace->ns_lock); +} + void ldlm_lock_cancel(struct ldlm_lock *lock) { struct ldlm_resource *res; @@ -794,6 +805,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) CDEBUG(D_INFO, "lock still has references (%d readers, %d " "writers)\n", lock->l_readers, lock->l_writers); + ldlm_cancel_callback(lock); + ldlm_resource_unlink_lock(lock); ldlm_lock_destroy(lock); l_unlock(&ns->ns_lock); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a6ede2f..a370571 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -115,13 +115,18 @@ static int ldlm_del_waiting_lock(struct ldlm_lock *lock) static int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { struct ldlm_request *body; struct ptlrpc_request *req; int rc = 0, size = sizeof(*body); ENTRY; + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import, LDLM_BL_CALLBACK, 1, &size, NULL); if (!req) @@ -391,7 +396,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req) "callback (%p)", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) { lock->l_blocking_ast(lock, &dlm_req->lock_desc, - lock->l_data, lock->l_data_len); + lock->l_data, lock->l_data_len, + LDLM_CB_BLOCKING); } } else LDLM_DEBUG(lock, "Lock still has references, will be" diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index bdde5c4..ee4594a 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -363,6 +363,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) /* Set this flag to prevent others from getting new references*/ l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; + ldlm_cancel_callback(lock); l_unlock(&lock->l_resource->lr_namespace->ns_lock); req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index d445cad..d28558f 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -75,17 +75,26 @@ static int ldlm_do_convert(void); */ static int ldlm_test_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int rc; struct lustre_handle lockh; ENTRY; - LDLM_DEBUG(lock, "We're blocking. Cancelling lock"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) { - CERROR("ldlm_cli_cancel: %d\n", rc); + switch (flag) { + case LDLM_CB_BLOCKING: + LDLM_DEBUG(lock, "We're blocking. Cancelling lock"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CERROR("ldlm_cli_cancel: %d\n", rc); + LBUG(); + } + break; + case LDLM_CB_CANCELING: + LDLM_DEBUG(lock, "this lock is being cancelled"); + break; + default: LBUG(); } @@ -95,10 +104,11 @@ static int ldlm_test_blocking_ast(struct ldlm_lock *lock, /* blocking ast for basic tests. noop */ static int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { ENTRY; - CERROR("ldlm_blocking_ast: lock=%p, new=%p\n", lock, new); + CERROR("ldlm_blocking_ast: lock=%p, new=%p, flag=%d\n", lock, new, + flag); RETURN(0); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index fc19186..6548237 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -340,7 +340,7 @@ static void ll_update_atime(struct inode *inode) } int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { struct inode *inode = data; struct lustre_handle lockh; @@ -353,16 +353,24 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, if (inode == NULL) LBUG(); - down(&inode->i_sem); - CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino); - /* FIXME: do something better than throwing away everything */ - invalidate_inode_pages(inode); - up(&inode->i_sem); + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: + down(&inode->i_sem); + CDEBUG(D_INODE, "invalidating obdo/inode %ld\n", inode->i_ino); + /* FIXME: do something better than throwing away everything */ + invalidate_inode_pages(inode); + up(&inode->i_sem); + break; + default: + LBUG(); + } - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel failed: %d\n", rc); RETURN(0); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 4b2ab91..da291bc 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -153,7 +153,7 @@ int mdc_getattr(struct lustre_handle *conn, } static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int rc; struct inode *inode = data; @@ -167,22 +167,29 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(-EINVAL); } - if (inode == NULL) + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CERROR("ldlm_cli_cancel: %d\n", rc); + LBUG(); + } + break; + case LDLM_CB_CANCELING: + /* FIXME: do something better than throwing away everything */ + if (inode == NULL) + LBUG(); + if (S_ISDIR(inode->i_mode)) { + CDEBUG(D_INODE, "invalidating inode %ld\n", + inode->i_ino); + invalidate_inode_pages(inode); + } + break; + default: LBUG(); - - /* FIXME: do something better than throwing away everything */ - if (S_ISDIR(inode->i_mode)) { - CDEBUG(D_INODE, "invalidating inode %ld\n", - inode->i_ino); - invalidate_inode_pages(inode); } - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) { - CERROR("ldlm_cli_cancel: %d\n", rc); - LBUG(); - } RETURN(0); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 539e50b..83a544c 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -414,11 +414,16 @@ static int mds_getlovinfo(struct ptlrpc_request *req) } int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int do_ast; ENTRY; + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); -- 1.8.3.1