Whamcloud - gitweb
LU-601 mdd: Fix transaction credits
authorBobi Jam <bobijam@whamcloud.com>
Tue, 23 Aug 2011 03:34:06 +0000 (11:34 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 7 Nov 2011 03:39:01 +0000 (22:39 -0500)
* mdd_create()/mdd_create_data() may need delete orphan objects on
  OSTs, so that we need preserve enough transaction credits for llog
  records.
* mdd_attr_set() may write lov llogs.
* orphan_object_destroy() also will write a llog record, we need
  reserve credit for it as well.
* add credit changelog record.

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: I5124d2f368e2ff794b2b2b8194bec86f63e971cf
Reviewed-on: http://review.whamcloud.com/1276
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mikhail Pershin <tappro@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdd/mdd_orphans.c
lustre/mdd/mdd_trans.c

index f88c837..7491ff3 100644 (file)
@@ -713,7 +713,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -857,7 +857,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
                  PFID(mdd_object_fid(mdd_cobj)));
 
-        rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
+        rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP, 1);
         if (rc)
                 RETURN(rc);
 
@@ -1019,7 +1019,7 @@ static int mdd_name_insert(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP, 0);
         handle = mdd_trans_start(env, mdo2mdd(pobj));
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1124,7 +1124,7 @@ static int mdd_name_remove(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1240,7 +1240,10 @@ static int mdd_rename_tgt(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
+        if (tobj && mdd_object_exists(mdd_tobj))
+                mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_TGT_OP,1);
+        else
+                mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1378,7 +1381,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
+        mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_CREATE_DATA_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
@@ -1735,7 +1738,7 @@ static int mdd_create(const struct lu_env *env,
                         got_def_acl = 1;
         }
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
+        mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_MKDIR_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
@@ -2037,7 +2040,10 @@ static int mdd_rename(const struct lu_env *env,
                 }
         }
 #endif
-        mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
+        if (tobj && mdd_object_exists(mdd_tobj))
+                mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_OP, 2);
+        else
+                mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP, 2);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
index 36bede9..c49b727 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -417,11 +420,16 @@ int mdd_quota_finvalidate(const struct lu_env *env, struct md_device *m,
 
 /* mdd_trans.c */
 void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                         enum mdd_txn_op);
+                         enum mdd_txn_op, int changelog_cnt);
+int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
+                               struct lov_mds_md *lmm, enum mdd_txn_op op,
+                               int changelog_cnt);
 int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                            struct md_attr *ma, enum mdd_txn_op);
+                            struct md_attr *ma, enum mdd_txn_op,
+                            int changelog_cnt);
 int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                                struct md_attr *ma, enum mdd_txn_op);
+                                struct md_attr *ma, enum mdd_txn_op,
+                                int changelog_cnt);
 
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la);
index a5f4975..0c5c772 100644 (file)
@@ -1458,11 +1458,11 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct thandle *handle;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int  rc, lmm_size = 0, cookie_size = 0;
+        int  rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-#ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct mds_obd *mds = &obd->u.mds;
+#ifdef HAVE_QUOTA_SUPPORT
         unsigned int qnids[MAXQUOTAS] = { 0, 0 };
         unsigned int qoids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0, block_count = 0;
@@ -1481,11 +1481,6 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
             ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
                 RETURN(0);
 
-        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
-                                    MDD_TXN_ATTR_SET_OP);
-        handle = mdd_trans_start(env, mdd);
-        if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
         /*TODO: add lock here*/
         /* start a log jounal handle if needed */
         if (S_ISREG(mdd_object_type(mdd_obj)) &&
@@ -1493,15 +1488,26 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 lmm_size = mdd_lov_mdsize(env, mdd);
                 lmm = mdd_max_lmm_get(env, mdd);
                 if (lmm == NULL)
-                        GOTO(cleanup, rc = -ENOMEM);
+                        GOTO(no_trans, rc = -ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
                                 XATTR_NAME_LOV);
 
                 if (rc < 0)
-                        GOTO(cleanup, rc);
+                        GOTO(no_trans, rc);
         }
 
+        chlog_cnt = 1;
+        if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
+                chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
+                         lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
+        }
+        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
+                                    MDD_TXN_ATTR_SET_OP, chlog_cnt);
+        handle = mdd_trans_start(env, mdd);
+        if (IS_ERR(handle))
+                GOTO(no_trans, rc = PTR_ERR(handle));
+
         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
@@ -1582,6 +1588,7 @@ cleanup:
                 rc = mdd_attr_set_changelog(env, obj, handle,
                                             ma->ma_attr.la_valid);
         mdd_trans_stop(env, mdd, rc, handle);
+no_trans:
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
@@ -1657,7 +1664,7 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
         /* security-replated changes may require sync */
         if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
             mdd->mdd_sync_permission == 1)
@@ -1695,7 +1702,7 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 RETURN(rc);
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
@@ -1741,7 +1748,7 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
 
         LASSERT(mdd_object_exists(mdd_obj) > 0);
 
