Whamcloud - gitweb
LU-795 osd api: Commit callback per transaction
authorMikhail Pershin <tappro@whamcloud.com>
Thu, 27 Oct 2011 04:03:33 +0000 (08:03 +0400)
committerOleg Drokin <green@whamcloud.com>
Sun, 13 Nov 2011 20:07:47 +0000 (15:07 -0500)
- ability to add commit callback per transaction in addition to
  per-device hooks. Now it is much simpler if only commit callback
  is needed.
- rewrite commit callbacks for last_commit and new_client, add commit
  callback in seq manager
- cleanup not-needed code: old commit callbacks, txn_keys
- remove osd od_env_for_commit environment and env param from commit
  callbacks
- use th_sync to mark sync operations

Change-Id: If5f8f2a6d3cd2f3e77fd13c802213a181043a2d7
Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1621
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
16 files changed:
lustre/fid/fid_handler.c
lustre/fid/fid_store.c
lustre/include/dt_object.h
lustre/include/lu_target.h
lustre/include/lustre_fid.h
lustre/mdd/mdd_device.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdd/mdd_trans.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c
lustre/obdclass/dt_object.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/ptlrpc/target.c

index 2a33c43..a762edb 100644 (file)
@@ -171,7 +171,6 @@ static int __seq_set_init(const struct lu_env *env,
         range_alloc(&seq->lss_hiwater_set, space, seq->lss_set_width);
 
         rc = seq_store_update(env, seq, NULL, 1);
-        seq->lss_set_transno = 0;
 
         return rc;
 }
@@ -209,31 +208,21 @@ static int range_alloc_set(const struct lu_env *env,
         if (range_is_exhausted(loset)) {
                 /* reached high water mark. */
                 struct lu_device *dev = seq->lss_site->ms_lu.ls_top_dev;
-                struct lu_target *tg = dev->ld_obd->u.obt.obt_lut;
                 int obd_num_clients = dev->ld_obd->obd_num_exports;
                 __u64 set_sz;
-                int sync = 0;
 
                 /* calculate new seq width based on number of clients */
                 set_sz = max(seq->lss_set_width,
-                               obd_num_clients * seq->lss_width);
+                             obd_num_clients * seq->lss_width);
                 set_sz = min(range_space(space), set_sz);
 
                 /* Switch to hiwater range now */
-                loset = hiset;
+                *loset = *hiset;
                 /* allocate new hiwater range */
                 range_alloc(hiset, space, set_sz);
 
-                if (seq->lss_set_transno > dev->ld_obd->obd_last_committed)
-                        sync = 1;
-
                 /* update ondisk seq with new *space */
-                rc = seq_store_update(env, seq, NULL, sync);
-
-                /* set new hiwater transno */
-                cfs_spin_lock(&tg->lut_translock);
-                seq->lss_set_transno = tg->lut_last_transno;
-                cfs_spin_unlock(&tg->lut_translock);
+                rc = seq_store_update(env, seq, NULL, seq->lss_need_sync);
         }
 
         LASSERTF(!range_is_exhausted(loset) || range_is_sane(loset),
index 2226d5e..66ce462 100644 (file)
@@ -73,37 +73,37 @@ static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
         return buf;
 }
 
