Whamcloud - gitweb
- If a completion callback arrives before the reply for the enqueue, AND the
authorpschwan <pschwan>
Fri, 21 Jun 2002 18:43:43 +0000 (18:43 +0000)
committerpschwan <pschwan>
Fri, 21 Jun 2002 18:43:43 +0000 (18:43 +0000)
completion callback specifies a different resource than the original enqueue,
fixup the resource.
- When the enqueue reply does come back, recognize that we've already fixed up
the resource and avoid doing it again.
- I am ---> <--- this close to replacing all of the DLM locking, which is
almost certainly a cesspool of deadlock opportunities, with lock_kernel() until
lustre lite performance

lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c

index 9f726e5..d313fcc 100644 (file)
@@ -21,60 +21,6 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-/* _ldlm_callback and local_callback setup the variables then call this common
- * code */
-static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                           ldlm_mode_t mode, void *data, __u32 data_len,
-                           struct ptlrpc_request **reqp)
-{
-        ENTRY;
-
-        if (!lock)
-                LBUG();
-        if (!lock->l_resource)
-                LBUG();
-
-        ldlm_lock_dump(lock);
-
-        spin_lock(&lock->l_resource->lr_lock);
-        spin_lock(&lock->l_lock);
-        if (!new) {
-                CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
-                lock->l_req_mode = mode;
-
-                /* FIXME: the API is flawed if I have to do these refcount
-                 * acrobatics (along with the _put() below). */
-                lock->l_resource->lr_refcount++;
-
-                /* _del_lock is safe for half-created locks that are not yet on
-                 * a list. */
-                ldlm_resource_del_lock(lock);
-                ldlm_grant_lock(lock->l_resource, lock);
-
-                ldlm_resource_put(lock->l_resource);
-
-                wake_up(&lock->l_waitq);
-                spin_unlock(&lock->l_lock);
-                spin_unlock(&lock->l_resource->lr_lock);
-        } else {
-                CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
-                lock->l_flags |= LDLM_FL_DYING;
-                spin_unlock(&lock->l_lock);
-                spin_unlock(&lock->l_resource->lr_lock);
-                if (!lock->l_readers && !lock->l_writers) {
-                        CDEBUG(D_INFO, "Lock already unused, calling "
-                               "callback (%p).\n", lock->l_blocking_ast);
-                        if (lock->l_blocking_ast != NULL)
-                                lock->l_blocking_ast(lock, new, lock->l_data,
-                                                     lock->l_data_len, reqp);
-                } else {
-                        CDEBUG(D_INFO, "Lock still has references; lock will be"
-                               " cancelled later.\n");
-                }
-        }
-        RETURN(0);
-}
-
 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
                          struct ptlrpc_request *req)
 {
@@ -227,7 +173,7 @@ static int _ldlm_callback(struct ptlrpc_service *svc,
                           struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock1, *lock2;
+        struct ldlm_lock *lock, *new;
         int rc;
         ENTRY;
 
@@ -244,17 +190,72 @@ static int _ldlm_callback(struct ptlrpc_service *svc,
         if (rc != 0)
                 RETURN(rc);
 
-        lock1 = lustre_handle2object(&dlm_req->lock_handle1);
-        lock2 = lustre_handle2object(&dlm_req->lock_handle2);
+        lock = lustre_handle2object(&dlm_req->lock_handle1);
+        new = lustre_handle2object(&dlm_req->lock_handle2);
+
+        LDLM_DEBUG(lock, "client %s callback handler START",
+                   new == NULL ? "completion" : "blocked");
+
+        spin_lock(&lock->l_resource->lr_lock);
+        spin_lock(&lock->l_lock);
+        if (!new) {
+                CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
+                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+                /* If we receive the completion AST before the actual enqueue
+                 * returned, then we might need to switch resources. */
+                if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+                           lock->l_resource->lr_name,
+                           sizeof(__u64) * RES_NAME_SIZE) != 0) {
+                        struct ldlm_namespace *ns =
+                                lock->l_resource->lr_namespace;
+                        int type = lock->l_resource->lr_type;
+
+                        if (!ldlm_resource_put(lock->l_resource))
+                                spin_unlock(&lock->l_resource->lr_lock);
+
+                        lock->l_resource = ldlm_resource_get(ns, NULL, dlm_req->lock_desc.l_resource.lr_name, type, 1);
+                        if (lock->l_resource == NULL) {
+                                LBUG();
+                                RETURN(-ENOMEM);
+                        }
+                        spin_lock(&lock->l_resource->lr_lock);
+                        LDLM_DEBUG(lock, "completion AST, new resource");
+                }
+
+                /* FIXME: the API is flawed if I have to do these refcount
+                 * acrobatics (along with the _put() below). */
+                lock->l_resource->lr_refcount++;
+
+                /* _del_lock is safe for half-created locks that are not yet on
+                 * a list. */
+                ldlm_resource_del_lock(lock);
+                ldlm_grant_lock(lock->l_resource, lock);
 
-        LDLM_DEBUG(lock1, "client %s callback handler START",
-                   lock2 == NULL ? "completion" : "blocked");
+                ldlm_resource_put(lock->l_resource);
 
-        common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
-                        0, NULL);
+                wake_up(&lock->l_waitq);
+                spin_unlock(&lock->l_lock);
+                spin_unlock(&lock->l_resource->lr_lock);
+        } else {
+                CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
+                lock->l_flags |= LDLM_FL_DYING;
+                spin_unlock(&lock->l_lock);
+                spin_unlock(&lock->l_resource->lr_lock);
+                if (!lock->l_readers && !lock->l_writers) {
+                        CDEBUG(D_INFO, "Lock already unused, calling "
+                               "callback (%p).\n", lock->l_blocking_ast);
+                        if (lock->l_blocking_ast != NULL)
+                                lock->l_blocking_ast(lock, new, lock->l_data,
+                                                     lock->l_data_len, NULL);
+                } else {
+                        CDEBUG(D_INFO, "Lock still has references; lock will be"
+                               " cancelled later.\n");
+                }
+        }
 
         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
