Whamcloud - gitweb
- The server side of the DLM wasn't always handling the invalid handle case
authorpschwan <pschwan>
Thu, 11 Jul 2002 15:37:29 +0000 (15:37 +0000)
committerpschwan <pschwan>
Thu, 11 Jul 2002 15:37:29 +0000 (15:37 +0000)
very gracefully.  Since there are races that make invalid handles not hugely
uncommon, that's fixed now.
- Even on the client side, we could get cancelled between ldlm_lock_decref() and
ldlm_lock_cancel(), thus passing an invalid handle into cancel().  This is
handled now.
- Fixed another leaked request.
- Fixed an IT_SETATTR bug, even though we don't currently exercise that code
path.
- Fixed common.sh for the multiple mount cleanup case
- llcleanup.sh was unloading things in the wrong order (MDS is pinned by LDLM
and needs to be unloaded later in the process).  Fixed.

lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/tests/common.sh
lustre/tests/create.pl
lustre/tests/llcleanup.sh

index 9ad2c51..3c40a49 100644 (file)
@@ -83,7 +83,7 @@ void ldlm_lock_put(struct ldlm_lock *lock)
 
         l_lock(nslock);
         lock->l_refc--;
-        LDLM_DEBUG(lock, "after refc--");
+        //LDLM_DEBUG(lock, "after refc--");
         if (lock->l_refc < 0)
                 LBUG();
 
index f1e2d07..9617ab0 100644 (file)
@@ -98,7 +98,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
 
         if (!err)
                 ldlm_reprocess_all(lock->l_resource);
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
 
         return 0;
 }
@@ -121,9 +121,9 @@ static int ldlm_handle_convert(struct ptlrpc_request *req)
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        if (!lock) { 
+        if (!lock) {
                 req->rq_status = EINVAL;
-        } else {         
+        } else {
                 LDLM_DEBUG(lock, "server-side convert handler START");
                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
                                   &dlm_rep->lock_flags);
@@ -132,9 +132,12 @@ static int ldlm_handle_convert(struct ptlrpc_request *req)
         if (ptlrpc_reply(req->rq_svc, req) != 0)
                 LBUG();
 
-        ldlm_reprocess_all(lock->l_resource);
-        ldlm_lock_put(lock); 
-        LDLM_DEBUG(lock, "server-side convert handler END");
+        if (lock) {
+                ldlm_reprocess_all(lock->l_resource);
+                ldlm_lock_put(lock);
+                LDLM_DEBUG(lock, "server-side convert handler END");
+        } else
+                LDLM_DEBUG_NOLOCK("server-side convert handler END");
 
         RETURN(0);
 }
@@ -144,6 +147,7 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
         int rc;
+        char *ns_name;
         ENTRY;
 
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
@@ -154,10 +158,11 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        if (!lock) { 
+        if (!lock) {
                 req->rq_status = ESTALE;
-        } else { 
+        } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
+                ns_name = lock->l_resource->lr_namespace->ns_name;
                 ldlm_lock_cancel(lock);
                 req->rq_status = 0;
         }
@@ -165,9 +170,14 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
         if (ptlrpc_reply(req->rq_svc, req) != 0)
                 LBUG();
 
-        ldlm_reprocess_all(lock->l_resource);
-        ldlm_lock_put(lock);
-        LDLM_DEBUG_NOLOCK("server-side cancel handler END");
+        if (lock) {
+                ldlm_reprocess_all(lock->l_resource);
+                ldlm_lock_put(lock);
+                LDLM_DEBUG_NOLOCK("server-side cancel handler END (%s: lock "
+                                  "%p)", ns_name, lock);
+        } else
+                LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
+                                  lock);
 
         RETURN(0);
 }