-struct thandle *seq_store_trans_start(struct lu_server_seq *seq,
-                                      const struct lu_env *env, int credit,
-                                      int sync)
-{
-        struct seq_thread_info *info;
-        struct dt_device *dt_dev;
-        struct thandle *th;
-        ENTRY;
+struct seq_update_callback {
+        struct dt_txn_commit_cb suc_cb;
+        struct lu_server_seq   *suc_seq;
+};
 
-        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
-        LASSERT(info != NULL);
-
-        txn_param_init(&info->sti_txn, credit);
-        if (sync)
-                txn_param_sync(&info->sti_txn);
-
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
-        return th;
+void seq_update_cb(struct lu_env *env, struct thandle *th,
+                   struct dt_txn_commit_cb *cb, int err)
+{
+        struct seq_update_callback *ccb;
+        ccb = container_of0(cb, struct seq_update_callback, suc_cb);
+        ccb->suc_seq->lss_need_sync = 0;
+        cfs_list_del(&ccb->suc_cb.dcb_linkage);
+        OBD_FREE_PTR(ccb);
 }
 
-void seq_store_trans_stop(struct lu_server_seq *seq,
-                          const struct lu_env *env,
-                          struct thandle *th)
+int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
 {
-        struct dt_device *dt_dev;
-        ENTRY;
-
-        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-
-        dt_dev->dd_ops->dt_trans_stop(env, th);
+        struct seq_update_callback *ccb;
+        int rc;
+        OBD_ALLOC_PTR(ccb);
+        if (ccb == NULL)
+                return -ENOMEM;
+
+        ccb->suc_cb.dcb_func = seq_update_cb;
+        CFS_INIT_LIST_HEAD(&ccb->suc_cb.dcb_linkage);
+        ccb->suc_seq = seq;
+        seq->lss_need_sync = 1;
+        rc = dt_trans_cb_add(th, &ccb->suc_cb);
+        if (rc)
+                OBD_FREE_PTR(ccb);
+        return rc;
 }
 
 /* This function implies that caller takes care about locking. */
@@ -143,14 +143,21 @@ int seq_store_write(struct lu_server_seq *seq,
 int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
                      struct lu_seq_range *out, int sync)
 {
+        struct seq_thread_info *info;
+        struct dt_device *dt_dev;
         struct thandle *th;
         int rc;
         int credits = SEQ_TXN_STORE_CREDITS;
+        ENTRY;
+
+        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
 
         if (out != NULL)
                 credits += FLD_TXN_INDEX_INSERT_CREDITS;
 
-        th = seq_store_trans_start(seq, env, credits, sync);
+        txn_param_init(&info->sti_txn, credits);
+        th = dt_trans_start(env, dt_dev, &info->sti_txn);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
@@ -158,16 +165,26 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
+                GOTO(out,rc);
         } else if (out != NULL) {
                 rc = fld_server_create(seq->lss_site->ms_server_fld,
                                        env, out, th);
                 if (rc) {
                         CERROR("%s: Can't Update fld database, rc %d\n",
                                seq->lss_name, rc);
+                        GOTO(out,rc);
                 }
         }
 
-        seq_store_trans_stop(seq, env, th);
+        /* next sequence update will need sync until this update is committed
+         * in case of sync operation this is not needed obviously */
+        if (!sync)
+                /* if callback can't be added then sync always */
+                sync = !!seq_update_cb_add(th, seq);
+
+        th->th_sync |= sync;
+out:
+        dt_trans_stop(env, dt_dev, th);
         return rc;
 }
 
