Whamcloud - gitweb
Lots of reorg in the lock manager, dealing with the updated design.
authorbraam <braam>
Mon, 24 Jun 2002 05:10:40 +0000 (05:10 +0000)
committerbraam <braam>
Mon, 24 Jun 2002 05:10:40 +0000 (05:10 +0000)
See the Lustre documentation for more details.

lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c

index e54a0a9..c526c6d 100644 (file)
@@ -18,7 +18,7 @@
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
 
-static kmem_cache_t *ldlm_lock_slab;
+kmem_cache_t *ldlm_lock_slab;
 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
 
@@ -175,21 +175,20 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
         l_lock(&ns->ns_lock);
         type = lock->l_resource->lr_type;
 
-        for (i = 0; i < lock->l_refc; i++) {
-                int rc;
-                rc = ldlm_resource_put(lock->l_resource);
-                if (rc == 1 && i != lock->l_refc - 1)
-                        LBUG();
-        }
-
         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
         if (lock->l_resource == NULL) {
                 LBUG();
                 RETURN(-ENOMEM);
         }
 
-        for (i = 1; i < lock->l_refc; i++)
+        /* move references over */ 
+        for (i = 0; i < lock->l_refc; i++) {
+                int rc;
                 ldlm_resource_addref(lock->l_resource);
+                rc = ldlm_resource_put(lock->l_resource);
+                if (rc == 1 && i != lock->l_refc - 1)
+                        LBUG();
+        }
 
         l_unlock(&ns->ns_lock);
         RETURN(0);
@@ -348,26 +347,6 @@ static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
         return lockmode_compat(a->l_req_mode, b->l_req_mode);
 }
 
-/* Args: unreferenced, locked lock
- *
- * Caller must do its own ldlm_resource_put() on lock->l_resource */
-void ldlm_lock_free(struct ldlm_lock *lock)
-{
-        if (!list_empty(&lock->l_children)) {
-                CERROR("lock %p still has children!\n", lock);
-                ldlm_lock_dump(lock);
-                LBUG();
-        }
-
-        if (lock->l_readers || lock->l_writers)
-                CDEBUG(D_INFO, "lock still has references (%d readers, %d "
-                       "writers)\n", lock->l_readers, lock->l_writers);
-
-        if (lock->l_connection)
-                ptlrpc_put_connection(lock->l_connection);
-        kmem_cache_free(ldlm_lock_slab, lock);
-}
-
 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
 {
         ldlm_res2desc(lock->l_resource, &desc->l_resource);
@@ -389,7 +368,7 @@ static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
         }
 
         lock->l_flags |= LDLM_FL_AST_SENT;
-
+        /* FIXME: this should merely add the lock to the lr_tmp list */
         lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
@@ -429,31 +408,31 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
         /* If we received a blocked AST and this was the last reference,
          * run the callback. */
         if (!lock->l_readers && !lock->l_writers &&
-            (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) {
+            (lock->l_flags & LDLM_FL_CBPENDING)) {
                 if (!lock->l_resource->lr_namespace->ns_client) {
-                        CERROR("LDLM_FL_DYING set on non-local lock!\n");
+                        CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
                         LBUG();
                 }
 
-                CDEBUG(D_INFO, "final decref done on dying lock, "
+                CDEBUG(D_INFO, "final decref done on cbpending lock, "
                        "calling callback.\n");
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 lock->l_blocking_ast(lock, NULL, lock->l_data,
                                      lock->l_data_len, NULL);
         } else
-                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         ldlm_lock_put(lock);
 
         EXIT;
 }
 
-static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
+static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
                              struct list_head *queue)
 {
         struct list_head *tmp, *pos;
-        int rc = 0;
+        int rc = 1;
 
         list_for_each_safe(tmp, pos, queue) {
                 struct ldlm_lock *child;
@@ -464,7 +443,7 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
                         continue;
 
                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
-                if (compat(child, lock)) {
+                if (compat && compat(child, lock)) {
                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
                         continue;
                 }
@@ -473,7 +452,7 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
                         continue;
                 }
 
-                rc = 1;
+                rc = 0;
 
                 CDEBUG(D_OTHER, "compat function failed and lock modes "
                        "incompat\n");
@@ -497,20 +476,26 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
         ENTRY;
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
+        rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted);
         /* FIXME: should we be sending ASTs to converting? */
-        rc |= _ldlm_lock_compat(lock, send_cbs,
-                                &lock->l_resource->lr_converting);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        if (rc) 
+                rc = ldlm_lock_compat_list
+                        (lock, send_cbs, &lock->l_resource->lr_converting);
 
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         RETURN(rc);
 }
 
