Whamcloud - gitweb
LU-1943 fld: Simplify transaction handling in FID/FLD
authorMikhail Pershin <tappro@whamcloud.com>
Wed, 3 Oct 2012 20:36:23 +0000 (00:36 +0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 4 Oct 2012 18:58:35 +0000 (14:58 -0400)
This is port of ORI-107 to the master
- cleanup FID/FLD transaction handling, fix IGIF_FLD_RANGE start value
- use th_local in all places where transno is not needed
- remove mti_no_need_transno flag

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: I4647dd7f8cd33b836cc959a9f74610c9fbc0e29c
Reviewed-on: http://review.whamcloud.com/4172
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: wangdi <di.wang@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/fid/fid_internal.h
lustre/fid/fid_store.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c

index 02ab9d8..b659107 100644 (file)
@@ -62,10 +62,6 @@ extern struct lu_context_key seq_thread_key;
 int seq_client_alloc_super(struct lu_client_seq *seq,
                            const struct lu_env *env);
 
-int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_seq_range *range,
-                            const struct lu_env *env);
-
 /* Store API functions. */
 int seq_store_init(struct lu_server_seq *seq,
                    const struct lu_env *env,
index 651d23e..77d4aed 100644 (file)
@@ -88,26 +88,6 @@ void seq_update_cb(struct lu_env *env, struct thandle *th,
        OBD_FREE_PTR(ccb);
 }
 
-struct thandle *seq_store_trans_create(struct lu_server_seq *seq,
-                                       const struct lu_env *env)
-{
-        struct dt_device *dt_dev;
-
-        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-        return dt_trans_create(env, dt_dev);
-}
-
-int seq_store_trans_start(struct lu_server_seq *seq, const struct lu_env *env,
-                          struct thandle *th)
-{
-        struct dt_device *dt_dev;
-        ENTRY;
-
-        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-
-        return dt_trans_start(env, dt_dev, th);
-}
-
 int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
 {
        struct seq_update_callback *ccb;
@@ -133,67 +113,25 @@ int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
        return rc;
 }
 
-int seq_declare_store_write(struct lu_server_seq *seq,
-                            const struct lu_env *env,
-                            struct thandle *th)
-{
-        struct dt_object *dt_obj = seq->lss_obj;
-        int rc;
-        ENTRY;
-
-        rc = dt_obj->do_body_ops->dbo_declare_write(env, dt_obj,
-                                                    sizeof(struct lu_seq_range),
-                                                    0, th);
-        return rc;
-}
-
 /* This function implies that caller takes care about locking. */
-int seq_store_write(struct lu_server_seq *seq,
-                    const struct lu_env *env,
-                    struct thandle *th)
-{
-        struct dt_object *dt_obj = seq->lss_obj;
-        struct seq_thread_info *info;
-        loff_t pos = 0;
-        int rc;
-        ENTRY;
-
-        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
-        LASSERT(info != NULL);
-
-        /* Store ranges in le format. */
-        range_cpu_to_le(&info->sti_space, &seq->lss_space);
-
-        rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
-                                            seq_store_buf(info),
-                                            &pos, th, BYPASS_CAPA, 1);
-        if (rc == sizeof(info->sti_space)) {
-                CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
-                       seq->lss_name, PRANGE(&seq->lss_space));
-                rc = 0;
-        } else if (rc >= 0) {
-                rc = -EIO;
-        }
-
-
-        RETURN(rc);
-}
-
 int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
                      struct lu_seq_range *out, int sync)
 {
-        struct dt_device *dt_dev;
-        struct thandle *th;
-        int rc;
-        ENTRY;
+       struct dt_device *dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+       struct seq_thread_info *info;
+       struct thandle *th;
+       loff_t pos = 0;
+       int rc;
 
-        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+       info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
+       LASSERT(info != NULL);
 
-        th = seq_store_trans_create(seq, env);
-        if (IS_ERR(th))
-                RETURN(PTR_ERR(th));
+       th = dt_trans_create(env, dt_dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
 
-        rc = seq_declare_store_write(seq, env, th);
+       rc = dt_declare_record_write(env, seq->lss_obj,
+                                    sizeof(struct lu_seq_range), 0, th);
         if (rc)
                 GOTO(exit, rc);
 
@@ -204,25 +142,27 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
                         GOTO(exit, rc);
         }
 
-        rc = seq_store_trans_start(seq, env, th);
+       rc = dt_trans_start_local(env, dt_dev, th);
         if (rc)
                 GOTO(exit, rc);
 
-        rc = seq_store_write(seq, env, th);
+       /* Store ranges in le format. */
+       range_cpu_to_le(&info->sti_space, &seq->lss_space);
+
+       rc = dt_record_write(env, seq->lss_obj, seq_store_buf(info), &pos, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
-                GOTO(exit,rc);
+               GOTO(exit, rc);
         } else if (out != NULL) {
                 rc = fld_server_create(seq->lss_site->ms_server_fld,
                                        env, out, th);
                 if (rc) {
                         CERROR("%s: Can't Update fld database, rc %d\n",
                                seq->lss_name, rc);
-                        GOTO(exit,rc);
+                       GOTO(exit, rc);
                 }
         }
-
         /* next sequence update will need sync until this update is committed
          * in case of sync operation this is not needed obviously */
         if (!sync)
@@ -242,7 +182,6 @@ exit:
 int seq_store_read(struct lu_server_seq *seq,
                    const struct lu_env *env)
 {
-        struct dt_object *dt_obj = seq->lss_obj;
         struct seq_thread_info *info;
         loff_t pos = 0;
         int rc;
@@ -251,8 +190,9 @@ int seq_store_read(struct lu_server_seq *seq,
         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
         LASSERT(info != NULL);
 
-        rc = dt_obj->do_body_ops->dbo_read(env, dt_obj, seq_store_buf(info),
-                                           &pos, BYPASS_CAPA);
+       rc = seq->lss_obj->do_body_ops->dbo_read(env, seq->lss_obj,
+                                                seq_store_buf(info),
+                                                &pos, BYPASS_CAPA);
 
         if (rc == sizeof(info->sti_space)) {
                 range_le_to_cpu(&seq->lss_space, &info->sti_space);
@@ -261,7 +201,7 @@ int seq_store_read(struct lu_server_seq *seq,
                 rc = 0;
         } else if (rc == 0) {
                 rc = -ENODATA;
-        } else if (rc >= 0) {
+       } else if (rc > 0) {
                 CERROR("%s: Read only %d bytes of %d\n", seq->lss_name,
                        rc, (int)sizeof(info->sti_space));
                 rc = -EIO;
index 4b1c2e2..d95193f 100644 (file)
@@ -102,7 +102,6 @@ int fld_declare_server_create(struct lu_server_fld *fld,
                               const struct lu_env *env,
                               struct thandle *th)
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
         int rc;
 
         ENTRY;
@@ -114,16 +113,15 @@ int fld_declare_server_create(struct lu_server_fld *fld,
 
         /* for ldiskfs OSD it's enough to declare operation with any ops
          * with DMU we'll probably need to specify exact key/value */
-        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
-        if (rc)
-                GOTO(out, rc);
-        rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
-        if (rc)
-                GOTO(out, rc);
-        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
-                                                      NULL, NULL, th);
+       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
+       if (rc)
+               GOTO(out, rc);
+       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
+       if (rc)
+               GOTO(out, rc);
+       rc = dt_declare_insert(env, fld->lsf_obj, NULL, NULL, th);
 out:
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_declare_server_create);
 
index 2a8d314..da83bef 100644 (file)
 const char fld_index_name[] = "fld";
 
 static const struct lu_seq_range IGIF_FLD_RANGE = {
-        .lsr_start = 1,
-        .lsr_end   = FID_SEQ_IDIF,
-        .lsr_index   = 0,
-        .lsr_flags  = LU_SEQ_RANGE_MDT
+       .lsr_start = FID_SEQ_IGIF,
+       .lsr_end   = FID_SEQ_IGIF_MAX + 1,
+       .lsr_index = 0,
+       .lsr_flags = LU_SEQ_RANGE_MDT
 };
 
 const struct dt_index_features fld_index_features = {
@@ -109,42 +109,11 @@ static struct dt_rec *fld_rec(const struct lu_env *env,
         RETURN((void *)rec);
 }
 
-struct thandle *fld_trans_create(struct lu_server_fld *fld,
-                                const struct lu_env *env)
-{
-        struct dt_device *dt_dev;
-
-        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-
-        return dt_dev->dd_ops->dt_trans_create(env, dt_dev);
-}
-
-int fld_trans_start(struct lu_server_fld *fld,
-                                const struct lu_env *env, struct thandle *th)
-{
-        struct dt_device *dt_dev;
-
-        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-
-        return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th);
-}
-
-void fld_trans_stop(struct lu_server_fld *fld,
-                    const struct lu_env *env, struct thandle* th)
-{
-        struct dt_device *dt_dev;
-
-        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-        dt_dev->dd_ops->dt_trans_stop(env, th);
-}
-
 int fld_declare_index_create(struct lu_server_fld *fld,
                              const struct lu_env *env,
                              const struct lu_seq_range *range,
                              struct thandle *th)
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
-        seqno_t start;
         int rc;
 
         ENTRY;
@@ -154,12 +123,10 @@ int fld_declare_index_create(struct lu_server_fld *fld,
                return 0;
        }
 
-        start = range->lsr_start;
         LASSERT(range_is_sane(range));
 
-        rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
-                                                      fld_rec(env, range),
-                                                      fld_key(env, start), th);
+       rc = dt_declare_insert(env, fld->lsf_obj, fld_rec(env, range),
+                             fld_key(env, range->lsr_start), th);
         RETURN(rc);
 }
 
@@ -173,14 +140,11 @@ int fld_declare_index_create(struct lu_server_fld *fld,
  *      \retval  0  success
  *      \retval  -ve error
  */
-
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
                      const struct lu_seq_range *range,
                      struct thandle *th)
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
-        seqno_t start;
         int rc;
 
         ENTRY;
@@ -196,14 +160,10 @@ int fld_index_create(struct lu_server_fld *fld,
                }
        }
 
-        start = range->lsr_start;
         LASSERT(range_is_sane(range));
 
-        rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
-                                              fld_rec(env, range),
-                                              fld_key(env, start),
-                                              th, BYPASS_CAPA, 1);
-
+       rc = dt_insert(env, fld->lsf_obj, fld_rec(env, range),
+                      fld_key(env, range->lsr_start), th, BYPASS_CAPA, 1);
         CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n",
                fld->lsf_name, PRANGE(range), rc);
         RETURN(rc);