-                   lock2 == NULL ? "completion" : "blocked", lock1);
+                   new == NULL ? "completion" : "blocked", lock);
 
         RETURN(0);
 }
index 2942d3f..dff9907 100644 (file)
@@ -74,8 +74,13 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 req->rq_replen = lustre_msg_size(1, &size);
         }
 
+        lock->l_connection = conn;
+        lock->l_client = cl;
+
         rc = ptlrpc_queue_wait(req);
         rc = ptlrpc_check_status(req, rc);
+
+        spin_lock(&lock->l_lock);
         if (rc != ELDLM_OK) {
                 spin_lock(&lock->l_resource->lr_lock);
                 ldlm_resource_put(lock->l_resource);
@@ -86,8 +91,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 GOTO(out, rc);
         }
 
-        lock->l_connection = conn;
-        lock->l_client = cl;
         reply = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
@@ -101,11 +104,17 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                (unsigned long long)reply->lock_extent.start,
                (unsigned long long)reply->lock_extent.end);
 
-        if (*flags & LDLM_FL_LOCK_CHANGED) {
+        /* If enqueue returned a blocked lock but the completion handler has
+         * already run, then it fixed up the resource and we don't need to do it
+         * again. */
+        if (*flags & LDLM_FL_LOCK_CHANGED &&
+            lock->l_req_mode != lock->l_granted_mode) {
                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
                        "%ld\n", (long)reply->lock_resource_name[0],
                        (long)lock->l_resource->lr_name[0]);
-                ldlm_resource_put(lock->l_resource);
+                spin_lock(&lock->l_resource->lr_lock);
+                if (!ldlm_resource_put(lock->l_resource))
+                        spin_unlock(&lock->l_resource->lr_lock);
 
                 lock->l_resource =
                         ldlm_resource_get(ns, NULL, reply->lock_resource_name,
@@ -120,6 +129,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (!req_passed_in)
                 ptlrpc_free_req(req);
 
+        spin_unlock(&lock->l_lock);
         rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
                                      callback);
 
index ebda971..39f2720 100644 (file)
@@ -116,8 +116,10 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
                                 rc = cleanup_resource(res, &res->lr_waiting);
 
                         if (rc == 0) {
-                                CERROR("Resource refcount nonzero after lock "
-                                       "cleanup; forcing cleanup.\n");
+                                CERROR("Resource refcount nonzero (%d) after "
+                                       "lock cleanup; forcing cleanup.\n",
+                                       res->lr_refcount);
+                                ldlm_resource_dump(res);
                                 res->lr_refcount = 1;
                                 rc = ldlm_resource_put(res);
                         }
@@ -352,7 +354,8 @@ void ldlm_resource_dump(struct ldlm_resource *res)
                  (unsigned long long)res->lr_name[1],
                  (unsigned long long)res->lr_name[2]);
 
-        CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name);
+        CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name,
+               res->lr_refcount);
         CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
                res->lr_namespace->ns_name);
         CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);