-void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
+/* NOTE: called by 
+   - ldlm_handle_enqueuque - resource 
+*/
+void ldlm_grant_lock(struct ldlm_lock *lock)
 {
+        struct ldlm_resource *res = lock->l_resource;
         struct ptlrpc_request *req = NULL;
         ENTRY;
 
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
         ldlm_resource_add_lock(res, &res->lr_granted, lock);
         lock->l_granted_mode = lock->l_req_mode;
 
@@ -518,10 +503,11 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
                 res->lr_most_restr = lock->l_granted_mode;
 
         if (lock->l_completion_ast) {
+                /* FIXME: this should merely add lock to lr_tmp list */
                 lock->l_completion_ast(lock, NULL, lock->l_data,
                                        lock->l_data_len, &req);
                 if (req != NULL) {
-                        struct list_head *list = res->lr_tmp;
+                        struct list_head *list = &res->lr_tmp;
                         if (list == NULL) {
                                 LBUG();
                                 return;
@@ -529,6 +515,7 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
                         list_add(&req->rq_multi, list);
                 }
         }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         EXIT;
 }
 
@@ -636,11 +623,11 @@ ldlm_local_lock_create(struct ldlm_namespace *ns,
 }
 
 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
-                                     void *cookie, int cookie_len,
-                                     int *flags,
-                                     ldlm_lock_callback completion,
-                                     ldlm_lock_callback blocking)
+ldlm_error_t ldlm_lock_enqueue(struct lustre_lock *lock,
+                               void *cookie, int cookie_len,
+                               int *flags,
+                               ldlm_lock_callback completion,
+                               ldlm_lock_callback blocking)
 {
         struct ldlm_resource *res;
         struct ldlm_lock *lock;
@@ -715,8 +702,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
                 *flags |= LDLM_FL_BLOCK_WAIT;
                 GOTO(out, ELDLM_OK);
         }
-        incompat = ldlm_lock_compat(lock, 0);
-        if (incompat) {
+        if (!ldlm_lock_compat(lock,0)) {
                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
                 GOTO(out, ELDLM_OK);
@@ -731,20 +717,20 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
         return ELDLM_OK;
 }
 
-/* Must be called with resource->lr_lock taken. */
+/* Must be called with namespace taken: queue is waiting or converting. */
 static int ldlm_reprocess_queue(struct ldlm_resource *res,
-                                struct list_head *converting)
+                                struct list_head *queue)
 {
         struct list_head *tmp, *pos;
         ENTRY;
 
-        list_for_each_safe(tmp, pos, converting) {
+        list_for_each_safe(tmp, pos, queue) {
                 struct ldlm_lock *pending;
                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
-                if (ldlm_lock_compat(pending, 1))
+                if (!ldlm_lock_compat(pending, 1))
                         RETURN(1);
 
                 list_del_init(&pending->l_res_link);
@@ -805,7 +791,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
 }
 
 /* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
+void ldlm_lock_cancel(struct ldlm_lock *lock)
 {
         struct ldlm_resource *res;
         struct ldlm_namespace *ns;
@@ -819,17 +805,13 @@ struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
                        "writers)\n", lock->l_readers, lock->l_writers);
 
-        ldlm_resource_del_lock(lock);
+        ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy(lock);
-
         l_unlock(&ns->ns_lock);
-
-        RETURN(res);
 }
 
 /* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
-                                              int new_mode, int *flags)
+struct ldlm_resource *ldlm_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 {
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
@@ -843,17 +825,18 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
         l_lock(&ns->ns_lock);
 
         lock->l_req_mode = new_mode;
-        ldlm_resource_del_lock(lock);
+        ldlm_resource_unlink_lock(lock);
 
         /* If this is a local resource, put it on the appropriate list. */
         if (res->lr_namespace->ns_client) {
-                if (*flags & LDLM_FL_BLOCK_CONV)
+                if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
                                                lock);
-                else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
-                        ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
-                else
-                        ldlm_grant_lock(res, lock);
+                else { 
+                        ldlm_grant_lock(lock);
+                        /* FIXME: completion handling not with ns_lock held ! */
+                        wake_up(&lock->l_waitq);
+                }
         } else {
                 list_add(&lock->l_res_link, res->lr_converting.prev);
         }
index f10e876..5bb4a37 100644 (file)
@@ -64,13 +64,12 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service
         if (!lock)
                 GOTO(out, -ENOMEM);
 
-        ldlm_lock2handle(&lockh);
         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
                sizeof(lock->l_remote_handle));
         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
 
         flags = dlm_req->lock_flags;
