#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_LDLM
+#ifdef __KERNEL__
#include <linux/module.h>
#include <linux/slab.h>
-#include <linux/lustre_dlm.h>
#include <linux/init.h>
+#else
+#include <liblustre.h>
+#endif
+
+#include <linux/lustre_dlm.h>
#include <linux/obd_class.h>
+
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
RETURN(1);
}
+static inline void ldlm_failed_ast(struct ldlm_lock *lock)
+{
+ /* XXX diagnostic */
+ recovd_conn_fail(lock->l_export->exp_connection);
+}
+
int ldlm_server_blocking_ast(struct ldlm_lock *lock,
struct ldlm_lock_desc *desc,
void *data, int flag)
sizeof(body->lock_handle1));
memcpy(&body->lock_desc, desc, sizeof(*desc));
- LDLM_DEBUG0(lock, "server preparing blocking AST");
+ LDLM_DEBUG(lock, "server preparing blocking AST");
req->rq_replen = lustre_msg_size(0, NULL);
ldlm_add_waiting_lock(lock);
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
- ldlm_expired_completion_wait(lock);
+ ldlm_failed_ast(lock);
} else if (rc) {
CERROR("client returned %d from blocking AST for lock %p\n",
req->rq_status, lock);
body->lock_flags = flags;
ldlm_lock2desc(lock, &body->lock_desc);
- LDLM_DEBUG0(lock, "server preparing completion AST");
+ LDLM_DEBUG(lock, "server preparing completion AST");
req->rq_replen = lustre_msg_size(0, NULL);
req->rq_level = LUSTRE_CONN_RECOVD;
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
- ldlm_expired_completion_wait(lock);
+ ldlm_failed_ast(lock);
} else if (rc) {
CERROR("client returned %d from completion AST for lock %p\n",
req->rq_status, lock);
memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
sizeof(lock->l_remote_handle));
- LDLM_DEBUG0(lock, "server-side enqueue handler, new lock created");
+ LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
LASSERT(req->rq_export);
lock->l_export = req->rq_export;
if (!lock) {
req->rq_status = EINVAL;
} else {
- LDLM_DEBUG0(lock, "server-side convert handler START");
+ LDLM_DEBUG(lock, "server-side convert handler START");
ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
if (ldlm_del_waiting_lock(lock))
if (lock) {
ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG0(lock, "server-side convert handler END");
+ LDLM_DEBUG(lock, "server-side convert handler END");
LDLM_LOCK_PUT(lock);
} else
LDLM_DEBUG_NOLOCK("server-side convert handler END");
dlm_req->lock_handle1.cookie);
req->rq_status = ESTALE;
} else {
- LDLM_DEBUG0(lock, "server-side cancel handler START");
+ LDLM_DEBUG(lock, "server-side cancel handler START");
ldlm_lock_cancel(lock);
if (ldlm_del_waiting_lock(lock))
CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
if (lock) {
ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG0(lock, "server-side cancel handler END");
+ LDLM_DEBUG(lock, "server-side cancel handler END");
LDLM_LOCK_PUT(lock);
}
lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
if (!lock) {
- CERROR("blocking callback on lock "LPX64" - lock disappeared\n",
- dlm_req->lock_handle1.cookie);
+ CDEBUG(D_INFO, "blocking callback on lock "LPX64
+ " - lock disappeared\n", dlm_req->lock_handle1.cookie);
RETURN(-EINVAL);
}
- LDLM_DEBUG0(lock, "client blocking AST callback handler START");
+ LDLM_DEBUG(lock, "client blocking AST callback handler START");
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ns->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ns->ns_lock);
if (do_ast) {
LDLM_DEBUG(lock, "already unused, calling "
"callback (%p)", lock->l_blocking_ast);
- if (lock->l_blocking_ast != NULL) {
+ if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, &dlm_req->lock_desc,
lock->l_data, LDLM_CB_BLOCKING);
- }
- } else
- LDLM_DEBUG0(lock, "Lock still has references, will be"
- " cancelled later");
+ } else {
+ LDLM_DEBUG(lock, "Lock still has references, will be"
+ " cancelled later");
+ }
- LDLM_DEBUG0(lock, "client blocking callback handler END");
+ LDLM_DEBUG(lock, "client blocking callback handler END");
LDLM_LOCK_PUT(lock);
RETURN(0);
}
RETURN(-EINVAL);
}
- LDLM_DEBUG0(lock, "client completion callback handler START");
+ LDLM_DEBUG(lock, "client completion callback handler START");
l_lock(&ns->ns_lock);
* then we might need to switch lock modes, resources, or extents. */
if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
- LDLM_DEBUG0(lock, "completion AST, new lock mode");
+ LDLM_DEBUG(lock, "completion AST, new lock mode");
}
if (lock->l_resource->lr_type == LDLM_EXTENT)
memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
sizeof(lock->l_resource->lr_name)) != 0) {
ldlm_lock_change_resource(ns, lock,
dlm_req->lock_desc.l_resource.lr_name);
- LDLM_DEBUG0(lock, "completion AST, new resource");
+ LDLM_DEBUG(lock, "completion AST, new resource");
}
lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock, req, sizeof(*req));
lock->l_resource->lr_tmp = NULL;
l_unlock(&ns->ns_lock);
- LDLM_DEBUG0(lock, "callback handler finished, about to run_ast_work");
+ LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
LDLM_LOCK_PUT(lock);
ldlm_run_ast_work(&ast_list);
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
- RETURN(-ENOTCONN);
+ rc = -ENOTCONN;
+ goto out;
}
LASSERT(req->rq_export != NULL);
CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
RETURN(-EINVAL);
}
-
+ out:
req->rq_status = rc;
- if (rc) {
- ptlrpc_error(req->rq_svc, req);
- } else {
- rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
- &req->rq_repmsg);
- if (rc)
- RETURN(rc);
- ptlrpc_reply(req->rq_svc, req);
- }
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+ ptlrpc_reply(req->rq_svc, req);
RETURN(0);
}
static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
- struct obd_uuid uuid = {"self"};
int rc, i;
ENTRY;
if (rc != 0)
RETURN(rc);
+#ifdef __KERNEL__
ldlm->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, &uuid,
+ LDLM_CB_REPLY_PORTAL,
ldlm_callback_handler, "ldlm_cbd");
if (!ldlm->ldlm_cb_service) {
ldlm->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, &uuid,
+ LDLM_CANCEL_REPLY_PORTAL,
ldlm_cancel_handler, "ldlm_canceld");
if (!ldlm->ldlm_cancel_service) {
}
}
+#endif
INIT_LIST_HEAD(&waiting_locks_list);
spin_lock_init(&waiting_locks_spinlock);
waiting_locks_timer.function = waiting_locks_callback;
RETURN(0);
out_thread:
+#ifdef __KERNEL__
ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
ptlrpc_unregister_service(ldlm->ldlm_cb_service);
-
+#endif
out_proc:
ldlm_proc_cleanup(obddev);
RETURN(-EBUSY);
}
+#ifdef __KERNEL__
ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
ptlrpc_unregister_service(ldlm->ldlm_cb_service);
ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
ldlm_proc_cleanup(obddev);
-
+#endif
ldlm_already_setup = 0;
RETURN(0);
}
o_disconnect: class_disconnect
};
-static int __init ldlm_init(void)
+int __init ldlm_init(void)
{
int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
if (rc != 0)
EXPORT_SYMBOL(l_lock);
EXPORT_SYMBOL(l_unlock);
+#ifdef __KERNEL__
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
MODULE_LICENSE("GPL");
module_init(ldlm_init);
module_exit(ldlm_exit);
+#endif