@@ -175,7 +185,7 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
 static int ldlm_handle_callback(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
-        struct ldlm_lock_desc *descp = NULL; 
+        struct ldlm_lock_desc *descp = NULL;
         struct ldlm_lock *lock;
         __u64 is_blocking_ast = 0;
         int rc;
@@ -193,15 +203,15 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
         rc = ptlrpc_reply(req->rq_svc, req);
         if (rc != 0)
                 RETURN(rc);
-        
+
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
-        if (!lock) { 
-                CERROR("callback on lock %Lx - lock disappeared\n", 
+        if (!lock) {
+                CERROR("callback on lock %Lx - lock disappeared\n",
                        dlm_req->lock_handle1.addr);
                 RETURN(0);
         }
 
-        /* check if this is a blocking AST */ 
+        /* check if this is a blocking AST */
         if (dlm_req->lock_desc.l_req_mode !=
             dlm_req->lock_desc.l_granted_mode) {
                 descp = &dlm_req->lock_desc;
@@ -212,12 +222,12 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                    is_blocking_ast ? "blocked" : "completion");
 
         if (descp) {
-                int do_ast; 
+                int do_ast;
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
-                do_ast = (!lock->l_readers && !lock->l_writers); 
+                do_ast = (!lock->l_readers && !lock->l_writers);
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                
+
                 if (do_ast) {
                         LDLM_DEBUG(lock, "already unused, calling "
                                    "callback (%p)", lock->l_blocking_ast);
@@ -232,7 +242,7 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                         LDLM_DEBUG(lock, "Lock still has references, will be"
                                " cancelled later");
                 }
-                ldlm_lock_put(lock); 
+                ldlm_lock_put(lock);
         } else {
                 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
 
@@ -259,7 +269,7 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                 ldlm_run_ast_work(&rpc_list);
         }
 
-        LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
+        LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
                           is_blocking_ast ? "blocked" : "completion", lock);
         RETURN(0);
 }
@@ -281,11 +291,11 @@ static int ldlm_handle(struct ptlrpc_request *req)
                 GOTO(out, rc = -EINVAL);
         }
 
-        if (!req->rq_export && 
-            req->rq_reqmsg->opc == LDLM_ENQUEUE) { 
+        if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
                 CERROR("No export handle for enqueue request.\n");
                 GOTO(out, rc = -ENOTCONN);
         }
+
         switch (req->rq_reqmsg->opc) {
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
@@ -323,8 +333,8 @@ out:
         return 0;
 }
 
-static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg,
-                          void *uarg)
+static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
+                          void *karg, void *uarg)
 {
         struct obd_device *obddev = class_conn2obd(conn);
         struct ptlrpc_connection *connection;
index e42d804..f71b122 100644 (file)
@@ -107,7 +107,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         /* If enqueue returned a blocked lock but the completion handler has
          * already run, then it fixed up the resource and we don't need to do it
          * again. */
-        if (*flags & LDLM_FL_LOCK_CHANGED &&
+        if ((*flags) & LDLM_FL_LOCK_CHANGED &&
             lock->l_req_mode != lock->l_granted_mode) {
                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
                        "%ld\n", (long)reply->lock_resource_name[0],
@@ -260,8 +260,15 @@ int ldlm_cli_cancel(struct lustre_handle *lockh,
         ENTRY;
 
         lock = ldlm_handle2lock(lockh); 
-        if (!lock)
-                LBUG();
+        if (!lock) {
+                /* It's possible that the decref that we did just before this
+                 * cancel was the last reader/writer, and caused a cancel before
+                 * we could call this function.  If we want to make this
+                 * impossible (by adding a dec_and_cancel() or similar), then
+                 * we can put the LBUG back. */
+                //LBUG();
+                RETURN(-EINVAL);
+        }
 
         LDLM_DEBUG(lock, "client-side cancel");
         req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
index c7906af..ab9108f 100644 (file)
@@ -160,12 +160,23 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
         memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
 
         if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) &&
-            it->it_disposition && !it->it_status)
+            it->it_disposition && !it->it_status) {
+#if 0
+                if (it->it_data)
+                        CERROR("leaking request %p\n", it->it_data);
+#endif
                 GOTO(negative, NULL);
+        }
 
         if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR)) &&
-            it->it_disposition && it->it_status)
+            it->it_disposition && it->it_status) {
+#if 0
+                if (it->it_data)
+                        CERROR("request: %p, status: %d\n", it->it_data,
+                               it->it_status);
+#endif
                 GOTO(negative, NULL);
+        }
 
         request = (struct ptlrpc_request *)it->it_data;
         if (!it->it_disposition) {
index 15a6724..b0596e3 100644 (file)
@@ -167,6 +167,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
         sb->s_blocksize_bits = log2(sfs.f_bsize);
         sb->s_magic = LL_SUPER_MAGIC;
         sb->s_maxbytes = (1ULL << (32 + 9)) - sfs.f_bsize;
+        ptlrpc_req_finished(request);
 
         sb->s_op = &ll_super_operations;
 
index dea2dce..6083e29 100644 (file)
@@ -302,8 +302,9 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         } else if (it->it_op == IT_SETATTR) {
                 size[2] = sizeof(struct mds_rec_setattr);
                 size[3] = de->d_name.len + 1;
+
                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
-                                       &mdc->mdc_connh, LDLM_ENQUEUE, 5, size,
+                                       &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
                                        NULL);
                 if (!req)
                         RETURN(-ENOMEM);