-        err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
                                       ldlm_cli_callback, ldlm_cli_callback);
         if (err != ELDLM_OK)
                 GOTO(out, err);
@@ -78,7 +77,7 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_rep->lock_flags = flags;
 
-        memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
+        ldlm_lock2handle(lock, &dlm_rep->lock_handle);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
                        sizeof(lock->l_extent));
@@ -126,14 +125,15 @@ static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        if (!lock)
-                RETURN(0);
-
-        LDLM_DEBUG(lock, "server-side convert handler START");
-        ldlm_local_lock_convert(&dlm_req->lock_handle1,
-                                dlm_req->lock_desc.l_req_mode,
-                                &dlm_rep->lock_flags);
-        req->rq_status = 0;
+        if (!lock) { 
+                req->rq_status = EINVAL;
+        } else {         
+                LDLM_DEBUG(lock, "server-side convert handler START");
+                ldlm_lock_convert(&dlm_req->lock_handle1,
+                                  dlm_req->lock_desc.l_req_mode,
+                                  &dlm_rep->lock_flags);
+                req->rq_status = 0;
+        }
         if (ptlrpc_reply(svc, req) != 0)
                 LBUG();
 
@@ -160,12 +160,13 @@ static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        if (!lock)
-                RETURN(0); 
-
-        LDLM_DEBUG(lock, "server-side cancel handler START");
-        ldlm_local_lock_cancel(lock);
-        req->rq_status = 0;
+        if (!lock) { 
+                req->rq_status = ESTALE;
+        } else { 
+                LDLM_DEBUG(lock, "server-side cancel handler START");
+                ldlm_local_lock_cancel(lock);
+                req->rq_status = 0;
+        }
 
         if (ptlrpc_reply(svc, req) != 0)
                 LBUG();
@@ -181,7 +182,10 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc,
                           struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock *lock, *new;
+        struct ldlm_lock_desc desc; 
+        struct ldlm_lock_desc *descp; 
+        struct ldlm_lock *lock;
+        __u64 is_blocking_ast;
         int rc;
         ENTRY;
 
