Whamcloud - gitweb
- Added some temporary LDLM_LOCK_PUT/LDLM_LOCK_GET macros to aid in debugging
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 8ecb2c9..d69128c 100644 (file)
@@ -21,75 +21,19 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-/* _ldlm_callback and local_callback setup the variables then call this common
- * code */
-static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                           ldlm_mode_t mode, void *data, __u32 data_len,
-                           struct ptlrpc_request **reqp)
-{
-        ENTRY;
-
-        if (!lock)
-                LBUG();
-        if (!lock->l_resource)
-                LBUG();
-
-        ldlm_lock_dump(lock);
-
-        spin_lock(&lock->l_resource->lr_lock);
-        spin_lock(&lock->l_lock);
-        if (!new) {
-                CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
-                lock->l_req_mode = mode;
-
-                /* FIXME: the API is flawed if I have to do these refcount
-                 * acrobatics (along with the _put() below). */
-                lock->l_resource->lr_refcount++;
-
-                /* _del_lock is safe for half-created locks that are not yet on
-                 * a list. */
-                ldlm_resource_del_lock(lock);
-                ldlm_grant_lock(lock->l_resource, lock);
-
-                ldlm_resource_put(lock->l_resource);
-
-                wake_up(&lock->l_waitq);
-                spin_unlock(&lock->l_lock);
-                spin_unlock(&lock->l_resource->lr_lock);
-        } else {
-                CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
-                lock->l_flags |= LDLM_FL_DYING;
-                spin_unlock(&lock->l_lock);
-                spin_unlock(&lock->l_resource->lr_lock);
-                if (!lock->l_readers && !lock->l_writers) {
-                        CDEBUG(D_INFO, "Lock already unused, calling "
-                               "callback (%p).\n", lock->l_blocking_ast);
-                        if (lock->l_blocking_ast != NULL)
-                                lock->l_blocking_ast(lock, new, lock->l_data,
-                                                     lock->l_data_len, reqp);
-                } else {
-                        CDEBUG(D_INFO, "Lock still has references; lock will be"
-                               " cancelled later.\n");
-                }
-        }
-        RETURN(0);
-}
-
-static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
-                         struct ptlrpc_request *req)
+static int ldlm_handle_enqueue(struct ptlrpc_request *req)
 {
+        struct obd_device *obddev = req->rq_export->export_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size = sizeof(*dlm_rep), cookielen;
+        int rc, size = sizeof(*dlm_rep), cookielen = 0;
         __u32 flags;
         ldlm_error_t err;
         struct ldlm_lock *lock = NULL;
-        ldlm_lock_callback callback;
-        struct lustre_handle lockh;
-        void *cookie;
+        void *cookie = NULL;
         ENTRY;
 
-        callback = ldlm_cli_callback;
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
@@ -110,28 +54,28 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
                 }
         }
 
-        err = ldlm_local_lock_create(obddev->obd_namespace,
-                                     &dlm_req->lock_handle2,
-                                     dlm_req->lock_desc.l_resource.lr_name,
-                                     dlm_req->lock_desc.l_resource.lr_type,
-                                     dlm_req->lock_desc.l_req_mode,
-                                     NULL, 0, &lockh);
-        if (err != ELDLM_OK)
-                GOTO(out, err);
+        lock = ldlm_lock_create(obddev->obd_namespace,
+                                &dlm_req->lock_handle2,
+                                dlm_req->lock_desc.l_resource.lr_name,
+                                dlm_req->lock_desc.l_resource.lr_type,
+                                dlm_req->lock_desc.l_req_mode, NULL, 0);
+        if (!lock)
+                GOTO(out, err = -ENOMEM);
 
-        lock = lustre_handle2object(&lockh);
-        LDLM_DEBUG(lock, "server-side enqueue handler");
+        memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+               sizeof(lock->l_remote_handle));
+        LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
 
         flags = dlm_req->lock_flags;
