Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index d826db1..803e59d 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
  *   Author: Peter Braam <braam@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
  *
@@ -42,6 +42,7 @@ inline unsigned long round_timeout(unsigned long timeout)
         return ((timeout / HZ) + 1) * HZ;
 }
 
+/* XXX should this be per-ldlm? */
 static struct list_head waiting_locks_list;
 static spinlock_t waiting_locks_spinlock;
 static struct timer_list waiting_locks_timer;
@@ -129,9 +130,9 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         RETURN(1);
 }
 
-static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
-                                    struct ldlm_lock_desc *desc,
-                                    void *data, __u32 data_len, int flag)
+int ldlm_server_blocking_ast(struct ldlm_lock *lock,
+                             struct ldlm_lock_desc *desc,
+                             void *data, int flag)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
@@ -146,6 +147,13 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LASSERT(lock);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        /* XXX This is necessary because, with the lock re-tasking, we actually
+         * _can_ get called in here twice.  (bug 830) */
+        if (!list_empty(&lock->l_pending_chain)) {
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                RETURN(0);
+        }
+
         if (lock->l_destroyed) {
                 /* What's the point? */
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
@@ -171,6 +179,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req->rq_level = LUSTRE_CONN_RECOVD;
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_del_waiting_lock(lock);
                 ldlm_expired_completion_wait(lock);
         } else if (rc) {
                 CERROR("client returned %d from blocking AST for lock %p\n",
@@ -188,7 +197,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         RETURN(rc);
 }
 
-static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         struct ldlm_request *body;
         struct ptlrpc_request *req;
@@ -217,6 +226,7 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         req->rq_level = LUSTRE_CONN_RECOVD;
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_del_waiting_lock(lock);
                 ldlm_expired_completion_wait(lock);
         } else if (rc) {
                 CERROR("client returned %d from completion AST for lock %p\n",
@@ -233,7 +243,9 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         RETURN(rc);
 }
 
-int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_handle_enqueue(struct ptlrpc_request *req,
+                        ldlm_completion_callback completion_callback,
+                        ldlm_blocking_callback blocking_callback)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
@@ -268,8 +280,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                 }
         }
 
-        /* XXX notice that this lock has no callback data: of course the
-           export would be exactly what we may want to use here... */
+        /* The lock's callback data might be set in the policy function */
         lock = ldlm_lock_create(obddev->obd_namespace,
                                 &dlm_req->lock_handle2,
                                 dlm_req->lock_desc.l_resource.lr_name,
@@ -289,10 +300,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(obddev->obd_namespace, lock, cookie, cookielen,
-                                &flags, ldlm_server_completion_ast,
-                                ldlm_server_blocking_ast);
-        if (err != ELDLM_OK)
+        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
+                                &flags, completion_callback, blocking_callback);
+        if (err)
                 GOTO(out, err);
 
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
@@ -303,7 +313,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
                        sizeof(lock->l_extent));
         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
-                memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
+                memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
                        sizeof(dlm_rep->lock_resource_name));
                 dlm_rep->lock_mode = lock->l_req_mode;
         }
@@ -315,6 +325,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                            "(err=%d)", err);
         req->rq_status = err;
 
+        /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
+         * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
                 if (!err)
                         ldlm_reprocess_all(lock->l_resource);
@@ -384,9 +396,11 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
-                LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
-                                  "%p)", (void *)(unsigned long)
-                                  dlm_req->lock_handle1.addr);
+                CERROR("received cancel for unknown lock cookie "LPX64"\n",
+                       dlm_req->lock_handle1.cookie);
+                LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
+                                  "(cookie "LPU64")",
+                                  dlm_req->lock_handle1.cookie);
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
@@ -442,8 +456,7 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
                            "callback (%p)", lock->l_blocking_ast);
                 if (lock->l_blocking_ast != NULL) {
                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
-                                             lock->l_data, lock->l_data_len,
-                                             LDLM_CB_BLOCKING);
+                                             lock->l_data, LDLM_CB_BLOCKING);
                 }
         } else
                 LDLM_DEBUG(lock, "Lock still has references, will be"
@@ -487,15 +500,15 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
                        sizeof(lock->l_extent));
         ldlm_resource_unlink_lock(lock);
