Whamcloud - gitweb
- more LDLM refcount locking infrastructure
authorpschwan <pschwan>
Mon, 1 Jul 2002 21:23:34 +0000 (21:23 +0000)
committerpschwan <pschwan>
Mon, 1 Jul 2002 21:23:34 +0000 (21:23 +0000)
- fixed the error handling paths of unlink and rmdir, so that unlinking a
non-existant file returns an error
- fixed the DLM bug that caused the perl script to hang (there's a new bug now
  that it triggers under heavier load that causes it to crash)
- fixed a second crashing bug that occurred if a blocked lock gets completed
  and then cancelled all before the server-side enqueue finishes (amazing but
  true)
- osc_open() now returns the actual rc instead of 0
- fixed a race condition in ptlrpc/service.c related to thread shutdown (nice
  catch, Mike)

lustre/include/linux/lustre_dlm.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/namei.c
lustre/mdc/mdc_request.c
lustre/osc/osc_request.c
lustre/ptlrpc/service.c
lustre/tests/create.pl

index 2aa3ff5..898e182 100644 (file)
@@ -197,10 +197,11 @@ extern char *ldlm_typename[];
 #define LDLM_DEBUG(lock, format, a...)                                  \
 do {                                                                    \
         CDEBUG(D_DLMTRACE, "### " format                                \
-               " (%s: lock %p(rc=%d) mode %s/%s on res %Lu(rc=%d) "     \
-               " type %s remote %Lx)\n" , ## a,                         \
+               " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res %Lu"       \
+               "(rc=%d) type %s remote %Lx)\n" , ## a,                  \
                lock->l_resource->lr_namespace->ns_name, lock,           \
-               lock->l_refc, ldlm_lockname[lock->l_granted_mode],       \
+               lock->l_refc, lock->l_readers, lock->l_writers,          \
+               ldlm_lockname[lock->l_granted_mode],                     \
                ldlm_lockname[lock->l_req_mode],                         \
                lock->l_resource->lr_name[0],                            \
                atomic_read(&lock->l_resource->lr_refcount),             \
index 25791c1..d1e6984 100644 (file)
@@ -348,9 +348,13 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                                 RETURN(-EINVAL);
                         }
                         rc = mds_getattr_name_p(2, req);
-                        if (rc) {
-                                req->rq_status = rc;
-                                RETURN(rc);
+                        /* FIXME: we need to sit down and decide on who should
+                         * set req->rq_status, who should return negative and
+                         * positive return values, and what they all mean. */
+                        if (rc || req->rq_status != 0) {
+                                mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
                         }
                         break;
                 case IT_READDIR|IT_OPEN:
@@ -366,7 +370,9 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
 
                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
                 rep->lock_policy_res2 = req->rq_status;
-                new_resid[0] = mds_rep->ino;
+                new_resid[0] = NTOH__u32(mds_rep->ino);
+                if (new_resid[0] == 0)
+                        LBUG();
                 old_res = lock->l_resource->lr_name[0];
 
                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
@@ -455,6 +461,7 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
                 lock->l_writers++;
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
         ldlm_lock_get(lock);
+        LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
 }
 
 /* Args: unlocked lock */
@@ -466,7 +473,7 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
         if (lock == NULL)
                 LBUG();
 
-        LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
+        LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
                 lock->l_readers--;
@@ -484,12 +491,12 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
                         LBUG();
                 }
 
-                CDEBUG(D_INFO, "final decref done on cbpending lock, "
-                       "calling callback.\n");
+                LDLM_DEBUG(lock, "final decref done on cbpending lock");
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
                 ldlm_lock2handle(lock, &lockh);
-                lock->l_blocking_ast(&lockh, NULL, lock->l_data,
+                /* FIXME: -1 is a really, really bad 'desc' */
+                lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
                                      lock->l_data_len);
         } else
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
index 381c755..4f778df 100644 (file)
@@ -86,20 +86,19 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
         EXIT;
  out:
-        if (lock)
+        if (lock) {
+                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply");
                 ldlm_lock_put(lock);
+        }
         req->rq_status = err;
         CDEBUG(D_INFO, "err = %d\n", err);
 
         if (ptlrpc_reply(req->rq_svc, req))
                 LBUG();
 
-        if (err)
-                LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
-        else {
+        if (!err)
                 ldlm_reprocess_all(lock->l_resource);
-                LDLM_DEBUG(lock, "server-side enqueue handler END");
-        }
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
 
         return 0;
 }
@@ -220,8 +219,8 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 
                 if (do_ast) {
-                        CDEBUG(D_INFO, "Lock already unused, calling "
-                               "callback (%p).\n", lock->l_blocking_ast);
+                        LDLM_DEBUG(lock, "already unused, calling "
+                                   "callback (%p)", lock->l_blocking_ast);
                         if (lock->l_blocking_ast != NULL) {
                                 struct lustre_handle lockh;
                                 ldlm_lock2handle(lock, &lockh);
index 268eb2f..1b055ca 100644 (file)
@@ -13,7 +13,8 @@
 
 #include <linux/lustre_dlm.h>
 
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct lustre_handle *connh, 
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+                     struct lustre_handle *connh, 
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -38,12 +39,11 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, s
                                 data, data_len);
         if (lock == NULL)
                 GOTO(out, rc = -ENOMEM);
+        LDLM_DEBUG(lock, "client-side enqueue START");
         /* for the local lock, add the reference */
         ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
 
-        LDLM_DEBUG(lock, "client-side enqueue START");
-
         if (req == NULL) {
                 req = ptlrpc_prep_req2(cl, conn, connh, 
                                        LDLM_ENQUEUE, 1, &size, NULL);
@@ -127,18 +127,18 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, s
         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
                                      callback);
 
-        LDLM_DEBUG(lock, "client-side enqueue END");
         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                       LDLM_FL_BLOCK_CONV)) {
                 /* Go to sleep until the lock is granted. */
                 /* FIXME: or cancelled. */
-                CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
-                       "going to sleep.\n", lock);
+                LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+                           " sleeping");
                 ldlm_lock_dump(lock);
                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
                                          lock->l_granted_mode);
-                CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
         }
+        LDLM_DEBUG(lock, "client-side enqueue END");
         ldlm_lock_put(lock);
         EXIT;
  out:
index 78e1c8b..4141c69 100644 (file)
@@ -177,7 +177,7 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                         RETURN(ERR_PTR(-abs(err)));
                 }
                 offset = 0;
-        } else if (it->it_op == IT_UNLINK) { 
+        } else if (it->it_op == IT_UNLINK) {
                 struct obdo *obdo;
                 obdo = lustre_msg_buf(request->rq_repmsg, 1);
                 inode = new_inode(dir->i_sb);
@@ -186,14 +186,14 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                 /* XXX fix mem allocation error */
                 memcpy(ll_i2info(inode)->lli_obdo, obdo, sizeof(*obdo));
 
-                if (!inode) 
+                if (!inode)
                         GOTO(out_req, -ENOMEM);
                 inode->i_mode= S_IFREG;
                 inode->i_nlink = 1;
                 GOTO(out_req, 0);
-        } else if (it->it_op == IT_RMDIR) { 
+        } else if (it->it_op == IT_RMDIR) {
                 inode = new_inode(dir->i_sb);
-                if (!inode) 
+                if (!inode)
                         GOTO(out_req, -ENOMEM);
                 ll_i2info(inode)->lli_obdo = NULL;
                 inode->i_mode= S_IFDIR;
@@ -543,9 +543,9 @@ static int ll_unlink(struct inode * dir, struct dentry *dentry)
         struct page * page;
         int err = -ENOENT;
 
-        if (dentry->d_it && dentry->d_it->it_disposition) { 
+        if (dentry->d_it && dentry->d_it->it_disposition) {
                 inode->i_nlink = 0;
-                GOTO(out, err=0);
+                GOTO(out, err = dentry->d_it->it_status);
         }
 
         de = ext2_find_entry (dir, dentry, &page);
@@ -573,22 +573,23 @@ static int ll_rmdir(struct inode * dir, struct dentry *dentry)
         int err = 0;
         int intent_did = dentry->d_it && dentry->d_it->it_disposition;
 
-        if (!intent_did) { 
+        if (!intent_did) {
                 if (!ext2_empty_dir(inode))
                 LBUG();
 
                 err = ll_unlink(dir, dentry);
-                if (err) 
+                if (err)
                         RETURN(err);
-        }
+        } else
+                err = dentry->d_it->it_status;
         inode->i_size = 0;
         ext2_dec_count(inode);
         ext2_dec_count(dir);
         RETURN(err);
 }
 
-static int ll_rename (struct inode * old_dir, struct dentry * old_dentry,
-                      struct inode * new_dir, struct dentry * new_dentry )
+static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
+                     struct inode * new_dir, struct dentry * new_dentry)
 {
         struct inode * old_inode = old_dentry->d_inode;
         struct inode * new_inode = new_dentry->d_inode;
@@ -598,11 +599,8 @@ static int ll_rename (struct inode * old_dir, struct dentry * old_dentry,
         struct ext2_dir_entry_2 * old_de;
         int err = -ENOENT;
 
-        if (new_dentry->d_it) {
-                struct ptlrpc_request *req = new_dentry->d_it->it_data;
-                err = req->rq_status;
-                goto out;
-        }
+        if (new_dentry->d_it)
+                GOTO(out, err = new_dentry->d_it->it_status);
 
         err = ll_mdc_rename(old_dir, new_dir, old_dentry, new_dentry);
         if (err)
index c13119a..b3bffae 100644 (file)
@@ -124,9 +124,9 @@ int mdc_getattr(struct lustre_handle *conn,
         return rc;
 }
 
-static int mdc_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                             void *data, int data_len,
-                             struct ptlrpc_request **req)
+static int mdc_lock_callback(struct lustre_handle *lockh,
+                             struct ldlm_lock_desc *desc, void *data,
+                             int data_len, struct ptlrpc_request **req)
 {
         int rc;
         struct inode *inode = data;
@@ -201,7 +201,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, struct lookup_intent
                 size[4] = tgtlen + 1;
                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
                                        &mdc->mdc_connh, 
-                                      LDLM_ENQUEUE, 5, size, NULL);
+                                       LDLM_ENQUEUE, 5, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -278,12 +278,13 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, struct lookup_intent
                 size[0] = sizeof(struct ldlm_reply);
                 req->rq_replen = lustre_msg_size(1, size);
         } else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
-                     it->it_op == IT_OPEN ) {
+                    it->it_op == IT_OPEN ) {
                 size[2] = sizeof(struct mds_body);
                 size[3] = de->d_name.len + 1;
 
                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
-                                      &mdc->mdc_connh, LDLM_ENQUEUE, 4, size, NULL);
+                                       &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
+                                       NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
index 07dc96f..fd8e85d 100644 (file)
@@ -191,7 +191,7 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa)
         EXIT;
  out:
         ptlrpc_free_req(request);
-        return 0;
+        return rc;
 }
 
 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
index abb2bea..a979435 100644 (file)
@@ -299,7 +299,9 @@ static int ptlrpc_main(void *arg)
 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
+        spin_lock(&svc->srv_lock);
         thread->t_flags = SVC_STOPPING;
+        spin_unlock(&svc->srv_lock);
 
         wake_up(&svc->srv_waitq);
         wait_event_interruptible(thread->t_ctl_waitq,
index c9aa192..9a2c7f0 100644 (file)
@@ -1,19 +1,21 @@
 #!/usr/bin/perl
 
-$mtpt = shift || die;
-$mount_count = shift || die;
-$i = shift || die;
+my $mtpt = shift || die;
+my $mount_count = shift || die;
+my $i = shift || die;
+my $size = 2;
 
 while ($i--) {
     $which = int(rand() * $mount_count) + 1;
     $path = "$mtpt$which/";
 
-    $d = int(rand() * 5);
+    $d = int(rand() * $size);
     print `./mcreate $path$d`;
 
     $which = int(rand() * $mount_count) + 1;
     $path = "$mtpt$which/";
 
-    $d = int(rand() * 5);
+    $d = int(rand() * $size);
     unlink("$path$d") || print "unlink($path$d): $!\n"
 }
+print "Done.\n";