-        err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
-                                      callback, callback);
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
+                                ldlm_server_ast, ldlm_server_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_rep->lock_flags = flags;
 
-        memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
+        ldlm_lock2handle(lock, &dlm_rep->lock_handle);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
                        sizeof(lock->l_extent));
@@ -139,28 +83,31 @@ static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
                        sizeof(dlm_rep->lock_resource_name));
 
-        memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
-               sizeof(lock->l_remote_handle));
         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
         EXIT;
  out:
+        if (lock)
+                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+                           "(err=%d)", err);
         req->rq_status = err;
-        CDEBUG(D_INFO, "err = %d\n", err);
 
-        if (ptlrpc_reply(svc, req))
+        if (ptlrpc_reply(req->rq_svc, req))
                 LBUG();
 
-        if (!err)
-                ldlm_reprocess_all(lock->l_resource);
+        if (lock) {
+                if (!err)
+                        ldlm_reprocess_all(lock->l_resource);
+                LDLM_LOCK_PUT(lock);
+        }
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
 
         return 0;
 }
 
-static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_convert(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_reply *dlm_rep;
-        struct ldlm_resource *res;
         struct ldlm_lock *lock;
         int rc, size = sizeof(*dlm_rep);
         ENTRY;
@@ -174,26 +121,32 @@ static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
-        lock = lustre_handle2object(&dlm_req->lock_handle1);
-        LDLM_DEBUG(lock, "server-side convert handler");
-
-        res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
-                                      dlm_req->lock_desc.l_req_mode,
-                                      &dlm_rep->lock_flags);
-        req->rq_status = 0;
-        if (ptlrpc_reply(svc, req) != 0)
+        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        if (!lock) {
+                req->rq_status = EINVAL;
+        } else {
+                LDLM_DEBUG(lock, "server-side convert handler START");
+                ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
+                                  &dlm_rep->lock_flags);
+                req->rq_status = 0;
+        }
+        if (ptlrpc_reply(req->rq_svc, req) != 0)
                 LBUG();
 
-        ldlm_reprocess_all(res);
+        if (lock) {
+                ldlm_reprocess_all(lock->l_resource);
+                LDLM_DEBUG(lock, "server-side convert handler END");
+                LDLM_LOCK_PUT(lock);
+        } else
+                LDLM_DEBUG_NOLOCK("server-side convert handler END");
 
         RETURN(0);
 }
 
-static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
+static int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
-        struct ldlm_resource *res;
         int rc;
         ENTRY;
 
@@ -204,24 +157,35 @@ static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
         }
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = lustre_handle2object(&dlm_req->lock_handle1);
-        LDLM_DEBUG(lock, "server-side cancel handler");
-        res = ldlm_local_lock_cancel(lock);
-        req->rq_status = 0;
-        if (ptlrpc_reply(svc, req) != 0)
+        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        if (!lock) {
+                req->rq_status = ESTALE;
+        } else {
+                LDLM_DEBUG(lock, "server-side cancel handler START");
+                ldlm_lock_cancel(lock);
+                req->rq_status = 0;
+        }
+
+        if (ptlrpc_reply(req->rq_svc, req) != 0)
                 LBUG();
 
-        if (res != NULL)
-                ldlm_reprocess_all(res);
+        if (lock) {
+                ldlm_reprocess_all(lock->l_resource);
+                LDLM_DEBUG(lock, "server-side cancel handler END");
+                LDLM_LOCK_PUT(lock);
+        } else
+                LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
+                                  lock);
 
         RETURN(0);
 }
 