@@ -199,7 +203,13 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc,
                 RETURN(rc);
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        new = ldlm_handle2lock(&dlm_req->lock_handle2);
+        is_blocking_ast = dlm_req->lock_handle2.addr;
+        if (is_blocking_ast) { 
+                /* FIXME: copy lock information into desc here */
+                descp = &desc;
+        }  else 
+                descp = NULL; 
+
         if (!lock) { 
                 CERROR("callback on lock %Lx - lock disappeared\n", 
                        dlm_req->lock_handle.addr);
@@ -209,25 +219,7 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc,
         LDLM_DEBUG(lock, "client %s callback handler START",
                    new == NULL ? "completion" : "blocked");
 
-        spin_lock(&lock->l_resource->lr_lock);
-        spin_lock(&lock->l_lock);
-        if (!new) {
-                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
-
-                /* If we receive the completion AST before the actual enqueue
-                 * returned, then we might need to switch resources. */
-                if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
-                           lock->l_resource->lr_name,
-                           sizeof(__u64) * RES_NAME_SIZE) != 0) {
-
-                        ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
-                        LDLM_DEBUG(lock, "completion AST, new resource");
-                }
-                ldlm_grant_lock(lock);
-                /* XXX Fixme: we want any completion function, not just wake_up */
-                wake_up(&lock->l_waitq);
-                ldlm_lock_put(lock);
-        } else {
+        if (is_blocking_ast) {
                 int do_ast; 
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
@@ -238,18 +230,32 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc,
                         CDEBUG(D_INFO, "Lock already unused, calling "
                                "callback (%p).\n", lock->l_blocking_ast);
                         if (lock->l_blocking_ast != NULL)
-                                lock->l_blocking_ast(lock, new, lock->l_data,
+                                lock->l_blocking_ast(lock, descp, lock->l_data,
                                                      lock->l_data_len, NULL);
                 } else {
                         LDLM_DEBUG(lock, "Lock still has references, will be"
                                " cancelled later");
                 }
                 ldlm_lock_put(lock); 
+        } else {
+                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+                /* If we receive the completion AST before the actual enqueue
+                 * returned, then we might need to switch resources. */
+                if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+                           lock->l_resource->lr_name,
+                           sizeof(__u64) * RES_NAME_SIZE) != 0) {
+                        ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
+                        LDLM_DEBUG(lock, "completion AST, new resource");
+                }
+                ldlm_grant_lock(lock);
+                /*  FIXME: we want any completion function, not just wake_up */
+                wake_up(&lock->l_waitq);
+                ldlm_lock_put(lock);
         }
 
         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
                    new == NULL ? "completion" : "blocked", lock);
-
         RETURN(0);
 }
 
index a0781c9..0917dd1 100644 (file)
@@ -39,7 +39,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (lock == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        ldlm_lock2handle(lock, &lockh);
         LDLM_DEBUG(lock, "client-side enqueue START");
 
         if (req == NULL) {
@@ -62,8 +61,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                        sizeof(body->lock_desc.l_extent));
         body->lock_flags = *flags;
 
-        memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
-
+        ldlm_lock2handle(lock, &body->lock_handle1);
         if (parent_lock_handle)
                 memcpy(&body->lock_handle2, parent_lock_handle,
                        sizeof(body->lock_handle2));
@@ -120,7 +118,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (!req_passed_in)
                 ptlrpc_free_req(req);
 
-        rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
+        rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
                                      callback);
 
         LDLM_DEBUG(lock, "client-side enqueue END");
@@ -198,7 +196,9 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        lock = lustre_handle2object(lockh);
+        lock = ldlm_handle2lock(lockh);
+        if (!lock)
+                LBUG(); 
         *flags = 0;
 
         LDLM_DEBUG(lock, "client-side convert");
@@ -236,20 +236,26 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
                                          lock->l_granted_mode);
                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
         }
+        ldlm_put_lock(lock);
         EXIT;
  out:
         ptlrpc_free_req(req);
         return rc;
 }
 
-int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
+int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh)
 {
         struct ptlrpc_request *req;
+        struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_resource *res;
         int rc, size = sizeof(*body);
         ENTRY;
 
+        lock = ldlm_handle2lock(lockh); 
+        if (!lock)
+                LBUG();
+
         LDLM_DEBUG(lock, "client-side cancel");
         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
                               NULL);
@@ -268,12 +274,10 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        res = ldlm_local_lock_cancel(lock);
-        if (res != NULL)
-                ldlm_reprocess_all(res);
-        else
-                rc = ELDLM_RESOURCE_FREED;
+        ldlm_lock_cancel(lock);
+        ldlm_reprocess_all(res);
+        ldlm_lock_put(lock); 
         EXIT;
  out:
-        return rc;
+        return 0;
 }