@@ -314,7 +315,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 if (!it->it_iattr) 
                         LBUG();
 
-                mds_setattr_pack(req, 2, dir, it->it_iattr, 
+                mds_setattr_pack(req, 2, dir, it->it_iattr,
                                 de->d_name.name, de->d_name.len);
                 size[0] = sizeof(struct ldlm_reply);
                 size[1] = sizeof(struct mds_body);
index 4efecf0..8c4c269 100644 (file)
@@ -597,10 +597,15 @@ cleanup_server() {
 }
 
 cleanup_mount() {
-       [ "$SETUP" -a -z "$SETUP_MOUNT" ] && return 0
+       set -vx
+       [ "$SETUP_MOUNT" != "y" ] && return 0
+        [ "$MDC_NAMES" ] || MDC_NAMES=MDCDEV
+        [ "$OSC_NAMES" ] || OSC_NAMES=OSCDEV
+       [ -z "$MOUNT_LIST" -a "$OSCMT" ] && MOUNT_LIST="MT" && MT="$OSCMT OSCDEV MDCDEV"
+
+       [ "$MOUNT_LIST" ] || fail "error: $0: MOUNT_LIST unset"
 
-       [ "$OSCMT" ] || OSCMT=/mnt/lustre
-       for THEMOUNT in $OSCMT; do
+       for THEMOUNT in $MOUNT_LIST; do
            if [ "`mount | grep $THEMOUNT`" ]; then
                umount $THEMOUNT || fail "unable to unmount $THEMOUNT"
            fi
index 9a2c7f0..12e28c2 100644 (file)
@@ -1,21 +1,36 @@
 #!/usr/bin/perl
 
-my $mtpt = shift || die;
-my $mount_count = shift || die;
-my $i = shift || die;
-my $size = 2;
+sub usage () {
+    print "Usage: $0 <mount point prefix> <mount count> <iterations>\n";
+    print "example: $0 /mnt/lustre 2 50\n";
+    print "         will test in /mnt/lustre1 and /mnt/lustre2\n";
+    exit;
+}
+
+my $mtpt = shift || usage();
+my $mount_count = shift || usage();
+my $i = shift || usage();
+my $files = 5;
 
 while ($i--) {
     $which = int(rand() * $mount_count) + 1;
-    $path = "$mtpt$which/";
-
-    $d = int(rand() * $size);
-    print `./mcreate $path$d`;
+    $d = int(rand() * $files);
+    $path = "$mtpt$which/$d";
+    my $tmp = `./mcreate $path`;
+    if ($tmp) {
+        $tmp =~ /.*error: (.*)\n/;
+        print "Created  $path: $1\n";
+    } else {
+        print "Created  $path: Success\n";
+    }
 
     $which = int(rand() * $mount_count) + 1;
-    $path = "$mtpt$which/";
-
-    $d = int(rand() * $size);
-    unlink("$path$d") || print "unlink($path$d): $!\n"
+    $d = int(rand() * $files);
+    $path = "$mtpt$which/$d";
+    if (unlink($path)) {
+        print "Unlinked $path: Success\n";
+    } else {
+        print "Unlinked $path: $!\n";
+    }
 }
 print "Done.\n";
index 922bf64..0141c53 100755 (executable)
@@ -15,8 +15,8 @@ $DBGCTL debug_kernel /tmp/debug.2.$TIME
 cleanup_server
 
 $DBGCTL debug_kernel /tmp/debug.3.$TIME
-cleanup_lustre
-$DBGCTL debug_kernel /tmp/debug.4.$TIME
 cleanup_ldlm
+$DBGCTL debug_kernel /tmp/debug.4.$TIME
+cleanup_lustre
 $DBGCTL debug_kernel /tmp/debug.5.$TIME
 cleanup_portals