-static int _ldlm_callback(struct ptlrpc_service *svc,
-                          struct ptlrpc_request *req)
+static int ldlm_handle_callback(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock1, *lock2;
+        struct ldlm_lock_desc *descp = NULL;
+        struct ldlm_lock *lock;
+        __u64 is_blocking_ast = 0;
         int rc;
         ENTRY;
 
@@ -234,30 +198,83 @@ static int _ldlm_callback(struct ptlrpc_service *svc,
 
         /* We must send the reply first, so that the thread is free to handle
          * any requests made in common_callback() */
-        rc = ptlrpc_reply(svc, req);
+        rc = ptlrpc_reply(req->rq_svc, req);
         if (rc != 0)
                 RETURN(rc);
 
-        lock1 = lustre_handle2object(&dlm_req->lock_handle1);
-        lock2 = lustre_handle2object(&dlm_req->lock_handle2);
+        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        if (!lock) {
+                CERROR("callback on lock %Lx - lock disappeared\n",
+                       dlm_req->lock_handle1.addr);
+                RETURN(0);
+        }
 
-        LDLM_DEBUG(lock1, "client %s callback handler START",
-                   lock2 == NULL ? "completion" : "blocked");
+        /* check if this is a blocking AST */
+        if (dlm_req->lock_desc.l_req_mode !=
+            dlm_req->lock_desc.l_granted_mode) {
+                descp = &dlm_req->lock_desc;
+                is_blocking_ast = 1;
+        }
 
-        common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
-                        0, NULL);
+        LDLM_DEBUG(lock, "client %s callback handler START",
+                   is_blocking_ast ? "blocked" : "completion");
+
+        if (descp) {
+                int do_ast;
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                do_ast = (!lock->l_readers && !lock->l_writers);
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+                if (do_ast) {
+                        LDLM_DEBUG(lock, "already unused, calling "
+                                   "callback (%p)", lock->l_blocking_ast);
+                        if (lock->l_blocking_ast != NULL) {
+                                struct lustre_handle lockh;
+                                ldlm_lock2handle(lock, &lockh);
+                                lock->l_blocking_ast(&lockh, descp,
+                                                     lock->l_data,
+                                                     lock->l_data_len);
+                        }
+                } else {
+                        LDLM_DEBUG(lock, "Lock still has references, will be"
+                                   " cancelled later");
+                }
+                LDLM_LOCK_PUT(lock);
+        } else {
+                struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+                /* If we receive the completion AST before the actual enqueue
+                 * returned, then we might need to switch resources. */
+                if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+                           lock->l_resource->lr_name,
+                           sizeof(__u64) * RES_NAME_SIZE) != 0) {
+                        ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
+                        LDLM_DEBUG(lock, "completion AST, new resource");
+                }
+                lock->l_resource->lr_tmp = &rpc_list;
+                ldlm_resource_unlink_lock(lock);
+                ldlm_grant_lock(lock);
+                /*  FIXME: we want any completion function, not just wake_up */
+                wake_up(&lock->l_waitq);
+                lock->l_resource->lr_tmp = NULL;
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                LDLM_LOCK_PUT(lock);
 
-        LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
-                   lock2 == NULL ? "completion" : "blocked", lock1);
+                ldlm_run_ast_work(&rpc_list);
+        }
 
+        LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
+                          is_blocking_ast ? "blocked" : "completion", lock);
         RETURN(0);
 }
 
-static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
-                       struct ptlrpc_request *req)
+static int ldlm_handle(struct ptlrpc_request *req)
 {
-        struct obd_device *req_dev;
-        int id, rc;
+        int rc;
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
@@ -272,52 +289,52 @@ static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 GOTO(out, rc = -EINVAL);
         }
 
