From ee36e8dc873f4515c926790d397b90ae8aa0f6f4 Mon Sep 17 00:00:00 2001 From: pschwan Date: Tue, 25 Jun 2002 22:09:20 +0000 Subject: [PATCH] - ldlm fixes to get it to pass intent-test.sh, although it doesn't quite clean up correctly afterwards yet. --- lustre/archdep.m4 | 2 +- lustre/include/linux/lustre_dlm.h | 3 +- lustre/ldlm/ldlm_lock.c | 96 ++++++++++++++++++------------- lustre/ldlm/ldlm_lockd.c | 44 +++++++++----- lustre/ldlm/ldlm_request.c | 6 +- lustre/ldlm/ldlm_test.c | 117 ++++++++++++++++++++++++++++---------- lustre/mds/handler.c | 9 ++- 7 files changed, 182 insertions(+), 95 deletions(-) diff --git a/lustre/archdep.m4 b/lustre/archdep.m4 index 9054ebb..7c7a1c8 100644 --- a/lustre/archdep.m4 +++ b/lustre/archdep.m4 @@ -15,7 +15,7 @@ AC_MSG_CHECKING(setting make flags system architecture: ) case ${host_cpu} in um ) AC_MSG_RESULT($host_cpu) - KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common ' + KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common ' KCPPFLAGS='-D__KERNEL__ -U__i386__ -Ui386 -DUM_FASTCALL -D__arch_um__ -DSUBARCH="i386" -DNESTING=0 -D_LARGEFILE64_SOURCE -Derrno=kernel_errno -DPATCHLEVEL=4 -DMODULE -I$(LINUX)/arch/um/include ' MOD_LINK=elf_i386 ;; diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 3451a66..6ace006 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -238,9 +238,10 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie, int cookie_len, int *flags, ldlm_lock_callback completion, ldlm_lock_callback blocking); -struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode, +struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int *flags); void ldlm_lock_cancel(struct ldlm_lock *lock); +void ldlm_run_ast_work(struct list_head *rpc_list); void ldlm_reprocess_all(struct ldlm_resource *res); void ldlm_lock_dump(struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index d683992..b031559 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -22,7 +22,7 @@ /* lock types */ char *ldlm_lockname[] = { [0] "--", - [LCK_EX] "EX", + [LCK_EX] "EX", [LCK_PW] "PW", [LCK_PR] "PR", [LCK_CW] "CW", @@ -33,7 +33,7 @@ char *ldlm_typename[] = { [LDLM_PLAIN] "PLN", [LDLM_EXTENT] "EXT", [LDLM_MDSINTENT] "INT" -}; +}; extern kmem_cache_t *ldlm_lock_slab; int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL; @@ -58,11 +58,11 @@ ldlm_res_policy ldlm_res_policy_table [] = { /* * REFCOUNTED LOCK OBJECTS - */ + */ -/* - * Lock refcounts, during creation: +/* + * Lock refcounts, during creation: * - one special one for allocation, dec'd only once in destroy * - one for being a lock that's in-use * - one for the addref associated with a new lock @@ -168,7 +168,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, l_unlock(&parent->l_resource->lr_namespace->ns_lock); } /* this is the extra refcount, to prevent the lock - evaporating */ + evaporating */ ldlm_lock_get(lock); RETURN(lock); } @@ -189,7 +189,7 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) RETURN(-ENOMEM); } - /* move references over */ + /* move references over */ for (i = 0; i < lock->l_refc; i++) { int rc; ldlm_resource_getref(lock->l_resource); @@ -204,9 +204,9 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) RETURN(0); } -/* +/* * HANDLES - */ + */ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh) { @@ -403,26 +403,30 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); } -static void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new) +static void ldlm_add_ast_work_item(struct ldlm_lock *lock, + struct ldlm_lock *new) { struct ldlm_ast_work *w; ENTRY; - + OBD_ALLOC(w, sizeof(*w)); - if (!w) { + if (!w) { LBUG(); return; } - + l_lock(&lock->l_resource->lr_namespace->ns_lock); - ldlm_lock_get(lock); - if (new) { + if (new) { + if (lock->l_flags & LDLM_FL_AST_SENT) + GOTO(out, 0); + + lock->l_flags |= LDLM_FL_AST_SENT; w->w_blocking = 1; ldlm_lock2desc(new, &w->w_desc); } - w->w_lock = lock; - lock->l_flags |= LDLM_FL_AST_SENT; + w->w_lock = ldlm_lock_get(lock); list_add(&w->w_list, lock->l_resource->lr_tmp); + out: l_unlock(&lock->l_resource->lr_namespace->ns_lock); return; } @@ -532,9 +536,10 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) ENTRY; l_lock(&lock->l_resource->lr_namespace->ns_lock); - rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted); + rc = ldlm_lock_compat_list(lock, send_cbs, + &lock->l_resource->lr_granted); /* FIXME: should we be sending ASTs to converting? */ - if (rc) + if (rc) rc = ldlm_lock_compat_list (lock, send_cbs, &lock->l_resource->lr_converting); @@ -542,8 +547,8 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) RETURN(rc); } -/* NOTE: called by - - ldlm_handle_enqueuque - resource +/* NOTE: called by + - ldlm_handle_enqueuque - resource */ void ldlm_grant_lock(struct ldlm_lock *lock) { @@ -625,13 +630,15 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, ldlm_resource_put(res); l_unlock(&ns->ns_lock); - if (lock) + if (lock) { + ldlm_lock2handle(lock, lockh); wait_event_interruptible(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); - if (rc) + } + if (rc) LDLM_DEBUG(lock, "matched"); else - LDLM_DEBUG(lock, "not matched"); + LDLM_DEBUG_NOLOCK("not matched"); return rc; } @@ -697,7 +704,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, *flags |= LDLM_FL_LOCK_CHANGED; } else if (rc == ELDLM_LOCK_ABORTED) { ldlm_lock_destroy(lock); - ldlm_lock_put(lock); RETURN(rc); } } @@ -775,26 +781,27 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res, RETURN(0); } -static void ldlm_run_ast_work(struct list_head *rpc_list) +void ldlm_run_ast_work(struct list_head *rpc_list) { struct list_head *tmp, *pos; int rc; ENTRY; list_for_each_safe(tmp, pos, rpc_list) { - struct ldlm_ast_work *w = + struct ldlm_ast_work *w = list_entry(tmp, struct ldlm_ast_work, w_list); struct lustre_handle lockh; - - ldlm_lock2handle(w->w_lock, &lockh); - if (w->w_blocking) { - rc = w->w_lock->l_blocking_ast(&lockh, &w->w_desc, w->w_data, w->w_datalen); - } else { - rc = w->w_lock->l_completion_ast(&lockh, NULL, w->w_data, w->w_datalen); - } - if (rc) { - CERROR("Failed AST - should clean & disconnect client\n"); - } + + ldlm_lock2handle(w->w_lock, &lockh); + if (w->w_blocking) + rc = w->w_lock->l_blocking_ast + (&lockh, &w->w_desc, w->w_data, w->w_datalen); + else + rc = w->w_lock->l_completion_ast + (&lockh, NULL, w->w_data, w->w_datalen); + if (rc) + CERROR("Failed AST - should clean & disconnect " + "client\n"); ldlm_lock_put(w->w_lock); list_del(&w->w_list); OBD_FREE(w, sizeof(*w)); @@ -849,10 +856,13 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) } /* Must be called with lock and lock->l_resource unlocked */ -struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode, int *flags) +struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, + int *flags) { + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); struct ldlm_resource *res; struct ldlm_namespace *ns; + int granted = 0; ENTRY; res = lock->l_resource; @@ -868,17 +878,21 @@ struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode, int *fl if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, res->lr_converting.prev, lock); - else { + else { + res->lr_tmp = &rpc_list; ldlm_grant_lock(lock); + res->lr_tmp = NULL; + granted = 1; /* FIXME: completion handling not with ns_lock held ! */ wake_up(&lock->l_waitq); } - } else { + } else list_add(&lock->l_res_link, res->lr_converting.prev); - } l_unlock(&ns->ns_lock); + if (granted) + ldlm_run_ast_work(&rpc_list); RETURN(res); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 00a2039..2102e45 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,8 +21,9 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, - struct ptlrpc_request *req) +static int ldlm_handle_enqueue(struct obd_device *obddev, + struct ptlrpc_service *svc, + struct ptlrpc_request *req) { struct ldlm_reply *dlm_rep; struct ldlm_request *dlm_req; @@ -60,7 +61,7 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, NULL, 0); if (!lock) - GOTO(out, -ENOMEM); + GOTO(out, err = -ENOMEM); memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1, sizeof(lock->l_remote_handle)); @@ -104,7 +105,8 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service return 0; } -static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req) +static int ldlm_handle_convert(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_reply *dlm_rep; @@ -126,7 +128,7 @@ static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request req->rq_status = EINVAL; } else { LDLM_DEBUG(lock, "server-side convert handler START"); - ldlm_convert(lock, dlm_req->lock_desc.l_req_mode, + ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); req->rq_status = 0; } @@ -140,7 +142,8 @@ static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request RETURN(0); } -static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req) +static int ldlm_handle_cancel(struct ptlrpc_service *svc, + struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_lock *lock; @@ -174,12 +177,12 @@ static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request } static int ldlm_handle_callback(struct ptlrpc_service *svc, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock_desc *descp; + struct ldlm_lock_desc *descp = NULL; struct ldlm_lock *lock; - __u64 is_blocking_ast; + __u64 is_blocking_ast = 0; int rc; ENTRY; @@ -189,7 +192,6 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, RETURN(-ENOMEM); } dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); - descp = &dlm_req->lock_desc; /* We must send the reply first, so that the thread is free to handle * any requests made in common_callback() */ @@ -198,16 +200,19 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, RETURN(rc); lock = ldlm_handle2lock(&dlm_req->lock_handle1); - /* check if this is a blocking AST */ - if (!descp->l_req_mode) - descp = NULL; - if (!lock) { CERROR("callback on lock %Lx - lock disappeared\n", dlm_req->lock_handle1.addr); RETURN(0); } + /* check if this is a blocking AST */ + if (dlm_req->lock_desc.l_req_mode != + dlm_req->lock_desc.l_granted_mode) { + descp = &dlm_req->lock_desc; + is_blocking_ast = 1; + } + LDLM_DEBUG(lock, "client %s callback handler START", is_blocking_ast ? "blocked" : "completion"); @@ -234,6 +239,9 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, } ldlm_lock_put(lock); } else { + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + + l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; /* If we receive the completion AST before the actual enqueue @@ -244,10 +252,16 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); } + lock->l_resource->lr_tmp = &rpc_list; + ldlm_resource_unlink_lock(lock); ldlm_grant_lock(lock); /* FIXME: we want any completion function, not just wake_up */ wake_up(&lock->l_waitq); ldlm_lock_put(lock); + lock->l_resource->lr_tmp = NULL; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + + ldlm_run_ast_work(&rpc_list); } LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)", @@ -256,7 +270,7 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, } static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc, - struct ptlrpc_request *req) + struct ptlrpc_request *req) { struct obd_device *req_dev; int id, rc; diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 099bd76..2c463c5 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -141,7 +141,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, } int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len) { struct ldlm_lock *lock; struct ldlm_request *body; @@ -172,7 +172,7 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, } LDLM_DEBUG(lock, "server preparing %s AST", - desc->l_req_mode == 0 ? "completion" : "blocked"); + desc == 0 ? "completion" : "blocked"); req->rq_replen = lustre_msg_size(0, NULL); @@ -225,7 +225,7 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, GOTO(out, rc); reply = lustre_msg_buf(req->rq_repmsg, 0); - res = ldlm_convert(lock, new_mode, &reply->lock_flags); + res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) ldlm_reprocess_all(res); if (lock->l_req_mode != lock->l_granted_mode) { diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index efe314e..ed11446 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -11,6 +11,9 @@ #define DEBUG_SUBSYSTEM S_LDLM +#include +#include + #include struct ldlm_test_thread { @@ -23,11 +26,22 @@ struct ldlm_test_thread { static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED; static struct list_head ctl_threads; static int regression_running = 0; +static struct ptlrpc_client ctl_client; +static struct ptlrpc_connection *ctl_conn; static int ldlm_test_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len) { + struct ldlm_lock *lock; + ENTRY; + + lock = ldlm_handle2lock(lockh); + if (lock == NULL) { + CERROR("invalid handle in callback\n"); + RETURN(0); + } + printk("ldlm_test_callback: lock=%Lu, new=%p\n", lockh->addr, new); return 0; } @@ -68,7 +82,7 @@ int ldlm_test_basics(struct obd_device *obddev) LBUG(); ldlm_resource_dump(res); - res = ldlm_convert(lock1, LCK_NL, &flags); + res = ldlm_lock_convert(lock1, LCK_NL, &flags); if (res != NULL) ldlm_reprocess_all(res); @@ -127,7 +141,7 @@ int ldlm_test_extents(struct obd_device *obddev) /* Convert/cancel blocking locks */ flags = 0; - res = ldlm_convert(lock1, LCK_NL, &flags); + res = ldlm_lock_convert(lock1, LCK_NL, &flags); if (res != NULL) ldlm_reprocess_all(res); @@ -167,7 +181,59 @@ static int ldlm_test_network(struct obd_device *obddev, static int ldlm_test_main(void *data) { - return 0; + struct ldlm_test_thread *thread = data; + struct ldlm_namespace *ns; + ENTRY; + + lock_kernel(); + daemonize(); + spin_lock_irq(¤t->sigmask_lock); + sigfillset(¤t->blocked); + recalc_sigpending(current); + spin_unlock_irq(¤t->sigmask_lock); + + sprintf(current->comm, "ldlm_test"); + + ns = ldlm_namespace_new("ldlm_test", LDLM_NAMESPACE_CLIENT); + if (ns == NULL) { + LBUG(); + GOTO(out, -ENOMEM); + } + + /* Record that the thread is running */ + thread->t_flags |= SVC_RUNNING; + wake_up(&thread->t_ctl_waitq); + + while (1) { + struct lustre_handle lockh; + __u64 res_id[3] = {0}; + __u32 lock_mode; + char random; + int flags = 0, rc; + + /* Pick a random resource from 1 to 10 */ + get_random_bytes(&random, sizeof(random)); + res_id[0] = random % 10 + 1; + + /* Pick a random lock mode */ + get_random_bytes(&random, sizeof(random)); + lock_mode = random % LCK_NL + 1; + + rc = ldlm_cli_enqueue(&ctl_client, ctl_conn, NULL, ns, NULL, + res_id, LDLM_PLAIN, NULL, 0, lock_mode, + &flags, ldlm_test_callback, NULL, 0, + &lockh); + if (rc < 0) { + CERROR("ldlm_cli_enqueue: %d\n", rc); + LBUG(); + } + } + + out: + thread->t_flags |= SVC_STOPPED; + wake_up(&thread->t_ctl_waitq); + + RETURN(0); } static int ldlm_start_thread(void) @@ -198,34 +264,10 @@ static int ldlm_start_thread(void) RETURN(0); } -static int ldlm_stop_all_threads(void) -{ - spin_lock(&ctl_lock); - while (!list_empty(&ctl_threads)) { - struct ldlm_test_thread *thread; - thread = list_entry(ctl_threads.next, struct ldlm_test_thread, - t_link); - spin_unlock(&ctl_lock); - - thread->t_flags = SVC_STOPPING; - - wake_up(&thread->t_ctl_waitq); - wait_event_interruptible(thread->t_ctl_waitq, - (thread->t_flags & SVC_STOPPED)); - - spin_lock(&ctl_lock); - list_del(&thread->t_link); - OBD_FREE(thread, sizeof(*thread)); - } - spin_unlock(&ctl_lock); - - return 0; -} - int ldlm_regression_start(struct obd_device *obddev, struct ptlrpc_connection *conn, int count) { - int i, rc; + int i, rc = 0; ENTRY; spin_lock(&ctl_lock); @@ -257,11 +299,24 @@ int ldlm_regression_stop(void) spin_unlock(&ctl_lock); RETURN(-EINVAL); } - spin_unlock(&ctl_lock); - /* Do stuff */ + while (!list_empty(&ctl_threads)) { + struct ldlm_test_thread *thread; + thread = list_entry(ctl_threads.next, struct ldlm_test_thread, + t_link); + + thread->t_flags |= SVC_STOPPING; + spin_unlock(&ctl_lock); + + wake_up(&thread->t_ctl_waitq); + wait_event_interruptible(thread->t_ctl_waitq, + thread->t_flags & SVC_STOPPED); + + spin_lock(&ctl_lock); + list_del(&thread->t_link); + OBD_FREE(thread, sizeof(*thread)); + } - spin_lock(&ctl_lock); regression_running = 0; spin_unlock(&ctl_lock); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 1380948..a28c995 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -24,7 +24,7 @@ #include #include extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle, - struct ptlrpc_request *req); + struct ptlrpc_request *req); static int mds_cleanup(struct obd_device * obddev); /* Assumes caller has already pushed into the kernel filesystem context */ @@ -554,9 +554,12 @@ static int mds_open(struct ptlrpc_request *req) /* XXX error handling */ rc = mds_fs_set_obdo(mds, inode, handle, obdo); // rc = mds_fs_setattr(mds, de, handle, &iattr); - if (!rc) + if (!rc) { + struct obd_run_ctxt saved; + push_ctxt(&saved, &mds->mds_ctxt); rc = mds_update_last_rcvd(mds, handle, req); - else { + pop_ctxt(&saved); + } else { req->rq_status = rc; RETURN(0); } -- 1.8.3.1