# include <linux/module.h>
# include <linux/slab.h>
# include <linux/init.h>
+# include <linux/wait.h>
#else
# include <liblustre.h>
#endif
#include <linux/lustre_dlm.h>
#include <linux/obd_class.h>
-
extern kmem_cache_t *ldlm_resource_slab;
extern kmem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
+static int ldlm_already_setup = 0;
+
+#ifdef __KERNEL__
+
inline unsigned long round_timeout(unsigned long timeout)
{
return ((timeout / HZ) + 1) * HZ;
static struct list_head waiting_locks_list;
static spinlock_t waiting_locks_spinlock;
static struct timer_list waiting_locks_timer;
-static int ldlm_already_setup = 0;
+
+static struct expired_lock_thread {
+ wait_queue_head_t elt_waitq;
+ int elt_state;
+ struct list_head elt_expired_locks;
+ spinlock_t elt_lock;
+} expired_lock_thread;
+
+#define ELT_STOPPED 0
+#define ELT_READY 1
+#define ELT_TERMINATE 2
+
+static inline int have_expired_locks(void)
+{
+ int need_to_run;
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ RETURN(need_to_run);
+}
+
+static int expired_lock_main(void *arg)
+{
+ struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+ struct l_wait_info lwi = { 0 };
+ unsigned long flags;
+
+ ENTRY;
+ lock_kernel();
+ kportal_daemonize("ldlm_elt");
+
+ SIGNAL_MASK_LOCK(current, flags);
+ sigfillset(¤t->blocked);
+ RECALC_SIGPENDING;
+ SIGNAL_MASK_UNLOCK(current, flags);
+
+ unlock_kernel();
+
+ expired_lock_thread.elt_state = ELT_READY;
+ wake_up(&expired_lock_thread.elt_waitq);
+
+ while (1) {
+ l_wait_event(expired_lock_thread.elt_waitq,
+ have_expired_locks() ||
+ expired_lock_thread.elt_state == ELT_TERMINATE,
+ &lwi);
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ while (!list_empty(expired)) {
+ struct ldlm_lock *lock = list_entry(expired->next,
+ struct ldlm_lock,
+ l_pending_chain);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ ptlrpc_fail_export(lock->l_export);
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ }
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+ if (expired_lock_thread.elt_state == ELT_TERMINATE)
+ break;
+ }
+
+ expired_lock_thread.elt_state = ELT_STOPPED;
+ wake_up(&expired_lock_thread.elt_waitq);
+ RETURN(0);
+}
static void waiting_locks_callback(unsigned long unused)
{
- struct list_head *liter, *n;
+ struct ldlm_lock *lock;
spin_lock_bh(&waiting_locks_spinlock);
- list_for_each_safe(liter, n, &waiting_locks_list) {
- struct ldlm_lock *l = list_entry(liter, struct ldlm_lock,
- l_pending_chain);
- if (l->l_callback_timeout > jiffies)
+ while (!list_empty(&waiting_locks_list)) {
+ lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
+ l_pending_chain);
+
+ if (lock->l_callback_timeout > jiffies)
break;
- CERROR("lock timer expired, lock %p\n", l);
- LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
- l->l_export, l->l_export->exp_connection);
- recovd_conn_fail(l->l_export->exp_connection);
+
+ LDLM_ERROR(lock, "lock callback timer expired: evicting client "
+ "%s@%s nid "LPU64,
+ lock->l_export->exp_client_uuid.uuid,
+ lock->l_export->exp_connection->c_remote_uuid.uuid,
+ lock->l_export->exp_connection->c_peer.peer_nid);
+
+ spin_lock_bh(&expired_lock_thread.elt_lock);
+ list_del(&lock->l_pending_chain);
+ list_add(&lock->l_pending_chain,
+ &expired_lock_thread.elt_expired_locks);
+ spin_unlock_bh(&expired_lock_thread.elt_lock);
+ wake_up(&expired_lock_thread.elt_waitq);
}
+
spin_unlock_bh(&waiting_locks_spinlock);
}
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
unsigned long timeout_rounded;
- ENTRY;
+ LDLM_DEBUG(lock, "adding to wait list");
LASSERT(list_empty(&lock->l_pending_chain));
spin_lock_bh(&waiting_locks_spinlock);
}
list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
spin_unlock_bh(&waiting_locks_spinlock);
- RETURN(1);
+ /* We drop this ref when we get removed from the list. */
+ class_export_get(lock->l_export);
+ return 1;
}
/*
{
struct list_head *list_next;
- ENTRY;
+ if (lock->l_export == NULL) {
+ /* We don't have a "waiting locks list" on clients. */
+ LDLM_DEBUG(lock, "client lock: no-op");
+ return 0;
+ }
spin_lock_bh(&waiting_locks_spinlock);
if (list_empty(&lock->l_pending_chain)) {
spin_unlock_bh(&waiting_locks_spinlock);
- RETURN(0);
+ LDLM_DEBUG(lock, "wasn't waiting");
+ return 0;
}
list_next = lock->l_pending_chain.next;
}
list_del_init(&lock->l_pending_chain);
spin_unlock_bh(&waiting_locks_spinlock);
+ /* We got this ref when we were added to the list. */
+ class_export_put(lock->l_export);
+ LDLM_DEBUG(lock, "removed");
+ return 1;
+}
+
+#else /* !__KERNEL__ */
+
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
RETURN(1);
}
-static inline void ldlm_failed_ast(struct ldlm_lock *lock)
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
{
- /* XXX diagnostic */
- recovd_conn_fail(lock->l_export->exp_connection);
+ RETURN(0);
+}
+
+#endif /* __KERNEL__ */
+
+static inline void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
+ char *ast_type)
+{
+ CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
+ ", mode %s: evicting client %s@%s NID "LPU64"\n",
+ ast_type, rc,
+ lock->l_resource->lr_name.name[0],
+ lock->l_resource->lr_name.name[1],
+ ldlm_lockname[lock->l_granted_mode],
+ lock->l_export->exp_client_uuid.uuid,
+ lock->l_export->exp_connection->c_remote_uuid.uuid,
+ lock->l_export->exp_connection->c_peer.peer_nid);
+ ptlrpc_fail_export(lock->l_export);
}
int ldlm_server_blocking_ast(struct ldlm_lock *lock,
RETURN(0);
}
- req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
+#if 0
+ if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
+ ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
+ RETURN(-ETIMEDOUT);
+ }
+#endif
+
+ req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
LDLM_BL_CALLBACK, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
memcpy(&body->lock_desc, desc, sizeof(*desc));
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
req->rq_level = LUSTRE_CONN_RECOVD;
- req->rq_timeout = 2;
+ req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
- ldlm_failed_ast(lock);
+ ldlm_failed_ast(lock, rc, "blocking");
} else if (rc) {
- CERROR("client returned %d from blocking AST for lock %p\n",
- req->rq_status, lock);
+ if (rc == -EINVAL)
+ CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p--normal race\n",
+ req->rq_connection->c_peer.peer_nid,
+ req->rq_repmsg->status, lock);
+ else if (rc == -ENOTCONN)
+ CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p--this client was "
+ "probably rebooted while it held a lock, nothing"
+ " serious\n",req->rq_connection->c_peer.peer_nid,
+ req->rq_repmsg->status, lock);
+ else
+ CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
+ "from blocking AST for lock %p\n",
+ req->rq_connection->c_peer.peer_nid,
+ req->rq_repmsg->status, lock);
LDLM_DEBUG(lock, "client returned error %d from blocking AST",
req->rq_status);
ldlm_lock_cancel(lock);
RETURN(-EINVAL);
}
- req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
+ req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
LDLM_CP_CALLBACK, 1, &size, NULL);
if (!req)
RETURN(-ENOMEM);
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
memcpy(&body->lock_handle1, &lock->l_remote_handle,
sizeof(body->lock_handle1));
body->lock_flags = flags;
req->rq_replen = lustre_msg_size(0, NULL);
req->rq_level = LUSTRE_CONN_RECOVD;
- req->rq_timeout = 2;
+ req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
ldlm_del_waiting_lock(lock);
- ldlm_failed_ast(lock);
+ ldlm_failed_ast(lock, rc, "completion");
} else if (rc) {
CERROR("client returned %d from completion AST for lock %p\n",
req->rq_status, lock);
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req == NULL) {
+ CERROR ("Can't unpack dlm_req\n");
+ RETURN (-EFAULT);
+ }
+
flags = dlm_req->lock_flags;
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
(flags & LDLM_FL_HAS_INTENT)) {
&dlm_req->lock_handle2,
dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
- dlm_req->lock_desc.l_req_mode, NULL, 0);
+ dlm_req->lock_desc.l_req_mode,
+ blocking_callback, NULL);
if (!lock)
GOTO(out, err = -ENOMEM);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
- &flags, completion_callback, blocking_callback);
+ &flags, completion_callback);
if (err)
GOTO(out, err);
- dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
dlm_rep->lock_flags = flags;
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
int rc, size = sizeof(*dlm_rep);
ENTRY;
+ dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req == NULL) {
+ CERROR ("Can't unpack dlm_req\n");
+ RETURN (-EFAULT);
+ }
+
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
RETURN(-ENOMEM);
}
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
dlm_rep->lock_flags = dlm_req->lock_flags;
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
int rc;
ENTRY;
+ dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req == NULL) {
+ CERROR("bad request buffer for cancel\n");
+ RETURN(-EFAULT);
+ }
+
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("out of memory\n");
RETURN(-ENOMEM);
}
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- if (!dlm_req) {
- CERROR("bad request buffer for cancel\n");
- RETURN(-EINVAL);
- }
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
if (!lock) {
- CERROR("received cancel for unknown lock cookie "LPX64"\n",
- dlm_req->lock_handle1.cookie);
+ CERROR("received cancel for unknown lock cookie "LPX64
+ " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
+ req->rq_connection->c_peer.peer_nid);
LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
"(cookie "LPU64")",
dlm_req->lock_handle1.cookie);
req->rq_status = 0;
}
- if (ptlrpc_reply(req->rq_svc, req) != 0)
+ if (ptlrpc_reply(req) != 0)
LBUG();
if (lock) {
int do_ast;
ENTRY;
- /* Try to narrow down this damn iozone bug */
- if (lock->l_resource == NULL)
- CERROR("lock %p resource NULL\n", lock);
- if (lock->l_resource->lr_type != LDLM_EXTENT)
- if (lock->l_resource->lr_namespace != ns)
- CERROR("lock %p namespace %p != passed ns %p\n", lock,
- lock->l_resource->lr_namespace, ns);
+ l_lock(&ns->ns_lock);
LDLM_DEBUG(lock, "client blocking AST callback handler START");
- l_lock(&ns->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&ns->ns_lock);
if (do_ast) {
LDLM_DEBUG(lock, "already unused, calling "
"callback (%p)", lock->l_blocking_ast);
- if (lock->l_blocking_ast != NULL)
+ if (lock->l_blocking_ast != NULL) {
+ l_unlock(&ns->ns_lock);
lock->l_blocking_ast(lock, &dlm_req->lock_desc,
lock->l_data, LDLM_CB_BLOCKING);
+ l_lock(&ns->ns_lock);
+ }
} else {
LDLM_DEBUG(lock, "Lock still has references, will be"
" cancelled later");
}
LDLM_DEBUG(lock, "client blocking callback handler END");
+ l_unlock(&ns->ns_lock);
LDLM_LOCK_PUT(lock);
EXIT;
}
LIST_HEAD(ast_list);
ENTRY;
- LDLM_DEBUG(lock, "client completion callback handler START");
-
l_lock(&ns->ns_lock);
+ LDLM_DEBUG(lock, "client completion callback handler START");
/* If we receive the completion AST before the actual enqueue returned,
* then we might need to switch lock modes, resources, or extents. */
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
LDLM_DEBUG(lock, "completion AST, new lock mode");
}
- if (lock->l_resource->lr_type == LDLM_EXTENT)
+ if (lock->l_resource->lr_type == LDLM_EXTENT) {
memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
sizeof(lock->l_extent));
+
+ if ((lock->l_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
+ /* XXX Old versions of BA OST code have a fencepost bug
+ * which will cause them to grant a lock that's one
+ * byte too large. This can be safely removed after BA
+ * ships their next release -phik (02 Apr 2003) */
+ lock->l_extent.end--;
+ } else if ((lock->l_extent.start & ~PAGE_MASK) ==
+ ~PAGE_MASK) {
+ lock->l_extent.start++;
+ }
+ }
+
ldlm_resource_unlink_lock(lock);
if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
&lock->l_resource->lr_name,
lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock, req, sizeof(*req));
lock->l_resource->lr_tmp = NULL;
- l_unlock(&ns->ns_lock);
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
+ l_unlock(&ns->ns_lock);
LDLM_LOCK_PUT(lock);
ldlm_run_ast_work(&ast_list);
&req->rq_repmsg);
if (rc)
return rc;
- return ptlrpc_reply(req->rq_svc, req);
+ return ptlrpc_reply(req);
}
static int ldlm_callback_handler(struct ptlrpc_request *req)
struct ldlm_namespace *ns;
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
- int rc;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc) {
- CERROR("Invalid request: %d\n", rc);
- RETURN(rc);
- }
+ /* Requests arrive in sender's byte order. The ptlrpc service
+ * handler has already checked and, if necessary, byte-swapped the
+ * incoming request message body, but I am responsible for the
+ * message buffers. */
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
- CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
- req->rq_reqmsg->opc, req->rq_request_portal,
- req->rq_reply_portal);
- CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
- req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
- dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
+ CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
+ "export cookie "LPX64" (ptl req %d/rep %d); this is "
+ "normal if this node rebooted with a lock held\n",
+ req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
+ req->rq_reqmsg->handle.cookie,
+ req->rq_request_portal, req->rq_reply_portal);
+
+ dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req != NULL)
+ CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
+ dlm_req->lock_handle1.cookie);
+
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
} else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
} else {
- ldlm_callback_reply(req, -EIO);
+ ldlm_callback_reply(req, -EPROTO);
RETURN(0);
}
ns = req->rq_export->exp_obd->obd_namespace;
LASSERT(ns != NULL);
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req == NULL) {
+ CERROR ("can't unpack dlm_req\n");
+ ldlm_callback_reply (req, -EPROTO);
+ RETURN (0);
+ }
+
lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
if (!lock) {
CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
CDEBUG(D_INODE, "completion ast\n");
ldlm_handle_cp_callback(req, ns, dlm_req, lock);
break;
+ default:
+ LBUG(); /* checked above */
}
RETURN(0);
int rc;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc) {
- CERROR("lustre_ldlm: Invalid request: %d\n", rc);
- RETURN(rc);
- }
+ /* Requests arrive in sender's byte order. The ptlrpc service
+ * handler has already checked and, if necessary, byte-swapped the
+ * incoming request message body, but I am responsible for the
+ * message buffers. */
if (req->rq_export == NULL) {
struct ldlm_request *dlm_req;
CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
req->rq_reqmsg->opc, req->rq_request_portal,
req->rq_reply_portal);
- CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
- req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
- dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
+ CERROR("--> export cookie: "LPX64"\n",
+ req->rq_reqmsg->handle.cookie);
+ dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req != NULL)
+ ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
RETURN(-ENOTCONN);
}
switch (req->rq_reqmsg->opc) {
- /* XXX FIXME move this back to mds/handler.c, bug 625069 */
+ /* XXX FIXME move this back to mds/handler.c, bug 249 */
case LDLM_CANCEL:
CDEBUG(D_INODE, "cancel\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
RETURN(rc);
#ifdef __KERNEL__
+ inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
+ ldlm_cli_cancel_unused);
+ inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
+ ldlm_namespace_cleanup);
+ inter_module_register("ldlm_replay_locks", THIS_MODULE,
+ ldlm_replay_locks);
+
ldlm->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
LDLM_CB_REPLY_PORTAL,
- ldlm_callback_handler, "ldlm_cbd");
+ ldlm_callback_handler, "ldlm_cbd", obddev);
if (!ldlm->ldlm_cb_service) {
CERROR("failed to start service\n");
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
LDLM_CANCEL_REPLY_PORTAL,
- ldlm_cancel_handler, "ldlm_canceld");
+ ldlm_cancel_handler, "ldlm_canceld", obddev);
if (!ldlm->ldlm_cancel_service) {
CERROR("failed to start service\n");
}
}
-#endif
+ INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
+ spin_lock_init(&expired_lock_thread.elt_lock);
+ expired_lock_thread.elt_state = ELT_STOPPED;
+ init_waitqueue_head(&expired_lock_thread.elt_waitq);
+
+ rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
+ if (rc < 0) {
+ CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
+ GOTO(out_thread, rc);
+ }
+
+ wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_READY);
+
INIT_LIST_HEAD(&waiting_locks_list);
spin_lock_init(&waiting_locks_spinlock);
waiting_locks_timer.function = waiting_locks_callback;
waiting_locks_timer.data = 0;
init_timer(&waiting_locks_timer);
+#endif
ldlm_already_setup = 1;
return rc;
}
-static int ldlm_cleanup(struct obd_device *obddev)
+static int ldlm_cleanup(struct obd_device *obddev, int force, int failover)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
ENTRY;
if (!list_empty(&ldlm_namespace_list)) {
CERROR("ldlm still has namespaces; clean these up first.\n");
+ ldlm_dump_all_namespaces();
RETURN(-EBUSY);
}
#ifdef __KERNEL__
+ if (force) {
+ ptlrpc_put_ldlm_hooks();
+ } else if (ptlrpc_ldlm_hooks_referenced()) {
+ CERROR("Some connections weren't cleaned up; run lconf with "
+ "--force to forcibly unload.\n");
+ ptlrpc_dump_connections();
+ RETURN(-EBUSY);
+ }
+
ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
ptlrpc_unregister_service(ldlm->ldlm_cb_service);
ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
ldlm_proc_cleanup(obddev);
+
+ expired_lock_thread.elt_state = ELT_TERMINATE;
+ wake_up(&expired_lock_thread.elt_waitq);
+ wait_event(expired_lock_thread.elt_waitq,
+ expired_lock_thread.elt_state == ELT_STOPPED);
+
+ inter_module_unregister("ldlm_namespace_cleanup");
+ inter_module_unregister("ldlm_cli_cancel_unused");
+ inter_module_unregister("ldlm_replay_locks");
#endif
+
ldlm_already_setup = 0;
RETURN(0);
}
static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
- struct obd_uuid *cluuid, struct recovd_obd *recovd,
- ptlrpc_recovery_cb_t recover)
+ struct obd_uuid *cluuid)
{
return class_connect(conn, src, cluuid);
}
EXPORT_SYMBOL(l_lock);
EXPORT_SYMBOL(l_unlock);
+/* ldlm_lib.c */
+EXPORT_SYMBOL(client_import_connect);
+EXPORT_SYMBOL(client_import_disconnect);
+EXPORT_SYMBOL(target_abort_recovery);
+EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_cancel_recovery_timer);
+EXPORT_SYMBOL(target_send_reply);
+EXPORT_SYMBOL(target_queue_recovery_request);
+EXPORT_SYMBOL(target_handle_ping);
+EXPORT_SYMBOL(target_handle_disconnect);
+EXPORT_SYMBOL(target_queue_final_reply);
+
#ifdef __KERNEL__
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");