X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=d69128cd3df18191be41e069c6ed97222df3c139;hb=dbc11cfaa7ff9dc05fa316805b37b7eeef6c3e25;hp=bf70eb0644304fc8f92d37dbb734ff29a05896aa;hpb=cc8d77b7540d8b300e32dac614bb99bf9055daa8;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index bf70eb0..d69128c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,10 +21,9 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -static int ldlm_handle_enqueue(struct obd_device *obddev, - struct ptlrpc_service *svc, - struct ptlrpc_request *req) +static int ldlm_handle_enqueue(struct ptlrpc_request *req) { + struct obd_device *obddev = req->rq_export->export_obd; struct ldlm_reply *dlm_rep; struct ldlm_request *dlm_req; int rc, size = sizeof(*dlm_rep), cookielen = 0; @@ -88,25 +87,24 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, EXIT; out: if (lock) - ldlm_lock_put(lock); + LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" + "(err=%d)", err); req->rq_status = err; - CDEBUG(D_INFO, "err = %d\n", err); - if (ptlrpc_reply(svc, req)) + if (ptlrpc_reply(req->rq_svc, req)) LBUG(); - if (err) - LDLM_DEBUG_NOLOCK("server-side enqueue handler END"); - else { - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "server-side enqueue handler END"); + if (lock) { + if (!err) + ldlm_reprocess_all(lock->l_resource); + LDLM_LOCK_PUT(lock); } + LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock); return 0; } -static int ldlm_handle_convert(struct ptlrpc_service *svc, - struct ptlrpc_request *req) +static int ldlm_handle_convert(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_reply *dlm_rep; @@ -124,26 +122,28 @@ static int ldlm_handle_convert(struct ptlrpc_service *svc, dlm_rep->lock_flags = dlm_req->lock_flags; lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) { + if (!lock) { req->rq_status = EINVAL; - } else { + } else { LDLM_DEBUG(lock, "server-side convert handler START"); ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); req->rq_status = 0; } - if (ptlrpc_reply(svc, req) != 0) + if (ptlrpc_reply(req->rq_svc, req) != 0) LBUG(); - ldlm_reprocess_all(lock->l_resource); - ldlm_lock_put(lock); - LDLM_DEBUG(lock, "server-side convert handler END"); + if (lock) { + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "server-side convert handler END"); + LDLM_LOCK_PUT(lock); + } else + LDLM_DEBUG_NOLOCK("server-side convert handler END"); RETURN(0); } -static int ldlm_handle_cancel(struct ptlrpc_service *svc, - struct ptlrpc_request *req) +static int ldlm_handle_cancel(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; struct ldlm_lock *lock; @@ -158,29 +158,32 @@ static int ldlm_handle_cancel(struct ptlrpc_service *svc, dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) { + if (!lock) { req->rq_status = ESTALE; - } else { + } else { LDLM_DEBUG(lock, "server-side cancel handler START"); ldlm_lock_cancel(lock); req->rq_status = 0; } - if (ptlrpc_reply(svc, req) != 0) + if (ptlrpc_reply(req->rq_svc, req) != 0) LBUG(); - ldlm_reprocess_all(lock->l_resource); - ldlm_lock_put(lock); - LDLM_DEBUG_NOLOCK("server-side cancel handler END"); + if (lock) { + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "server-side cancel handler END"); + LDLM_LOCK_PUT(lock); + } else + LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)", + lock); RETURN(0); } -static int ldlm_handle_callback(struct ptlrpc_service *svc, - struct ptlrpc_request *req) +static int ldlm_handle_callback(struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock_desc *descp = NULL; + struct ldlm_lock_desc *descp = NULL; struct ldlm_lock *lock; __u64 is_blocking_ast = 0; int rc; @@ -195,18 +198,18 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, /* We must send the reply first, so that the thread is free to handle * any requests made in common_callback() */ - rc = ptlrpc_reply(svc, req); + rc = ptlrpc_reply(req->rq_svc, req); if (rc != 0) RETURN(rc); - + lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) { - CERROR("callback on lock %Lx - lock disappeared\n", + if (!lock) { + CERROR("callback on lock %Lx - lock disappeared\n", dlm_req->lock_handle1.addr); RETURN(0); } - /* check if this is a blocking AST */ + /* check if this is a blocking AST */ if (dlm_req->lock_desc.l_req_mode != dlm_req->lock_desc.l_granted_mode) { descp = &dlm_req->lock_desc; @@ -217,15 +220,15 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, is_blocking_ast ? "blocked" : "completion"); if (descp) { - int do_ast; + int do_ast; l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = (!lock->l_readers && !lock->l_writers); + do_ast = (!lock->l_readers && !lock->l_writers); l_unlock(&lock->l_resource->lr_namespace->ns_lock); - + if (do_ast) { - CDEBUG(D_INFO, "Lock already unused, calling " - "callback (%p).\n", lock->l_blocking_ast); + LDLM_DEBUG(lock, "already unused, calling " + "callback (%p)", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) { struct lustre_handle lockh; ldlm_lock2handle(lock, &lockh); @@ -235,9 +238,9 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, } } else { LDLM_DEBUG(lock, "Lock still has references, will be" - " cancelled later"); + " cancelled later"); } - ldlm_lock_put(lock); + LDLM_LOCK_PUT(lock); } else { struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -257,23 +260,21 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, ldlm_grant_lock(lock); /* FIXME: we want any completion function, not just wake_up */ wake_up(&lock->l_waitq); - ldlm_lock_put(lock); lock->l_resource->lr_tmp = NULL; l_unlock(&lock->l_resource->lr_namespace->ns_lock); + LDLM_LOCK_PUT(lock); ldlm_run_ast_work(&rpc_list); } - LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)", + LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)", is_blocking_ast ? "blocked" : "completion", lock); RETURN(0); } -static int lustre_handle(struct ptlrpc_request *req) +static int ldlm_handle(struct ptlrpc_request *req) { - struct ptlrpc_service *svc = req->rq_svc; - struct obd_device *req_dev; - int id, rc; + int rc; ENTRY; rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); @@ -288,52 +289,52 @@ static int lustre_handle(struct ptlrpc_request *req) GOTO(out, rc = -EINVAL); } - id = req->rq_reqmsg->target_id; - if (id < 0 || id > MAX_OBD_DEVICES) - GOTO(out, rc = -ENODEV); - req_dev = req->rq_obd = &obd_dev[id]; + if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) { + CERROR("No export handle for enqueue request.\n"); + GOTO(out, rc = -ENOTCONN); + } switch (req->rq_reqmsg->opc) { case LDLM_ENQUEUE: CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); - rc = ldlm_handle_enqueue(req_dev, svc, req); + rc = ldlm_handle_enqueue(req); break; case LDLM_CONVERT: CDEBUG(D_INODE, "convert\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); - rc = ldlm_handle_convert(svc, req); + rc = ldlm_handle_convert(req); break; case LDLM_CANCEL: CDEBUG(D_INODE, "cancel\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); - rc = ldlm_handle_cancel(svc, req); + rc = ldlm_handle_cancel(req); break; case LDLM_CALLBACK: CDEBUG(D_INODE, "callback\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); - rc = ldlm_handle_callback(svc, req); + rc = ldlm_handle_callback(req); break; default: - rc = ptlrpc_error(svc, req); + rc = ptlrpc_error(req->rq_svc, req); RETURN(rc); } EXIT; out: if (rc) - RETURN(ptlrpc_error(svc, req)); + RETURN(ptlrpc_error(req->rq_svc, req)); return 0; } -static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg, - void *uarg) +static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, + void *karg, void *uarg) { - struct obd_device *obddev = gen_conn2obd(conn); + struct obd_device *obddev = class_conn2obd(conn); struct ptlrpc_connection *connection; int err; ENTRY; @@ -384,7 +385,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) MOD_INC_USE_COUNT; ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, - LDLM_REPLY_PORTAL, "self", lustre_handle); + LDLM_REPLY_PORTAL, "self", ldlm_handle); if (!ldlm->ldlm_service) { LBUG(); GOTO(out_dec, rc = -ENOMEM); @@ -393,9 +394,6 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) for (i = 0; i < LDLM_NUM_THREADS; i++) { rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm"); - /* XXX We could just continue if we had started at least - * a few threads here. - */ if (rc) { CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc); LBUG(); @@ -434,14 +432,14 @@ struct obd_ops ldlm_obd_ops = { o_iocontrol: ldlm_iocontrol, o_setup: ldlm_setup, o_cleanup: ldlm_cleanup, - o_connect: gen_connect, - o_disconnect: gen_disconnect + o_connect: class_connect, + o_disconnect: class_disconnect }; static int __init ldlm_init(void) { - int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME); + int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME); if (rc != 0) return rc; @@ -464,7 +462,7 @@ static int __init ldlm_init(void) static void __exit ldlm_exit(void) { - obd_unregister_type(OBD_LDLM_DEVICENAME); + class_unregister_type(OBD_LDLM_DEVICENAME); if (kmem_cache_destroy(ldlm_resource_slab) != 0) CERROR("couldn't free ldlm resource slab\n"); if (kmem_cache_destroy(ldlm_lock_slab) != 0) @@ -485,7 +483,7 @@ EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_namespace_new); EXPORT_SYMBOL(ldlm_namespace_free); -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Lock Management Module v0.1"); MODULE_LICENSE("GPL");