Whamcloud - gitweb
LU-81 deadlock of changelog adding vs. changelog cancelling
authorNiu Yawei <niu@whamcloud.com>
Thu, 18 Aug 2011 04:22:19 +0000 (21:22 -0700)
committerOleg Drokin <green@whamcloud.com>
Wed, 4 Jan 2012 13:12:49 +0000 (08:12 -0500)
This is a workaround for the deadlock of changelog adding vs.
changelog cancelling. Changelog adding always start transaction
before acquiring the catlog lock(lgh_lock), whereas, changelog
cancelling do start transaction after holding the catlog lock.

We start transaction earlier to avoid above deadlock.

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I9647b9a559f68a27dc0d4b4885857d3cf73b5b8e
Reviewed-on: http://review.whamcloud.com/1260
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/mdd/mdd_device.c
lustre/mds/mds_log.c

index d69839b..2747214 100644 (file)
@@ -1357,6 +1357,8 @@ struct mdd_changelog_user_data {
         __u32 mcud_minid;  /**< user id with lowest rec reference */
         __u32 mcud_usercount;
         int   mcud_found:1;
         __u32 mcud_minid;  /**< user id with lowest rec reference */
         __u32 mcud_usercount;
         int   mcud_found:1;
+        struct mdd_device   *mcud_mdd;
+        const struct lu_env *mcud_env;
 };
 #define MCUD_UNREGISTER -1LL
 
 };
 #define MCUD_UNREGISTER -1LL
 
@@ -1399,12 +1401,28 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh,
         /* Special case: unregister this user */
         if (mcud->mcud_endrec == MCUD_UNREGISTER) {
                 struct llog_cookie cookie;
         /* Special case: unregister this user */
         if (mcud->mcud_endrec == MCUD_UNREGISTER) {
                 struct llog_cookie cookie;
+                void *trans_h;
+                struct mdd_device *mdd = mcud->mcud_mdd;
+
                 cookie.lgc_lgl = llh->lgh_id;
                 cookie.lgc_index = hdr->lrh_index;
                 cookie.lgc_lgl = llh->lgh_id;
                 cookie.lgc_index = hdr->lrh_index;
+
+                /* XXX This is a workaround for the deadlock of changelog
+                 * adding vs. changelog cancelling. LU-81. */
+                mdd_txn_param_build(mcud->mcud_env, mdd, MDD_TXN_UNLINK_OP, 0);
+                trans_h = mdd_trans_start(mcud->mcud_env, mdd);
+                if (IS_ERR(trans_h)) {
+                        CERROR("fsfilt_start_log failed: %ld\n",
+                               PTR_ERR(trans_h));
+                        RETURN(PTR_ERR(trans_h));
+                }
+
                 rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle,
                                              1, &cookie);
                 if (rc == 0)
                         mcud->mcud_usercount--;
                 rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle,
                                              1, &cookie);
                 if (rc == 0)
                         mcud->mcud_usercount--;