index d02d664..39fde4b 100644 (file)
@@ -84,6 +84,21 @@ struct dt_device_param {
 };
 
 /**
+ * Per-transaction commit callback function
+ */
+struct dt_txn_commit_cb;
+typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th,
+                        struct dt_txn_commit_cb *cb, int err);
+/**
+ * Special per-transaction callback for cases when just commit callback
+ * is needed and per-device callback are not convenient to use
+ */
+struct dt_txn_commit_cb {
+        cfs_list_t dcb_linkage;
+        dt_cb_t    dcb_func;
+};
+
+/**
  * Basic transaction credit op
  */
 enum dt_txn_op {
@@ -123,6 +138,11 @@ struct dt_device_operations {
         void  (*dt_trans_stop)(const struct lu_env *env,
                                struct thandle *th);
         /**
+         * Add commit callback to the transaction.
+         */
+        int   (*dt_trans_cb_add)(struct thandle *th,
+                                 struct dt_txn_commit_cb *dcb);
+        /**
          * Return fid of root index object.
          */
         int   (*dt_root_get)(const struct lu_env *env,
@@ -538,8 +558,6 @@ static inline int dt_object_exists(const struct dt_object *dt)
 struct txn_param {
         /** number of blocks this transaction will modify */
         unsigned int tp_credits;
-        /** sync transaction is needed */
-        __u32        tp_sync:1;
 };
 
 static inline void txn_param_init(struct txn_param *p, unsigned int credits)
@@ -554,11 +572,6 @@ static inline void txn_param_credit_add(struct txn_param *p,
         p->tp_credits += credits;
 }
 
-static inline void txn_param_sync(struct txn_param *p)
-{
-        p->tp_sync = 1;
-}
-
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -583,6 +596,8 @@ struct thandle {
         /** the last operation result in this transaction.
          * this value is used in recovery */
         __s32             th_result;
+        /** whether we need sync commit */
+        int               th_sync;
 };
 
 /**
@@ -601,8 +616,7 @@ struct dt_txn_callback {
                              struct txn_param *param, void *cookie);
         int (*dtc_txn_stop)(const struct lu_env *env,
                             struct thandle *txn, void *cookie);
-        int (*dtc_txn_commit)(const struct lu_env *env,
-                              struct thandle *txn, void *cookie);
+        void (*dtc_txn_commit)(struct thandle *txn, void *cookie);
         void                *dtc_cookie;
         __u32                dtc_tag;
         cfs_list_t           dtc_linkage;
@@ -614,7 +628,7 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb);
 int dt_txn_hook_start(const struct lu_env *env,
                       struct dt_device *dev, struct txn_param *param);
 int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn);
-int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn);
+void dt_txn_hook_commit(struct thandle *txn);
 
 int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj);
 
@@ -671,11 +685,17 @@ static inline struct thandle *dt_trans_start(const struct lu_env *env,
 }
 
 static inline void dt_trans_stop(const struct lu_env *env,
-                                 struct dt_device *d,
-                                 struct thandle *th)
+                                 struct dt_device *d, struct thandle *th)
 {
         LASSERT(d->dd_ops->dt_trans_stop);
         return d->dd_ops->dt_trans_stop(env, th);
 }
+
+static inline int dt_trans_cb_add(struct thandle *th,
+                                  struct dt_txn_commit_cb *dcb)
+{
+        LASSERT(th->th_dev->dd_ops->dt_trans_cb_add);
+        return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
+}
 /** @} dt */
 #endif /* __LUSTRE_DT_OBJECT_H */
index 1ec6e66..59fd19c 100644 (file)
@@ -67,8 +67,9 @@ struct lut_commit_cb {
 };
 
 void lut_boot_epoch_update(struct lu_target *);
-void lut_cb_last_committed(struct lu_target *, __u64, void *, int);
-void lut_cb_client(struct lu_target *, __u64, void *, int);
+int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
+                           struct obd_export *exp, __u64 transno);
+int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp);
 int lut_init(const struct lu_env *, struct lu_target *,
              struct obd_device *, struct dt_device *);
 void lut_fini(const struct lu_env *, struct lu_target *);
index 6894735..adb2671 100644 (file)
@@ -218,8 +218,8 @@ struct lu_server_seq {
          */
         __u64                   lss_set_width;
 
-        /* transaction no of seq update write operation */
-        __u64                   lss_set_transno;
+        /* sync is needed for update operation */
+        __u32                   lss_need_sync;
         /**
          * Pointer to site object, required to access site fld.
          */
index 5f3a492..876a51a 100644 (file)
@@ -93,7 +93,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
         /* Prepare transactions callbacks. */
         mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
         mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
-        mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
+        mdd->mdd_txn_cb.dtc_txn_commit = NULL;
         mdd->mdd_txn_cb.dtc_cookie = mdd;
         mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD;
         CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage);
index c49b727..e2ee0f2 100644 (file)
@@ -427,9 +427,10 @@ int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
 int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
                             struct md_attr *ma, enum mdd_txn_op,
                             int changelog_cnt);
-int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                                struct md_attr *ma, enum mdd_txn_op,
-                                int changelog_cnt);
+void mdd_setattr_txn_param_build(const struct lu_env *env,
+                                 struct md_object *obj,
+                                 struct md_attr *ma, enum mdd_txn_op,
+                                 int changelog_cnt);
 
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la);
@@ -452,8 +453,6 @@ int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param,
 int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                     void *cookie);
 
-int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
-                      void *cookie);
 /* mdd_device.c */
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
index 0c5c772..ecfe82b 100644 (file)
@@ -1502,12 +1502,17 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                 chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
                          lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
         }
+
         mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
                                     MDD_TXN_ATTR_SET_OP, chlog_cnt);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 GOTO(no_trans, rc = PTR_ERR(handle));
 
+        /* permission changes may require sync operation */
+        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+                handle->th_sync |= mdd->mdd_sync_permission;
+
         if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
@@ -1665,15 +1670,14 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                 RETURN(rc);
 
         mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
-        /* security-replated changes may require sync */
-        if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
-            mdd->mdd_sync_permission == 1)
-                txn_param_sync(&mdd_env_info(env)->mti_param);
-
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
+        /* security-replated changes may require sync */
+        if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+                handle->th_sync |= mdd->mdd_sync_permission;
+
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
 
         /* Only record user xattr changes */
index 001cac3..9519e38 100644 (file)
@@ -100,12 +100,6 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
         return mds_lov_write_objids(obd);
 }
 