-        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
+        rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
         if (rc)
                 RETURN(rc);
 
@@ -1866,7 +1873,7 @@ static int mdd_object_create(const struct lu_env *env,
         }
 #endif
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1953,7 +1960,7 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         int rc;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+        mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(-ENOMEM);
@@ -2133,7 +2140,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         if (mdd_obj->mod_count == 1 &&
             (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
  again:
-                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
+                rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
                 if (rc)
                         RETURN(rc);
                 handle = mdd_trans_start(env, mdo2mdd(obj));
index 43eeec3..2902df5 100644 (file)
@@ -318,10 +318,19 @@ static int orphan_object_destroy(const struct lu_env *env,
 {
         struct thandle *th = NULL;
         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+        struct md_attr *ma = &mdd_env_info(env)->mti_ma;
         int rc = 0;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
+        /* init ma */
+        ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
+        ma->ma_lmm = mdd_max_lmm_get(env, mdd);
+        ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
+        ma->ma_cookie = mdd_max_cookie_get(env, mdd);
+        ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
+        ma->ma_valid = 0;
+
+        mdd_log_txn_param_build(env, &obj->mod_obj, ma, MDD_TXN_UNLINK_OP, 0);
         th = mdd_trans_start(env, mdd);
         if (IS_ERR(th)) {
                 CERROR("Cannot get thandle\n");
index 46a8b9b..001cac3 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -104,30 +107,65 @@ int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
 }
 
 void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
-                         enum mdd_txn_op op)
+                         enum mdd_txn_op op, int changelog_cnt)
 {
         LASSERT(0 <= op && op < MDD_TXN_LAST_OP);
 
         txn_param_init(&mdd_env_info(env)->mti_param,
                        mdd->mdd_tod[op].mod_credits);
+        if (changelog_cnt > 0) {
+                txn_param_credit_add(&mdd_env_info(env)->mti_param,
+                                  changelog_cnt * dto_txn_credits[DTO_LOG_REC]);
+        }
+}
+
+int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
+                               struct lov_mds_md *lmm, enum mdd_txn_op op,
+                               int changelog_cnt)
+{
+        int stripes = 0;
+        ENTRY;
+
+        LASSERT(op == MDD_TXN_CREATE_DATA_OP || op == MDD_TXN_MKDIR_OP);
+
+        if (lmm == NULL)
+                GOTO(out, 0);
+        /* only replay create request will cause lov_objid update */
+        if (!mdd->mdd_obd_dev->obd_recovering)
+                GOTO(out, 0);
+
+        /* add possible orphan unlink rec credits used in lov_objid update */
+        if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) {
+                stripes = le32_to_cpu(((struct lov_mds_md_v1*)lmm)
+                                      ->lmm_stripe_count);
+        } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3){
+                stripes = le32_to_cpu(((struct lov_mds_md_v3*)lmm)
+                                      ->lmm_stripe_count);
+        } else {
+                CERROR("Unknown lmm type %X\n", le32_to_cpu(lmm->lmm_magic));
+                LBUG();
+        }
+out:
+        mdd_txn_param_build(env, mdd, op, stripes + changelog_cnt);
+        RETURN(0);
 }
 
 int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                            struct md_attr *ma, enum mdd_txn_op op)
+                            struct md_attr *ma, enum mdd_txn_op op,
+                            int changelog_cnt)
 {
         struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
-        int rc, log_credits, stripe;
+        int rc, stripe = 0;
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, op);
-
         if (S_ISDIR(lu_object_attr(&obj->mo_lu)))
-                RETURN(0);
+                GOTO(out, rc = 0);
 
-        LASSERT(op == MDD_TXN_UNLINK_OP || op == MDD_TXN_RENAME_OP);
+        LASSERT(op == MDD_TXN_UNLINK_OP || op == MDD_TXN_RENAME_OP ||
+                op == MDD_TXN_RENAME_TGT_OP);
         rc = mdd_lmm_get_locked(env, md2mdd_obj(obj), ma);
         if (rc || !(ma->ma_valid & MA_LOV))
-                RETURN(rc);
+                GOTO(out, rc);
 
         LASSERTF(le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1 ||
                  le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3,
@@ -138,18 +176,20 @@ int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
         else
                 stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count);
 
-        log_credits = stripe * dto_txn_credits[DTO_LOG_REC];
-        txn_param_credit_add(&mdd_env_info(env)->mti_param, log_credits);
+out:
+        mdd_txn_param_build(env, mdd, op, stripe + changelog_cnt);
+
         RETURN(rc);
 }
 
 int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                                struct md_attr *ma, enum mdd_txn_op op)
+                                struct md_attr *ma, enum mdd_txn_op op,
+                                int changelog_cnt)
 {
         struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, op);
+        mdd_txn_param_build(env, mdd, op, changelog_cnt);
         if (ma->ma_attr.la_valid & (LA_UID | LA_GID))
                 txn_param_credit_add(&mdd_env_info(env)->mti_param,
                                      dto_txn_credits[DTO_ATTR_SET_CHOWN]);