-        id = req->rq_reqmsg->target_id;
-        if (id < 0 || id > MAX_OBD_DEVICES)
-                GOTO(out, rc = -ENODEV);
-        req_dev = req->rq_obd = &obd_dev[id];
+        if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
+                CERROR("No export handle for enqueue request.\n");
+                GOTO(out, rc = -ENOTCONN);
+        }
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
-                rc = _ldlm_enqueue(req_dev, svc, req);
+                rc = ldlm_handle_enqueue(req);
                 break;
 
         case LDLM_CONVERT:
                 CDEBUG(D_INODE, "convert\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
-                rc = _ldlm_convert(svc, req);
+                rc = ldlm_handle_convert(req);
                 break;
 
         case LDLM_CANCEL:
                 CDEBUG(D_INODE, "cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
-                rc = _ldlm_cancel(svc, req);
+                rc = ldlm_handle_cancel(req);
                 break;
 
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
-                rc = _ldlm_callback(svc, req);
+                rc = ldlm_handle_callback(req);
                 break;
 
         default:
-                rc = ptlrpc_error(svc, req);
+                rc = ptlrpc_error(req->rq_svc, req);
                 RETURN(rc);
         }
 
         EXIT;
 out:
         if (rc)
-                RETURN(ptlrpc_error(svc, req));
+                RETURN(ptlrpc_error(req->rq_svc, req));
         return 0;
 }
 
-static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
-                          void *uarg)
+static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
+                          void *karg, void *uarg)
 {
-        struct obd_device *obddev = conn->oc_dev;
+        struct obd_device *obddev = class_conn2obd(conn);
         struct ptlrpc_connection *connection;
         int err;
         ENTRY;
@@ -368,7 +385,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         MOD_INC_USE_COUNT;
         ldlm->ldlm_service =
                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
-                                LDLM_REPLY_PORTAL, "self", lustre_handle);
+                                LDLM_REPLY_PORTAL, "self", ldlm_handle);
         if (!ldlm->ldlm_service) {
                 LBUG();
                 GOTO(out_dec, rc = -ENOMEM);
@@ -377,9 +394,6 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
                                          "lustre_dlm");
-                /* XXX We could just continue if we had started at least
-                 *     a few threads here.
-                 */
                 if (rc) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
                         LBUG();
@@ -391,8 +405,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
 
 out_thread:
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
-        rpc_unregister_service(ldlm->ldlm_service);
-        OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+        ptlrpc_unregister_service(ldlm->ldlm_service);
 out_dec:
         MOD_DEC_USE_COUNT;
         return rc;
@@ -404,14 +417,7 @@ static int ldlm_cleanup(struct obd_device *obddev)
         ENTRY;
 
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
-        rpc_unregister_service(ldlm->ldlm_service);
-
-        if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
-                // XXX reply with errors and clean up
-                CERROR("Request list not empty!\n");
-        }
-
-        OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
+        ptlrpc_unregister_service(ldlm->ldlm_service);
 
         if (mds_reint_p != NULL)
                 inter_module_put("mds_reint");
@@ -426,14 +432,14 @@ struct obd_ops ldlm_obd_ops = {
         o_iocontrol:   ldlm_iocontrol,
         o_setup:       ldlm_setup,
         o_cleanup:     ldlm_cleanup,
-        o_connect:     gen_connect,
-        o_disconnect:  gen_disconnect
+        o_connect:     class_connect,
+        o_disconnect:  class_disconnect
 };
 
 
 static int __init ldlm_init(void)
 {
-        int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
+        int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
         if (rc != 0)
                 return rc;
 
@@ -456,26 +462,28 @@ static int __init ldlm_init(void)
 
 static void __exit ldlm_exit(void)
 {
-        obd_unregister_type(OBD_LDLM_DEVICENAME);
+        class_unregister_type(OBD_LDLM_DEVICENAME);
         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
                 CERROR("couldn't free ldlm resource slab\n");
         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
-EXPORT_SYMBOL(ldlm_local_lock_match);
+EXPORT_SYMBOL(ldlm_lockname);
+EXPORT_SYMBOL(ldlm_typename);
+EXPORT_SYMBOL(ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_cancel);
-EXPORT_SYMBOL(lustre_handle2object);
 EXPORT_SYMBOL(ldlm_test);
 EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_free);
 
-MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
 MODULE_LICENSE("GPL");