-int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
-                      void *cookie)
-{
-        return 0;
-}
-
 void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
                          enum mdd_txn_op op, int changelog_cnt)
 {
@@ -182,24 +176,17 @@ out:
         RETURN(rc);
 }
 
-int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
-                                struct md_attr *ma, enum mdd_txn_op op,
-                                int changelog_cnt)
+void mdd_setattr_txn_param_build(const struct lu_env *env,
+                                 struct md_object *obj,
+                                 struct md_attr *ma, enum mdd_txn_op op,
+                                 int changelog_cnt)
 {
         struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
-        ENTRY;
 
         mdd_txn_param_build(env, mdd, op, changelog_cnt);
         if (ma->ma_attr.la_valid & (LA_UID | LA_GID))
                 txn_param_credit_add(&mdd_env_info(env)->mti_param,
                                      dto_txn_credits[DTO_ATTR_SET_CHOWN]);
-
-        /* permission changes may require sync operation */
-        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID) &&
-            mdd->mdd_sync_permission == 1)
-                txn_param_sync(&mdd_env_info(env)->mti_param);
-
-        RETURN(0);
 }
 
 static void mdd_txn_init_dto_credits(const struct lu_env *env,
index e45abae..1398f1e 100644 (file)
@@ -5669,15 +5669,6 @@ LU_KEY_INIT_FINI(mdt, struct mdt_thread_info);
 /* context key: mdt_thread_key */
 LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
 
-/* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
-LU_KEY_INIT_FINI(mdt_txn, struct mdt_txn_info);
-
-struct lu_context_key mdt_txn_key = {
-        .lct_tags = LCT_TX_HANDLE,
-        .lct_init = mdt_txn_key_init,
-        .lct_fini = mdt_txn_key_fini
-};
-
 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
 {
         return md_ucred(info->mti_env);
@@ -5720,7 +5711,7 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
 }
 
 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
-LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
+LU_TYPE_INIT_FINI(mdt, &mdt_thread_key);
 
 static struct lu_device_type_operations mdt_device_type_ops = {
         .ldto_init = mdt_type_init,
index e5e2556..1317643 100644 (file)
@@ -412,32 +412,6 @@ enum mdt_txn_op {
         MDT_TXN_LAST_RCVD_WRITE_OP,
 };
 
-/*
- * Info allocated per-transaction.
- */
-#define MDT_MAX_COMMIT_CB       4
-struct mdt_txn_info {
-        __u64                 txi_transno;
-        unsigned int          txi_cb_count;
-        struct lut_commit_cb  txi_cb[MDT_MAX_COMMIT_CB];
-};
-
-extern struct lu_context_key mdt_txn_key;
-
-static inline void mdt_trans_add_cb(const struct thandle *th,
-                                    lut_cb_t cb_func, void *cb_data)
-{
-        struct mdt_txn_info *txi;
-
-        txi = lu_context_key_get(&th->th_ctx, &mdt_txn_key);
-        LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb));
-
-        /* add new callback */
-        txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func;
-        txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data;
-        txi->txi_cb_count++;
-}
-
 static inline const struct md_device_operations *
 mdt_child_ops(struct mdt_device * m)
 {
index 103fc7a..8ae3631 100644 (file)
@@ -123,17 +123,19 @@ struct thandle* mdt_trans_start(const struct lu_env *env,
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         p = &mti->mti_txn_param;
 
-        /* export can require sync operations */
-        if (mti->mti_exp != NULL)
-                p->tp_sync = mti->mti_exp->exp_need_sync;
-
-        return mdt->mdt_bottom->dd_ops->dt_trans_start(env, mdt->mdt_bottom, p);
+        return dt_trans_start(env, mdt->mdt_bottom, p);
 }
 
 void mdt_trans_stop(const struct lu_env *env,
                     struct mdt_device *mdt, struct thandle *th)
 {
-        mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th);
+        struct mdt_thread_info *mti;
+
+        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+        /* export can require sync operations */
+        if (mti->mti_exp != NULL)
+                th->th_sync |= mti->mti_exp->exp_need_sync;
+        dt_trans_stop(env, mdt->mdt_bottom, th);
 }
 
 static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
@@ -585,10 +587,15 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
          * transaction so that many connecting clients will not bring
          * server down with lots of sync writes.
          */