+
+                mdd_trans_stop(mcud->mcud_env, mdd, rc, trans_h);
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
@@ -1420,7 +1438,8 @@ static int mdd_changelog_user_purge_cb(struct llog_handle *llh,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdd_changelog_user_purge(struct mdd_device *mdd, int id,
+static int mdd_changelog_user_purge(const struct lu_env *env,
+                                    struct mdd_device *mdd, int id,
                                     long long endrec)
 {
         struct mdd_changelog_user_data data;
                                     long long endrec)
 {
         struct mdd_changelog_user_data data;
@@ -1435,6 +1454,8 @@ static int mdd_changelog_user_purge(struct mdd_device *mdd, int id,
         data.mcud_minrec = 0;
         data.mcud_usercount = 0;
         data.mcud_endrec = endrec;
         data.mcud_minrec = 0;
         data.mcud_usercount = 0;
         data.mcud_endrec = endrec;
+        data.mcud_mdd = mdd;
+        data.mcud_env = env;
         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
         endrec = mdd->mdd_cl.mc_index;
         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
         endrec = mdd->mdd_cl.mc_index;
         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
@@ -1498,7 +1519,8 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
         switch (cmd) {
         case OBD_IOC_CHANGELOG_CLEAR: {
                 struct changelog_setinfo *cs = karg;
         switch (cmd) {
         case OBD_IOC_CHANGELOG_CLEAR: {
                 struct changelog_setinfo *cs = karg;
-                rc = mdd_changelog_user_purge(mdd, cs->cs_id, cs->cs_recno);
+                rc = mdd_changelog_user_purge(env, mdd, cs->cs_id,
+                                              cs->cs_recno);
                 RETURN(rc);
         }
         case OBD_IOC_GET_MNTOPT: {
                 RETURN(rc);
         }
         case OBD_IOC_GET_MNTOPT: {
@@ -1524,7 +1546,7 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
                 rc = mdd_changelog_user_register(mdd, &data->ioc_u32_1);
                 break;
         case OBD_IOC_CHANGELOG_DEREG:
                 rc = mdd_changelog_user_register(mdd, &data->ioc_u32_1);
                 break;
         case OBD_IOC_CHANGELOG_DEREG:
-                rc = mdd_changelog_user_purge(mdd, data->ioc_u32_1,
+                rc = mdd_changelog_user_purge(env, mdd, data->ioc_u32_1,
                                               MCUD_UNREGISTER);
                 break;
         default:
                                               MCUD_UNREGISTER);
                 break;
         default:
index 70aac6e..91ea64c 100644 (file)
@@ -121,7 +121,10 @@ static int llog_changelog_cancel_cb(struct llog_handle *llh,
         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
         struct llog_cookie cookie;
         long long endrec = *(long long *)data;
         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
         struct llog_cookie cookie;
         long long endrec = *(long long *)data;
-        int rc;
+        int rc, err;
+        struct obd_device *obd;
+        void *trans_h;
+        struct inode *inode;
         ENTRY;
 
         /* This is always a (sub)log, not the catalog */
         ENTRY;
 
         /* This is always a (sub)log, not the catalog */
@@ -133,11 +136,34 @@ static int llog_changelog_cancel_cb(struct llog_handle *llh,
 
         cookie.lgc_lgl = llh->lgh_id;
         cookie.lgc_index = hdr->lrh_index;
 
         cookie.lgc_lgl = llh->lgh_id;
         cookie.lgc_index = hdr->lrh_index;
+        obd = llh->lgh_ctxt->loc_exp->exp_obd;
+        inode = llh->lgh_file->f_dentry->d_inode;
+
+        /* XXX This is a workaround for the deadlock of changelog adding vs.
+         * changelog cancelling. Changelog adding always start transaction
+         * before acquiring the catlog lock (lgh_lock), whereas, changelog
+         * cancelling do start transaction after holding catlog lock.
+         *
+         * We start the transaction earlier here to keep the locking ordering:
+         * 'start transaction -> catlog lock'. LU-81. */
+        trans_h = fsfilt_start_log(obd, inode, FSFILT_OP_CANCEL_UNLINK,
+                                   NULL, 1);
+        if (IS_ERR(trans_h)) {
+                CERROR("fsfilt_start_log failed: %ld\n", PTR_ERR(trans_h));
+                RETURN(PTR_ERR(trans_h));
+        }
 
         /* cancel them one at a time.  I suppose we could store up the cookies
            and cancel them all at once; probably more efficient, but this is
            done as a user call, so who cares... */
         rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie);
 
         /* cancel them one at a time.  I suppose we could store up the cookies
            and cancel them all at once; probably more efficient, but this is
            done as a user call, so who cares... */
         rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie);
+
+        err = fsfilt_commit(obd, inode, trans_h, 0);
+        if (err) {
+                CERROR("fsfilt_commit failed: %d\n", err);
+                rc = (rc >= 0) ? err : rc;
+        }
+
         RETURN(rc < 0 ? rc : 0);
 }
 
         RETURN(rc < 0 ? rc : 0);
 }