extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-/* _ldlm_callback and local_callback setup the variables then call this common
- * code */
-static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- ldlm_mode_t mode, void *data, __u32 data_len,
- struct ptlrpc_request **reqp)
-{
- ENTRY;
-
- if (!lock)
- LBUG();
- if (!lock->l_resource)
- LBUG();
-
- ldlm_lock_dump(lock);
-
- spin_lock(&lock->l_resource->lr_lock);
- spin_lock(&lock->l_lock);
- if (!new) {
- CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
- lock->l_req_mode = mode;
-
- /* FIXME: the API is flawed if I have to do these refcount
- * acrobatics (along with the _put() below). */
- lock->l_resource->lr_refcount++;
-
- /* _del_lock is safe for half-created locks that are not yet on
- * a list. */
- ldlm_resource_del_lock(lock);
- ldlm_grant_lock(lock->l_resource, lock);
-
- ldlm_resource_put(lock->l_resource);
-
- wake_up(&lock->l_waitq);
- spin_unlock(&lock->l_lock);
- spin_unlock(&lock->l_resource->lr_lock);
- } else {
- CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
- lock->l_flags |= LDLM_FL_DYING;
- spin_unlock(&lock->l_lock);
- spin_unlock(&lock->l_resource->lr_lock);
- if (!lock->l_readers && !lock->l_writers) {
- CDEBUG(D_INFO, "Lock already unused, calling "
- "callback (%p).\n", lock->l_blocking_ast);
- if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, new, lock->l_data,
- lock->l_data_len, reqp);
- } else {
- CDEBUG(D_INFO, "Lock still has references; lock will be"
- " cancelled later.\n");
- }
- }
- RETURN(0);
-}
-
-static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+static int ldlm_handle_enqueue(struct ptlrpc_request *req)
{
+ struct obd_device *obddev = req->rq_export->export_obd;
struct ldlm_reply *dlm_rep;
struct ldlm_request *dlm_req;
int rc, size = sizeof(*dlm_rep), cookielen = 0;
__u32 flags;
ldlm_error_t err;
struct ldlm_lock *lock = NULL;
- ldlm_lock_callback callback;
- struct lustre_handle lockh;
void *cookie = NULL;
ENTRY;
- callback = ldlm_cli_callback;
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
}
}
- err = ldlm_local_lock_create(obddev->obd_namespace,
- &dlm_req->lock_handle2,
- dlm_req->lock_desc.l_resource.lr_name,
- dlm_req->lock_desc.l_resource.lr_type,
- dlm_req->lock_desc.l_req_mode,
- NULL, 0, &lockh);
- if (err != ELDLM_OK)
- GOTO(out, err);
+ lock = ldlm_lock_create(obddev->obd_namespace,
+ &dlm_req->lock_handle2,
+ dlm_req->lock_desc.l_resource.lr_name,
+ dlm_req->lock_desc.l_resource.lr_type,
+ dlm_req->lock_desc.l_req_mode, NULL, 0);
+ if (!lock)
+ GOTO(out, err = -ENOMEM);
- lock = lustre_handle2object(&lockh);
memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
sizeof(lock->l_remote_handle));
- LDLM_DEBUG(lock, "server-side enqueue handler START");
+ LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
flags = dlm_req->lock_flags;
- err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
- callback, callback);
+ err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
+ ldlm_server_ast, ldlm_server_ast);
if (err != ELDLM_OK)
GOTO(out, err);
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_rep->lock_flags = flags;
- memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
+ ldlm_lock2handle(lock, &dlm_rep->lock_handle);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
memcpy(&dlm_rep->lock_extent, &lock->l_extent,
sizeof(lock->l_extent));
lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
EXIT;
out:
+ if (lock) {
+ LDLM_DEBUG(lock, "server-side enqueue handler, sending reply");
+ ldlm_lock_put(lock);
+ }
req->rq_status = err;
CDEBUG(D_INFO, "err = %d\n", err);
- if (ptlrpc_reply(svc, req))
+ if (ptlrpc_reply(req->rq_svc, req))
LBUG();
if (!err)
ldlm_reprocess_all(lock->l_resource);
- if (err)
- LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
- else
- LDLM_DEBUG(lock, "server-side enqueue handler END");
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
return 0;
}
-static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_convert(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_reply *dlm_rep;
- struct ldlm_resource *res;
struct ldlm_lock *lock;
int rc, size = sizeof(*dlm_rep);
ENTRY;
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_rep->lock_flags = dlm_req->lock_flags;
- lock = lustre_handle2object(&dlm_req->lock_handle1);
- LDLM_DEBUG(lock, "server-side convert handler START");
-
- res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
- dlm_req->lock_desc.l_req_mode,
- &dlm_rep->lock_flags);
- req->rq_status = 0;
- if (ptlrpc_reply(svc, req) != 0)
+ lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ if (!lock) {
+ req->rq_status = EINVAL;
+ } else {
+ LDLM_DEBUG(lock, "server-side convert handler START");
+ ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
+ &dlm_rep->lock_flags);
+ req->rq_status = 0;
+ }
+ if (ptlrpc_reply(req->rq_svc, req) != 0)
LBUG();
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(lock->l_resource);
+ ldlm_lock_put(lock);
LDLM_DEBUG(lock, "server-side convert handler END");
RETURN(0);
}
-static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
- struct ldlm_resource *res;
int rc;
ENTRY;
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- lock = lustre_handle2object(&dlm_req->lock_handle1);
- LDLM_DEBUG(lock, "server-side cancel handler START");
- res = ldlm_local_lock_cancel(lock);
- req->rq_status = 0;
- if (ptlrpc_reply(svc, req) != 0)
+ lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ if (!lock) {
+ req->rq_status = ESTALE;
+ } else {
+ LDLM_DEBUG(lock, "server-side cancel handler START");
+ ldlm_lock_cancel(lock);
+ req->rq_status = 0;
+ }
+
+ if (ptlrpc_reply(req->rq_svc, req) != 0)
LBUG();
- if (res != NULL)
- ldlm_reprocess_all(res);
+ ldlm_reprocess_all(lock->l_resource);
+ ldlm_lock_put(lock);
LDLM_DEBUG_NOLOCK("server-side cancel handler END");
RETURN(0);
}
-static int _ldlm_callback(struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+static int ldlm_handle_callback(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock1, *lock2;
+ struct ldlm_lock_desc *descp = NULL;
+ struct ldlm_lock *lock;
+ __u64 is_blocking_ast = 0;
int rc;
ENTRY;
/* We must send the reply first, so that the thread is free to handle
* any requests made in common_callback() */
- rc = ptlrpc_reply(svc, req);
+ rc = ptlrpc_reply(req->rq_svc, req);
if (rc != 0)
RETURN(rc);
+
+ lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ if (!lock) {
+ CERROR("callback on lock %Lx - lock disappeared\n",
+ dlm_req->lock_handle1.addr);
+ RETURN(0);
+ }
- lock1 = lustre_handle2object(&dlm_req->lock_handle1);
- lock2 = lustre_handle2object(&dlm_req->lock_handle2);
+ /* check if this is a blocking AST */
+ if (dlm_req->lock_desc.l_req_mode !=
+ dlm_req->lock_desc.l_granted_mode) {
+ descp = &dlm_req->lock_desc;
+ is_blocking_ast = 1;
+ }
- LDLM_DEBUG(lock1, "client %s callback handler START",
- lock2 == NULL ? "completion" : "blocked");
+ LDLM_DEBUG(lock, "client %s callback handler START",
+ is_blocking_ast ? "blocked" : "completion");
+
+ if (descp) {
+ int do_ast;
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ do_ast = (!lock->l_readers && !lock->l_writers);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+ if (do_ast) {
+ LDLM_DEBUG(lock, "already unused, calling "
+ "callback (%p)", lock->l_blocking_ast);
+ if (lock->l_blocking_ast != NULL) {
+ struct lustre_handle lockh;
+ ldlm_lock2handle(lock, &lockh);
+ lock->l_blocking_ast(&lockh, descp,
+ lock->l_data,
+ lock->l_data_len);
+ }
+ } else {
+ LDLM_DEBUG(lock, "Lock still has references, will be"
+ " cancelled later");
+ }
+ ldlm_lock_put(lock);
+ } else {
+ struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+ /* If we receive the completion AST before the actual enqueue
+ * returned, then we might need to switch resources. */
+ if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+ lock->l_resource->lr_name,
+ sizeof(__u64) * RES_NAME_SIZE) != 0) {
+ ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
+ LDLM_DEBUG(lock, "completion AST, new resource");
+ }
+ lock->l_resource->lr_tmp = &rpc_list;
+ ldlm_resource_unlink_lock(lock);
+ ldlm_grant_lock(lock);
+ /* FIXME: we want any completion function, not just wake_up */
+ wake_up(&lock->l_waitq);
+ ldlm_lock_put(lock);
+ lock->l_resource->lr_tmp = NULL;
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
- 0, NULL);
+ ldlm_run_ast_work(&rpc_list);
+ }
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
- lock2 == NULL ? "completion" : "blocked", lock1);
-
+ is_blocking_ast ? "blocked" : "completion", lock);
RETURN(0);
}
-static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+static int ldlm_handle(struct ptlrpc_request *req)
{
- struct obd_device *req_dev;
- int id, rc;
+ int rc;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
GOTO(out, rc = -EINVAL);
}
- id = req->rq_reqmsg->target_id;
- if (id < 0 || id > MAX_OBD_DEVICES)
- GOTO(out, rc = -ENODEV);
- req_dev = req->rq_obd = &obd_dev[id];
-
+ if (!req->rq_export &&
+ req->rq_reqmsg->opc == LDLM_ENQUEUE) {
+ CERROR("No export handle for enqueue request.\n");
+ GOTO(out, rc = -ENOTCONN);
+ }
switch (req->rq_reqmsg->opc) {
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = _ldlm_enqueue(req_dev, svc, req);
+ rc = ldlm_handle_enqueue(req);
break;
case LDLM_CONVERT:
CDEBUG(D_INODE, "convert\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = _ldlm_convert(svc, req);
+ rc = ldlm_handle_convert(req);
break;
case LDLM_CANCEL:
CDEBUG(D_INODE, "cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = _ldlm_cancel(svc, req);
+ rc = ldlm_handle_cancel(req);
break;
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
- rc = _ldlm_callback(svc, req);
+ rc = ldlm_handle_callback(req);
break;
default:
- rc = ptlrpc_error(svc, req);
+ rc = ptlrpc_error(req->rq_svc, req);
RETURN(rc);
}
EXIT;
out:
if (rc)
- RETURN(ptlrpc_error(svc, req));
+ RETURN(ptlrpc_error(req->rq_svc, req));
return 0;
}
-static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
+static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg,
void *uarg)
{
- struct obd_device *obddev = conn->oc_dev;
+ struct obd_device *obddev = class_conn2obd(conn);
struct ptlrpc_connection *connection;
int err;
ENTRY;
MOD_INC_USE_COUNT;
ldlm->ldlm_service =
ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
- LDLM_REPLY_PORTAL, "self", lustre_handle);
+ LDLM_REPLY_PORTAL, "self", ldlm_handle);
if (!ldlm->ldlm_service) {
LBUG();
GOTO(out_dec, rc = -ENOMEM);
for (i = 0; i < LDLM_NUM_THREADS; i++) {
rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
"lustre_dlm");
- /* XXX We could just continue if we had started at least
- * a few threads here.
- */
if (rc) {
CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
LBUG();
out_thread:
ptlrpc_stop_all_threads(ldlm->ldlm_service);
- rpc_unregister_service(ldlm->ldlm_service);
- OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+ ptlrpc_unregister_service(ldlm->ldlm_service);
out_dec:
MOD_DEC_USE_COUNT;
return rc;
ENTRY;
ptlrpc_stop_all_threads(ldlm->ldlm_service);
- rpc_unregister_service(ldlm->ldlm_service);
-
- if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
- // XXX reply with errors and clean up
- CERROR("Request list not empty!\n");
- }
-
- OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+ ptlrpc_unregister_service(ldlm->ldlm_service);
if (mds_reint_p != NULL)
inter_module_put("mds_reint");
o_iocontrol: ldlm_iocontrol,
o_setup: ldlm_setup,
o_cleanup: ldlm_cleanup,
- o_connect: gen_connect,
- o_disconnect: gen_disconnect
+ o_connect: class_connect,
+ o_disconnect: class_disconnect
};
static int __init ldlm_init(void)
{
- int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
+ int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
if (rc != 0)
return rc;
static void __exit ldlm_exit(void)
{
- obd_unregister_type(OBD_LDLM_DEVICENAME);
+ class_unregister_type(OBD_LDLM_DEVICENAME);
if (kmem_cache_destroy(ldlm_resource_slab) != 0)
CERROR("couldn't free ldlm resource slab\n");
if (kmem_cache_destroy(ldlm_lock_slab) != 0)
CERROR("couldn't free ldlm lock slab\n");
}
-EXPORT_SYMBOL(ldlm_local_lock_match);
+EXPORT_SYMBOL(ldlm_lockname);
+EXPORT_SYMBOL(ldlm_typename);
+EXPORT_SYMBOL(ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_addref);
EXPORT_SYMBOL(ldlm_lock_decref);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
-EXPORT_SYMBOL(lustre_handle2object);
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_namespace_new);