@@ -218,24 +178,19 @@ int fld_index_create(struct lu_server_fld *fld,
  *      \retval  0  success
  *      \retval  -ve error
  */
-
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
                      struct lu_seq_range *range,
                      struct thandle   *th)
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
-        seqno_t seq = range->lsr_start;
         int rc;
 
         ENTRY;
 
-        rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq),
-                                              th, BYPASS_CAPA);
-
+       rc = dt_delete(env, fld->lsf_obj, fld_key(env, range->lsr_start), th,
+                      BYPASS_CAPA);
         CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
                fld->lsf_name, PRANGE(range), rc);
-
         RETURN(rc);
 }
 
@@ -304,26 +259,24 @@ static int fld_insert_igif_fld(struct lu_server_fld *fld,
         int rc;
         ENTRY;
 
-        /* FLD_TXN_INDEX_INSERT_CREDITS */
-        th = fld_trans_create(fld, env);
-        if (IS_ERR(th))
-                RETURN(PTR_ERR(th));
-        rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
-        if (rc) {
-                fld_trans_stop(fld, env, th);
-                RETURN(rc);
-        }
-        rc = fld_trans_start(fld, env, th);
-        if (rc) {
-                fld_trans_stop(fld, env, th);
-                RETURN(rc);
-        }
-
-        rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
-        fld_trans_stop(fld, env, th);
-        if (rc == -EEXIST)
-                rc = 0;
-        RETURN(rc);
+       th = dt_trans_create(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev));
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+       rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_trans_start_local(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev),
+                                 th);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
+       if (rc == -EEXIST)
+               rc = 0;
+out:
+       dt_trans_stop(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev), th);
+       RETURN(rc);
 }
 
 int fld_index_init(struct lu_server_fld *fld,
index a2975d0..c6f87f1 100644 (file)
@@ -148,15 +148,6 @@ struct fld_thread_info {
         struct lu_seq_range fti_irange;
 };
 
-
-struct thandle *fld_trans_create(struct lu_server_fld *fld,
-                                const struct lu_env *env);
-int fld_trans_start(struct lu_server_fld *fld,
-                    const struct lu_env *env, struct thandle *th);
-
-void fld_trans_stop(struct lu_server_fld *fld,
-                    const struct lu_env *env, struct thandle* th);
-
 extern struct lu_context_key fld_thread_key;
 
 int fld_index_init(struct lu_server_fld *fld,
index d9f7188..c6e54db 100644 (file)
@@ -3119,7 +3119,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_object = NULL;
         info->mti_dlm_req = NULL;
         info->mti_has_trans = 0;
-        info->mti_no_need_trans = 0;
         info->mti_cross_ref = 0;
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
@@ -3135,11 +3134,6 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info)
 
         req_capsule_fini(info->mti_pill);
         if (info->mti_object != NULL) {
-                /*
-                 * freeing an object may lead to OSD level transaction, do not
-                 * let it mess with MDT. bz19385.
-                 */
-                info->mti_no_need_trans = 1;
                 mdt_object_put(info->mti_env, info->mti_object);
                 info->mti_object = NULL;
         }
index f83d4e0..5d50e50 100644 (file)
@@ -336,7 +336,6 @@ struct mdt_thread_info {
         const struct ldlm_request *mti_dlm_req;
 
         __u32                      mti_has_trans:1, /* has txn already? */
-                                   mti_no_need_trans:1,
                                    mti_cross_ref:1;
 
         /* opdata for mdt_reint_open(), has the same as
index 8fb45fb..bdee514 100644 (file)
@@ -1515,7 +1515,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                         ma->ma_need = 0;
                         ma->ma_valid = 0;
                         ma->ma_cookie_size = 0;
-                        info->mti_no_need_trans = 1;
                         rc = mdo_unlink(info->mti_env,
                                         mdt_object_child(parent),
                                         mdt_object_child(child),
index 93fd646..1872119 100644 (file)
@@ -500,10 +500,8 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         req = mdt_info_req(mti);
 
-        if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
-                mti->mti_no_need_trans = 0;
-                return 0;
-        }
+       if (mti->mti_mdt == NULL || req == NULL)
+               return 0;
 
         if (mti->mti_has_trans) {
                 /* XXX: currently there are allowed cases, but the wrong cases