-        mdt_trans_add_cb(th, lut_cb_client, class_export_cb_get(mti->mti_exp));
-        cfs_spin_lock(&mti->mti_exp->exp_lock);
-        mti->mti_exp->exp_need_sync = 1;
-        cfs_spin_unlock(&mti->mti_exp->exp_lock);
+        rc = lut_new_client_cb_add(th, mti->mti_exp);
+        if (rc) {
+                /* can't add callback, do sync now */
+                th->th_sync = 1;
+        } else {
+                cfs_spin_lock(&mti->mti_exp->exp_lock);
+                mti->mti_exp->exp_need_sync = 1;
+                cfs_spin_unlock(&mti->mti_exp->exp_lock);
+        }
 
         rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
         CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
@@ -848,17 +855,13 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
                            struct thandle *txn, void *cookie)
 {
         struct mdt_device *mdt = cookie;
-        struct mdt_txn_info *txi;
         struct mdt_thread_info *mti;
         struct ptlrpc_request *req;
 
-        /* transno in two contexts - for commit_cb and for thread */
-        txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         req = mdt_info_req(mti);
 
         if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
-                txi->txi_transno = 0;
                 mti->mti_no_need_trans = 0;
                 return 0;
         }
@@ -899,35 +902,13 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
 
         req->rq_transno = mti->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
-        /* save transno for the commit callback */
-        txi->txi_transno = mti->mti_transno;
-
-        /* add separate commit callback for transaction handling because we need
-         * export as parameter */
-        mdt_trans_add_cb(txn, lut_cb_last_committed,
-                         class_export_cb_get(mti->mti_exp));
-
+        /* if can't add callback, do sync write */
+        txn->th_sync = !!lut_last_commit_cb_add(txn, &mdt->mdt_lut,
+                                                mti->mti_exp,
+                                                mti->mti_transno);
         return mdt_last_rcvd_update(mti, txn);
 }
 
-/* commit callback, need to update last_committed value */
-static int mdt_txn_commit_cb(const struct lu_env *env,
-                             struct thandle *txn, void *cookie)
-{
-        struct mdt_device *mdt = cookie;
-        struct mdt_txn_info *txi;
-        int i;
-
-        txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
-
-        /* iterate through all additional callbacks */
-        for (i = 0; i < txi->txi_cb_count; i++) {
-                txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno,
-                                           txi->txi_cb[i].lut_cb_data, 0);
-        }
-        return 0;
-}
-
 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
                  struct obd_device *obd,
                  struct lustre_sb_info *lsi)
@@ -943,7 +924,7 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         /* prepare transactions callbacks */
         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
-        mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
+        mdt->mdt_txn_cb.dtc_txn_commit = NULL;
         mdt->mdt_txn_cb.dtc_cookie = mdt;
         mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
         CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
index 3164187..3d08d2f 100644 (file)
@@ -126,22 +126,15 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
 }
 EXPORT_SYMBOL(dt_txn_hook_stop);
 
-int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn)
+void dt_txn_hook_commit(struct thandle *txn)
 {
-        struct dt_device       *dev = txn->th_dev;
         struct dt_txn_callback *cb;
-        int                     result;
 
-        result = 0;
-        cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
-                if (cb->dtc_txn_commit == NULL ||
-                    !(cb->dtc_tag & env->le_ctx.lc_tags))
-                        continue;
-                result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie);
-                if (result < 0)
-                        break;
+        cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
+                                dtc_linkage) {
+                if (cb->dtc_txn_commit)
+                        cb->dtc_txn_commit(txn, cb->dtc_cookie);
         }
-        return result;
 }
 EXPORT_SYMBOL(dt_txn_hook_commit);
 
index 9bdf41c..9e4f75b 100644 (file)
@@ -136,6 +136,7 @@ struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
         struct journal_callback ot_jcb;
+        cfs_list_t              ot_dcb_list;
         /* Link to the device, for debugging. */
         struct lu_ref_link     *ot_dev_link;
 
