/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
* Author: Peter Braam <braam@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
*
return ((timeout / HZ) + 1) * HZ;
}
+/* XXX should this be per-ldlm? */
static struct list_head waiting_locks_list;
static spinlock_t waiting_locks_spinlock;
static struct timer_list waiting_locks_timer;
RETURN(1);
}
-static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
- void *data, __u32 data_len, int flag)
+int ldlm_server_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
+ void *data, int flag)
{
struct ldlm_request *body;
struct ptlrpc_request *req;
LASSERT(lock);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ /* XXX This is necessary because, with the lock re-tasking, we actually
+ * _can_ get called in here twice. (bug 830) */
+ if (!list_empty(&lock->l_pending_chain)) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ RETURN(0);
+ }
+
if (lock->l_destroyed) {
/* What's the point? */
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
req->rq_level = LUSTRE_CONN_RECOVD;
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
+ ldlm_del_waiting_lock(lock);
ldlm_expired_completion_wait(lock);
} else if (rc) {
CERROR("client returned %d from blocking AST for lock %p\n",
RETURN(rc);
}
-static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
{
struct ldlm_request *body;
struct ptlrpc_request *req;
req->rq_level = LUSTRE_CONN_RECOVD;
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT || rc == -EINTR) {
+ ldlm_del_waiting_lock(lock);
ldlm_expired_completion_wait(lock);
} else if (rc) {
CERROR("client returned %d from completion AST for lock %p\n",
RETURN(rc);
}
-int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_handle_enqueue(struct ptlrpc_request *req,
+ ldlm_completion_callback completion_callback,
+ ldlm_blocking_callback blocking_callback)
{
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
}
}
- /* XXX notice that this lock has no callback data: of course the
- export would be exactly what we may want to use here... */
+ /* The lock's callback data might be set in the policy function */
lock = ldlm_lock_create(obddev->obd_namespace,
&dlm_req->lock_handle2,
dlm_req->lock_desc.l_resource.lr_name,
&lock->l_export->exp_ldlm_data.led_held_locks);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- err = ldlm_lock_enqueue(obddev->obd_namespace, lock, cookie, cookielen,
- &flags, ldlm_server_completion_ast,
- ldlm_server_blocking_ast);
- if (err != ELDLM_OK)
+ err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
+ &flags, completion_callback, blocking_callback);
+ if (err)
GOTO(out, err);
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&dlm_rep->lock_extent, &lock->l_extent,
sizeof(lock->l_extent));
if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
- memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
+ memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
sizeof(dlm_rep->lock_resource_name));
dlm_rep->lock_mode = lock->l_req_mode;
}
"(err=%d)", err);
req->rq_status = err;
+ /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
+ * ldlm_reprocess_all. If this moves, revisit that code. -phil */
if (lock) {
if (!err)
ldlm_reprocess_all(lock->l_resource);
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
if (!lock) {
- LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
- "%p)", (void *)(unsigned long)
- dlm_req->lock_handle1.addr);
+ CERROR("received cancel for unknown lock cookie "LPX64"\n",
+ dlm_req->lock_handle1.cookie);
+ LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
+ "(cookie "LPU64")",
+ dlm_req->lock_handle1.cookie);
req->rq_status = ESTALE;
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
"callback (%p)", lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL) {
lock->l_blocking_ast(lock, &dlm_req->lock_desc,
- lock->l_data, lock->l_data_len,
- LDLM_CB_BLOCKING);
+ lock->l_data, LDLM_CB_BLOCKING);
}
} else
LDLM_DEBUG(lock, "Lock still has references, will be"
memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
sizeof(lock->l_extent));
ldlm_resource_unlink_lock(lock);
- if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
- lock->l_resource->lr_name,
- sizeof(__u64) * RES_NAME_SIZE) != 0) {
+ if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
+ &lock->l_resource->lr_name,
+ sizeof(lock->l_resource->lr_name)) != 0) {
ldlm_lock_change_resource(ns, lock,
dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
}
lock->l_resource->lr_tmp = &ast_list;
- ldlm_grant_lock(lock);
+ ldlm_grant_lock(lock, req, sizeof(*req));
lock->l_resource->lr_tmp = NULL;
l_unlock(&ns->ns_lock);
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
{
struct obd_device *obddev = class_conn2obd(conn);
struct ptlrpc_connection *connection;
+ struct obd_uuid uuid = { "ldlm" };
int err = 0;
ENTRY;
OBD_ALLOC(obddev->u.ldlm.ldlm_client,
sizeof(*obddev->u.ldlm.ldlm_client));
- connection = ptlrpc_uuid_to_connection("ldlm");
+ connection = ptlrpc_uuid_to_connection(&uuid);
if (!connection)
CERROR("No LDLM UUID found: assuming ldlm is local.\n");
switch (cmd) {
case IOC_LDLM_TEST:
- err = ldlm_test(obddev, conn);
- CERROR("-- done err %d\n", err);
+ //err = ldlm_test(obddev, conn);
+ err = 0;
+ CERROR("-- NO TESTS WERE RUN done err %d\n", err);
GOTO(out, err);
case IOC_LDLM_DUMP:
ldlm_dump_all_namespaces();
static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ldlm_obd *ldlm = &obddev->u.ldlm;
+ struct obd_uuid uuid = {"self"};
int rc, i;
ENTRY;
ldlm->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, "self",
+ LDLM_CB_REPLY_PORTAL, &uuid,
ldlm_callback_handler, "ldlm_cbd");
if (!ldlm->ldlm_cb_service) {
ldlm->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, "self",
+ LDLM_CANCEL_REPLY_PORTAL, &uuid,
ldlm_cancel_handler, "ldlm_canceld");
if (!ldlm->ldlm_cancel_service) {
}
static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
- obd_uuid_t cluuid, struct recovd_obd *recovd,
+ struct obd_uuid *cluuid, struct recovd_obd *recovd,
ptlrpc_recovery_cb_t recover)
{
return class_connect(conn, src, cluuid);
CERROR("couldn't free ldlm lock slab\n");
}
-EXPORT_SYMBOL(ldlm_completion_ast);
-EXPORT_SYMBOL(ldlm_handle_enqueue);
-EXPORT_SYMBOL(ldlm_handle_cancel);
-EXPORT_SYMBOL(ldlm_handle_convert);
+/* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_lock2desc);
EXPORT_SYMBOL(ldlm_register_intent);
EXPORT_SYMBOL(ldlm_unregister_intent);
EXPORT_SYMBOL(ldlm_lockname);
EXPORT_SYMBOL(ldlm_typename);
-EXPORT_SYMBOL(__ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock2handle);
+EXPORT_SYMBOL(__ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock_put);
EXPORT_SYMBOL(ldlm_lock_match);
+EXPORT_SYMBOL(ldlm_lock_cancel);
EXPORT_SYMBOL(ldlm_lock_addref);
EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
EXPORT_SYMBOL(ldlm_lock_change_resource);
EXPORT_SYMBOL(ldlm_lock_set_data);
+EXPORT_SYMBOL(ldlm_it2str);
+EXPORT_SYMBOL(ldlm_lock_dump);
+EXPORT_SYMBOL(ldlm_lock_dump_handle);
+EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
+EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+
+/* ldlm_request.c */
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_expired_completion_wait);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
EXPORT_SYMBOL(ldlm_cli_cancel_unused);
EXPORT_SYMBOL(ldlm_match_or_enqueue);
-EXPORT_SYMBOL(ldlm_it2str);
+EXPORT_SYMBOL(ldlm_replay_locks);
+EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach_res);
+
+/* ldlm_lockd.c */
+EXPORT_SYMBOL(ldlm_server_blocking_ast);
+EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_del_waiting_lock);
+
+#if 0
+/* ldlm_test.c */
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_regression_start);
EXPORT_SYMBOL(ldlm_regression_stop);
-EXPORT_SYMBOL(ldlm_lock_dump);
-EXPORT_SYMBOL(ldlm_lock_dump_handle);
+#endif
+
+/* ldlm_resource.c */
EXPORT_SYMBOL(ldlm_namespace_new);
EXPORT_SYMBOL(ldlm_namespace_cleanup);
EXPORT_SYMBOL(ldlm_namespace_free);
EXPORT_SYMBOL(ldlm_namespace_dump);
-EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
-EXPORT_SYMBOL(ldlm_replay_locks);
-EXPORT_SYMBOL(ldlm_resource_foreach);
-EXPORT_SYMBOL(ldlm_reprocess_all_ns);
-EXPORT_SYMBOL(ldlm_namespace_foreach);
-EXPORT_SYMBOL(ldlm_namespace_foreach_res);
+
+/* l_lock.c */
EXPORT_SYMBOL(l_lock);
EXPORT_SYMBOL(l_unlock);