From: braam Date: Tue, 25 Jun 2002 04:49:57 +0000 (+0000) Subject: - Cleanup of the lock callback infrastructure - not quite functional X-Git-Tag: 0.4.2~39 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=616e61d0c24f2316b2cc832818d3410a68a2eb24;p=fs%2Flustre-release.git - Cleanup of the lock callback infrastructure - not quite functional yet. - API improvements. - Documentation update. --- diff --git a/lustre/Makefile.am b/lustre/Makefile.am index e1e25cb..0a30460 100644 --- a/lustre/Makefile.am +++ b/lustre/Makefile.am @@ -6,7 +6,7 @@ AUTOMAKE_OPTIONS = foreign # NOTE: keep extN before mds -SUBDIRS = ptlrpc llite lib ldlm obdecho mdc osc extN mds ost +SUBDIRS = ldlm ptlrpc llite lib obdecho mdc osc extN mds ost SUBDIRS+= utils tests obdfilter obdclass obdfs demos doc scripts EXTRA_DIST = BUGS FDL Rules include patches diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 67bd6eb..3451a66 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -102,7 +102,7 @@ struct ldlm_lock; typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, - __u32 data_len, struct ptlrpc_request **req); + __u32 data_len); struct ldlm_lock { __u64 l_random; @@ -174,6 +174,15 @@ struct ldlm_resource { void *lr_tmp; }; +struct ldlm_ast_work { + struct ldlm_lock *w_lock; + int w_blocking; + struct ldlm_lock_desc w_desc; + struct list_head w_list; + void *w_data; + int w_datalen; +}; + static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res) { return (struct ldlm_extent *)(res->lr_name); @@ -213,8 +222,9 @@ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); void ldlm_lock_put(struct ldlm_lock *lock); void ldlm_lock_destroy(struct ldlm_lock *lock); void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); -void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode); -void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode); +void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); +void ldlm_lock_addref_internal(struct ldlm_lock* , __u32 mode); +void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_grant_lock(struct ldlm_lock *lock); int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, void *cookie, int cookielen, ldlm_mode_t mode, @@ -269,8 +279,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, void *data, __u32 data_len, struct lustre_handle *lockh); -int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new, - void *data, __u32 data_len, struct ptlrpc_request **reqp); +int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, + void *data, __u32 data_len); int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct lustre_handle *); diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 02f5e17..e3797b7 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -101,6 +101,7 @@ struct ptlrpc_client { struct obd_device *cli_obd; __u32 cli_request_portal; __u32 cli_reply_portal; + __u64 cli_last_rcvd; __u64 cli_last_committed; __u32 cli_target_devno; diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 8087e6a..d683992 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -403,37 +403,41 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); } -static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) +static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new) { - struct lustre_handle lockh; - struct ldlm_lock_desc desc; - struct ptlrpc_request *req = NULL; + struct ldlm_ast_work *w; ENTRY; - - + + OBD_ALLOC(w, sizeof(*w)); + if (!w) { + LBUG(); + return; + } + l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (lock->l_flags & LDLM_FL_AST_SENT) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(0); + ldlm_lock_get(lock); + if (new) { + w->w_blocking = 1; + ldlm_lock2desc(new, &w->w_desc); } - + w->w_lock = lock; lock->l_flags |= LDLM_FL_AST_SENT; - /* FIXME: this should merely add the lock to the lr_tmp list */ - ldlm_lock2handle(lock, &lockh); - ldlm_lock2desc(new, &desc); - lock->l_blocking_ast(&lockh, &desc, lock->l_data, lock->l_data_len, - &req); + list_add(&w->w_list, lock->l_resource->lr_tmp); l_unlock(&lock->l_resource->lr_namespace->ns_lock); + return; +} - if (req != NULL) { - struct list_head *list = lock->l_resource->lr_tmp; - list_add(&req->rq_multi, list); - } - RETURN(1); +void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) +{ + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(lockh); + ldlm_lock_addref_internal(lock, mode); + ldlm_lock_put(lock); } -/* Args: unlocked lock */ -void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) +/* only called for local locks */ +void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) { l_lock(&lock->l_resource->lr_namespace->ns_lock); if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) @@ -445,13 +449,15 @@ void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode) } /* Args: unlocked lock */ -void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) +void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) { + struct ldlm_lock *lock = ldlm_handle2lock(lockh); ENTRY; if (lock == NULL) LBUG(); + LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode); l_lock(&lock->l_resource->lr_namespace->ns_lock); if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) lock->l_readers--; @@ -475,7 +481,7 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) ldlm_lock2handle(lock, &lockh); lock->l_blocking_ast(&lockh, NULL, lock->l_data, - lock->l_data_len, NULL); + lock->l_data_len); } else l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -510,16 +516,10 @@ static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs, rc = 0; - CDEBUG(D_OTHER, "compat function failed and lock modes " - "incompat\n"); if (send_cbs && child->l_blocking_ast != NULL) { CDEBUG(D_OTHER, "incompatible; sending blocking " "AST.\n"); - /* It's very difficult to actually send the AST from - * here, because we'd have to drop the lock before going - * to sleep to wait for the reply. Instead we build the - * packet and send it later. */ - ldlm_send_blocking_ast(child, lock); + ldlm_add_ast_work_item(child, lock); } } @@ -548,7 +548,6 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) void ldlm_grant_lock(struct ldlm_lock *lock) { struct ldlm_resource *res = lock->l_resource; - struct ptlrpc_request *req = NULL; ENTRY; l_lock(&lock->l_resource->lr_namespace->ns_lock); @@ -559,20 +558,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock) res->lr_most_restr = lock->l_granted_mode; if (lock->l_completion_ast) { - struct lustre_handle lockh; - - /* FIXME: this should merely add lock to lr_tmp list */ - ldlm_lock2handle(lock, &lockh); - lock->l_completion_ast(&lockh, NULL, lock->l_data, - lock->l_data_len, &req); - if (req != NULL) { - struct list_head *list = res->lr_tmp; - if (list == NULL) { - LBUG(); - return; - } - list_add(&req->rq_multi, list); - } + ldlm_add_ast_work_item(lock, NULL); } l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; @@ -600,7 +586,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_extent.end < extent->end)) continue; - ldlm_lock_addref(lock, mode); + ldlm_lock_addref_internal(lock, mode); return lock; } @@ -642,7 +628,10 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, if (lock) wait_event_interruptible(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); - + if (rc) + LDLM_DEBUG(lock, "matched"); + else + LDLM_DEBUG(lock, "not matched"); return rc; } @@ -781,31 +770,34 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, list_del_init(&pending->l_res_link); ldlm_grant_lock(pending); - - ldlm_lock_addref(pending, pending->l_req_mode); - ldlm_lock_decref(pending, pending->l_granted_mode); } RETURN(0); } -static void ldlm_send_delayed_asts(struct list_head *rpc_list) +static void ldlm_run_ast_work(struct list_head *rpc_list) { struct list_head *tmp, *pos; + int rc; ENTRY; list_for_each_safe(tmp, pos, rpc_list) { - int rc; - struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_multi); - - CDEBUG(D_INFO, "Sending callback.\n"); - - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - if (rc) - CERROR("Callback send failed: %d\n", rc); + struct ldlm_ast_work *w = + list_entry(tmp, struct ldlm_ast_work, w_list); + struct lustre_handle lockh; + + ldlm_lock2handle(w->w_lock, &lockh); + if (w->w_blocking) { + rc = w->w_lock->l_blocking_ast(&lockh, &w->w_desc, w->w_data, w->w_datalen); + } else { + rc = w->w_lock->l_completion_ast(&lockh, NULL, w->w_data, w->w_datalen); + } + if (rc) { + CERROR("Failed AST - should clean & disconnect client\n"); + } + ldlm_lock_put(w->w_lock); + list_del(&w->w_list); + OBD_FREE(w, sizeof(*w)); } EXIT; } @@ -832,7 +824,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) res->lr_tmp = NULL; l_unlock(&res->lr_namespace->ns_lock); - ldlm_send_delayed_asts(&rpc_list); + ldlm_run_ast_work(&rpc_list); EXIT; } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 3b4370b..00a2039 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -68,7 +68,7 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service flags = dlm_req->lock_flags; err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags, - ldlm_cli_callback, ldlm_cli_callback); + ldlm_server_ast, ldlm_server_ast); if (err != ELDLM_OK) GOTO(out, err); @@ -226,7 +226,7 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, ldlm_lock2handle(lock, &lockh); lock->l_blocking_ast(&lockh, descp, lock->l_data, - lock->l_data_len, NULL); + lock->l_data_len); } } else { LDLM_DEBUG(lock, "Lock still has references, will be" diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index c7d44e3..099bd76 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -39,7 +39,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (lock == NULL) GOTO(out, rc = -ENOMEM); /* for the local lock, add the reference */ - ldlm_lock_addref(lock, mode); + ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); LDLM_DEBUG(lock, "client-side enqueue START"); @@ -140,21 +140,20 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, return rc; } -int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, __u32 data_len, struct ptlrpc_request **reqp) +int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, + void *data, __u32 data_len) { struct ldlm_lock *lock; struct ldlm_request *body; struct ptlrpc_request *req; - struct ptlrpc_client *cl = - &lock->l_resource->lr_namespace->ns_rpc_client; + struct ptlrpc_client *cl; int rc = 0, size = sizeof(*body); ENTRY; lock = ldlm_handle2lock(lockh); if (lock == NULL) LBUG(); - + cl = &lock->l_resource->lr_namespace->ns_rpc_client; req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, &size, NULL); if (!req) @@ -177,14 +176,9 @@ int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, req->rq_replen = lustre_msg_size(0, NULL); - if (reqp == NULL) { - LBUG(); - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - } else - *reqp = req; - + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); EXIT; out: @@ -281,7 +275,6 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) GOTO(out, rc); ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); ldlm_lock_put(lock); EXIT; out: diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 02ffca2..efe314e 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -26,8 +26,7 @@ static int regression_running = 0; static int ldlm_test_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new, - void *data, __u32 data_len, - struct ptlrpc_request **reqp) + void *data, __u32 data_len) { printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new); return 0; diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 3a6d230..5d1c29d 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -21,7 +21,6 @@ extern struct address_space_operations ll_aops; void ll_intent_release(struct dentry *de) { - struct ldlm_lock *lock; struct lustre_handle *handle; ENTRY; @@ -32,10 +31,7 @@ void ll_intent_release(struct dentry *de) if (de->d_it->it_lock_mode) { handle = (struct lustre_handle *)de->d_it->it_lock_handle; - lock = lustre_handle2object(handle); - LDLM_DEBUG(lock, "calling ldlm_lock_decref(%d)", - de->d_it->it_lock_mode); - ldlm_lock_decref(lock, de->d_it->it_lock_mode); + ldlm_lock_decref(handle, de->d_it->it_lock_mode); } de->d_it = NULL; EXIT; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index e367bbc..ac813a4 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -120,11 +120,9 @@ int ll_lock(struct inode *dir, struct dentry *dentry, int ll_unlock(__u32 mode, struct lustre_handle *lockh) { - struct ldlm_lock *lock; ENTRY; - lock = lustre_handle2object(lockh); - ldlm_lock_decref(lock, mode); + ldlm_lock_decref(lockh, mode); RETURN(0); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index cfe6f6c..b1ae5c4 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -279,7 +279,6 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) struct mds_body *body; struct dentry *de = NULL, *dchild = NULL; struct inode *dir; - struct ldlm_lock *lock; struct lustre_handle lockh; char *name; int namelen, flags, lock_mode, rc = 0; @@ -327,9 +326,6 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); } - } else { - lock = lustre_handle2object(&lockh); - LDLM_DEBUG(lock, "matched"); } ldlm_lock_dump((void *)(unsigned long)lockh.addr); @@ -366,8 +362,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) out_create_dchild: l_dput(dchild); up(&dir->i_sem); - lock = lustre_handle2object(&lockh); - ldlm_lock_decref(lock, lock_mode); + ldlm_lock_decref(&lockh, lock_mode); out_create_de: l_dput(de); out_pre_de: diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 0b536c6..449d594 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -190,7 +190,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, struct dentry *dchild = NULL; struct inode *dir; void *handle; - struct ldlm_lock *lock; struct lustre_handle lockh; int rc = 0, err, flags, lock_mode, type = rec->ur_mode & S_IFMT; __u64 res_id[3] = {0,0,0}; @@ -228,9 +227,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); } - } else { - lock = ldlm_handle2lock(&lockh); - LDLM_DEBUG(lock, "matched"); } ldlm_lock_dump((void *)(unsigned long)lockh.addr); @@ -370,8 +366,7 @@ out_create_commit: out_create_dchild: l_dput(dchild); up(&dir->i_sem); - lock = lustre_handle2object(&lockh); - ldlm_lock_decref(lock, lock_mode); + ldlm_lock_decref(&lockh, lock_mode); out_create_de: l_dput(de); req->rq_status = rc; @@ -408,7 +403,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, int lock_mode, flags; __u64 res_id[3] = {0}; struct lustre_handle lockh; - struct ldlm_lock *lock; void *handle; int rc = 0; int err; @@ -438,10 +432,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, CERROR("lock enqueue: err: %d\n", rc); GOTO(out_unlink_de, rc = -EIO); } - } else { - lock = lustre_handle2object(&lockh); - LDLM_DEBUG(lock, "matched"); - } + } else + ldlm_lock_dump((void *)(unsigned long)lockh.addr); down(&dir->i_sem); @@ -515,8 +507,8 @@ out_unlink_dchild: l_dput(dchild); out_unlink_de: up(&dir->i_sem); - lock = lustre_handle2object(&lockh); - ldlm_lock_decref(lock, lock_mode); + ldlm_lock_decref(&lockh +, lock_mode); if (!rc) { /* Take an exclusive lock on the resource that we're * about to free, to force everyone to drop their @@ -537,8 +529,7 @@ out_unlink_de: l_dput(de); if (!rc) { - lock = lustre_handle2object(&lockh); - ldlm_lock_decref(lock, LCK_EX); + ldlm_lock_decref(&lockh, LCK_EX); rc = ldlm_cli_cancel(&lockh); if (rc < 0) CERROR("failed to cancel child inode lock ino " diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index a1db49a..da29a96 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -683,12 +683,11 @@ static int osc_enqueue(struct obd_conn *oconn, sizeof(extent), mode2, lockh); if (rc == 1) { int flags; - struct ldlm_lock *lock = lustre_handle2object(lockh); /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ - ldlm_lock_addref(lock, mode); - ldlm_lock_decref(lock, mode2); + ldlm_lock_addref(lockh, mode); + ldlm_lock_decref(lockh, mode2); if (mode == LCK_PR) return 0; @@ -709,11 +708,9 @@ static int osc_enqueue(struct obd_conn *oconn, static int osc_cancel(struct obd_conn *oconn, __u32 mode, struct lustre_handle *lockh) { - struct ldlm_lock *lock; ENTRY; - lock = lustre_handle2object(lockh); - ldlm_lock_decref(lock, mode); + ldlm_lock_decref(lockh, mode); RETURN(0); }