@@ -629,24 +630,19 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
 {
         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
         struct thandle     *th  = &oh->ot_super;
-        struct dt_device   *dev = th->th_dev;
-        struct lu_device   *lud = &dev->dd_lu_dev;
+        struct lu_device   *lud = &th->th_dev->dd_lu_dev;
+        struct dt_txn_commit_cb *dcb, *tmp;
 
-        LASSERT(dev != NULL);
         LASSERT(oh->ot_handle == NULL);
 
-        if (error) {
+        if (error)
                 CERROR("transaction @0x%p commit error: %d\n", th, error);
-        } else {
-                struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
-                /*
-                 * This od_env_for_commit is only for commit usage.  see
-                 * "struct dt_device"
-                 */
-                lu_context_enter(&env->le_ctx);
-                dt_txn_hook_commit(env, th);
-                lu_context_exit(&env->le_ctx);
-        }
+
+        dt_txn_hook_commit(th);
+
+        /* call per-transaction callbacks if any */
+        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+                dcb->dcb_func(NULL, th, dcb, error);
 
         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
         lu_device_put(lud);
@@ -686,6 +682,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                          * be used.
                          */
                         oti->oti_dev = dev;
+                        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
                         osd_th_alloced(oh);
                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
                         osd_th_started(oh);
@@ -694,7 +691,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                                 th = &oh->ot_super;
                                 th->th_dev = d;
                                 th->th_result = 0;
-                                jh->h_sync = p->tp_sync;
+                                th->th_sync = 0;
                                 lu_device_get(&d->dd_lu_dev);
                                 oh->ot_dev_link = lu_ref_add
                                         (&d->dd_lu_dev.ld_reference,
@@ -702,8 +699,6 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                                 /* add commit callback */
                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
                                 lu_context_enter(&th->th_ctx);
-                                osd_journal_callback_set(jh,osd_trans_commit_cb,
-                                                         &oh->ot_jcb);
                                 LASSERT(oti->oti_txns == 0);
                                 LASSERT(oti->oti_r_locks == 0);
                                 LASSERT(oti->oti_w_locks == 0);
@@ -737,6 +732,15 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
+                hdl->h_sync = th->th_sync;
+                /*
+                 * add commit callback
+                 * notice we don't do this in osd_trans_start()
+                 * as underlying transaction can change during truncate
+                 */
+                osd_journal_callback_set(hdl, osd_trans_commit_cb,
+                                         &oh->ot_jcb);
+
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
                 LASSERT(oti->oti_r_locks == 0);
@@ -749,10 +753,22 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
                                   result = ldiskfs_journal_stop(hdl));
                 if (result != 0)
                         CERROR("Failure to stop transaction: %d\n", result);
+        } else {
+                OBD_FREE_PTR(oh);
         }
         EXIT;
 }
 
+static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+        struct osd_thandle *oh = container_of0(th, struct osd_thandle,
+                                               ot_super);
+
+        cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+
+        return 0;
+}
+
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
@@ -1137,6 +1153,7 @@ static const struct dt_device_operations osd_dt_ops = {
         .dt_statfs         = osd_statfs,
         .dt_trans_start    = osd_trans_start,
         .dt_trans_stop     = osd_trans_stop,
+        .dt_trans_cb_add   = osd_trans_cb_add,
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
@@ -3873,17 +3890,7 @@ static struct lu_context_key osd_key = {
 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
-        int rc;
-        struct lu_context *ctx;
-
-        /* context for commit hooks */
-        ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
-        rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
-        if (rc == 0) {
-                rc = osd_procfs_init(osd_dev(d), name);
-                ctx->lc_cookie = 0x3;
-        }
-        return rc;
+        return osd_procfs_init(osd_dev(d), name);
 }
 
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
@@ -3957,7 +3964,6 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
                                  osd_dev(d)->od_mount->lmi_mnt);
         osd_dev(d)->od_mount = NULL;
 
-        lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
         RETURN(NULL);
 }
 
