Whamcloud - gitweb
- more LDLM refcount locking infrastructure
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index ef99903..1b055ca 100644 (file)
@@ -14,6 +14,7 @@
 #include <linux/lustre_dlm.h>
 
 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+                     struct lustre_handle *connh, 
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -38,11 +39,14 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                                 data, data_len);
         if (lock == NULL)
                 GOTO(out, rc = -ENOMEM);
-
         LDLM_DEBUG(lock, "client-side enqueue START");
+        /* for the local lock, add the reference */
+        ldlm_lock_addref_internal(lock, mode);
+        ldlm_lock2handle(lock, lockh);
 
         if (req == NULL) {
-                req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
+                req = ptlrpc_prep_req2(cl, conn, connh, 
+                                       LDLM_ENQUEUE, 1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
                 req_passed_in = 0;
@@ -51,17 +55,14 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
 
         /* Dump all of this data into the request buffer */
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        body->lock_desc.l_resource.lr_type = type;
-        memcpy(body->lock_desc.l_resource.lr_name, res_id,
-               sizeof(body->lock_desc.l_resource.lr_name));
-
-        body->lock_desc.l_req_mode = mode;
+        ldlm_lock2desc(lock, &body->lock_desc);
+        /* Phil: make this part of ldlm_lock2desc */
         if (type == LDLM_EXTENT)
                 memcpy(&body->lock_desc.l_extent, cookie,
                        sizeof(body->lock_desc.l_extent));
         body->lock_flags = *flags;
 
-        ldlm_lock2handle(lock, &body->lock_handle1);
+        memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
         if (parent_lock_handle)
                 memcpy(&body->lock_handle2, parent_lock_handle,
                        sizeof(body->lock_handle2));
@@ -72,16 +73,21 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 req->rq_replen = lustre_msg_size(1, &size);
         }
 
-        lock->l_connection = conn;
+        lock->l_connection = ptlrpc_connection_addref(conn);
         lock->l_client = cl;
 
         rc = ptlrpc_queue_wait(req);
+        /* FIXME: status check here? */
         rc = ptlrpc_check_status(req, rc);
 
         if (rc != ELDLM_OK) {
                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 ldlm_lock_put(lock);
+                ldlm_lock_decref(lockh, mode);
+                /* FIXME: if we've already received a completion AST, this will
+                 * LBUG! */
+                ldlm_lock_destroy(lock);
                 GOTO(out, rc);
         }
 
@@ -121,34 +127,38 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
                                      callback);
 
-        LDLM_DEBUG(lock, "client-side enqueue END");
         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                       LDLM_FL_BLOCK_CONV)) {
                 /* Go to sleep until the lock is granted. */
                 /* FIXME: or cancelled. */
-                CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
-                       "going to sleep.\n", lock);
+                LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+                           " sleeping");
                 ldlm_lock_dump(lock);
                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
                                          lock->l_granted_mode);
-                CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
         }
+        LDLM_DEBUG(lock, "client-side enqueue END");
         ldlm_lock_put(lock);
         EXIT;
  out:
         return rc;
 }
 
-int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
-                      void *data, __u32 data_len, struct ptlrpc_request **reqp)
+int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
+                    void *data, __u32 data_len)
 {
+        struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ptlrpc_request *req;
-        struct ptlrpc_client *cl =
-                &lock->l_resource->lr_namespace->ns_rpc_client;
+        struct ptlrpc_client *cl;
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
+        lock = ldlm_handle2lock(lockh);
+        if (lock == NULL)
+                LBUG();
+        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
                               &size, NULL);
         if (!req)
@@ -158,33 +168,31 @@ int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
 
-        if (new == NULL) {
+        if (desc == NULL) {
                 CDEBUG(D_NET, "Sending granted AST\n");
                 ldlm_lock2desc(lock, &body->lock_desc);
         } else {
                 CDEBUG(D_NET, "Sending blocked AST\n");
-                memcpy(&body->lock_desc, new, sizeof(*new));
+                memcpy(&body->lock_desc, desc, sizeof(*desc));
         }
 
         LDLM_DEBUG(lock, "server preparing %s AST",
-                   new == NULL ? "completion" : "blocked");
+                   desc == 0 ? "completion" : "blocked");
 
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        if (reqp == NULL) {
-                LBUG();
-                rc = ptlrpc_queue_wait(req);
-                rc = ptlrpc_check_status(req, rc);
-                ptlrpc_free_req(req);
-        } else
-                *reqp = req;
+        rc = ptlrpc_queue_wait(req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
 
         EXIT;
  out:
+        ldlm_lock_put(lock);
         return rc;
 }
 
 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
+                     struct lustre_handle *connh, 
                      int new_mode, int *flags)
 {
         struct ldlm_request *body;
@@ -202,8 +210,8 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(cl, lock->l_connection,
+                               LDLM_CONVERT, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -242,12 +250,12 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
         return rc;
 }
 
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh)
+int ldlm_cli_cancel(struct lustre_handle *lockh, 
+                    struct lustre_handle *connh)
 {
         struct ptlrpc_request *req;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
-        struct ldlm_resource *res;
         int rc, size = sizeof(*body);
         ENTRY;
 
@@ -256,8 +264,8 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh)
                 LBUG();
 
         LDLM_DEBUG(lock, "client-side cancel");
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
+                              LDLM_CANCEL, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -274,7 +282,6 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh)
                 GOTO(out, rc);
 
         ldlm_lock_cancel(lock);
-        ldlm_reprocess_all(res);
         ldlm_lock_put(lock); 
         EXIT;
  out: