Whamcloud - gitweb
b=2313
authorphil <phil>
Fri, 5 Dec 2003 03:15:19 +0000 (03:15 +0000)
committerphil <phil>
Fri, 5 Dec 2003 03:15:19 +0000 (03:15 +0000)
r=shaver
This bug happens when a file is opened twice for write, then both close it
at the same time.  If they both drop the writecount, then race to
compare it against 0, one will free the fsdata and the other will assert.

This looks like a big patch, but it's mostly plumbing. I had to do some
different argument passing, in order to keep everything protected under the
same lock.

I removed the writecount spinlock, and use the epoch semaphore for all three
things: management of the epoch, protection of the writecount, and atomicity of
writecount modifications which result in allocation or freeing of the fsdata.

lustre/mds/mds_open.c

index 4f6df75..6ddd8da 100644 (file)
@@ -111,43 +111,112 @@ static void mds_mfd_destroy(struct mds_file_data *mfd)
 }
 
 
-/* 
- * Write access to a file: executors cause a negative count, 
- * writers a positive count.  The spin lock is needed to perform
- * a check for the sign and then increment or decrement atomically.
- */
+/* Caller must hold mds->mds_epoch_sem */
+static int mds_alloc_filterdata(struct inode *inode)
+{
+        LASSERT(inode->i_filterdata == NULL);
+        OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data));
+        if (inode->i_filterdata == NULL)
+                return -ENOMEM;
+        LASSERT(igrab(inode) == inode);
+        return 0;
+}
 
-static spinlock_t mds_exec_lock = SPIN_LOCK_UNLOCKED;
-static int mds_get_write_access(struct inode * inode)
+/* Caller must hold mds->mds_epoch_sem */
+static void mds_free_filterdata(struct inode *inode)
 {
-        ENTRY;
-        spin_lock(&mds_exec_lock);
+        LASSERT(inode->i_filterdata != NULL);
+        OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data));
+        inode->i_filterdata = NULL;
+        iput(inode);
+}
+
+/* Write access to a file: executors cause a negative count, 
+ * writers a positive count.  The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * We could use a different semaphore for each file, if it ever shows
+ * up in a profile, which it won't.
+ *
+ * epoch argument is nonzero during recovery */
+static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
+                                __u64 epoch)
+{
+        int rc = 0;
+
+        down(&mds->mds_epoch_sem);
+
         if (atomic_read(&inode->i_writecount) < 0) {
-                spin_unlock(&mds_exec_lock);
+                up(&mds->mds_epoch_sem);
                 RETURN(-ETXTBSY);
         }
-        atomic_inc(&inode->i_writecount);
-        spin_unlock(&mds_exec_lock);
-        RETURN(0);
+
+
+        if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
+                CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
+                       MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
+                       inode->i_generation);
+                goto out;
+        }
+
+        if (inode->i_filterdata == NULL)
+                mds_alloc_filterdata(inode);
+        if (inode->i_filterdata == NULL) {
+                rc = -ENOMEM;
+                goto out;
+        }
+        if (epoch > mds->mds_io_epoch)
+                mds->mds_io_epoch = epoch;
+        else
+                mds->mds_io_epoch++;
+        MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
+        CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
+               mds->mds_io_epoch, inode->i_ino, inode->i_generation);
+ out:
+        if (rc == 0)
+                atomic_inc(&inode->i_writecount);
+        up(&mds->mds_epoch_sem);
+        return rc;
 }
 
-static int mds_deny_write_access(struct inode *inode)
+/* Returns EAGAIN if the client needs to get size and/or cookies and close
+ * again -- which is never true if the file is about to be unlinked.  Otherwise
+ * returns the number of remaining writers. */
+static int mds_put_write_access(struct mds_obd *mds, struct inode *inode,
+                                struct mds_body *body, int unlinking)
 {
+        int rc = 0;
         ENTRY;
-        spin_lock(&mds_exec_lock);
-        if (atomic_read(&inode->i_writecount) > 0) {
-                spin_unlock(&mds_exec_lock);
-                RETURN(-ETXTBSY);
-        }
+
+        down(&mds->mds_epoch_sem);
         atomic_dec(&inode->i_writecount);
-        spin_unlock(&mds_exec_lock);
-        RETURN(0);
+        rc = atomic_read(&inode->i_writecount);
+        if (rc > 0)
+                GOTO(out, rc);
+#if 0
+        if (!unlinking && !(body->valid & OBD_MD_FLSIZE))
+                GOTO(out, rc = EAGAIN);
+#endif
+        mds_free_filterdata(inode);
+ out:
+        up(&mds->mds_epoch_sem);
+        return rc;
 }
 
-static void mds_put_write_access(struct inode * inode)
+static int mds_deny_write_access(struct mds_obd *mds, struct inode *inode)
 {
         ENTRY;
+        down(&mds->mds_epoch_sem);
+        if (atomic_read(&inode->i_writecount) > 0) {
+                up(&mds->mds_epoch_sem);
+                RETURN(-ETXTBSY);
+        }
         atomic_dec(&inode->i_writecount);
+        up(&mds->mds_epoch_sem);
+        RETURN(0);
 }
 
 static void mds_allow_write_access(struct inode *inode)
@@ -168,7 +237,9 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
                                              struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_obd *mds = mds_req2mds(req);
         struct mds_file_data *mfd;
+        struct mds_body *body;
         int error;
         ENTRY;
         
@@ -178,12 +249,16 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
                 GOTO(cleanup_dentry, error = -ENOMEM);
         }
 
+        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+
         if (flags & FMODE_WRITE) {
-                error = mds_get_write_access(dentry->d_inode);
+                /* FIXME: in recovery, need to pass old epoch here */
+                error = mds_get_write_access(mds, dentry->d_inode, 0);
                 if (error)
                         GOTO(cleanup_mfd, error);
+                body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
         } else if (flags & FMODE_EXEC) {
-                error = mds_deny_write_access(dentry->d_inode);
+                error = mds_deny_write_access(mds, dentry->d_inode);
                 if (error)
                         GOTO(cleanup_mfd, error);
         }
@@ -201,6 +276,9 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
         list_add(&mfd->mfd_list, &med->med_open_head);
         spin_unlock(&med->med_open_lock);
         mds_mfd_put(mfd);
+
+        body->handle.cookie = mfd->mfd_handle.h_cookie;
+
         RETURN(mfd);
 
 cleanup_mfd:
@@ -362,79 +440,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         RETURN(rc);
 }
 
-/* Caller must hold mds->mds_epoch_sem */
-static int mds_alloc_filterdata(struct inode *inode)
-{
-        LASSERT(inode->i_filterdata == NULL);
-        OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data));
-        if (inode->i_filterdata == NULL)
-                return -ENOMEM;
-        LASSERT(igrab(inode) == inode);
-        return 0;
-}
-
-/* Caller must hold mds->mds_epoch_sem */
-static void mds_free_filterdata(struct inode *inode)
-{
-        LASSERT(inode->i_filterdata != NULL);
-        OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data));
-        inode->i_filterdata = NULL;
-        iput(inode);
-}
-
-/* epoch argument is nonzero during recovery */
-static int mds_open_io_epoch(struct mds_obd *mds, struct inode *inode,
-                             __u64 epoch)
-{
-        int rc = 0;
-
-        down(&mds->mds_epoch_sem);
-        if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
-                CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
-                       MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
-                       inode->i_generation);
-                goto out;
-        }
-
-        if (inode->i_filterdata == NULL)
-                mds_alloc_filterdata(inode);
-        if (inode->i_filterdata == NULL) {
-                rc = -ENOMEM;
-                goto out;
-        }
-        if (epoch > mds->mds_io_epoch)
-                mds->mds_io_epoch = epoch;
-        else
-                mds->mds_io_epoch++;
-        MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
-        CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
-               mds->mds_io_epoch, inode->i_ino, inode->i_generation);
- out:
-        up(&mds->mds_epoch_sem);
-        return rc;
-}
-
-/* Returns EAGAIN if the client needs to get size and/or cookies and close
- * again -- which is never true if the file is about to be unlinked. */
-static int mds_close_io_epoch(struct mds_obd *mds, struct inode *inode,
-                              struct mds_body *body, int unlinking)
-{
-        int rc = 0;
-        ENTRY;
-
-        down(&mds->mds_epoch_sem);
-        if (mds_query_write_access(inode) > 0)
-                GOTO(out, rc);
-#if 0
-        if (!unlinking && !(body->valid & OBD_MD_FLSIZE))
-                GOTO(out, rc = EAGAIN);
-#endif
-        mds_free_filterdata(inode);
- out:
-        up(&mds->mds_epoch_sem);
-        return rc;
-}
-
 static void reconstruct_open(struct mds_update_record *rec, int offset,
                              struct ptlrpc_request *req,
                              struct lustre_handle *child_lockh)
@@ -552,11 +557,6 @@ static void reconstruct_open(struct mds_update_record *rec, int offset,
                 put_child = 0;
         }
 
-        if ((rec->ur_flags & FMODE_WRITE) &&
-            mds_open_io_epoch(mds, child->d_inode, 0) == 0)
-                body->io_epoch = MDS_FILTERDATA(child->d_inode)->io_epoch;
-        body->handle.cookie = mfd->mfd_handle.h_cookie;
-
  out_dput:
         if (put_child)
                 l_dput(child);
@@ -624,11 +624,6 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
         if (IS_ERR(mfd))
                 RETURN(PTR_ERR(mfd));
 
-        if ((rec->ur_flags & FMODE_WRITE) &&
-            mds_open_io_epoch(mds, dchild->d_inode, 0) == 0)
-                body->io_epoch = MDS_FILTERDATA(dchild->d_inode)->io_epoch;
-        body->handle.cookie = mfd->mfd_handle.h_cookie;
-
         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
                mfd->mfd_handle.h_cookie);
         if (ids != NULL) {
@@ -1003,17 +998,16 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
         last_orphan = mds_open_orphan_dec_test(inode) &&
                 mds_inode_is_orphan(inode);
 
-        /* this is the actual "close" */
+        /* this is half of the actual "close" */
         if (mfd->mfd_mode & FMODE_WRITE) {
-                mds_put_write_access(inode);
+                rc = mds_put_write_access(mds, inode, request_body,
+                                          last_orphan && unlink_orphan);
         } else if (mfd->mfd_mode & FMODE_EXEC) {
                 mds_allow_write_access(inode);
         }
 
         if (last_orphan && unlink_orphan) {
-                LASSERT(mds_query_write_access(inode) == 0);
-                if (mfd->mfd_mode & FMODE_WRITE)
-                        rc = mds_close_io_epoch(mds, inode, request_body, 1);
+                LASSERT(rc == 0); /* mds_put_write_access must have succeeded */
 
                 CWARN("destroying orphan object %s\n", fidname);
 
@@ -1055,16 +1049,15 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                         rc = vfs_unlink(pending_dir, pending_child);
                 if (rc)
                         CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
-        } else if (mfd->mfd_mode & FMODE_WRITE &&
-                   mds_query_write_access(inode) == 0) {
-                // XXX this should probably be abstracted with mds_reint_setattr
+        } else if (mfd->mfd_mode & FMODE_WRITE && rc == 0) {
+                /* Update the on-disk attributes if this was the last write
+                 * close, and all information was provided (i.e., rc == 0)
+                 *
+                 * XXX this should probably be abstracted with mds_reint_setattr
+                 */
+#if 0
                 struct iattr iattr;
 
-                rc = mds_close_io_epoch(mds, inode, request_body, 0);
-                if (rc == EAGAIN)
-                        goto dput;
-
-#if 0
                 /* XXX can't set block count with fsfilt_setattr (!) */
                 iattr.ia_valid = ATTR_CTIME | ATTR_ATIME |
                         ATTR_MTIME | ATTR_SIZE;
@@ -1082,7 +1075,6 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                         CERROR("error in setattr(%s): rc %d\n", fidname, rc);
 #endif
         }
- dput:
         l_dput(mfd->mfd_dentry);
         mds_mfd_destroy(mfd);