index 343287c..5b3eaa4 100644 (file)
@@ -114,14 +114,6 @@ struct osd_device {
          */
         struct dt_object         *od_obj_area;
 
-        /* Environment for transaction commit callback.
-         * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
-         * is serialized, that is there is no more than one transaction commit
-         * at a time (JBD journal_commit_transaction() is serialized).
-         * This means that it's enough to have _one_ lu_context.
-         */
-        struct lu_env             od_env_for_commit;
-
         /*
          * Fid Capability
          */
index df732ad..0de17bf 100644 (file)
@@ -41,6 +41,7 @@
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
+#include <obd_class.h>
 
 /**
  * Update client data in last_rcvd file. An obd API
@@ -338,37 +339,115 @@ EXPORT_SYMBOL(lut_boot_epoch_update);
 /**
  * commit callback, need to update last_commited value
  */
-void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
-                           void *data, int err)
+struct lut_last_committed_callback {
+        struct dt_txn_commit_cb llcc_cb;
+        struct lu_target       *llcc_lut;
+        struct obd_export      *llcc_exp;
+        __u64                   llcc_transno;
+};
+
+void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
+                           struct dt_txn_commit_cb *cb, int err)
 {
-        struct obd_export *exp = data;
-        LASSERT(exp->exp_obd == lut->lut_obd);
-        cfs_spin_lock(&lut->lut_translock);
-        if (transno > lut->lut_obd->obd_last_committed)
-                lut->lut_obd->obd_last_committed = transno;
-
-        LASSERT(exp);
-        if (transno > exp->exp_last_committed) {
-                exp->exp_last_committed = transno;
-                cfs_spin_unlock(&lut->lut_translock);
-                ptlrpc_commit_replies(exp);
+        struct lut_last_committed_callback *ccb;
+
+        ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
+
+        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
+
+        cfs_spin_lock(&ccb->llcc_lut->lut_translock);
+        if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
+                ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
+
+        LASSERT(ccb->llcc_exp);
+        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
+                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
+                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
+                ptlrpc_commit_replies(ccb->llcc_exp);
         } else {
-                cfs_spin_unlock(&lut->lut_translock);
+                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
         }
-        class_export_cb_put(exp);
-        if (transno)
+        class_export_cb_put(ccb->llcc_exp);
+        if (ccb->llcc_transno)
                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
-                       lut->lut_obd->obd_name, transno);
+                       ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
+        cfs_list_del(&ccb->llcc_cb.dcb_linkage);
+        OBD_FREE_PTR(ccb);
+}
+
+int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
+                           struct obd_export *exp, __u64 transno)
+{
+        struct lut_last_committed_callback *ccb;
+        int rc;
+
+        OBD_ALLOC_PTR(ccb);
+        if (ccb == NULL)
+                return -ENOMEM;
+
+        ccb->llcc_cb.dcb_func = lut_cb_last_committed;
+        CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
+        ccb->llcc_lut = lut;
+        ccb->llcc_exp = class_export_cb_get(exp);
+        ccb->llcc_transno = transno;
+
+        rc = dt_trans_cb_add(th, &ccb->llcc_cb);
+        if (rc) {
+                class_export_cb_put(exp);
+                OBD_FREE_PTR(ccb);
+        }
+        return rc;
 }
-EXPORT_SYMBOL(lut_cb_last_committed);
+EXPORT_SYMBOL(lut_last_commit_cb_add);
+
+struct lut_new_client_callback {
+        struct dt_txn_commit_cb lncc_cb;
+        struct obd_export      *lncc_exp;
+};
 
-void lut_cb_client(struct lu_target *lut, __u64 transno,
-                       void *data, int err)
+void lut_cb_new_client(struct lu_env *env, struct thandle *th,
+                       struct dt_txn_commit_cb *cb, int err)
 {
-        LASSERT(lut->lut_obd);
-        target_client_add_cb(lut->lut_obd, transno, data, err);
+        struct lut_new_client_callback *ccb;
+
+        ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
+
+        LASSERT(ccb->lncc_exp->exp_obd);
+
+        CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+               ccb->lncc_exp->exp_obd->obd_name,
+               ccb->lncc_exp->exp_client_uuid.uuid);
+
+        cfs_spin_lock(&ccb->lncc_exp->exp_lock);
+        ccb->lncc_exp->exp_need_sync = 0;
+        cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
+        class_export_cb_put(ccb->lncc_exp);
+
+        cfs_list_del(&ccb->lncc_cb.dcb_linkage);
+        OBD_FREE_PTR(ccb);
+}
+
+int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+        struct lut_new_client_callback *ccb;
+        int rc;
+
+        OBD_ALLOC_PTR(ccb);
+        if (ccb == NULL)
+                return -ENOMEM;
+
+        ccb->lncc_cb.dcb_func = lut_cb_new_client;
+        CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
+        ccb->lncc_exp = class_export_cb_get(exp);
+
+        rc = dt_trans_cb_add(th, &ccb->lncc_cb);
+        if (rc) {
+                class_export_cb_put(exp);
+                OBD_FREE_PTR(ccb);
+        }
+        return rc;
 }
-EXPORT_SYMBOL(lut_cb_client);
+EXPORT_SYMBOL(lut_new_client_cb_add);
 
 int lut_init(const struct lu_env *env, struct lu_target *lut,
              struct obd_device *obd, struct dt_device *dt)