-        if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
-                   lock->l_resource->lr_name,
-                   sizeof(__u64) * RES_NAME_SIZE) != 0) {
+        if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
+                   &lock->l_resource->lr_name,
+                   sizeof(lock->l_resource->lr_name)) != 0) {
                 ldlm_lock_change_resource(ns, lock,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
         lock->l_resource->lr_tmp = &ast_list;
-        ldlm_grant_lock(lock);
+        ldlm_grant_lock(lock, req, sizeof(*req));
         lock->l_resource->lr_tmp = NULL;
         l_unlock(&ns->ns_lock);
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
@@ -618,6 +631,7 @@ static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 {
         struct obd_device *obddev = class_conn2obd(conn);
         struct ptlrpc_connection *connection;
+        struct obd_uuid uuid = { "ldlm" };
         int err = 0;
         ENTRY;
 
@@ -630,14 +644,15 @@ static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 
         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
                   sizeof(*obddev->u.ldlm.ldlm_client));
-        connection = ptlrpc_uuid_to_connection("ldlm");
+        connection = ptlrpc_uuid_to_connection(&uuid);
         if (!connection)
                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
 
         switch (cmd) {
         case IOC_LDLM_TEST:
-                err = ldlm_test(obddev, conn);
-                CERROR("-- done err %d\n", err);
+                //err = ldlm_test(obddev, conn);
+                err = 0;
+                CERROR("-- NO TESTS WERE RUN done err %d\n", err);
                 GOTO(out, err);
         case IOC_LDLM_DUMP:
                 ldlm_dump_all_namespaces();
@@ -657,6 +672,7 @@ static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
+        struct obd_uuid uuid = {"self"};
         int rc, i;
         ENTRY;
 
@@ -670,7 +686,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         ldlm->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
-                                LDLM_CB_REPLY_PORTAL, "self",
+                                LDLM_CB_REPLY_PORTAL, &uuid,
                                 ldlm_callback_handler, "ldlm_cbd");
 
         if (!ldlm->ldlm_cb_service) {
@@ -681,7 +697,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         ldlm->ldlm_cancel_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
-                                LDLM_CANCEL_REPLY_PORTAL, "self",
+                                LDLM_CANCEL_REPLY_PORTAL, &uuid,
                                 ldlm_cancel_handler, "ldlm_canceld");
 
         if (!ldlm->ldlm_cancel_service) {
@@ -755,7 +771,7 @@ static int ldlm_cleanup(struct obd_device *obddev)
 }
 
 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
-                        obd_uuid_t cluuid, struct recovd_obd *recovd,
+                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
                         ptlrpc_recovery_cb_t recover)
 {
         return class_connect(conn, src, cluuid);
@@ -804,43 +820,63 @@ static void __exit ldlm_exit(void)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
-EXPORT_SYMBOL(ldlm_completion_ast);
-EXPORT_SYMBOL(ldlm_handle_enqueue);
-EXPORT_SYMBOL(ldlm_handle_cancel);
-EXPORT_SYMBOL(ldlm_handle_convert);
+/* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_lock2desc);
 EXPORT_SYMBOL(ldlm_register_intent);
 EXPORT_SYMBOL(ldlm_unregister_intent);
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
-EXPORT_SYMBOL(__ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock2handle);
+EXPORT_SYMBOL(__ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_match);
+EXPORT_SYMBOL(ldlm_lock_cancel);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
 EXPORT_SYMBOL(ldlm_lock_change_resource);
 EXPORT_SYMBOL(ldlm_lock_set_data);
+EXPORT_SYMBOL(ldlm_it2str);
+EXPORT_SYMBOL(ldlm_lock_dump);
+EXPORT_SYMBOL(ldlm_lock_dump_handle);
+EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
+EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+
+/* ldlm_request.c */
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_expired_completion_wait);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_cancel);
 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
 EXPORT_SYMBOL(ldlm_match_or_enqueue);
-EXPORT_SYMBOL(ldlm_it2str);
+EXPORT_SYMBOL(ldlm_replay_locks);
+EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach_res);
+
+/* ldlm_lockd.c */
+EXPORT_SYMBOL(ldlm_server_blocking_ast);
+EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_del_waiting_lock);
+
+#if 0
+/* ldlm_test.c */
 EXPORT_SYMBOL(ldlm_test);
 EXPORT_SYMBOL(ldlm_regression_start);
 EXPORT_SYMBOL(ldlm_regression_stop);
-EXPORT_SYMBOL(ldlm_lock_dump);
-EXPORT_SYMBOL(ldlm_lock_dump_handle);
+#endif
+
+/* ldlm_resource.c */
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
 EXPORT_SYMBOL(ldlm_namespace_dump);
-EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
-EXPORT_SYMBOL(ldlm_replay_locks);
-EXPORT_SYMBOL(ldlm_resource_foreach);
-EXPORT_SYMBOL(ldlm_reprocess_all_ns);
-EXPORT_SYMBOL(ldlm_namespace_foreach);
-EXPORT_SYMBOL(ldlm_namespace_foreach_res);
+
+/* l_lock.c */
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);